Commit b134b423 authored by madengji's avatar madengji Committed by 33cn

update bls sign leader function

parent ebe61688
......@@ -47,12 +47,16 @@ enableReduceLocaldb=true
enablePushSubscribe=false
[p2p]
enable=false
types=["dht"]
enable=true
driver="leveldb"
dbPath="paradatadir/addrbook"
dbCache=4
grpcLogFile="grpc33.log"
[p2p.sub.dht]
isSeed=true
[rpc]
# 避免与主链配置冲突
......@@ -71,7 +75,7 @@ maxTxNumPerAccount=10000
[consensus]
name="para"
genesisBlockTime=1514533394
genesisBlockTime=1514533390
genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
minerExecs=["paracross"] #配置挖矿合约
......
package para
import (
"errors"
"sync"
"time"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
)
const (
ELECTION = iota
COORDINATOR
OK
CLOSE
)
// Bully is a `struct` representing a single node used by the `Bully Algorithm`.
//
// NOTE: More details about the `Bully algorithm` can be found here
// https://en.wikipedia.org/wiki/Bully_algorithm .
type Bully struct {
ID string
coordinator string
nodegroup map[string]bool
inNodeGroup bool
mu *sync.RWMutex
receiveChan chan *pt.ElectionMsg
electionChan chan *pt.ElectionMsg
paraClient *client
qClient queue.Client
wg *sync.WaitGroup
quit chan struct{}
}
func NewBully(para *client, ID string, wg *sync.WaitGroup) (*Bully, error) {
b := &Bully{
paraClient: para,
ID: ID,
nodegroup: make(map[string]bool),
electionChan: make(chan *pt.ElectionMsg, 1),
receiveChan: make(chan *pt.ElectionMsg),
wg: wg,
mu: &sync.RWMutex{},
quit: make(chan struct{}),
}
return b, nil
}
func (b *Bully) SetParaAPI(cli queue.Client) {
b.qClient = cli
}
func (b *Bully) UpdateValidNodes(nodes []string) {
b.mu.Lock()
defer b.mu.Unlock()
b.nodegroup = make(map[string]bool)
for _, n := range nodes {
plog.Info("bully node update", "node", n)
b.nodegroup[n] = true
}
//退出nodegroup
if b.inNodeGroup && !b.nodegroup[b.ID] {
_ = b.Send("", CLOSE, nil)
}
b.inNodeGroup = b.nodegroup[b.ID]
}
func (b *Bully) isValidNode(ID string) bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.nodegroup[ID]
}
func (b *Bully) Close() {
close(b.quit)
}
func (b *Bully) Receive(msg *pt.ElectionMsg) {
plog.Info("bully rcv", "type", msg.Type)
switch msg.Type {
case CLOSE:
if msg.PeerID == b.Coordinator() {
b.SetCoordinator(b.ID)
b.Elect()
}
case OK:
if msg.ToID != b.ID {
return
}
select {
case b.electionChan <- msg:
return
case <-time.After(200 * time.Millisecond):
return
}
default:
b.receiveChan <- msg
}
}
func (b *Bully) sendMsg(ty int64, data interface{}) error {
msg := b.qClient.NewMessage("p2p", ty, data)
err := b.qClient.Send(msg, true)
if err != nil {
return err
}
resp, err := b.qClient.Wait(msg)
if err != nil {
return err
}
if resp.GetData().(*types.Reply).IsOk {
return nil
}
return errors.New(string(resp.GetData().(*types.Reply).GetMsg()))
}
func (b *Bully) Send(toId string, msgTy int32, data []byte) error {
act := &pt.ParaP2PSubMsg{Ty: P2pSubElectMsg}
act.Value = &pt.ParaP2PSubMsg_Election{Election: &pt.ElectionMsg{ToID: toId, PeerID: b.ID, Type: msgTy, Data: data}}
plog.Info("bull sendmsg")
err := b.paraClient.SendPubP2PMsg(types.Encode(act))
//err := b.sendMsg(types.EventPubTopicMsg, &types.PublishTopicMsg{Topic: "consensus", Msg: types.Encode(act)})
plog.Info("bully ret")
return err
}
// SetCoordinator sets `ID` as the new `b.coordinator` if `ID` is greater than
// `b.coordinator` or equal to `b.ID`.
func (b *Bully) SetCoordinator(ID string) {
b.mu.Lock()
defer b.mu.Unlock()
if ID > b.coordinator || ID == b.ID {
b.coordinator = ID
}
}
// Coordinator returns `b.coordinator`.
//
// NOTE: This function is thread-safe.
func (b *Bully) Coordinator() string {
b.mu.RLock()
defer b.mu.RUnlock()
return b.coordinator
}
func (b *Bully) IsSelfCoordinator() bool {
b.mu.RLock()
defer b.mu.RUnlock()
return b.ID == b.coordinator
}
// Elect handles the leader election mechanism of the `Bully algorithm`.
func (b *Bully) Elect() {
_ = b.Send("", ELECTION, nil)
select {
case <-b.electionChan:
return
case <-time.After(time.Second):
b.SetCoordinator(b.ID)
_ = b.Send("", COORDINATOR, nil)
return
}
}
func (b *Bully) Run() {
defer b.wg.Done()
var feedDog = false
var feedDogTiker <-chan time.Time
var watchDogTiker <-chan time.Time
plog.Info("bully init")
onceTimer := time.NewTimer(time.Minute)
out:
for {
select {
case msg := <-b.receiveChan:
switch msg.Type {
case ELECTION:
if msg.PeerID < b.ID {
_ = b.Send(msg.PeerID, OK, nil)
b.Elect()
}
case COORDINATOR:
if !b.isValidNode(msg.PeerID) {
continue
}
b.SetCoordinator(msg.PeerID)
if b.coordinator < b.ID {
b.Elect()
}
feedDog = true
}
case <-onceTimer.C:
feedDogTiker = time.NewTicker(20 * time.Second).C
watchDogTiker = time.NewTicker(time.Minute).C
case <-feedDogTiker:
plog.Info("bully feed dog tiker", "is", b.IsSelfCoordinator(), "valid", b.isValidNode(b.ID))
//leader需要定期喂狗
if b.IsSelfCoordinator() && b.isValidNode(b.ID) {
_ = b.Send("", COORDINATOR, nil)
}
case <-watchDogTiker:
//至少1分钟内要收到leader喂狗消息,否则认为leader挂了,重新选举
if !feedDog {
b.Elect()
plog.Info("bully watchdog triger")
}
feedDog = false
case <-b.quit:
break out
}
}
}
# para bully algorithm
#1. 广播
1. 新节点起来后会监听leader节点的喂狗消息, 超时未收到,则发起选举流程
1. 如果收到消息后,发现比自己小,则发起选举流程
1. 如果本节点不是最大节点,则会收到比自己大的节点的OK回复,并自己大的节点整个网络发起选举流程,最后收到leader的通知
1. 如果本节点是最大节点,则不会收到ok,则最后确定自己是leader
1. leader节点会定期广播宣示主权的喂狗消息,如果有节点超过一定时间未收到喂狗消息,则发起选举,比非leader节点定期发心跳消息要少
1. leader节点会查看自己是否在nodegroup里面,如果不在,则发送close消息,触发重新选举 
......@@ -15,10 +15,10 @@ import (
"sync/atomic"
"errors"
"time"
"github.com/33cn/chain33/client/api"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/merkle"
"github.com/33cn/chain33/queue"
......@@ -28,6 +28,7 @@ import (
"github.com/33cn/chain33/types"
paracross "github.com/33cn/plugin/plugin/dapp/paracross/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
"github.com/pkg/errors"
)
const (
......@@ -40,12 +41,6 @@ const (
poolMainBlockSec int64 = 5
defaultEmptyBlockInterval int64 = 50 //write empty block every interval blocks in mainchain
defaultSearchMatchedBlockDepth int32 = 10000
defaultParaBlsSignTopic = "BLS-SIGN"
)
const (
P2pSubCommitTx = iota
P2pSubElectMsg
)
var (
......@@ -58,10 +53,6 @@ func init() {
drivers.QueryData.Register("para", &client{})
}
type ClientInter interface {
SendPubP2PMsg(data []byte) error
}
type client struct {
*drivers.BaseClient
grpcClient types.Chain33Client
......@@ -73,19 +64,18 @@ type client struct {
jumpDldCli *jumpDldClient
minerPrivateKey crypto.PrivKey
wg sync.WaitGroup
cfg *types.Consensus
subCfg *subConfig
dldCfg *downloadClient
bullyCli *Bully
blsSignCli *blsClient
isClosed int32
quitCreate chan struct{}
quit chan struct{}
}
type subConfig struct {
WriteBlockSeconds int64 `json:"writeBlockSeconds,omitempty"`
ParaRemoteGrpcClient string `json:"paraRemoteGrpcClient,omitempty"`
StartHeight int64 `json:"startHeight,omitempty"`
GenesisBlockTime int64 `json:"genesisBlockTime,omitempty"`
GenesisStartHeightSame bool `json:"genesisStartHeightSame,omitempty"`
EmptyBlockInterval []string `json:"emptyBlockInterval,omitempty"`
AuthAccount string `json:"authAccount,omitempty"`
......@@ -104,6 +94,7 @@ type subConfig struct {
RmCommitParamMainHeight int64 `json:"rmCommitParamMainHeight,omitempty"`
JumpDownloadClose bool `json:"jumpDownloadClose,omitempty"`
BlsSignOff bool `json:"blsSignOff,omitempty"`
BlsLeaderSwitchInt int32 `json:"blsLeaderSwitchInt,omitempty"`
}
// New function to init paracross env
......@@ -121,10 +112,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
subcfg.WriteBlockSeconds = poolMainBlockSec
}
if subcfg.GenesisBlockTime <= 0 {
subcfg.GenesisBlockTime = defaultGenesisBlockTime
}
emptyInterval, err := parseEmptyBlockInterval(subcfg.EmptyBlockInterval)
if err != nil {
panic("para EmptyBlockInterval config not correct")
......@@ -157,65 +144,22 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
para := &client{
BaseClient: c,
minerPrivateKey: priKey,
cfg: cfg,
subCfg: &subcfg,
quitCreate: make(chan struct{}),
quit: make(chan struct{}),
}
para.dldCfg = &downloadClient{}
para.dldCfg.emptyInterval = append(para.dldCfg.emptyInterval, emptyInterval...)
para.commitMsgClient = &commitMsgClient{
paraClient: para,
authAccount: subcfg.AuthAccount,
waitMainBlocks: waitBlocks4CommitMsg,
waitConsensStopTimes: waitConsensStopTimes,
consensHeight: -2,
sendingHeight: -1,
consensDoneHeight: -1,
resetCh: make(chan interface{}, 1),
quit: make(chan struct{}),
}
if subcfg.WaitBlocks4CommitMsg > 0 {
para.commitMsgClient.waitMainBlocks = subcfg.WaitBlocks4CommitMsg
}
if subcfg.WaitConsensStopTimes > 0 {
para.commitMsgClient.waitConsensStopTimes = subcfg.WaitConsensStopTimes
}
// 设置平行链共识起始高度,在共识高度为-1也就是从未共识过的环境中允许从设置的非0起始高度开始共识
//note:只有在主链LoopCheckCommitTxDoneForkHeight之后才支持设置ParaConsensStartHeight
if subcfg.ParaConsensStartHeight > 0 {
para.commitMsgClient.consensDoneHeight = subcfg.ParaConsensStartHeight - 1
}
para.blockSyncClient = &blockSyncClient{
paraClient: para,
notifyChan: make(chan bool, 1),
quitChan: make(chan struct{}),
maxCacheCount: defaultMaxCacheCount,
maxSyncErrCount: defaultMaxSyncErrCount,
}
if subcfg.MaxCacheCount > 0 {
para.blockSyncClient.maxCacheCount = subcfg.MaxCacheCount
}
if subcfg.MaxSyncErrCount > 0 {
para.blockSyncClient.maxSyncErrCount = subcfg.MaxSyncErrCount
}
para.commitMsgClient = newCommitMsgCli(para, &subcfg)
para.blockSyncClient = newBlockSyncCli(para, &subcfg)
para.multiDldCli = newMultiDldCli(para, &subcfg)
para.jumpDldCli = &jumpDldClient{paraClient: para}
para.jumpDldCli = newJumpDldCli(para, &subcfg)
if len(subcfg.AuthAccount) > 0 {
para.blsSignCli = newBlsClient(para, &subcfg)
cli, err := NewBully(para, subcfg.AuthAccount, &para.wg)
if err != nil {
panic("bully create err")
}
para.bullyCli = cli
para.bullyCli.SetParaAPI(para.GetQueueClient())
}
para.blsSignCli = newBlsClient(para, &subcfg)
c.SetChild(para)
return para
......@@ -230,12 +174,9 @@ func (client *client) CheckBlock(parent *types.Block, current *types.BlockDetail
func (client *client) Close() {
atomic.StoreInt32(&client.isClosed, 1)
close(client.commitMsgClient.quit)
close(client.quitCreate)
close(client.quit)
close(client.blockSyncClient.quitChan)
if len(client.subCfg.AuthAccount) > 0 {
close(client.blsSignCli.quit)
client.bullyCli.Close()
}
close(client.blsSignCli.quit)
client.wg.Wait()
......@@ -262,12 +203,9 @@ func (client *client) SetQueueClient(c queue.Client) {
client.wg.Add(1)
go client.blockSyncClient.syncBlocks()
if len(client.subCfg.AuthAccount) > 0 {
client.wg.Add(1)
go client.blsSignCli.procRcvSignTxs()
client.wg.Add(1)
go client.bullyCli.Run()
}
client.wg.Add(2)
go client.blsSignCli.procAggregateTxs()
go client.blsSignCli.procLeaderSync()
}
......@@ -301,7 +239,10 @@ func (client *client) InitBlock() {
// 创世区块
newblock := &types.Block{}
newblock.Height = 0
newblock.BlockTime = client.subCfg.GenesisBlockTime
newblock.BlockTime = defaultGenesisBlockTime
if client.cfg.GenesisBlockTime > 0 {
newblock.BlockTime = client.cfg.GenesisBlockTime
}
newblock.ParentHash = zeroHash[:]
newblock.MainHash = mainHash
......@@ -383,19 +324,26 @@ func (client *client) CreateGenesisTx() (ret []*types.Transaction) {
func (client *client) ProcEvent(msg *queue.Message) bool {
if msg.Ty == types.EventReceiveSubData {
if req, ok := msg.GetData().(*types.TopicData); ok {
var sub pt.ParaP2PSubMsg
err := types.Decode(req.Data, &sub)
if err != nil {
plog.Error("paracross ProcEvent decode", "ty", types.EventReceiveSubData)
return true
}
plog.Info("paracross Recv from", req.GetFrom(), "topic:", req.GetTopic(), "ty", sub.GetTy())
plog.Info("paracross ProcEvent from", "from", req.GetFrom(), "topic:", req.GetTopic(), "ty", sub.GetTy())
switch sub.GetTy() {
case P2pSubCommitTx:
client.blsSignCli.rcvCommitTx(sub.GetCommitTx())
case P2pSubElectMsg:
client.bullyCli.Receive(sub.GetElection())
err := client.blsSignCli.rcvCommitTx(sub.GetCommitTx())
if err != nil {
plog.Error("paracross ProcEvent commit tx", "err", err, "txhash", common.ToHex(sub.GetCommitTx().Hash()), "from", sub.GetCommitTx().From())
}
case P2pSubLeaderSyncMsg:
err := client.blsSignCli.rcvLeaderSyncTx(sub.GetSyncMsg())
if err != nil {
plog.Info("paracross ProcEvent leader sync msg", "err", err)
}
default:
plog.Error("paracross ProcEvent not support", "ty", sub.GetTy())
}
} else {
......@@ -408,35 +356,40 @@ func (client *client) ProcEvent(msg *queue.Message) bool {
return false
}
func (client *client) sendP2PMsg(ty int64, data interface{}) error {
func (client *client) sendP2PMsg(ty int64, data interface{}) ([]byte, error) {
msg := client.GetQueueClient().NewMessage("p2p", ty, data)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return err
return nil, errors.Wrapf(err, "ty=%d", ty)
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return err
return nil, errors.Wrapf(err, "wait ty=%d", ty)
}
if resp.GetData().(*types.Reply).IsOk {
return nil
return resp.GetData().(*types.Reply).Msg, nil
}
return errors.New(string(resp.GetData().(*types.Reply).GetMsg()))
return nil, errors.Wrapf(types.ErrInvalidParam, "resp msg=%s", string(resp.GetData().(*types.Reply).GetMsg()))
}
func (client *client) SendPubP2PMsg(msg []byte) error {
data := &types.PublishTopicMsg{Topic: "consensus", Msg: msg}
return client.sendP2PMsg(types.EventPubTopicMsg, data)
// p2p订阅消息
func (client *client) SendPubP2PMsg(topic string, msg []byte) error {
data := &types.PublishTopicMsg{Topic: topic, Msg: msg}
_, err := client.sendP2PMsg(types.EventPubTopicMsg, data)
return err
}
func (client *client) subP2PTopic() error {
return client.sendP2PMsg(types.EventSubTopic, &types.SubTopic{Module: "consensus", Topic: defaultParaBlsSignTopic})
func (client *client) SendSubP2PTopic(topic string) error {
data := &types.SubTopic{Topic: topic, Module: "consensus"}
_, err := client.sendP2PMsg(types.EventSubTopic, data)
return err
}
//TODO 本节点退出时候会自动removeTopic吗
func (client *client) rmvP2PTopic() error {
return client.sendP2PMsg(types.EventRemoveTopic, &types.RemoveTopic{Module: "consensus", Topic: defaultParaBlsSignTopic})
func (client *client) SendRmvP2PTopic(topic string) error {
data := &types.RemoveTopic{Topic: topic, Module: "consensus"}
_, err := client.sendP2PMsg(types.EventRemoveTopic, data)
return err
}
......
......@@ -8,7 +8,9 @@ import (
"bytes"
"math/big"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/33cn/chain33/common"
......@@ -19,19 +21,33 @@ import (
)
const (
maxRcvTxCount = 100 //max 100 nodes, 1 height tx or 1 txgroup per node
maxRcvTxCount = 100 //channel buffer, max 100 nodes, 1 height tx or 1 txgroup per node
leaderSyncInt = 15 //15s heartbeat sync interval
defLeaderSwitchInt = 100 //每隔100个共识高度切换一次leader,大约6小时(按50个空块间隔计算)
defaultParaBlsSignTopic = "PARA-BLS-SIGN-TOPIC"
)
//新增需要保证顺序
const (
P2pSubCommitTx = 1
P2pSubLeaderSyncMsg = 2
)
type blsClient struct {
paraClient *client
selfID string
blsPriKey *g2pubs.SecretKey
blsPubKey *g2pubs.PublicKey
peers map[string]bool
peersBlsPubKey map[string]*g2pubs.PublicKey
txsBuff map[int64]*pt.ParaBlsSignSumDetails
rcvCommitTxCh chan []*pt.ParacrossCommitAction
quit chan struct{}
paraClient *client
selfID string
blsPriKey *g2pubs.SecretKey
blsPubKey *g2pubs.PublicKey
peers map[string]bool
peersBlsPubKey map[string]*g2pubs.PublicKey
commitsPool map[int64]*pt.ParaBlsSignSumDetails
rcvCommitTxCh chan []*pt.ParacrossCommitAction
leaderOffset int32
leaderSwitchInt int32
feedDog uint32
quit chan struct{}
mutex sync.Mutex
}
func newBlsClient(para *client, cfg *subConfig) *blsClient {
......@@ -39,107 +55,228 @@ func newBlsClient(para *client, cfg *subConfig) *blsClient {
b.selfID = cfg.AuthAccount
b.peers = make(map[string]bool)
b.peersBlsPubKey = make(map[string]*g2pubs.PublicKey)
b.txsBuff = make(map[int64]*pt.ParaBlsSignSumDetails)
b.commitsPool = make(map[int64]*pt.ParaBlsSignSumDetails)
b.rcvCommitTxCh = make(chan []*pt.ParacrossCommitAction, maxRcvTxCount)
b.quit = make(chan struct{})
b.leaderSwitchInt = defLeaderSwitchInt
if cfg.BlsLeaderSwitchInt > 0 {
b.leaderSwitchInt = cfg.BlsLeaderSwitchInt
}
return b
}
//1. 要等到达成共识了才发送,不然处理未达成共识的各种场景会比较复杂,而且浪费手续费
func (b *blsClient) procRcvSignTxs() {
/*
1. 当前的leaderIndex和自己在nodegroup里面index一致,则自己是leader,负责发送共识交易
2. 当前leader负责每15s 发送一个喂狗消息,表明自己live
3. 每个node开启watchdog定时器,如果超时,则leaderIndex++, 寻找新的活的leader
4. node一旦收到新的喂狗消息,则把新消息的index更新为自己的leaderIndex, 如果和自己的leaderIndex冲突,则选大者
5. 每隔比如100个共识高度,就需要轮换leader,触发leaderIndex++,leader均衡轮换发送共识交易
*/
func (b *blsClient) procLeaderSync() {
defer b.paraClient.wg.Done()
if len(b.selfID) <= 0 {
return
}
p2pTimer := time.NewTimer(time.Minute)
var feedDogTicker <-chan time.Time
var watchDogTicker <-chan time.Time
p2pTimer := time.After(time.Minute)
out:
for {
select {
case commits := <-b.rcvCommitTxCh:
collectSigns(b.txsBuff, commits)
nodes := b.paraClient.commitMsgClient.authNodes.Load().([]string)
if !isMostCommitDone(len(nodes), b.txsBuff) {
continue
case <-feedDogTicker:
//leader需要定期喂狗
_, _, base, off, isLeader := b.getLeaderInfo()
if isLeader {
act := &pt.ParaP2PSubMsg{Ty: P2pSubLeaderSyncMsg}
act.Value = &pt.ParaP2PSubMsg_SyncMsg{SyncMsg: &pt.LeaderSyncInfo{ID: b.selfID, BaseIdx: base, Offset: off}}
err := b.paraClient.SendPubP2PMsg(defaultParaBlsSignTopic, types.Encode(act))
if err != nil {
plog.Error("para.procLeaderSync feed dog", "err", err)
}
plog.Info("procLeaderSync feed dog", "id", b.selfID, "base", base, "off", off)
}
//清空txsBuff,重新收集
txsBuff := b.txsBuff
b.txsBuff = make(map[int64]*pt.ParaBlsSignSumDetails)
//自己是Coordinator,则聚合交易
if b.paraClient.bullyCli.IsSelfCoordinator() {
dones := filterDoneCommits(len(nodes), txsBuff)
if len(dones) > 0 {
continue
}
acts, err := b.transferCommit2Action(dones)
if err != nil {
case <-watchDogTicker:
//至少1分钟内要收到leader喂狗消息,否则认为leader挂了,index++
if atomic.LoadUint32(&b.feedDog) == 0 {
nodes, leader, _, off, _ := b.getLeaderInfo()
if len(nodes) <= 0 {
continue
}
b.paraClient.commitMsgClient.sendCommitActions(acts)
atomic.StoreInt32(&b.leaderOffset, (off+1)%int32(len(nodes)))
plog.Info("procLeaderSync watchdog", "fail node", nodes[leader], "newOffset", atomic.LoadInt32(&b.leaderOffset))
}
case <-p2pTimer.C:
if len(b.selfID) > 0 {
//tle := cfg.GetTitle()
plog.Info("send p2p topic------------------------------")
b.paraClient.subP2PTopic()
plog.Info("rcv p2p topic-------------------------------")
atomic.StoreUint32(&b.feedDog, 0)
case <-p2pTimer:
err := b.paraClient.SendSubP2PTopic(defaultParaBlsSignTopic)
if err != nil {
plog.Error("procLeaderSync.SubP2PTopic", "err", err)
p2pTimer = time.After(time.Minute)
continue
}
feedDogTicker = time.NewTicker(leaderSyncInt * time.Second).C
watchDogTicker = time.NewTicker(time.Minute).C
case <-b.quit:
break out
}
}
}
//处理leader sync tx, 需接受同步的数据,两个节点基本的共识高度相同, 两个共同leader需相同
func (b *blsClient) rcvLeaderSyncTx(sync *pt.LeaderSyncInfo) error {
nodes, _, base, off, isLeader := b.getLeaderInfo()
if len(nodes) <= 0 {
return errors.Wrapf(pt.ErrParaNodeGroupNotSet, "id=%s", b.selfID)
}
syncLeader := (sync.BaseIdx + sync.Offset) % int32(len(nodes))
//接受同步数据需要两个节点基本的共识高度相同, 两个共同leader需相同
if sync.BaseIdx != base || nodes[syncLeader] != sync.ID {
return errors.Wrapf(types.ErrNotSync, "peer base=%d,id=%s,self.Base=%d,id=%s", sync.BaseIdx, sync.ID, base, nodes[syncLeader])
}
//如果leader节点冲突,取大者
if isLeader && off > sync.Offset {
return errors.Wrapf(types.ErrNotSync, "self leader off=%d bigger than sync=%d", off, sync.Offset)
}
//更新同步过来的最新offset 高度
atomic.StoreInt32(&b.leaderOffset, sync.Offset)
//两节点不同步则不喂狗,以防止非同步或作恶节点喂狗
atomic.StoreUint32(&b.feedDog, 1)
return nil
}
func (b *blsClient) rcvCommitTx(tx *types.Transaction) error {
if !tx.CheckSign() {
return types.ErrSign
func (b *blsClient) getLeaderInfo() ([]string, int32, int32, int32, bool) {
nodes, _ := b.getSuperNodes()
if len(nodes) <= 0 {
return nil, 0, 0, 0, false
}
h := b.paraClient.commitMsgClient.getConsensusHeight()
//间隔的除数再根据nodes取余数,平均覆盖所有节点
baseIdx := int32((h / int64(b.leaderSwitchInt)) % int64(len(nodes)))
offIdx := atomic.LoadInt32(&b.leaderOffset)
leaderIdx := (baseIdx + offIdx) % int32(len(nodes))
return nodes, leaderIdx, baseIdx, offIdx, nodes[leaderIdx] == b.selfID
}
func (b *blsClient) getSuperNodes() ([]string, string) {
nodeStr, err := b.paraClient.commitMsgClient.getNodeGroupAddrs()
if err != nil {
return nil, ""
}
return strings.Split(nodeStr, ","), nodeStr
}
func (b *blsClient) isValidNodes(id string) bool {
_, nodes := b.getSuperNodes()
return strings.Contains(nodes, id)
}
//1. 要等到达成共识了才发送,不然处理未达成共识的各种场景会比较复杂,而且浪费手续费
func (b *blsClient) procAggregateTxs() {
defer b.paraClient.wg.Done()
if len(b.selfID) <= 0 {
return
}
if !b.paraClient.commitMsgClient.isValidNode(tx.From()) {
out:
for {
select {
case commits := <-b.rcvCommitTxCh:
b.mutex.Lock()
integrateCommits(b.commitsPool, commits)
//commitsPool里面任一高度满足共识,则认为done
nodes, _ := b.getSuperNodes()
if !isMostCommitDone(len(nodes), b.commitsPool) {
b.mutex.Unlock()
continue
}
//自己是Leader,则聚合并发送交易
_, _, _, _, isLeader := b.getLeaderInfo()
if isLeader {
b.sendAggregateTx(nodes)
}
//清空txsBuff,重新收集
b.commitsPool = make(map[int64]*pt.ParaBlsSignSumDetails)
b.mutex.Unlock()
case <-b.quit:
break out
}
}
}
func (b *blsClient) sendAggregateTx(nodes []string) error {
dones := filterDoneCommits(len(nodes), b.commitsPool)
plog.Info("sendAggregateTx filterDone", "commits", len(dones))
if len(dones) <= 0 {
return nil
}
acts, err := b.AggregateCommit2Action(nodes, dones)
if err != nil {
plog.Error("sendAggregateTx AggregateCommit2Action", "err", err)
return err
}
b.paraClient.commitMsgClient.sendCommitActions(acts)
return nil
}
func (b *blsClient) rcvCommitTx(tx *types.Transaction) error {
if !b.isValidNodes(tx.From()) {
b.updatePeers(tx.From(), false)
return pt.ErrParaNodeAddrNotExisted
}
b.updatePeers(tx.From(), true)
txs := []*types.Transaction{tx}
if count := tx.GetGroupCount(); count > 0 {
group, err := tx.GetTxGroup()
if err != nil {
return err
return errors.Wrap(err, "GetTxGroup")
}
txs = group.Txs
}
commits, err := b.getCommitInfo(txs)
commits, err := b.checkCommitTx(txs)
if err != nil {
return err
return errors.Wrap(err, "checkCommitTx")
}
b.updatePeers(tx.From(), true)
b.rcvCommitTxCh <- commits
return nil
}
func (b *blsClient) getCommitInfo(txs []*types.Transaction) ([]*pt.ParacrossCommitAction, error) {
func (b *blsClient) checkCommitTx(txs []*types.Transaction) ([]*pt.ParacrossCommitAction, error) {
var commits []*pt.ParacrossCommitAction
for _, tx := range txs {
//验签
if !tx.CheckSign() {
return nil, errors.Wrapf(types.ErrSign, "hash=%s", common.ToHex(tx.Hash()))
}
var act pt.ParacrossAction
err := types.Decode(tx.Payload, &act)
if err != nil {
return nil, errors.Wrap(err, "decode act")
}
if act.Ty != pt.ParacrossActionCommit {
return nil, types.ErrInvalidParam
return nil, errors.Wrapf(types.ErrInvalidParam, "act ty=%d", act.Ty)
}
//交易签名和bls签名用户一致
commit := act.GetCommit()
if tx.From() != commit.Bls.Addrs[0] {
return nil, types.ErrFromAddr
return nil, errors.Wrapf(types.ErrFromAddr, "from=%s,bls addr=%s", tx.From(), commit.Bls.Addrs[0])
}
//验证bls 签名
err = b.verifyBlsSign(tx.From(), commit)
if err != nil {
return nil, pt.ErrBlsSignVerify
return nil, errors.Wrapf(pt.ErrBlsSignVerify, "from=%s", tx.From())
}
commits = append(commits, commit)
}
......@@ -147,12 +284,13 @@ func (b *blsClient) getCommitInfo(txs []*types.Transaction) ([]*pt.ParacrossComm
return commits, nil
}
func collectSigns(txsBuff map[int64]*pt.ParaBlsSignSumDetails, commits []*pt.ParacrossCommitAction) {
//整合相同高度commits
func integrateCommits(pool map[int64]*pt.ParaBlsSignSumDetails, commits []*pt.ParacrossCommitAction) {
for _, cmt := range commits {
if _, ok := txsBuff[cmt.Status.Height]; !ok {
txsBuff[cmt.Status.Height] = &pt.ParaBlsSignSumDetails{Height: cmt.Status.Height}
if _, ok := pool[cmt.Status.Height]; !ok {
pool[cmt.Status.Height] = &pt.ParaBlsSignSumDetails{Height: cmt.Status.Height}
}
a := txsBuff[cmt.Status.Height]
a := pool[cmt.Status.Height]
for i, v := range a.Addrs {
//节点更新交易参数的场景
if v == cmt.Bls.Addrs[0] {
......@@ -167,7 +305,12 @@ func collectSigns(txsBuff map[int64]*pt.ParaBlsSignSumDetails, commits []*pt.Par
}
}
//txBuff中任一高度满足done则认为ok,有可能某些未达成的高度是冗余的,达成共识的高度发给链最终判决
func isMostCommitDone(peers int, txsBuff map[int64]*pt.ParaBlsSignSumDetails) bool {
if peers <= 0 {
return false
}
for i, v := range txsBuff {
most, _ := getMostCommit(v.Msgs)
if isCommitDone(peers, most) {
......@@ -178,18 +321,19 @@ func isMostCommitDone(peers int, txsBuff map[int64]*pt.ParaBlsSignSumDetails) bo
return false
}
//找出共识并达到2/3的commits, 并去除与共识不同的commits,为后面聚合签名做准备
func filterDoneCommits(peers int, txs map[int64]*pt.ParaBlsSignSumDetails) []*pt.ParaBlsSignSumDetails {
var seq []int64
for i, v := range txs {
most, hash := getMostCommit(v.Msgs)
if !isCommitDone(peers, most) {
plog.Info("blssign.filterDoneCommits not commit done", "height", i)
plog.Debug("blssign.filterDoneCommits not commit done", "height", i)
delete(txs, i)
continue
}
seq = append(seq, i)
//只保留相同的commits
//只保留与most相同的commits做聚合签名使用
a := &pt.ParaBlsSignSumDetails{Msgs: [][]byte{[]byte(hash)}}
for j, m := range v.Msgs {
if bytes.Equal([]byte(hash), m) {
......@@ -201,12 +345,12 @@ func filterDoneCommits(peers int, txs map[int64]*pt.ParaBlsSignSumDetails) []*pt
}
if len(seq) <= 0 {
plog.Info("blssign.filterDoneCommits nil")
return nil
}
//从低到高找出连续的commits
sort.Slice(seq, func(i, j int) bool { return seq[i] < seq[j] })
plog.Info("blssign.filterDoneCommits", "seq", seq)
plog.Debug("blssign.filterDoneCommits", "seq", seq)
var signs []*pt.ParaBlsSignSumDetails
//共识高度要连续,不连续则退出
lastSeq := seq[0] - 1
......@@ -221,22 +365,22 @@ func filterDoneCommits(peers int, txs map[int64]*pt.ParaBlsSignSumDetails) []*pt
}
func (b *blsClient) transferCommit2Action(commits []*pt.ParaBlsSignSumDetails) ([]*pt.ParacrossCommitAction, error) {
//聚合多个签名为一个签名,并设置地址bitmap
func (b *blsClient) AggregateCommit2Action(nodes []string, commits []*pt.ParaBlsSignSumDetails) ([]*pt.ParacrossCommitAction, error) {
var notify []*pt.ParacrossCommitAction
for _, v := range commits {
a := &pt.ParacrossCommitAction{}
a := &pt.ParacrossCommitAction{Bls: &pt.ParacrossCommitBlsInfo{}}
s := &pt.ParacrossNodeStatus{}
types.Decode(v.Msgs[0], s)
a.Status = s
sign, err := b.aggregateSigns(v.Signs)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "bls aggreate=%s", v.Addrs)
}
signData := sign.Serialize()
copy(a.Bls.Sign, signData[:])
nodes := b.paraClient.commitMsgClient.authNodes.Load().([]string)
a.Bls.Sign = append(a.Bls.Sign, signData[:]...)
bits, remains := setAddrsBitMap(nodes, v.Addrs)
if len(remains) > 0 {
plog.Info("bls.signDoneCommits", "remains", remains)
......@@ -254,7 +398,7 @@ func (b *blsClient) aggregateSigns(signs [][]byte) (*g2pubs.Signature, error) {
copy(s[:], data)
signKey, err := g2pubs.DeserializeSignature(s)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "aggregateSigns")
}
signatures = append(signatures, signKey)
}
......@@ -262,15 +406,15 @@ func (b *blsClient) aggregateSigns(signs [][]byte) (*g2pubs.Signature, error) {
return g2pubs.AggregateSignatures(signatures), nil
}
func (b *blsClient) updatePeers(peer string, add bool) {
if _, ok := b.peers[peer]; ok {
func (b *blsClient) updatePeers(id string, add bool) {
if _, ok := b.peers[id]; ok {
if !add {
delete(b.peers, peer)
delete(b.peers, id)
}
return
}
if add {
b.peers[peer] = true
b.peers[id] = true
}
}
......@@ -285,7 +429,7 @@ func (b *blsClient) setBlsPriKey(secpPrkKey []byte) {
//to repeat get prikey's hash until in range of bls's private key
func getBlsPriKey(key []byte) *g2pubs.SecretKey {
var newKey [common.Sha256Len]byte
copy(newKey[:], key[:])
copy(newKey[:], key)
for {
plog.Info("para commit getBlsPriKey", "keys", common.ToHex(newKey[:]))
secret := g2pubs.DeserializeSecretKey(newKey)
......@@ -302,9 +446,14 @@ func getBlsPriKey(key []byte) *g2pubs.SecretKey {
func (b *blsClient) blsSign(commits []*pt.ParacrossCommitAction) error {
for _, cmt := range commits {
data := types.Encode(cmt.Status)
plog.Debug("blsign msg", "data", common.ToHex(data), "height", cmt.Status.Height)
cmt.Bls = &pt.ParacrossCommitBlsInfo{Addrs: []string{b.selfID}}
sign := g2pubs.Sign(data, b.blsPriKey).Serialize()
cmt.Bls = &pt.ParacrossCommitBlsInfo{Sign: sign[:], Addrs: []string{b.selfID}}
if len(sign) <= 0 {
return errors.Wrapf(types.ErrInvalidParam, "addr=%s,prikey=%d,height=%d", b.selfID, len(b.blsPriKey.Serialize()), cmt.Status.Height)
}
cmt.Bls.Sign = append(cmt.Bls.Sign, sign[:]...)
plog.Debug("blsign msg", "data", common.ToHex(data), "height", cmt.Status.Height, "sign", len(cmt.Bls.Sign), "src", len(sign))
}
return nil
}
......@@ -397,8 +546,7 @@ func (b *blsClient) verifyBlsSign(addr string, commit *pt.ParacrossCommitAction)
//1. 获取对应公钥
pubKey, err := b.getBlsPubKey(addr)
if err != nil {
plog.Error("verifyBlsSign pub key not exist", "addr", addr)
return err
return errors.Wrapf(err, "pub key not exist to addr=%s", addr)
}
//2. 获取bls签名
......@@ -406,8 +554,7 @@ func (b *blsClient) verifyBlsSign(addr string, commit *pt.ParacrossCommitAction)
copy(signkey[:], commit.Bls.Sign)
sign, err := g2pubs.DeserializeSignature(signkey)
if err != nil {
plog.Error("verifyBlsSign.DeserializeSignature", "key", common.ToHex(commit.Bls.Sign))
return err
return errors.Wrapf(err, "DeserializeSignature key=%s", common.ToHex(commit.Bls.Sign))
}
//3. 获取签名前原始msg
......@@ -423,15 +570,24 @@ func (b *blsClient) verifyBlsSign(addr string, commit *pt.ParacrossCommitAction)
}
func (b *blsClient) showTxBuffInfo() *pt.ParaBlsSignSumInfo {
b.mutex.Lock()
defer b.mutex.Unlock()
var seq []int64
var ret pt.ParaBlsSignSumInfo
for k := range b.txsBuff {
for k := range b.commitsPool {
seq = append(seq, k)
}
sort.Slice(seq, func(i, j int) bool { return seq[i] < seq[j] })
for _, h := range seq {
ret.Info = append(ret.Info, b.txsBuff[h])
for _, s := range seq {
sum := b.commitsPool[s]
info := &pt.ParaBlsSignSumDetailsShow{Height: s}
for i, a := range sum.Addrs {
info.Addrs = append(info.Addrs, a)
info.Msgs = append(info.Msgs, common.ToHex(sum.Msgs[i]))
}
ret.Info = append(ret.Info, info)
}
return &ret
}
# para bls sign
# 平行链 bls 聚合签名
>平行链多个共识节点间通过P2P组成内部局域网,把之前每个节点发送到主链的共识交易转而先互相内部广播,leader节点负责把多个共识交易聚合成一个共识交易发送给主链
#1. 订阅topic
以自身账户为topic订阅,向平行链内部节点间广播
#1. 订阅P2P topic
1. 以PARA-BLS-SIGN-TOPIC为topic在P2P订阅,平行链内部节点间通过p2p广播同步消息,比如这里bls签名交易和leader同步消息
#2. 协商leader
广播之后一段时间,是寻找leader的过程
#3. 发送共识交易
1. 共识交易发给leader,由leader聚合后上链,如果收集的签名交易不超过2/3节点,则不发送上链交易
1. 共识交易发送后,等待回复(广播形式),如果超时未回复,则重新发送
2. leader的轮换算法可以下一步考虑
1. 考虑到leader轮换发送共识交易,每隔一定共识高度比如100就会轮换下一个节点为leader发送交易,当前共识高度/100后对nodegroup 地址取余base值就是当前leader地址
1. 考虑到某些leader节点可能是僵尸节点,每个节点监听leader节点每隔15s的心跳消息,如果超过1min没收到,则认为leader节点不工作,需要跳到下一个,这里维护
一个offset,如果超时没收到,offset++,和之前的base相加取余一起确定下一个leader节点,如果当前节点发现自己是leader节点则开始发送sync消息,offset停止增长
1. 随着共识高度增长,base增长,offset不变,一起确认新的leader
1. 某些特殊场景有多个leader同步时候,收敛到最大索引值的leader
#3. 发送聚合共识交易
1. 共识交易P2P广播给所有订阅的节点,leader节点负责聚合后上链,如果收集的签名交易不超过2/3节点,则不发送上链交易,聚合交易最终在主链达成共识
1. 节点广播共识交易后,超过一定时间共识高度没增长,重新发送共识交易
1. leader的轮换算法可以下一步考虑
#4. BLS聚合签名算法
1. BLS签名需要的私钥比SECP256小一倍,采用SECP256私钥不断取hash直到满足BLS范围为止作为BLS私钥,然后确定BLS公钥
1. BLS公钥注册到主链nodegroup里面,和聚合签名一起验证BLS签名,同时防止BLS leader节点作弊
1. 对同一高度,每个节点签名的共识消息是一样的,只需要保留一份,签名聚合成一个,公钥信息压缩到一个bitmap,作为一个交易发送
......
......@@ -24,7 +24,6 @@ import (
"github.com/33cn/chain33/types"
paracross "github.com/33cn/plugin/plugin/dapp/paracross/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
"github.com/phoreproject/bls/g2pubs"
"github.com/pkg/errors"
)
......@@ -32,8 +31,8 @@ const (
consensusInterval = 10 //about 1 new block interval
minerInterval = 10 //5s的主块间隔后分叉概率增加,10s可以消除一些分叉回退
waitBlocks4CommitMsg int32 = 5 //commit msg共识发送后等待几个块没确认则重发
waitConsensStopTimes uint32 = 30 //30*10s = 5min
waitBlocks4CommitMsg int32 = 5 //commit msg共识发送后等待几个块没确认则重发
waitConsensStopTimes uint32 = 3 //3*10s
)
type paraSelfConsEnable struct {
......@@ -61,9 +60,6 @@ type commitMsgClient struct {
txFeeRate int64
selfConsEnableList []*paraSelfConsEnable //适配在自共识合约配置前有自共识的平行链项目,fork之后,采用合约配置
privateKey crypto.PrivKey
authNodes atomic.Value
blsPriKey *g2pubs.SecretKey
blsPubKey *g2pubs.PublicKey
quit chan struct{}
mutex sync.Mutex
}
......@@ -72,6 +68,34 @@ type commitCheckParams struct {
consensStopTimes uint32
}
func newCommitMsgCli(para *client, cfg *subConfig) *commitMsgClient {
cli := &commitMsgClient{
paraClient: para,
authAccount: cfg.AuthAccount,
waitMainBlocks: waitBlocks4CommitMsg,
waitConsensStopTimes: waitConsensStopTimes,
consensHeight: -2,
sendingHeight: -1,
consensDoneHeight: -1,
resetCh: make(chan interface{}, 1),
quit: make(chan struct{}),
}
if cfg.WaitBlocks4CommitMsg > 0 {
cli.waitMainBlocks = cfg.WaitBlocks4CommitMsg
}
if cfg.WaitConsensStopTimes > 0 {
cli.waitConsensStopTimes = cfg.WaitConsensStopTimes
}
// 设置平行链共识起始高度,在共识高度为-1也就是从未共识过的环境中允许从设置的非0起始高度开始共识
//note:只有在主链LoopCheckCommitTxDoneForkHeight之后才支持设置ParaConsensStartHeight
if cfg.ParaConsensStartHeight > 0 {
cli.consensDoneHeight = cfg.ParaConsensStartHeight - 1
}
return cli
}
// 1. 链高度回滚,低于当前发送高度,需要重新计算当前发送高度,不然不会重新发送回滚的高度
// 2. 定时轮询是在比如锁定解锁钱包这类外部条件变化时候,其他输入条件不会触发时候及时响应,不然任何一个外部条件变化都触发一下发送,可能条件比较多
func (client *commitMsgClient) handler() {
......@@ -163,18 +187,15 @@ func (client *commitMsgClient) createCommitTx() {
if tx == nil {
return
}
//bls sign, send to p2p
if !client.paraClient.subCfg.BlsSignOff {
//send to p2p pubsub
plog.Info("para commitMs send to p2p", "hash", common.ToHex(tx.Hash()))
client.paraClient.SendPubP2PMsg(types.Encode(tx))
act := &pt.ParaP2PSubMsg{Ty: P2pSubCommitTx, Value: &pt.ParaP2PSubMsg_CommitTx{CommitTx: tx}}
client.paraClient.SendPubP2PMsg(defaultParaBlsSignTopic, types.Encode(act))
return
}
client.pushCommitTx(tx)
}
//四个触发:1,新增区块 2,10s tick例行检查 3,发送交易成功上链 4,异常重发
......@@ -231,7 +252,7 @@ func (client *commitMsgClient) pushCommitTx(signTx *types.Transaction) {
}
func (client *commitMsgClient) sendCommitActions(acts []*pt.ParacrossCommitAction) {
txs, _, err := client.calcCommitMsgTxs(acts)
txs, _, err := client.createCommitMsgTxs(acts)
if err != nil {
return
}
......@@ -239,7 +260,7 @@ func (client *commitMsgClient) sendCommitActions(acts []*pt.ParacrossCommitActio
for i, msg := range acts {
plog.Debug("paracommitmsg sendCommitActions", "idx", i, "height", msg.Status.Height, "mainheight", msg.Status.MainBlockHeight,
"blockhash", common.HashHex(msg.Status.BlockHash), "mainHash", common.HashHex(msg.Status.MainBlockHash),
"addrsmap", common.ToHex(msg.Bls.AddrsMap))
"addrsmap", common.ToHex(msg.Bls.AddrsMap), "sign", common.ToHex(msg.Bls.Sign))
}
client.pushCommitTx(txs)
}
......@@ -313,14 +334,13 @@ func (client *commitMsgClient) checkConsensusStop(checks *commitCheckParams) {
consensHeight := client.getConsensusHeight()
if client.sendingHeight > consensHeight {
checks.consensStopTimes += 1
checks.consensStopTimes++
if checks.consensStopTimes > client.waitConsensStopTimes {
plog.Debug("para commitMsg-checkConsensusStop", "times", checks.consensStopTimes)
checks.consensStopTimes = 0
client.resetSendEnv()
}
}
return
}
func (client *commitMsgClient) checkAuthAccountIn() {
......@@ -336,11 +356,6 @@ func (client *commitMsgClient) checkAuthAccountIn() {
}
client.authAccountIn = authExist
nodes := strings.Split(nodeStr, ",")
client.paraClient.bullyCli.UpdateValidNodes(nodes)
client.authNodes.Store(nodes)
}
func (client *commitMsgClient) procChecks(checks *commitCheckParams) {
......@@ -417,11 +432,12 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type
if !client.paraClient.subCfg.BlsSignOff {
err = client.paraClient.blsSignCli.blsSign(commits)
if err != nil {
plog.Error("paracommitmsg bls sign", "err", err)
return nil, 0
}
}
signTx, count, err := client.calcCommitMsgTxs(commits)
signTx, count, err := client.createCommitMsgTxs(commits)
if err != nil || signTx == nil {
return nil, 0
}
......@@ -437,7 +453,7 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type
return signTx, count
}
func (client *commitMsgClient) calcCommitMsgTxs(notifications []*pt.ParacrossCommitAction) (*types.Transaction, int64, error) {
func (client *commitMsgClient) createCommitMsgTxs(notifications []*pt.ParacrossCommitAction) (*types.Transaction, int64, error) {
txs, count, err := client.batchCalcTxGroup(notifications, atomic.LoadInt64(&client.txFeeRate))
if err != nil {
txs, err = client.singleCalcTx((notifications)[0], atomic.LoadInt64(&client.txFeeRate))
......@@ -566,6 +582,15 @@ func (client *commitMsgClient) sendCommitTxOut(tx *types.Transaction) error {
}
func needResentErr(err error) bool {
switch err {
case nil, types.ErrBalanceLessThanTenTimesFee, types.ErrNoBalance, types.ErrDupTx, types.ErrTxExist, types.ErrTxExpire:
return false
default:
return true
}
}
func (client *commitMsgClient) sendCommitMsg() {
var err error
var tx *types.Transaction
......@@ -583,7 +608,7 @@ out:
}
continue
}
if err != nil && (err != types.ErrBalanceLessThanTenTimesFee && err != types.ErrNoBalance) {
if needResentErr(err) {
resendTimer = time.After(time.Second * 2)
}
case <-resendTimer:
......@@ -1023,13 +1048,3 @@ func (client *commitMsgClient) isSelfConsEnable(height int64) bool {
}
return false
}
func (client *commitMsgClient) isValidNode(addr string) bool {
nodes := client.authNodes.Load().([]string)
for _, v := range nodes {
if v == addr {
return true
}
}
return false
}
......@@ -527,7 +527,7 @@ func (client *client) CreateBlock() {
out:
for {
select {
case <-client.quitCreate:
case <-client.quit:
break out
default:
count, err := client.getBatchSeqCount(currSeq)
......
......@@ -28,6 +28,10 @@ type jumpDldClient struct {
wg sync.WaitGroup
}
func newJumpDldCli(para *client, cfg *subConfig) *jumpDldClient {
return &jumpDldClient{paraClient: para}
}
//校验按高度获取的block hash和前一步对应高度的blockhash比对
func verifyBlockHash(heights []*types.BlockInfo, blocks []*types.ParaTxDetail) error {
heightMap := make(map[int64][]byte)
......
......@@ -59,11 +59,8 @@ func (client *client) Query_LeaderInfo(req *types.ReqNil) (types.Message, error)
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
}
isLeader := client.bullyCli.IsSelfCoordinator()
leader := client.bullyCli.Coordinator()
return &pt.ElectionStatus{IsLeader: isLeader, LeaderId: leader}, nil
nodes, leader, base, off, isLeader := client.blsSignCli.getLeaderInfo()
return &pt.ElectionStatus{IsLeader: isLeader, Leader: &pt.LeaderSyncInfo{ID: nodes[leader], BaseIdx: base, Offset: off}}, nil
}
func (client *client) Query_CommitTxInfo(req *types.ReqNil) (types.Message, error) {
......
......@@ -67,6 +67,23 @@ const (
blockSyncStateFinished
)
func newBlockSyncCli(para *client, cfg *subConfig) *blockSyncClient {
cli := &blockSyncClient{
paraClient: para,
notifyChan: make(chan bool, 1),
quitChan: make(chan struct{}),
maxCacheCount: defaultMaxCacheCount,
maxSyncErrCount: defaultMaxSyncErrCount,
}
if cfg.MaxCacheCount > 0 {
cli.maxCacheCount = cfg.MaxCacheCount
}
if cfg.MaxSyncErrCount > 0 {
cli.maxSyncErrCount = cfg.MaxSyncErrCount
}
return cli
}
//syncHasCaughtUp 判断同步是否已追赶上,供发送层调用
func (client *blockSyncClient) syncHasCaughtUp() bool {
return atomic.LoadInt32(&client.isSyncCaughtUpAtom) == 1
......@@ -414,7 +431,7 @@ func (client *blockSyncClient) addBlock(lastBlock *types.Block, localBlock *pt.P
newBlock.BlockTime = localBlock.BlockTime
newBlock.MainHash = localBlock.MainHash
newBlock.MainHeight = localBlock.MainHeight
if newBlock.Height == 1 && newBlock.BlockTime < client.paraClient.subCfg.GenesisBlockTime {
if newBlock.Height == 1 && newBlock.BlockTime < client.paraClient.cfg.GenesisBlockTime {
panic("genesisBlockTime bigger than the 1st block time, need rmv db and reset genesisBlockTime")
}
err = client.writeBlock(lastBlock.StateHash, &newBlock)
......
......@@ -66,14 +66,14 @@ func TestCalcCommitMsgTxs(t *testing.T) {
}
commit1 := &pt.ParacrossCommitAction{Status: nt1}
notify := []*pt.ParacrossCommitAction{commit1}
tx, count, err := client.calcCommitMsgTxs(notify, 0)
tx, count, err := client.createCommitMsgTxs(notify)
assert.Nil(t, err)
assert.Equal(t, int64(1), count)
assert.NotNil(t, tx)
commit1 = &pt.ParacrossCommitAction{Status: nt2}
notify = append(notify, commit1)
tx, count, err = client.calcCommitMsgTxs(notify, 0)
tx, count, err = client.createCommitMsgTxs(notify)
assert.Nil(t, err)
assert.Equal(t, int64(2), count)
assert.NotNil(t, tx)
......
......@@ -659,7 +659,7 @@ function para_create_nodegroup() {
local JR="0x8ed5ba075c27015e2c6da399b42da4cd272d4082b55f05c85d84b1308ec87bdb4aeea70dbef3e754eae99a6be0c0e49512d7e9197712f8538ce3d57c1b2d88e17b37f0e419f55333f6e841261a8d3151552fd7d4fd8e19f4f38a413395aab26e"
local NL="0x872e3ac07998deb12045ee48c52a8ba5d2538dc85123866fb330112eb0b805ce23f31bfde3a485cd89fac48eab48560005d12f714ca3786c7f47fe3b5edb1dc7838677c041c89cee4caf9225c1d68346bfcde3365ada0a627fbd77bc72e9b356"
local MC="0x87c58bb6cce41842462a0030335bb95948dcfba77e47e2d8ee893c0b2c34ac20d08c9e98a883ef2a6492d0ad808ace9a1730e8bae5d3b0861aaf743449df5de510073e2991c7274cab47f327e48d7eacf300e4b24174dae2e8603d1904b8a015"
local blspubs=$E5","$KS","$JR","$NL","$MC
local blspubs=$E5,$KS,$JR,$NL,$MC
txhash=$(${PARA_CLI} send para nodegroup apply -a "1E5saiXVb9mW8wcWUUZjsHJPZs5GmdzuSY,1KSBd17H7ZK8iT37aJztFB22XGwsPTdwE4,1JRNjdEqp4LJ5fqycUBm9ayCKSeeskgMKR,1NLHPEcbTWWxxU3dGUZBhayjrCHD3psX7k,1MCftFynyvG2F4ED5mdHYgziDxx6vDrScs" -p "$blspubs" -c 6 -k 0xd165c84ed37c2a427fea487470ee671b7a0495d68d82607cafbc6348bf23bec5)
echo "tx=$txhash"
query_tx "${PARA_CLI}" "${txhash}"
......
......@@ -326,6 +326,7 @@ func superNodeCmd() *cobra.Command {
cmd.AddCommand(getNodeInfoCmd())
cmd.AddCommand(getNodeIDInfoCmd())
cmd.AddCommand(getNodeListCmd())
cmd.AddCommand(nodeModifyCmd())
return cmd
}
......@@ -485,7 +486,7 @@ func nodeModifyCmd() *cobra.Command {
Short: "super node modify parameters",
Run: createNodeModifyTx,
}
addNodeCancelFlags(cmd)
addNodeModifyFlags(cmd)
return cmd
}
......
......@@ -549,7 +549,7 @@ func (a *action) proCommitMsg(commit *pt.ParacrossNodeStatus, commitAddr string)
func (a *action) verifyBlsSign(commit *pt.ParacrossCommitAction) ([]string, error) {
_, nodesArry, err := a.getNodesGroup(commit.Status.Title)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "getNodegroup")
}
//1. 获取addr对应的bls 公钥
......@@ -558,8 +558,7 @@ func (a *action) verifyBlsSign(commit *pt.ParacrossCommitAction) ([]string, erro
for _, addr := range signAddrs {
pub, err := getAddrBlsPubKey(a.db, commit.Status.Title, addr)
if err != nil {
clog.Error("verifyBlsSign pub key not exist", "addr", addr)
return nil, err
return nil, errors.Wrapf(err, "pubkey not exist to addr=%s", addr)
}
pubs = append(pubs, pub)
}
......@@ -568,14 +567,12 @@ func (a *action) verifyBlsSign(commit *pt.ParacrossCommitAction) ([]string, erro
k := [96]byte{}
val, err := common.FromHex(p)
if err != nil {
clog.Error("verifyBlsSign.fromhex", "p", p)
return nil, err
return nil, errors.Wrapf(err, "fromhex.p=%s", p)
}
copy(k[:], val)
key, err := g2pubs.DeserializePublicKey(k)
if err != nil {
clog.Error("verifyBlsSign.DeserializePublicKey", "key", p)
return nil, err
return nil, errors.Wrapf(err, "DeserializePublicKey=%s", p)
}
pubKeys = append(pubKeys, key)
......@@ -588,8 +585,7 @@ func (a *action) verifyBlsSign(commit *pt.ParacrossCommitAction) ([]string, erro
copy(signkey[:], commit.Bls.Sign)
sign, err := g2pubs.DeserializeSignature(signkey)
if err != nil {
clog.Error("verifyBlsSign.DeserializeSignature", "key", common.ToHex(commit.Bls.Sign))
return nil, err
return nil, errors.Wrapf(err, "DeserializeSignature,key=%s", common.ToHex(commit.Bls.Sign))
}
//4. 获取签名前原始msg
......
......@@ -169,7 +169,7 @@ func (suite *CommitTestSuite) TestSetup() {
}
func fillRawCommitTx(suite suite.Suite) (*types.Transaction, error) {
st1 := pt.ParacrossNodeStatus{
st1 := &pt.ParacrossNodeStatus{
MainBlockHash: MainBlockHash10,
MainBlockHeight: MainBlockHeight,
Title: Title,
......@@ -184,7 +184,8 @@ func fillRawCommitTx(suite suite.Suite) (*types.Transaction, error) {
CrossTxResult: []byte("abc"),
CrossTxHashs: [][]byte{},
}
tx, err := pt.CreateRawCommitTx4MainChain(chain33TestCfg, &st1, pt.GetExecName(chain33TestCfg), 0)
act := &pt.ParacrossCommitAction{Status: st1}
tx, err := pt.CreateRawCommitTx4MainChain(chain33TestCfg, act, pt.GetExecName(chain33TestCfg), 0)
if err != nil {
suite.T().Error("TestExec", "create tx failed", err)
}
......@@ -718,19 +719,12 @@ func createCrossParaTx(s suite.Suite, to []byte) (*types.Transaction, error) {
func createCrossCommitTx(s suite.Suite) (*types.Transaction, error) {
status := &pt.ParacrossNodeStatus{MainBlockHash: []byte("hash"), MainBlockHeight: 0, Title: Title}
tx, err := pt.CreateRawCommitTx4MainChain(chain33TestCfg, status, Title+pt.ParaX, 0)
act := &pt.ParacrossCommitAction{Status: status}
tx, err := pt.CreateRawCommitTx4MainChain(chain33TestCfg, act, Title+pt.ParaX, 0)
assert.Nil(s.T(), err, "create asset transfer failed")
if err != nil {
return nil, err
}
//tx, err = signTx(s, tx, privFrom)
//assert.Nil(s.T(), err, "sign asset transfer failed")
//if err != nil {
// return nil, err
//}
return tx, nil
}
......
......@@ -416,35 +416,42 @@ message ParaLocalDbBlockInfo {
}
message ParaBlsSignSumDetails {
repeated string addrs = 1;
repeated bytes msgs = 2;
repeated bytes signs = 3;
int64 height = 4;
int64 height = 1;
repeated string addrs = 2;
repeated bytes msgs = 3;
repeated bytes signs = 4;
}
message ParaBlsSignSumDetailsShow {
int64 height = 1;
repeated string addrs = 2;
repeated string msgs = 3;
}
message ParaBlsSignSumInfo {
repeated ParaBlsSignSumDetails info = 1;
repeated ParaBlsSignSumDetailsShow info = 1;
}
message ElectionMsg {
string toID = 1; //target id
string peerID = 2; //src id
int32 type = 3;
bytes data = 4;
message LeaderSyncInfo {
string ID = 1; //self id
int32 baseIdx = 2; //calculated by corrent consensus height and remainder by len(nodes)
int32 offset = 3;
}
message ParaP2PSubMsg {
int32 ty = 1;
oneof value {
Transaction commitTx = 10;
ElectionMsg election = 11;
Transaction commitTx = 10;
LeaderSyncInfo syncMsg = 11;
}
}
message ElectionStatus {
bool isLeader = 1;
string leaderId = 2;
LeaderSyncInfo leader = 2;
}
......
......@@ -57,6 +57,6 @@ var (
ErrKeyNotExist = errors.New("ErrKeyNotExist")
// ErrConsensClosed consensus closed
ErrConsensClosed = errors.New("ErrConsensClosed")
//ErrConsBlsSignVerify bls12-381 aggregate sign verify
//ErrBlsSignVerify bls12-381 aggregate sign verify
ErrBlsSignVerify = errors.New("ErrBlsSignVerify")
)
......@@ -49,6 +49,10 @@ func (mem *Mempool) SetQueueClient(client queue.Client) {
reply, err = mem.mainGrpcCli.SendTransaction(context.Background(), tx)
case types.EventGetProperFee:
reply, err = mem.mainGrpcCli.GetProperFee(context.Background(), &types.ReqProperFee{})
case types.EventGetMempoolSize:
// 消息类型EventGetMempoolSize:获取mempool大小
msg.Reply(mem.client.NewMessage("rpc", types.EventMempoolSize, &types.MempoolSize{Size: 1000000}))
continue
default:
msg.Reply(client.NewMessage(mem.key, types.EventReply, types.ErrActionNotSupport))
continue
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment