Commit de555afc authored by madengji's avatar madengji Committed by 33cn

para consens add bully and bls sign

parent e4dacdad
package para
import (
"errors"
"sync"
"time"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
)
const (
ELECTION = iota
COORDINATOR
OK
CLOSE
)
// Bully is a `struct` representing a single node used by the `Bully Algorithm`.
//
// NOTE: More details about the `Bully algorithm` can be found here
// https://en.wikipedia.org/wiki/Bully_algorithm .
type Bully struct {
ID string
coordinator string
nodegroup map[string]bool
inNodeGroup bool
mu *sync.RWMutex
receiveChan chan *pt.ElectionMsg
electionChan chan *pt.ElectionMsg
paraClient *client
qClient queue.Client
wg *sync.WaitGroup
quit chan struct{}
}
func NewBully(para *client, ID string, wg *sync.WaitGroup) (*Bully, error) {
b := &Bully{
paraClient: para,
ID: ID,
nodegroup: make(map[string]bool),
electionChan: make(chan *pt.ElectionMsg, 1),
receiveChan: make(chan *pt.ElectionMsg),
wg: wg,
mu: &sync.RWMutex{},
quit: make(chan struct{}),
}
return b, nil
}
func (b *Bully) SetParaAPI(cli queue.Client) {
b.qClient = cli
}
func (b *Bully) UpdateValidNodes(nodes []string) {
b.mu.Lock()
defer b.mu.Unlock()
b.nodegroup = make(map[string]bool)
for _, n := range nodes {
plog.Info("bully node update", "node", n)
b.nodegroup[n] = true
}
//退出nodegroup
if b.inNodeGroup && !b.nodegroup[b.ID] {
_ = b.Send("", CLOSE, nil)
}
b.inNodeGroup = b.nodegroup[b.ID]
}
func (b *Bully) isValidNode(ID string) bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.nodegroup[ID]
}
func (b *Bully) Close() {
close(b.quit)
}
func (b *Bully) Receive(msg *pt.ElectionMsg) {
plog.Info("bully rcv", "type", msg.Type)
switch msg.Type {
case CLOSE:
if msg.PeerID == b.Coordinator() {
b.SetCoordinator(b.ID)
b.Elect()
}
case OK:
if msg.ToID != b.ID {
return
}
select {
case b.electionChan <- msg:
return
case <-time.After(200 * time.Millisecond):
return
}
default:
b.receiveChan <- msg
}
}
func (b *Bully) sendMsg(ty int64, data interface{}) error {
msg := b.qClient.NewMessage("p2p", ty, data)
err := b.qClient.Send(msg, true)
if err != nil {
return err
}
resp, err := b.qClient.Wait(msg)
if err != nil {
return err
}
if resp.GetData().(*types.Reply).IsOk {
return nil
}
return errors.New(string(resp.GetData().(*types.Reply).GetMsg()))
}
func (b *Bully) Send(toId string, msgTy int32, data []byte) error {
act := &pt.ParaP2PSubMsg{Ty: P2pSubElectMsg}
act.Value = &pt.ParaP2PSubMsg_Election{Election: &pt.ElectionMsg{ToID: toId, PeerID: b.ID, Type: msgTy, Data: data}}
plog.Info("bull sendmsg")
err := b.paraClient.SendPubP2PMsg(types.Encode(act))
//err := b.sendMsg(types.EventPubTopicMsg, &types.PublishTopicMsg{Topic: "consensus", Msg: types.Encode(act)})
plog.Info("bully ret")
return err
}
// SetCoordinator sets `ID` as the new `b.coordinator` if `ID` is greater than
// `b.coordinator` or equal to `b.ID`.
func (b *Bully) SetCoordinator(ID string) {
b.mu.Lock()
defer b.mu.Unlock()
if ID > b.coordinator || ID == b.ID {
b.coordinator = ID
}
}
// Coordinator returns `b.coordinator`.
//
// NOTE: This function is thread-safe.
func (b *Bully) Coordinator() string {
b.mu.RLock()
defer b.mu.RUnlock()
return b.coordinator
}
func (b *Bully) IsSelfCoordinator() bool {
b.mu.RLock()
defer b.mu.RUnlock()
return b.ID == b.coordinator
}
// Elect handles the leader election mechanism of the `Bully algorithm`.
func (b *Bully) Elect() {
_ = b.Send("", ELECTION, nil)
select {
case <-b.electionChan:
return
case <-time.After(time.Second):
b.SetCoordinator(b.ID)
_ = b.Send("", COORDINATOR, nil)
return
}
}
func (b *Bully) Run() {
defer b.wg.Done()
var feedDog = false
var feedDogTiker <-chan time.Time
var watchDogTiker <-chan time.Time
plog.Info("bully init")
onceTimer := time.NewTimer(time.Minute)
out:
for {
select {
case msg := <-b.receiveChan:
switch msg.Type {
case ELECTION:
if msg.PeerID < b.ID {
_ = b.Send(msg.PeerID, OK, nil)
b.Elect()
}
case COORDINATOR:
if !b.isValidNode(msg.PeerID) {
continue
}
b.SetCoordinator(msg.PeerID)
if b.coordinator < b.ID {
b.Elect()
}
feedDog = true
}
case <-onceTimer.C:
feedDogTiker = time.NewTicker(20 * time.Second).C
watchDogTiker = time.NewTicker(time.Minute).C
case <-feedDogTiker:
plog.Info("bully feed dog tiker", "is", b.IsSelfCoordinator(), "valid", b.isValidNode(b.ID))
//leader需要定期喂狗
if b.IsSelfCoordinator() && b.isValidNode(b.ID) {
_ = b.Send("", COORDINATOR, nil)
}
case <-watchDogTiker:
//至少1分钟内要收到leader喂狗消息,否则认为leader挂了,重新选举
if !feedDog {
b.Elect()
plog.Info("bully watchdog triger")
}
feedDog = false
case <-b.quit:
break out
}
}
}
# para bully algorithm
#1. 广播
1. 新节点起来后会监听leader节点的喂狗消息, 超时未收到,则发起选举流程
1. 如果收到消息后,发现比自己小,则发起选举流程
1. 如果本节点不是最大节点,则会收到比自己大的节点的OK回复,并自己大的节点整个网络发起选举流程,最后收到leader的通知
1. 如果本节点是最大节点,则不会收到ok,则最后确定自己是leader
1. leader节点会定期广播宣示主权的喂狗消息,如果有节点超过一定时间未收到喂狗消息,则发起选举,比非leader节点定期发心跳消息要少
1. leader节点会查看自己是否在nodegroup里面,如果不在,则发送close消息,触发重新选举 
......@@ -15,10 +15,10 @@ import (
"sync/atomic"
"errors"
"time"
"github.com/33cn/chain33/client/api"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/merkle"
"github.com/33cn/chain33/queue"
......@@ -40,6 +40,12 @@ const (
poolMainBlockSec int64 = 5
defaultEmptyBlockInterval int64 = 50 //write empty block every interval blocks in mainchain
defaultSearchMatchedBlockDepth int32 = 10000
defaultParaBlsSignTopic = "BLS-SIGN"
)
const (
P2pSubCommitTx = iota
P2pSubElectMsg
)
var (
......@@ -52,6 +58,10 @@ func init() {
drivers.QueryData.Register("para", &client{})
}
type ClientInter interface {
SendPubP2PMsg(data []byte) error
}
type client struct {
*drivers.BaseClient
grpcClient types.Chain33Client
......@@ -65,6 +75,8 @@ type client struct {
wg sync.WaitGroup
subCfg *subConfig
dldCfg *downloadClient
bullyCli *Bully
blsSignCli *blsClient
isClosed int32
quitCreate chan struct{}
}
......@@ -191,72 +203,22 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
para.blockSyncClient.maxSyncErrCount = subcfg.MaxSyncErrCount
}
para.multiDldCli = &multiDldClient{
paraClient: para,
invNumPerJob: defaultInvNumPerJob,
jobBufferNum: defaultJobBufferNum,
serverTimeout: maxServerRspTimeout,
}
if subcfg.MultiDownInvNumPerJob > 0 {
para.multiDldCli.invNumPerJob = subcfg.MultiDownInvNumPerJob
}
if subcfg.MultiDownJobBuffNum > 0 {
para.multiDldCli.jobBufferNum = subcfg.MultiDownJobBuffNum
}
if subcfg.MultiDownServerRspTime > 0 {
para.multiDldCli.serverTimeout = subcfg.MultiDownServerRspTime
}
para.multiDldCli = newMultiDldCli(para, &subcfg)
para.jumpDldCli = &jumpDldClient{paraClient: para}
c.SetChild(para)
return para
}
//["0:50","100:20","500:30"]
func parseEmptyBlockInterval(cfg []string) ([]*emptyBlockInterval, error) {
var emptyInter []*emptyBlockInterval
if len(cfg) == 0 {
interval := &emptyBlockInterval{startHeight: 0, interval: defaultEmptyBlockInterval}
emptyInter = append(emptyInter, interval)
return emptyInter, nil
}
list := make(map[int64]int64)
var seq []int64
for _, e := range cfg {
ret, err := divideStr2Int64s(e, ":")
if len(subcfg.AuthAccount) > 0 {
para.blsSignCli = newBlsClient(para, &subcfg)
cli, err := NewBully(para, subcfg.AuthAccount, &para.wg)
if err != nil {
plog.Error("parse empty block inter config", "str", e)
return nil, err
panic("bully create err")
}
seq = append(seq, ret[0])
list[ret[0]] = ret[1]
}
sort.Slice(seq, func(i, j int) bool { return seq[i] < seq[j] })
for _, h := range seq {
emptyInter = append(emptyInter, &emptyBlockInterval{startHeight: h, interval: list[h]})
para.bullyCli = cli
para.bullyCli.SetParaAPI(para.GetQueueClient())
}
return emptyInter, nil
}
func checkEmptyBlockInterval(in []*emptyBlockInterval) error {
for i := 0; i < len(in); i++ {
if i == 0 && in[i].startHeight != 0 {
plog.Error("EmptyBlockInterval,first blockHeight should be 0", "height", in[i].startHeight)
return types.ErrInvalidParam
}
if i > 0 && in[i].startHeight <= in[i-1].startHeight {
plog.Error("EmptyBlockInterval,blockHeight should be sequence", "preHeight", in[i-1].startHeight, "laterHeight", in[i].startHeight)
return types.ErrInvalidParam
}
if in[i].interval <= 0 {
plog.Error("EmptyBlockInterval,interval should > 0", "height", in[i].startHeight)
return types.ErrInvalidParam
}
}
return nil
c.SetChild(para)
return para
}
//para 不检查任何的交易
......@@ -270,6 +232,11 @@ func (client *client) Close() {
close(client.commitMsgClient.quit)
close(client.quitCreate)
close(client.blockSyncClient.quitChan)
if len(client.subCfg.AuthAccount) > 0 {
close(client.blsSignCli.quit)
client.bullyCli.Close()
}
client.wg.Wait()
client.BaseClient.Close()
......@@ -287,12 +254,21 @@ func (client *client) SetQueueClient(c queue.Client) {
client.InitBlock()
})
go client.EventLoop()
client.wg.Add(1)
go client.commitMsgClient.handler()
client.wg.Add(1)
go client.CreateBlock()
client.wg.Add(1)
go client.blockSyncClient.syncBlocks()
if len(client.subCfg.AuthAccount) > 0 {
client.wg.Add(1)
go client.blsSignCli.procRcvSignTxs()
client.wg.Add(1)
go client.bullyCli.Run()
}
}
func (client *client) InitBlock() {
......@@ -405,54 +381,67 @@ func (client *client) CreateGenesisTx() (ret []*types.Transaction) {
}
func (client *client) ProcEvent(msg *queue.Message) bool {
return false
}
if msg.Ty == types.EventReceiveSubData {
if req, ok := msg.GetData().(*types.TopicData); ok {
func (client *client) isCaughtUp() bool {
return atomic.LoadInt32(&client.caughtUp) == 1
}
//IsCaughtUp 是否追上最新高度,
func (client *client) Query_IsCaughtUp(req *types.ReqNil) (types.Message, error) {
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
var sub pt.ParaP2PSubMsg
err := types.Decode(req.Data, &sub)
if err != nil {
plog.Error("paracross ProcEvent decode", "ty", types.EventReceiveSubData)
return true
}
plog.Info("paracross Recv from", req.GetFrom(), "topic:", req.GetTopic(), "ty", sub.GetTy())
switch sub.GetTy() {
case P2pSubCommitTx:
client.blsSignCli.rcvCommitTx(sub.GetCommitTx())
case P2pSubElectMsg:
client.bullyCli.Receive(sub.GetElection())
}
return &types.IsCaughtUp{Iscaughtup: client.isCaughtUp()}, nil
}
} else {
plog.Error("paracross ProcEvent topicData", "ty", types.EventReceiveSubData)
}
func (client *client) Query_LocalBlockInfo(req *types.ReqInt) (types.Message, error) {
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
return true
}
var block *pt.ParaLocalDbBlock
var err error
if req.Height <= -1 {
block, err = client.getLastLocalBlock()
return false
}
func (client *client) sendP2PMsg(ty int64, data interface{}) error {
msg := client.GetQueueClient().NewMessage("p2p", ty, data)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return nil, err
return err
}
} else {
block, err = client.getLocalBlockByHeight(req.Height)
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return nil, err
return err
}
if resp.GetData().(*types.Reply).IsOk {
return nil
}
return errors.New(string(resp.GetData().(*types.Reply).GetMsg()))
}
blockInfo := &pt.ParaLocalDbBlockInfo{
Height: block.Height,
MainHash: common.ToHex(block.MainHash),
MainHeight: block.MainHeight,
ParentMainHash: common.ToHex(block.ParentMainHash),
BlockTime: block.BlockTime,
}
func (client *client) SendPubP2PMsg(msg []byte) error {
data := &types.PublishTopicMsg{Topic: "consensus", Msg: msg}
return client.sendP2PMsg(types.EventPubTopicMsg, data)
for _, tx := range block.Txs {
blockInfo.Txs = append(blockInfo.Txs, common.ToHex(tx.Hash()))
}
}
func (client *client) subP2PTopic() error {
return client.sendP2PMsg(types.EventSubTopic, &types.SubTopic{Module: "consensus", Topic: defaultParaBlsSignTopic})
}
//TODO 本节点退出时候会自动removeTopic吗
func (client *client) rmvP2PTopic() error {
return client.sendP2PMsg(types.EventRemoveTopic, &types.RemoveTopic{Module: "consensus", Topic: defaultParaBlsSignTopic})
return blockInfo, nil
}
func (client *client) isCaughtUp() bool {
return atomic.LoadInt32(&client.caughtUp) == 1
}
func checkMinerTx(current *types.BlockDetail) error {
......@@ -482,29 +471,52 @@ func checkMinerTx(current *types.BlockDetail) error {
return nil
}
// Query_CreateNewAccount 通知para共识模块钱包创建了一个新的账户
func (client *client) Query_CreateNewAccount(acc *types.Account) (types.Message, error) {
if acc == nil {
return nil, types.ErrInvalidParam
}
plog.Info("Query_CreateNewAccount", "acc", acc.Addr)
// 需要para共识这边处理新创建的账户是否是超级节点发送commit共识交易的账户
client.commitMsgClient.onWalletAccount(acc)
return &types.Reply{IsOk: true, Msg: []byte("OK")}, nil
//比较newBlock是不是最优区块
func (client *client) CmpBestBlock(newBlock *types.Block, cmpBlock *types.Block) bool {
return false
}
// Query_WalletStatus 通知para共识模块钱包锁状态有变化
func (client *client) Query_WalletStatus(walletStatus *types.WalletStatus) (types.Message, error) {
if walletStatus == nil {
return nil, types.ErrInvalidParam
//["0:50","100:20","500:30"]
func parseEmptyBlockInterval(cfg []string) ([]*emptyBlockInterval, error) {
var emptyInter []*emptyBlockInterval
if len(cfg) == 0 {
interval := &emptyBlockInterval{startHeight: 0, interval: defaultEmptyBlockInterval}
emptyInter = append(emptyInter, interval)
return emptyInter, nil
}
plog.Info("Query_WalletStatus", "walletStatus", walletStatus.IsWalletLock)
// 需要para共识这边根据walletStatus.IsWalletLock锁的状态开启/关闭发送共识交易
client.commitMsgClient.onWalletStatus(walletStatus)
return &types.Reply{IsOk: true, Msg: []byte("OK")}, nil
list := make(map[int64]int64)
var seq []int64
for _, e := range cfg {
ret, err := divideStr2Int64s(e, ":")
if err != nil {
plog.Error("parse empty block inter config", "str", e)
return nil, err
}
seq = append(seq, ret[0])
list[ret[0]] = ret[1]
}
sort.Slice(seq, func(i, j int) bool { return seq[i] < seq[j] })
for _, h := range seq {
emptyInter = append(emptyInter, &emptyBlockInterval{startHeight: h, interval: list[h]})
}
return emptyInter, nil
}
//比较newBlock是不是最优区块
func (client *client) CmpBestBlock(newBlock *types.Block, cmpBlock *types.Block) bool {
return false
func checkEmptyBlockInterval(in []*emptyBlockInterval) error {
for i := 0; i < len(in); i++ {
if i == 0 && in[i].startHeight != 0 {
plog.Error("EmptyBlockInterval,first blockHeight should be 0", "height", in[i].startHeight)
return types.ErrInvalidParam
}
if i > 0 && in[i].startHeight <= in[i-1].startHeight {
plog.Error("EmptyBlockInterval,blockHeight should be sequence", "preHeight", in[i-1].startHeight, "laterHeight", in[i].startHeight)
return types.ErrInvalidParam
}
if in[i].interval <= 0 {
plog.Error("EmptyBlockInterval,interval should > 0", "height", in[i].startHeight)
return types.ErrInvalidParam
}
}
return nil
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package para
import (
"bytes"
"math/big"
"sort"
"time"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
"github.com/phoreproject/bls/g2pubs"
"github.com/pkg/errors"
)
const (
maxRcvTxCount = 100 //max 100 nodes, 1 height tx or 1 txgroup per node
)
type blsClient struct {
paraClient *client
selfID string
blsPriKey *g2pubs.SecretKey
blsPubKey *g2pubs.PublicKey
peers map[string]bool
peersBlsPubKey map[string]*g2pubs.PublicKey
txsBuff map[int64]*pt.ParaBlsSignSumDetails
rcvCommitTxCh chan []*pt.ParacrossCommitAction
quit chan struct{}
}
func newBlsClient(para *client, cfg *subConfig) *blsClient {
b := &blsClient{paraClient: para}
b.selfID = cfg.AuthAccount
b.peers = make(map[string]bool)
b.peersBlsPubKey = make(map[string]*g2pubs.PublicKey)
b.txsBuff = make(map[int64]*pt.ParaBlsSignSumDetails)
b.rcvCommitTxCh = make(chan []*pt.ParacrossCommitAction, maxRcvTxCount)
b.quit = make(chan struct{})
return b
}
//1. 要等到达成共识了才发送,不然处理未达成共识的各种场景会比较复杂,而且浪费手续费
func (b *blsClient) procRcvSignTxs() {
defer b.paraClient.wg.Done()
if len(b.selfID) <= 0 {
return
}
p2pTimer := time.NewTimer(time.Minute)
out:
for {
select {
case commits := <-b.rcvCommitTxCh:
collectSigns(b.txsBuff, commits)
nodes := b.paraClient.commitMsgClient.authNodes.Load().([]string)
if !isMostCommitDone(len(nodes), b.txsBuff) {
continue
}
//清空txsBuff,重新收集
txsBuff := b.txsBuff
b.txsBuff = make(map[int64]*pt.ParaBlsSignSumDetails)
//自己是Coordinator,则聚合交易
if b.paraClient.bullyCli.IsSelfCoordinator() {
dones := filterDoneCommits(len(nodes), txsBuff)
if len(dones) > 0 {
continue
}
acts, err := b.transferCommit2Action(dones)
if err != nil {
continue
}
b.paraClient.commitMsgClient.sendCommitActions(acts)
}
case <-p2pTimer.C:
if len(b.selfID) > 0 {
//tle := cfg.GetTitle()
plog.Info("send p2p topic------------------------------")
b.paraClient.subP2PTopic()
plog.Info("rcv p2p topic-------------------------------")
}
case <-b.quit:
break out
}
}
}
func (b *blsClient) rcvCommitTx(tx *types.Transaction) error {
if !tx.CheckSign() {
return types.ErrSign
}
if !b.paraClient.commitMsgClient.isValidNode(tx.From()) {
b.updatePeers(tx.From(), false)
return pt.ErrParaNodeAddrNotExisted
}
b.updatePeers(tx.From(), true)
txs := []*types.Transaction{tx}
if count := tx.GetGroupCount(); count > 0 {
group, err := tx.GetTxGroup()
if err != nil {
return err
}
txs = group.Txs
}
commits, err := b.getCommitInfo(txs)
if err != nil {
return err
}
b.rcvCommitTxCh <- commits
return nil
}
func (b *blsClient) getCommitInfo(txs []*types.Transaction) ([]*pt.ParacrossCommitAction, error) {
var commits []*pt.ParacrossCommitAction
for _, tx := range txs {
var act pt.ParacrossAction
err := types.Decode(tx.Payload, &act)
if err != nil {
return nil, errors.Wrap(err, "decode act")
}
if act.Ty != pt.ParacrossActionCommit {
return nil, types.ErrInvalidParam
}
commit := act.GetCommit()
if tx.From() != commit.Bls.Addrs[0] {
return nil, types.ErrFromAddr
}
err = b.verifyBlsSign(tx.From(), commit)
if err != nil {
return nil, pt.ErrBlsSignVerify
}
commits = append(commits, commit)
}
return commits, nil
}
func collectSigns(txsBuff map[int64]*pt.ParaBlsSignSumDetails, commits []*pt.ParacrossCommitAction) {
for _, cmt := range commits {
if _, ok := txsBuff[cmt.Status.Height]; !ok {
txsBuff[cmt.Status.Height] = &pt.ParaBlsSignSumDetails{Height: cmt.Status.Height}
}
a := txsBuff[cmt.Status.Height]
for i, v := range a.Addrs {
//节点更新交易参数的场景
if v == cmt.Bls.Addrs[0] {
a.Msgs[i] = types.Encode(cmt.Status)
a.Signs[i] = cmt.Bls.Sign
continue
}
}
a.Addrs = append(a.Addrs, cmt.Bls.Addrs[0])
a.Msgs = append(a.Msgs, types.Encode(cmt.Status))
a.Signs = append(a.Signs, cmt.Bls.Sign)
}
}
func isMostCommitDone(peers int, txsBuff map[int64]*pt.ParaBlsSignSumDetails) bool {
for i, v := range txsBuff {
most, _ := getMostCommit(v.Msgs)
if isCommitDone(peers, most) {
plog.Info("blssign.isMostCommitDone", "height", i, "most", most, "peers", peers)
return true
}
}
return false
}
func filterDoneCommits(peers int, txs map[int64]*pt.ParaBlsSignSumDetails) []*pt.ParaBlsSignSumDetails {
var seq []int64
for i, v := range txs {
most, hash := getMostCommit(v.Msgs)
if !isCommitDone(peers, most) {
plog.Info("blssign.filterDoneCommits not commit done", "height", i)
delete(txs, i)
continue
}
seq = append(seq, i)
//只保留相同的commits
a := &pt.ParaBlsSignSumDetails{Msgs: [][]byte{[]byte(hash)}}
for j, m := range v.Msgs {
if bytes.Equal([]byte(hash), m) {
a.Addrs = append(a.Addrs, v.Addrs[j])
a.Signs = append(a.Signs, v.Signs[j])
}
}
txs[i] = a
}
if len(seq) <= 0 {
plog.Info("blssign.filterDoneCommits nil")
return nil
}
sort.Slice(seq, func(i, j int) bool { return seq[i] < seq[j] })
plog.Info("blssign.filterDoneCommits", "seq", seq)
var signs []*pt.ParaBlsSignSumDetails
//共识高度要连续,不连续则退出
lastSeq := seq[0] - 1
for _, h := range seq {
if lastSeq+1 != h {
return signs
}
signs = append(signs, txs[h])
lastSeq = h
}
return signs
}
func (b *blsClient) transferCommit2Action(commits []*pt.ParaBlsSignSumDetails) ([]*pt.ParacrossCommitAction, error) {
var notify []*pt.ParacrossCommitAction
for _, v := range commits {
a := &pt.ParacrossCommitAction{}
s := &pt.ParacrossNodeStatus{}
types.Decode(v.Msgs[0], s)
a.Status = s
sign, err := b.aggregateSigns(v.Signs)
if err != nil {
return nil, err
}
signData := sign.Serialize()
copy(a.Bls.Sign, signData[:])
nodes := b.paraClient.commitMsgClient.authNodes.Load().([]string)
bits, remains := setAddrsBitMap(nodes, v.Addrs)
if len(remains) > 0 {
plog.Info("bls.signDoneCommits", "remains", remains)
}
a.Bls.AddrsMap = bits
notify = append(notify, a)
}
return notify, nil
}
func (b *blsClient) aggregateSigns(signs [][]byte) (*g2pubs.Signature, error) {
var signatures []*g2pubs.Signature
for _, data := range signs {
var s [48]byte
copy(s[:], data)
signKey, err := g2pubs.DeserializeSignature(s)
if err != nil {
return nil, err
}
signatures = append(signatures, signKey)
}
return g2pubs.AggregateSignatures(signatures), nil
}
func (b *blsClient) updatePeers(peer string, add bool) {
if _, ok := b.peers[peer]; ok {
if !add {
delete(b.peers, peer)
}
return
}
if add {
b.peers[peer] = true
}
}
func (b *blsClient) setBlsPriKey(secpPrkKey []byte) {
b.blsPriKey = getBlsPriKey(secpPrkKey)
b.blsPubKey = g2pubs.PrivToPub(b.blsPriKey)
serial := b.blsPubKey.Serialize()
plog.Info("para commit get pub bls", "pubkey", common.ToHex(serial[:]))
}
//to repeat get prikey's hash until in range of bls's private key
func getBlsPriKey(key []byte) *g2pubs.SecretKey {
var newKey [common.Sha256Len]byte
copy(newKey[:], key[:])
for {
plog.Info("para commit getBlsPriKey", "keys", common.ToHex(newKey[:]))
secret := g2pubs.DeserializeSecretKey(newKey)
if nil != secret.GetFRElement() {
serial := secret.Serialize()
plog.Info("para commit getBlsPriKey", "final keys", common.ToHex(serial[:]), "string", secret.String())
return secret
}
copy(newKey[:], common.Sha256(newKey[:]))
}
}
func (b *blsClient) blsSign(commits []*pt.ParacrossCommitAction) error {
for _, cmt := range commits {
data := types.Encode(cmt.Status)
plog.Debug("blsign msg", "data", common.ToHex(data), "height", cmt.Status.Height)
sign := g2pubs.Sign(data, b.blsPriKey).Serialize()
cmt.Bls = &pt.ParacrossCommitBlsInfo{Sign: sign[:], Addrs: []string{b.selfID}}
}
return nil
}
//设置nodes范围内的bitmap,如果addrs在node不存在,也不设置,返回未命中的addrs
func setAddrsBitMap(nodes, addrs []string) ([]byte, map[string]bool) {
rst := big.NewInt(0)
addrsMap := make(map[string]bool)
for _, n := range addrs {
addrsMap[n] = true
}
for i, a := range nodes {
if _, exist := addrsMap[a]; exist {
rst.SetBit(rst, i, 1)
delete(addrsMap, a)
}
}
return rst.Bytes(), addrsMap
}
func getMostCommit(commits [][]byte) (int, string) {
stats := make(map[string]int)
n := len(commits)
for i := 0; i < n; i++ {
if _, ok := stats[string(commits[i])]; ok {
stats[string(commits[i])]++
} else {
stats[string(commits[i])] = 1
}
}
most := -1
var hash string
for k, v := range stats {
if v > most {
most = v
hash = k
}
}
return most, hash
}
func isCommitDone(nodes, mostSame int) bool {
return 3*mostSame > 2*nodes
}
func (b *blsClient) getBlsPubKey(addr string) (*g2pubs.PublicKey, error) {
//先从缓存中获取
if v, ok := b.peersBlsPubKey[addr]; ok {
return v, nil
}
//缓存没有,则从statedb获取
cfg := b.paraClient.GetAPI().GetConfig()
ret, err := b.paraClient.GetAPI().QueryChain(&types.ChainExecutor{
Driver: "paracross",
FuncName: "GetNodeAddrInfo",
Param: types.Encode(&pt.ReqParacrossNodeInfo{Title: cfg.GetTitle(), Addr: addr}),
})
if err != nil {
plog.Error("commitmsg.GetNodeAddrInfo ", "err", err.Error())
return nil, err
}
resp, ok := ret.(*pt.ParaNodeAddrIdStatus)
if !ok {
plog.Error("commitmsg.getNodeGroupAddrs rsp nok")
return nil, err
}
//pubKeys := make([]*g2pubs.PublicKey, 0)
val, err := common.FromHex(resp.BlsPubKey)
if err != nil {
plog.Error("verifyBlsSign.fromhex", "p", addr)
return nil, err
}
k := [96]byte{}
copy(k[:], val)
pubKey, err := g2pubs.DeserializePublicKey(k)
if err != nil {
plog.Error("verifyBlsSign.DeserializePublicKey", "key", addr)
return nil, err
}
b.peersBlsPubKey[addr] = pubKey
return pubKey, nil
}
func (b *blsClient) verifyBlsSign(addr string, commit *pt.ParacrossCommitAction) error {
//1. 获取对应公钥
pubKey, err := b.getBlsPubKey(addr)
if err != nil {
plog.Error("verifyBlsSign pub key not exist", "addr", addr)
return err
}
//2. 获取bls签名
signkey := [48]byte{}
copy(signkey[:], commit.Bls.Sign)
sign, err := g2pubs.DeserializeSignature(signkey)
if err != nil {
plog.Error("verifyBlsSign.DeserializeSignature", "key", common.ToHex(commit.Bls.Sign))
return err
}
//3. 获取签名前原始msg
msg := types.Encode(commit.Status)
if !g2pubs.Verify(msg, pubKey, sign) {
plog.Error("paracross.Commit bls sign verify", "title", commit.Status.Title, "height", commit.Status.Height,
"addrsMap", common.ToHex(commit.Bls.AddrsMap), "sign", common.ToHex(commit.Bls.Sign), "addr", addr)
plog.Error("paracross.commit bls sign verify", "data", common.ToHex(msg), "height", commit.Status.Height)
return pt.ErrBlsSignVerify
}
return nil
}
func (b *blsClient) showTxBuffInfo() *pt.ParaBlsSignSumInfo {
var seq []int64
var ret pt.ParaBlsSignSumInfo
for k, _ := range b.txsBuff {
seq = append(seq, k)
}
sort.Slice(seq, func(i, j int) bool { return seq[i] < seq[j] })
for _, h := range seq {
ret.Info = append(ret.Info, b.txsBuff[h])
}
return &ret
}
# para bls sign
#1. 订阅topic
以自身账户为topic订阅,向平行链内部节点间广播
#2. 协商leader
广播之后一段时间,是寻找leader的过程
#3. 发送共识交易
1. 共识交易发给leader,由leader聚合后上链,如果收集的签名交易不超过2/3节点,则不发送上链交易
1. 共识交易发送后,等待回复(广播形式),如果超时未回复,则重新发送
2. leader的轮换算法可以下一步考虑
......@@ -61,6 +61,7 @@ type commitMsgClient struct {
txFeeRate int64
selfConsEnableList []*paraSelfConsEnable //适配在自共识合约配置前有自共识的平行链项目,fork之后,采用合约配置
privateKey crypto.PrivKey
authNodes atomic.Value
blsPriKey *g2pubs.SecretKey
blsPubKey *g2pubs.PublicKey
quit chan struct{}
......@@ -96,11 +97,11 @@ out:
//出错场景入口,需要reset 重发
case <-client.resetCh:
client.resetSend()
client.sendCommitTx()
//例行检查发送入口
client.createCommitTx()
//例行检查发送入口,及时触发未发送共识
case <-readTick:
client.procChecks(checkParams)
client.sendCommitTx()
client.createCommitTx()
case <-client.quit:
break out
......@@ -121,10 +122,7 @@ func (client *commitMsgClient) updateChainHeightNotify(height int64, isDel bool)
atomic.StoreInt64(&client.chainHeight, height)
client.checkRollback(height)
if !client.isSendingCommitMsg() {
client.sendCommitTx()
}
client.createCommitTx()
}
// reset notify 提供重设发送参数,发送tx的入口
......@@ -135,7 +133,7 @@ func (client *commitMsgClient) resetNotify() {
//新的区块产生,检查是否有commitTx正在发送入口
func (client *commitMsgClient) commitTxCheckNotify(block *types.ParaTxDetail) {
if client.checkCommitTxSuccess(block) {
client.sendCommitTx()
client.createCommitTx()
}
}
......@@ -160,7 +158,28 @@ func (client *commitMsgClient) getConsensusHeight() int64 {
return status.Height
}
func (client *commitMsgClient) sendCommitTx() {
func (client *commitMsgClient) createCommitTx() {
tx := client.getCommitTx()
if tx == nil {
return
}
//bls sign, send to p2p
if !client.paraClient.subCfg.BlsSignOff {
//send to p2p pubsub
plog.Info("para commitMs send to p2p", "hash", common.ToHex(tx.Hash()))
client.paraClient.SendPubP2PMsg(types.Encode(tx))
return
}
client.pushCommitTx(tx)
}
//四个触发:1,新增区块 2,10s tick例行检查 3,发送交易成功上链 4,异常重发
//1&2 只要共识高度追赶上了sendingHeight,就可以继续发送,即便当前节点发送交易仍未上链也直接取消发送新交易
func (client *commitMsgClient) getCommitTx() *types.Transaction {
client.mutex.Lock()
defer client.mutex.Unlock()
......@@ -172,122 +191,144 @@ func (client *commitMsgClient) sendCommitTx() {
chainHeight := atomic.LoadInt64(&client.chainHeight)
sendingHeight := client.sendingHeight
if sendingHeight < consensHeight {
sendingHeight = consensHeight
}
isSync := client.isSync()
plog.Info("para commitMsg---status", "chainHeight", chainHeight, "sendingHeight", sendingHeight,
"consensHeight", consensHeight, "isSendingTx", client.isSendingCommitMsg(), "sync", isSync)
if client.isSendingCommitMsg() || !isSync {
return
}
if sendingHeight < consensHeight {
sendingHeight = consensHeight
if !isSync {
return nil
}
//1.如果是在主链共识场景,共识高度可能大于平行链的链高度
//2.已发送,未共识场景
if chainHeight < consensHeight || sendingHeight > consensHeight {
return
if sendingHeight > consensHeight || consensHeight > chainHeight || sendingHeight >= chainHeight {
return nil
}
if sendingHeight < chainHeight {
//满足 sendingHeight <= consensHeight <= chainHeight && sendingHeight < chainHeight
signTx, count := client.getSendingTx(sendingHeight, chainHeight)
if signTx == nil {
return
return nil
}
client.checkTxCommitTimes = 0
client.sendingHeight = sendingHeight + count
client.setCurrentTx(signTx)
client.sendMsgCh <- signTx
}
return signTx
}
func (client *commitMsgClient) checkCommitTxSuccess(block *types.ParaTxDetail) bool {
//client.checkTxCommitTimes和client.sendingHeight锁的场景可以区分
//发送commitTx,可能跟checkCommitTxSuccess获取全局变量冲突,加锁, 如果有仍未成功上链的交易,直接覆盖重置
func (client *commitMsgClient) pushCommitTx(signTx *types.Transaction) {
client.mutex.Lock()
defer client.mutex.Unlock()
curTx := client.getCurrentTx()
if curTx == nil {
return false
client.checkTxCommitTimes = 0
client.setCurrentTx(signTx)
client.sendMsgCh <- signTx
}
func (client *commitMsgClient) sendCommitActions(acts []*pt.ParacrossCommitAction) {
txs, _, err := client.calcCommitMsgTxs(acts)
if err != nil {
return
}
plog.Debug("paracommitmsg sendCommitActions", "txhash", common.ToHex(txs.Hash()))
for i, msg := range acts {
plog.Debug("paracommitmsg sendCommitActions", "idx", i, "height", msg.Status.Height, "mainheight", msg.Status.MainBlockHeight,
"blockhash", common.HashHex(msg.Status.BlockHash), "mainHash", common.HashHex(msg.Status.MainBlockHash),
"addrsmap", common.ToHex(msg.Bls.AddrsMap))
}
client.pushCommitTx(txs)
}
//只处理AddType block,回滚的不处理
if block.Type == types.AddBlock {
//使用map 比每个交易hash byte比较效率应该会高些
txMap := make(map[string]bool)
func (client *commitMsgClient) checkTxIn(block *types.ParaTxDetail, tx *types.Transaction) bool {
//committx是平行链交易
if types.IsParaExecName(string(curTx.Execer)) {
if types.IsParaExecName(string(tx.Execer)) {
for _, tx := range block.TxDetails {
if bytes.HasSuffix(tx.Tx.Execer, []byte(pt.ParaX)) && tx.Receipt.Ty == types.ExecOk {
txMap[string(tx.Tx.Hash())] = true
return true
}
}
} else {
// committx是主链交易,需要向主链查询,平行链获取到的只是过滤了的平行链交易
//如果正在追赶,则暂时不去主链查找,减少耗时
if !client.paraClient.isCaughtUp() {
return false
}
receipt, _ := client.paraClient.QueryTxOnMainByHash(curTx.Hash())
if receipt != nil && receipt.Receipt.Ty == types.ExecOk {
txMap[string(curTx.Hash())] = true
}
}
//验证通过
if txMap[string(curTx.Hash())] {
client.setCurrentTx(nil)
//主链交易,向主链查询,平行链获取到的只是过滤了的平行链交易
receipt, _ := client.paraClient.QueryTxOnMainByHash(tx.Hash())
if receipt != nil && receipt.Receipt.Ty == types.ExecOk {
return true
}
}
return client.reSendCommitTx(block.Type)
return false
}
func (client *commitMsgClient) reSendCommitTx(addType int64) bool {
func (client *commitMsgClient) checkCommitTxSuccess(block *types.ParaTxDetail) bool {
client.mutex.Lock()
defer client.mutex.Unlock()
curTx := client.getCurrentTx()
if curTx == nil {
return false
}
//当前addType是回滚,则不计数,如果有累计则撤销上次累计次数,重新计数
if addType != types.AddBlock {
if block.Type != types.AddBlock {
if client.checkTxCommitTimes > 0 {
client.checkTxCommitTimes--
}
return false
}
if client.checkTxIn(block, curTx) {
client.setCurrentTx(nil)
return true
}
return client.reSendCommitTx(curTx)
}
func (client *commitMsgClient) reSendCommitTx(tx *types.Transaction) bool {
client.checkTxCommitTimes++
if client.checkTxCommitTimes < client.waitMainBlocks {
return false
}
client.checkTxCommitTimes = 0
//bls聚合签名场景,发送未成功上链,继续发送交易
if !client.paraClient.subCfg.BlsSignOff {
//resend tx
client.sendMsgCh <- tx
return false
}
//非聚合签名场景,发送未成功,触发重新构建交易发送
client.resetSendEnv()
return true
}
//如果共识高度一直没有追上发送高度,且当前发送高度已经上链,说明共识一直没达成,安全起见,超过停止次数后,重发
func (client *commitMsgClient) checkConsensusStop(consensStopTimes uint32) uint32 {
//如果共识高度一直没有追上发送高度,超出等待时间后,说明共识一直没达成,安全起见,超过停止次数后,重发
func (client *commitMsgClient) checkConsensusStop(checks *commitCheckParams) {
client.mutex.Lock()
defer client.mutex.Unlock()
consensHeight := client.getConsensusHeight()
if client.sendingHeight > consensHeight && !client.isSendingCommitMsg() {
if consensStopTimes > client.waitConsensStopTimes {
plog.Debug("para commitMsg-checkConsensusStop", "times", consensStopTimes)
if client.sendingHeight > consensHeight {
checks.consensStopTimes += 1
if checks.consensStopTimes > client.waitConsensStopTimes {
plog.Debug("para commitMsg-checkConsensusStop", "times", checks.consensStopTimes)
checks.consensStopTimes = 0
client.resetSendEnv()
return 0
}
return consensStopTimes + 1
}
return 0
return
}
func (client *commitMsgClient) checkAuthAccountIn() {
nodes, err := client.getNodeGroupAddrs()
nodeStr, err := client.getNodeGroupAddrs()
if err != nil {
return
}
authExist := strings.Contains(nodes, client.authAccount)
authExist := strings.Contains(nodeStr, client.authAccount)
//如果授权节点重新加入,需要从当前共识高度重新发送
if !client.authAccountIn && authExist {
......@@ -295,10 +336,15 @@ func (client *commitMsgClient) checkAuthAccountIn() {
}
client.authAccountIn = authExist
nodes := strings.Split(nodeStr, ",")
client.paraClient.bullyCli.UpdateValidNodes(nodes)
client.authNodes.Store(nodes)
}
func (client *commitMsgClient) procChecks(checks *commitCheckParams) {
checks.consensStopTimes = client.checkConsensusStop(checks.consensStopTimes)
client.checkConsensusStop(checks)
client.checkAuthAccountIn()
}
......@@ -369,13 +415,13 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type
}
if !client.paraClient.subCfg.BlsSignOff {
err = client.blsSign(commits)
err = client.paraClient.blsSignCli.blsSign(commits)
if err != nil {
return nil, 0
}
}
signTx, count, err := client.calcCommitMsgTxs(commits, atomic.LoadInt64(&client.txFeeRate))
signTx, count, err := client.calcCommitMsgTxs(commits)
if err != nil || signTx == nil {
return nil, 0
}
......@@ -391,10 +437,10 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type
return signTx, count
}
func (client *commitMsgClient) calcCommitMsgTxs(notifications []*pt.ParacrossCommitAction, feeRate int64) (*types.Transaction, int64, error) {
txs, count, err := client.batchCalcTxGroup(notifications, feeRate)
func (client *commitMsgClient) calcCommitMsgTxs(notifications []*pt.ParacrossCommitAction) (*types.Transaction, int64, error) {
txs, count, err := client.batchCalcTxGroup(notifications, atomic.LoadInt64(&client.txFeeRate))
if err != nil {
txs, err = client.singleCalcTx((notifications)[0], feeRate)
txs, err = client.singleCalcTx((notifications)[0], atomic.LoadInt64(&client.txFeeRate))
if err != nil {
plog.Error("single calc tx", "height", notifications[0].Status.Height)
......@@ -447,7 +493,10 @@ func (client *commitMsgClient) getExecName(commitHeight int64) string {
func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossCommitAction, feeRate int64) (*types.Transaction, int, error) {
var rawTxs types.Transactions
cfg := client.paraClient.GetAPI().GetConfig()
for _, notify := range notifications {
for i, notify := range notifications {
if i >= int(types.MaxTxGroupSize) {
break
}
execName := client.getExecName(notify.Status.Height)
tx, err := paracross.CreateRawCommitTx4MainChain(cfg, notify, execName, feeRate)
if err != nil {
......@@ -917,11 +966,8 @@ func (client *commitMsgClient) fetchPriKey() error {
}
client.privateKey = priKey
client.blsPriKey = getBlsPriKey(priKey.Bytes())
client.blsPubKey = g2pubs.PrivToPub(client.blsPriKey)
serial := client.blsPubKey.Serialize()
plog.Info("para commit get pub bls", "final keys", common.ToHex(serial[:]))
plog.Info("para commit fetchPriKey success")
client.paraClient.blsSignCli.setBlsPriKey(priKey.Bytes())
return nil
}
......@@ -978,3 +1024,13 @@ func (client *commitMsgClient) isSelfConsEnable(height int64) bool {
return false
}
func (client *commitMsgClient) isValidNode(addr string) bool {
nodes := client.authNodes.Load().([]string)
for _, v := range nodes {
if v == addr {
return true
}
}
return false
}
......@@ -553,6 +553,7 @@ out:
}
//如果当前正在追赶,暂不处理
if client.commitMsgClient.authAccount != "" && client.isCaughtUp() && len(paraTxs.Items) > 0 {
//在追赶上之后,每次seq只请求一个,只检查第一个即可
client.commitMsgClient.commitTxCheckNotify(paraTxs.Items[0])
}
......
......@@ -68,6 +68,26 @@ type multiDldClient struct {
mtx sync.Mutex
}
func newMultiDldCli(para *client, cfg *subConfig) *multiDldClient {
multi := &multiDldClient{
paraClient: para,
invNumPerJob: defaultInvNumPerJob,
jobBufferNum: defaultJobBufferNum,
serverTimeout: maxServerRspTimeout,
}
if cfg.MultiDownInvNumPerJob > 0 {
multi.invNumPerJob = cfg.MultiDownInvNumPerJob
}
if cfg.MultiDownJobBuffNum > 0 {
multi.jobBufferNum = cfg.MultiDownJobBuffNum
}
if cfg.MultiDownServerRspTime > 0 {
multi.serverTimeout = cfg.MultiDownServerRspTime
}
return multi
}
func (m *multiDldClient) getInvs(startHeight, endHeight int64) []*inventory {
var invs []*inventory
if endHeight > startHeight && endHeight-startHeight > maxRollbackHeight {
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package para
import (
"fmt"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
)
//IsCaughtUp 是否追上最新高度,
func (client *client) Query_IsCaughtUp(req *types.ReqNil) (types.Message, error) {
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
}
return &types.IsCaughtUp{Iscaughtup: client.isCaughtUp()}, nil
}
func (client *client) Query_LocalBlockInfo(req *types.ReqInt) (types.Message, error) {
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
}
var block *pt.ParaLocalDbBlock
var err error
if req.Height <= -1 {
block, err = client.getLastLocalBlock()
if err != nil {
return nil, err
}
} else {
block, err = client.getLocalBlockByHeight(req.Height)
if err != nil {
return nil, err
}
}
blockInfo := &pt.ParaLocalDbBlockInfo{
Height: block.Height,
MainHash: common.ToHex(block.MainHash),
MainHeight: block.MainHeight,
ParentMainHash: common.ToHex(block.ParentMainHash),
BlockTime: block.BlockTime,
}
for _, tx := range block.Txs {
blockInfo.Txs = append(blockInfo.Txs, common.ToHex(tx.Hash()))
}
return blockInfo, nil
}
func (client *client) Query_LeaderInfo(req *types.ReqNil) (types.Message, error) {
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
}
isLeader := client.bullyCli.IsSelfCoordinator()
leader := client.bullyCli.Coordinator()
return &pt.ElectionStatus{IsLeader: isLeader, LeaderId: leader}, nil
}
func (client *client) Query_CommitTxInfo(req *types.ReqNil) (types.Message, error) {
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
}
rt := client.blsSignCli.showTxBuffInfo()
return rt, nil
}
// Query_CreateNewAccount 通知para共识模块钱包创建了一个新的账户
func (client *client) Query_CreateNewAccount(acc *types.Account) (types.Message, error) {
if acc == nil {
return nil, types.ErrInvalidParam
}
plog.Info("Query_CreateNewAccount", "acc", acc.Addr)
// 需要para共识这边处理新创建的账户是否是超级节点发送commit共识交易的账户
client.commitMsgClient.onWalletAccount(acc)
return &types.Reply{IsOk: true, Msg: []byte("OK")}, nil
}
// Query_WalletStatus 通知para共识模块钱包锁状态有变化
func (client *client) Query_WalletStatus(walletStatus *types.WalletStatus) (types.Message, error) {
if walletStatus == nil {
return nil, types.ErrInvalidParam
}
plog.Info("Query_WalletStatus", "walletStatus", walletStatus.IsWalletLock)
// 需要para共识这边根据walletStatus.IsWalletLock锁的状态开启/关闭发送共识交易
client.commitMsgClient.onWalletStatus(walletStatus)
return &types.Reply{IsOk: true, Msg: []byte("OK")}, nil
}
......@@ -44,6 +44,8 @@ func ParcCmd() *cobra.Command {
GetBlockInfoCmd(),
GetLocalBlockInfoCmd(),
GetConsensDoneInfoCmd(),
LeaderCmd(),
CmtTxInfoCmd(),
)
return cmd
}
......@@ -950,6 +952,40 @@ func isSync(cmd *cobra.Command, args []string) {
ctx.Run()
}
// IsSyncCmd query parachain is sync
func LeaderCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "leader",
Short: "node leader info",
Run: leaderInfo,
}
return cmd
}
func leaderInfo(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
var res pt.ElectionStatus
ctx := jsonclient.NewRPCCtx(rpcLaddr, "paracross.GetParaNodeLeaderInfo", nil, &res)
ctx.Run()
}
// IsSyncCmd query parachain is sync
func CmtTxInfoCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "cmtinfo",
Short: "commit tx info",
Run: cmtTxInfo,
}
return cmd
}
func cmtTxInfo(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
var res pt.ParaBlsSignSumInfo
ctx := jsonclient.NewRPCCtx(rpcLaddr, "paracross.GetParaCmtTxInfo", nil, &res)
ctx.Run()
}
func consusHeight(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
paraName, _ := cmd.Flags().GetString("paraName")
......
......@@ -133,8 +133,8 @@ func checkCommitInfo(cfg *types.Chain33Config, commit *pt.ParacrossNodeStatus) e
}
//区块链中不能使用float类型
func isCommitDone(nodes map[string]struct{}, mostSame int) bool {
return 3*mostSame > 2*len(nodes)
func isCommitDone(nodes, mostSame int) bool {
return 3*mostSame > 2*nodes
}
func makeCommitReceipt(addr string, commit *pt.ParacrossNodeStatus, prev, current *pt.ParacrossHeightStatus) *types.Receipt {
......@@ -226,14 +226,14 @@ func makeDoneReceipt(cfg *types.Chain33Config, execMainHeight, execHeight int64,
}
}
func getMostCommit(stat *pt.ParacrossHeightStatus) (int, string) {
func GetMostCommit(commits [][]byte) (int, string) {
stats := make(map[string]int)
n := len(stat.Details.Addrs)
n := len(commits)
for i := 0; i < n; i++ {
if _, ok := stats[string(stat.Details.BlockHash[i])]; ok {
stats[string(stat.Details.BlockHash[i])]++
if _, ok := stats[string(commits[i])]; ok {
stats[string(commits[i])]++
} else {
stats[string(stat.Details.BlockHash[i])] = 1
stats[string(commits[i])] = 1
}
}
most := -1
......@@ -343,7 +343,7 @@ func paraCheckSelfConsOn(cfg *types.Chain33Config, db dbm.KV, commit *pt.Paracro
return true, nil, nil
}
func (a *action) preCheckCommitInfo(commit *pt.ParacrossNodeStatus) error {
func (a *action) preCheckCommitInfo(commit *pt.ParacrossNodeStatus, commitAddr string) error {
cfg := a.api.GetConfig()
err := checkCommitInfo(cfg, commit)
if err != nil {
......@@ -354,18 +354,19 @@ func (a *action) preCheckCommitInfo(commit *pt.ParacrossNodeStatus) error {
if err != nil {
return err
}
if !validNode(a.fromaddr, nodesMap) {
return errors.Wrapf(pt.ErrNodeNotForTheTitle, "not validNode:%s", a.fromaddr)
//允许a.fromAddr非nodegroup成员,将来也许可以代nodegroup提交共识交易
if !validNode(commitAddr, nodesMap) {
return errors.Wrapf(pt.ErrNodeNotForTheTitle, "not validNode:%s", commitAddr)
}
titleStatus, err := getTitle(a.db, calcTitleKey(commit.Title))
if err != nil {
return errors.Wrapf(err, "getTitle:%s", a.fromaddr)
return errors.Wrapf(err, "getTitle:%s", commitAddr)
}
if titleStatus.Height+1 == commit.Height && commit.Height > 0 && !pt.IsParaForkHeight(cfg, commit.MainBlockHeight, pt.ForkLoopCheckCommitTxDone) {
if !bytes.Equal(titleStatus.BlockHash, commit.PreBlockHash) {
clog.Error("paracross.Commit", "check PreBlockHash", common.ToHex(titleStatus.BlockHash),
"commit tx", common.ToHex(commit.PreBlockHash), "commitheit", commit.Height, "from", a.fromaddr)
"commit tx", common.ToHex(commit.PreBlockHash), "commitheit", commit.Height, "from", commitAddr)
return pt.ErrParaBlockHashNoMatch
}
}
......@@ -378,7 +379,7 @@ func (a *action) preCheckCommitInfo(commit *pt.ParacrossNodeStatus) error {
if !cfg.IsPara() {
blockHash, err := getBlockHash(a.api, commit.MainBlockHeight)
if err != nil {
clog.Error("paracross.Commit getBlockHash", "err", err, "commit tx height", commit.MainBlockHeight, "from", a.fromaddr)
clog.Error("paracross.Commit getBlockHash", "err", err, "commit tx height", commit.MainBlockHeight, "from", commitAddr)
return err
}
dbMainHash = blockHash.Hash
......@@ -386,7 +387,7 @@ func (a *action) preCheckCommitInfo(commit *pt.ParacrossNodeStatus) error {
} else {
block, err := getBlockInfo(a.api, commit.Height)
if err != nil {
clog.Error("paracross.Commit getBlockInfo", "err", err, "height", commit.Height, "from", a.fromaddr)
clog.Error("paracross.Commit getBlockInfo", "err", err, "height", commit.Height, "from", commitAddr)
return err
}
dbMainHash = block.MainHash
......@@ -397,7 +398,7 @@ func (a *action) preCheckCommitInfo(commit *pt.ParacrossNodeStatus) error {
if !bytes.Equal(dbMainHash, commit.MainBlockHash) && commit.Height > 0 {
clog.Error("paracross.Commit blockHash not match", "isMain", !cfg.IsPara(), "db", common.ToHex(dbMainHash),
"commit", common.ToHex(commit.MainBlockHash), "commitHeight", commit.Height,
"commitMainHeight", commit.MainBlockHeight, "from", a.fromaddr)
"commitMainHeight", commit.MainBlockHeight, "from", commitAddr)
return types.ErrBlockHashNoMatch
}
......@@ -414,12 +415,6 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
return receipt, err
}
}
err := a.preCheckCommitInfo(commit.Status)
if err != nil {
return nil, err
}
if commit.Bls != nil {
return a.commitBls(commit)
}
......@@ -450,6 +445,11 @@ func (a *action) commitBls(commit *pt.ParacrossCommitAction) (*types.Receipt, er
func (a *action) proCommitMsg(commit *pt.ParacrossNodeStatus, commitAddr string) (*types.Receipt, error) {
cfg := a.api.GetConfig()
err := a.preCheckCommitInfo(commit, commitAddr)
if err != nil {
return nil, err
}
nodes, _, err := a.getNodesGroup(commit.Title)
if err != nil {
return nil, err
......@@ -553,7 +553,7 @@ func (a *action) verifyBlsSign(commit *pt.ParacrossCommitAction) ([]string, erro
}
//1. 获取addr对应的bls 公钥
signAddrs := getAddrsByBitMap(nodesArry, commit.Bls.Addrs)
signAddrs := getAddrsByBitMap(nodesArry, commit.Bls.AddrsMap)
var pubs []string
for _, addr := range signAddrs {
pub, err := getAddrBlsPubKey(a.db, commit.Status.Title, addr)
......@@ -597,7 +597,7 @@ func (a *action) verifyBlsSign(commit *pt.ParacrossCommitAction) ([]string, erro
if !g2pubs.Verify(msg, aPub, sign) {
clog.Error("paracross.Commit bls sign verify", "title", commit.Status.Title, "height", commit.Status.Height,
"addrsMap", common.ToHex(commit.Bls.Addrs), "sign", common.ToHex(commit.Bls.Sign), "addr", signAddrs, "nodes", nodesArry)
"addrsMap", common.ToHex(commit.Bls.AddrsMap), "sign", common.ToHex(commit.Bls.Sign), "addr", signAddrs, "nodes", nodesArry)
clog.Error("paracross.commit bls sign verify", "data", common.ToHex(msg), "height", commit.Status.Height)
return nil, pt.ErrBlsSignVerify
}
......@@ -615,8 +615,8 @@ func (a *action) commitTxDone(nodeStatus *pt.ParacrossNodeStatus, stat *pt.Parac
}
commitCount := len(stat.Details.Addrs)
most, mostHash := getMostCommit(stat)
if !isCommitDone(nodes, most) {
most, mostHash := GetMostCommit(stat.Details.BlockHash)
if !isCommitDone(len(nodes), most) {
return receipt, nil
}
clog.Debug("paracross.Commit commit ----pass", "most", most, "mostHash", common.ToHex([]byte(mostHash)))
......@@ -794,8 +794,8 @@ func (a *action) commitTxDoneByStat(stat *pt.ParacrossHeightStatus, titleStatus
updateCommitAddrs(stat, nodes)
commitCount := len(stat.Details.Addrs)
most, mostHash := getMostCommit(stat)
if !isCommitDone(nodes, most) {
most, mostHash := GetMostCommit(stat.Details.BlockHash)
if !isCommitDone(len(nodes), most) {
return nil, nil
}
clog.Debug("paracross.commitTxDoneByStat ----pass", "most", most, "mostHash", common.ToHex([]byte(mostHash)))
......
......@@ -274,7 +274,7 @@ func (a *action) stageVote(config *pt.ConfigVoteInfo) (*types.Receipt, error) {
stat.Votes = updateVotes(stat.Votes, nodes)
most, vote := getMostVote(stat.Votes)
if !isCommitDone(nodes, most) {
if !isCommitDone(len(nodes), most) {
return makeStageConfigReceipt(copyStat, stat), nil
}
clog.Info("paracross.stageVote ----pass", "most", most, "pass", vote)
......
......@@ -587,7 +587,7 @@ func (a *action) nodeVote(config *pt.ParaNodeAddrConfig) (*types.Receipt, error)
stat.Votes = updateVotes(stat.Votes, nodes)
most, vote := getMostVote(stat.Votes)
if !isCommitDone(nodes, most) {
if !isCommitDone(len(nodes), most) {
superManagerPass, err := a.checkIsSuperManagerVote(config, nodes)
if err != nil {
return nil, err
......
......@@ -267,7 +267,8 @@ message ReplyQuerySelfStages {
message ParacrossCommitBlsInfo {
bytes sign = 1;
bytes addrs = 2; //addrs' bitmap
bytes addrsMap = 2; //addrs' bitmap
repeated string addrs = 3; //addr's array
}
message ParacrossCommitAction {
......@@ -414,6 +415,39 @@ message ParaLocalDbBlockInfo {
repeated string txs = 6;
}
message ParaBlsSignSumDetails {
repeated string addrs = 1;
repeated bytes msgs = 2;
repeated bytes signs = 3;
int64 height = 4;
}
message ParaBlsSignSumInfo {
repeated ParaBlsSignSumDetails info = 1;
}
message ElectionMsg {
string toID = 1; //target id
string peerID = 2; //src id
int32 type = 3;
bytes data = 4;
}
message ParaP2PSubMsg {
int32 ty = 1;
oneof value {
Transaction commitTx = 10;
ElectionMsg election = 11;
}
}
message ElectionStatus {
bool isLeader = 1;
string leaderId = 2;
}
service paracross {
rpc IsSync(ReqNil) returns (IsCaughtUp) {}
}
\ No newline at end of file
......@@ -52,3 +52,41 @@ func (c *Jrpc) GetParaLocalBlockInfo(in *types.ReqInt, result *interface{}) erro
*result = data
return nil
}
// GetParaLocalBlockInfo query para chain the download layer's local height
func (c *channelClient) GetParaNodeLeaderInfo(ctx context.Context, in *types.ReqNil) (*pt.ElectionStatus, error) {
data, err := c.QueryConsensusFunc("para", "LeaderInfo", in)
if err != nil {
return nil, err
}
return data.(*pt.ElectionStatus), nil
}
// GetParaLocalBlockInfo query para local height
func (c *Jrpc) GetParaNodeLeaderInfo(in *types.ReqNil, result *interface{}) error {
data, err := c.cli.GetParaNodeLeaderInfo(context.Background(), in)
if err != nil {
return err
}
*result = data
return nil
}
// GetParaLocalBlockInfo query para chain the download layer's local height
func (c *channelClient) GetParaCmtTxInfo(ctx context.Context, in *types.ReqNil) (*pt.ParaBlsSignSumInfo, error) {
data, err := c.QueryConsensusFunc("para", "CommitTxInfo", in)
if err != nil {
return nil, err
}
return data.(*pt.ParaBlsSignSumInfo), nil
}
// GetParaLocalBlockInfo query para local height
func (c *Jrpc) GetParaCmtTxInfo(in *types.ReqNil, result *interface{}) error {
data, err := c.cli.GetParaCmtTxInfo(context.Background(), in)
if err != nil {
return err
}
*result = data
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment