Commit b7088025 authored by mdj33's avatar mdj33 Committed by vipwzw

bls improve

parent a0eade8b
name: build
name: ci_base
on: [push,pull_request]
jobs:
......
name: build
name: ci_cross2eth
on: [push,pull_request]
jobs:
......
name: build
name: ci_mix
on: [push,pull_request]
jobs:
......
name: build
name: ci_paracross
on: [push,pull_request]
jobs:
......
name: ci_paracross_bls
on: [push,pull_request]
jobs:
ci_paracross:
name: ci_paracross_bls
runs-on: ubuntu-latest
steps:
- name: Set up Golang
uses: actions/setup-go@v2
with:
go-version: 1.15
id: go
- name: set go env
run: export PATH=${PATH}:`go env GOPATH`/bin
- name: checkout
uses: actions/checkout@v2
- name: deploy
run: |
make docker-compose dapp=paracross extra=1
make docker-compose-down dapp=paracross
name: build
name: ci_relay
on: [push,pull_request]
jobs:
......
......@@ -131,6 +131,9 @@ mainLoopCheckCommitTxDoneForkHeight=4320000
#无平行链交易的主链区块间隔,平行链产生一个空块,从高度0开始,配置[blockHeight:interval],比如["0:50","1000:100"]
emptyBlockInterval=["0:50"]
[consensus.sub.para.bls]
blsSign=false
[store]
name="kvmvccmavl"
......
......@@ -70,29 +70,28 @@ type client struct {
}
type subConfig struct {
WriteBlockSeconds int64 `json:"writeBlockSeconds,omitempty"`
ParaRemoteGrpcClient string `json:"paraRemoteGrpcClient,omitempty"`
StartHeight int64 `json:"startHeight,omitempty"`
WaitMainBlockNum int64 `json:"waitMainBlockNum,omitempty"`
GenesisStartHeightSame bool `json:"genesisStartHeightSame,omitempty"`
EmptyBlockInterval []string `json:"emptyBlockInterval,omitempty"`
AuthAccount string `json:"authAccount,omitempty"`
WaitBlocks4CommitMsg int32 `json:"waitBlocks4CommitMsg,omitempty"`
GenesisAmount int64 `json:"genesisAmount,omitempty"`
MainBlockHashForkHeight int64 `json:"mainBlockHashForkHeight,omitempty"`
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
MaxCacheCount int64 `json:"maxCacheCount,omitempty"`
MaxSyncErrCount int32 `json:"maxSyncErrCount,omitempty"`
BatchFetchBlockCount int64 `json:"batchFetchBlockCount,omitempty"`
ParaConsensStartHeight int64 `json:"paraConsensStartHeight,omitempty"`
MultiDownloadOpen bool `json:"multiDownloadOpen,omitempty"`
MultiDownInvNumPerJob int64 `json:"multiDownInvNumPerJob,omitempty"`
MultiDownJobBuffNum uint32 `json:"multiDownJobBuffNum,omitempty"`
MultiDownServerRspTime uint32 `json:"multiDownServerRspTime,omitempty"`
RmCommitParamMainHeight int64 `json:"rmCommitParamMainHeight,omitempty"`
JumpDownloadClose bool `json:"jumpDownloadClose,omitempty"`
BlsSign bool `json:"blsSign,omitempty"`
BlsLeaderSwitchIntval int32 `json:"blsLeaderSwitchIntval,omitempty"`
WriteBlockSeconds int64 `json:"writeBlockSeconds,omitempty"`
ParaRemoteGrpcClient string `json:"paraRemoteGrpcClient,omitempty"`
StartHeight int64 `json:"startHeight,omitempty"`
WaitMainBlockNum int64 `json:"waitMainBlockNum,omitempty"`
GenesisStartHeightSame bool `json:"genesisStartHeightSame,omitempty"`
EmptyBlockInterval []string `json:"emptyBlockInterval,omitempty"`
AuthAccount string `json:"authAccount,omitempty"`
WaitBlocks4CommitMsg int32 `json:"waitBlocks4CommitMsg,omitempty"`
GenesisAmount int64 `json:"genesisAmount,omitempty"`
MainBlockHashForkHeight int64 `json:"mainBlockHashForkHeight,omitempty"`
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
MaxCacheCount int64 `json:"maxCacheCount,omitempty"`
MaxSyncErrCount int32 `json:"maxSyncErrCount,omitempty"`
BatchFetchBlockCount int64 `json:"batchFetchBlockCount,omitempty"`
ParaConsensStartHeight int64 `json:"paraConsensStartHeight,omitempty"`
MultiDownloadOpen bool `json:"multiDownloadOpen,omitempty"`
MultiDownInvNumPerJob int64 `json:"multiDownInvNumPerJob,omitempty"`
MultiDownJobBuffNum uint32 `json:"multiDownJobBuffNum,omitempty"`
MultiDownServerRspTime uint32 `json:"multiDownServerRspTime,omitempty"`
RmCommitParamMainHeight int64 `json:"rmCommitParamMainHeight,omitempty"`
JumpDownloadClose bool `json:"jumpDownloadClose,omitempty"`
Bls *blsConfig `json:"bls,omitempty"`
}
// New function to init paracross env
......@@ -347,7 +346,7 @@ func (client *client) ProcEvent(msg *queue.Message) bool {
case P2pSubLeaderSyncMsg:
err := client.blsSignCli.rcvLeaderSyncTx(sub.GetSyncMsg())
if err != nil {
plog.Error("paracross ProcEvent leader sync msg", "err", err)
plog.Error("bls.event.paracross ProcEvent leader sync msg", "err", err)
}
default:
plog.Error("paracross ProcEvent not support", "ty", sub.GetTy())
......
......@@ -6,9 +6,9 @@ package para
import (
"bytes"
"math"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
......@@ -21,29 +21,44 @@ import (
)
const (
maxRcvTxCount = 100 //channel buffer, max 100 nodes, 1 height tx or 1 txgroup per node
leaderSyncInt = 15 //15s heartbeat sync interval
defLeaderSwitchInt = 100 //每隔100个共识高度切换一次leader,大约6小时(按50个空块间隔计算)
delaySubP2pTopic = 10 //30s to sub p2p topic
maxRcvTxCount = 1000 //channel buffer, max 100 nodes, 1 height tx or 1 txgroup per node
defLeaderHeardTickInt = 10 //10s heart tick sync interval
defWatchLeaderSyncInt = 60 //60s watch leader heard tick interval
defLeaderSwitchInt = 1000 //每隔1000个共识高度切换一次leader,大约6小时(按50个空块间隔计算)
delaySubP2pTopic = 10 //30s to sub p2p topic
defConsensHeightThreshold = 40 //共识高度和chainHeight差值阈值,超过此阈值,则任一共识done节点可尽快发送共识交易,不限于leader
paraBlsSignTopic = "PARA-BLS-SIGN-TOPIC"
)
type blsConfig struct {
BlsSign bool `json:"blsSign,omitempty"`
LeaderHeardTickInt int32 `json:"leaderHeardTickInt,omitempty"`
WatchLeaderSyncInt int32 `json:"watchLeaderSyncInt,omitempty"`
LeaderSwitchInt int32 `json:"leaderSwitchIntval,omitempty"`
ConsensHeightDiffThreshold int32 `json:"consensHeightDiffThreshold,omitempty"`
//支持只配置部分nodegroup地址即可聚合,另一部分地址不聚合直接发送交易
PartNodeGroup int32 `json:"partNodeGroup,omitempty"`
}
type blsClient struct {
paraClient *client
selfID string
cryptoCli crypto.Crypto
blsPriKey crypto.PrivKey
blsPubKey crypto.PubKey
peersBlsPubKey map[string]crypto.PubKey
commitsPool map[int64]*pt.ParaBlsSignSumDetails
rcvCommitTxCh chan []*pt.ParacrossCommitAction
leaderOffset int32
leaderSwitchInt int32
feedDog uint32
quit chan struct{}
mutex sync.Mutex
typeNode uint32
paraClient *client
selfID string
cryptoCli crypto.Crypto
blsPriKey crypto.PrivKey
blsPubKey crypto.PubKey
peersBlsPubKey map[string]crypto.PubKey
commitsPool map[int64]*pt.ParaBlsSignSumDetails
rcvCommitTxCh chan []*pt.ParacrossCommitAction
leaderOffset int32
leaderSwitchInt int32
leaderHeardTickInt int32
watchLeaderSyncInt int32
consensHeightDiffThreshold int32
partNodeGroup int32
feedDog uint32
quit chan struct{}
typeNode uint32
}
func newBlsClient(para *client, cfg *subConfig) *blsClient {
......@@ -59,10 +74,26 @@ func newBlsClient(para *client, cfg *subConfig) *blsClient {
b.rcvCommitTxCh = make(chan []*pt.ParacrossCommitAction, maxRcvTxCount)
b.quit = make(chan struct{})
b.leaderSwitchInt = defLeaderSwitchInt
b.typeNode = pt.ParaCommitNode
if cfg.BlsLeaderSwitchIntval > 0 {
b.leaderSwitchInt = cfg.BlsLeaderSwitchIntval
if cfg.Bls.LeaderSwitchInt > 0 {
b.leaderSwitchInt = cfg.Bls.LeaderSwitchInt
}
b.leaderHeardTickInt = defLeaderHeardTickInt
if cfg.Bls.LeaderHeardTickInt > 0 {
b.leaderHeardTickInt = cfg.Bls.LeaderHeardTickInt
}
b.watchLeaderSyncInt = defWatchLeaderSyncInt
if cfg.Bls.WatchLeaderSyncInt > 0 {
b.watchLeaderSyncInt = cfg.Bls.WatchLeaderSyncInt
}
b.consensHeightDiffThreshold = defConsensHeightThreshold
if cfg.Bls.ConsensHeightDiffThreshold > 0 {
b.consensHeightDiffThreshold = cfg.Bls.ConsensHeightDiffThreshold
}
if cfg.Bls.PartNodeGroup > 0 {
b.partNodeGroup = cfg.Bls.PartNodeGroup
}
b.typeNode = pt.ParaCommitNode
return b
}
......@@ -80,40 +111,48 @@ func (b *blsClient) procLeaderSync() {
return
}
var feedDogTicker <-chan time.Time
var watchDogTicker <-chan time.Time
var heardTicker <-chan time.Time
var watchLeaderTicker <-chan time.Time
p2pTimer := time.After(delaySubP2pTopic * time.Second)
var count uint32
out:
for {
select {
case <-feedDogTicker:
case <-heardTicker:
//leader需要定期喂狗
_, _, base, off, isLeader := b.getLeaderInfo()
_, _, base, off, isLeader, _ := b.getLeaderInfo()
if isLeader {
count++
count = count & 0xffffff
act := &pt.ParaP2PSubMsg{Ty: P2pSubLeaderSyncMsg}
act.Value = &pt.ParaP2PSubMsg_SyncMsg{SyncMsg: &pt.LeaderSyncInfo{ID: b.selfID, BaseIdx: base, Offset: off}}
act.Value = &pt.ParaP2PSubMsg_SyncMsg{SyncMsg: &pt.LeaderSyncInfo{ID: b.selfID, BaseIdx: base, Offset: off, Count: count}}
err := b.paraClient.SendPubP2PMsg(paraBlsSignTopic, types.Encode(act))
if err != nil {
plog.Error("para.procLeaderSync feed dog", "err", err)
plog.Error("para.procLeaderSync heard ticker", "err", err)
}
plog.Info("procLeaderSync feed dog", "id", b.selfID, "base", base, "off", off)
plog.Debug("bls.event.procLeaderSync send heard tick", "self accout", b.selfID, "base", base, "off", off, "count", count)
}
case <-watchDogTicker:
case <-watchLeaderTicker:
//排除不在Nodegroup里面的Node
if !b.isValidNodes(b.selfID) {
plog.Info("procLeaderSync watchdog, not in nodegroup", "self", b.selfID)
plog.Warn("procLeaderSync watch, not in nodegroup", "self", b.selfID)
continue
}
//至少1分钟内要收到leader喂狗消息,否则认为leader挂了,index++
if atomic.LoadUint32(&b.feedDog) == 0 {
nodes, leader, _, off, _ := b.getLeaderInfo()
nodes, leader, base, off, _, _ := b.getLeaderInfo()
if len(nodes) <= 0 {
continue
}
atomic.StoreInt32(&b.leaderOffset, (off+1)%int32(len(nodes)))
plog.Info("procLeaderSync watchdog", "fail node", nodes[leader], "newOffset", atomic.LoadInt32(&b.leaderOffset))
newOff := (off + 1) % int32(len(nodes))
atomic.StoreInt32(&b.leaderOffset, newOff)
plog.Warn("bls.event.procLeaderSync watchdog", "failLeader", leader, "newLeader", nodes[newOff],
"base", base, "off", off, "newleader", newOff)
//leader切换,重新发送commit msg
b.paraClient.commitMsgClient.resetNotify()
}
atomic.StoreUint32(&b.feedDog, 0)
......@@ -124,8 +163,8 @@ out:
p2pTimer = time.After(delaySubP2pTopic * time.Second)
continue
}
feedDogTicker = time.NewTicker(leaderSyncInt * time.Second).C
watchDogTicker = time.NewTicker(time.Minute).C
heardTicker = time.NewTicker(time.Second * time.Duration(b.leaderHeardTickInt)).C
watchLeaderTicker = time.NewTicker(time.Second * time.Duration(b.watchLeaderSyncInt)).C
case <-b.quit:
break out
}
......@@ -134,14 +173,15 @@ out:
//处理leader sync tx, 需接受同步的数据,两个节点基本的共识高度相同, 两个共同leader需相同
func (b *blsClient) rcvLeaderSyncTx(sync *pt.LeaderSyncInfo) error {
nodes, _, base, off, isLeader := b.getLeaderInfo()
nodes, _, base, off, isLeader, _ := b.getLeaderInfo()
if len(nodes) <= 0 {
return errors.Wrapf(pt.ErrParaNodeGroupNotSet, "id=%s", b.selfID)
}
syncLeader := (sync.BaseIdx + sync.Offset) % int32(len(nodes))
plog.Info("bls.event.rcvLeaderSyncTx", "from.leader", sync.ID, "self", b.selfID,
"fromBase", sync.BaseIdx, "selfBase", base, "from.Off", sync.Offset, "selfOff", off, "count", sync.Count)
//接受同步数据需要两个节点基本的共识高度相同, 两个共同leader需相同
if sync.BaseIdx != base || nodes[syncLeader] != sync.ID {
return errors.Wrapf(types.ErrNotSync, "peer base=%d,id=%s,self.Base=%d,id=%s", sync.BaseIdx, sync.ID, base, nodes[syncLeader])
if sync.BaseIdx != base {
return errors.Wrapf(types.ErrNotSync, "leaderSync.base diff,peer=%s, base=%d,self.Base=%d,self=%s", sync.ID, sync.BaseIdx, base, b.selfID)
}
//如果leader节点冲突,取大者
if isLeader && off > sync.Offset {
......@@ -155,21 +195,21 @@ func (b *blsClient) rcvLeaderSyncTx(sync *pt.LeaderSyncInfo) error {
return nil
}
func (b *blsClient) getLeaderInfo() ([]string, int32, int32, int32, bool) {
func (b *blsClient) getLeaderInfo() ([]string, string, int32, int32, bool, int64) {
//在未同步前 不处理聚合消息
if !b.paraClient.commitMsgClient.isSync() {
return nil, 0, 0, 0, false
return nil, "", 0, 0, false, 0
}
nodes, _ := b.getSuperNodes()
if len(nodes) <= 0 {
return nil, 0, 0, 0, false
return nil, "", 0, 0, false, 0
}
h := b.paraClient.commitMsgClient.getConsensusHeight()
consensHeight := b.paraClient.commitMsgClient.getConsensusHeight()
//间隔的除数再根据nodes取余数,平均覆盖所有节点
baseIdx := int32((h / int64(b.leaderSwitchInt)) % int64(len(nodes)))
baseIdx := int32((consensHeight / int64(b.leaderSwitchInt)) % int64(len(nodes)))
offIdx := atomic.LoadInt32(&b.leaderOffset)
leaderIdx := (baseIdx + offIdx) % int32(len(nodes))
return nodes, leaderIdx, baseIdx, offIdx, nodes[leaderIdx] == b.selfID
return nodes, nodes[leaderIdx], baseIdx, offIdx, nodes[leaderIdx] == b.selfID, consensHeight
}
func (b *blsClient) getSuperGroupNodes() ([]string, string) {
......@@ -228,6 +268,14 @@ func (b *blsClient) isValidNodes(id string) bool {
return strings.Contains(nodes, id)
}
func (b *blsClient) clearDonePool(consensHeight int64) {
for h, _ := range b.commitsPool {
if h <= consensHeight {
delete(b.commitsPool, h)
}
}
}
//1. 要等到达成共识了才发送,不然处理未达成共识的各种场景会比较复杂,而且浪费手续费
func (b *blsClient) procAggregateTxs() {
defer b.paraClient.wg.Done()
......@@ -239,24 +287,25 @@ out:
for {
select {
case commits := <-b.rcvCommitTxCh:
b.mutex.Lock()
integrateCommits(b.commitsPool, commits)
nodes, leader, _, _, isLeader, consensHeight := b.getLeaderInfo()
//清空已共识过的高度
b.clearDonePool(consensHeight)
//支持可配的只部分nodegroup地址参与聚合,另一部分直接发送
calcNodes := len(nodes)
if b.partNodeGroup > 0 && int(b.partNodeGroup) < calcNodes {
calcNodes = int(b.partNodeGroup)
}
//commitsPool里面任一高度满足共识,则认为done
nodes, _ := b.getSuperNodes()
if !isMostCommitDone(len(nodes), b.commitsPool) {
b.mutex.Unlock()
if !isMostCommitDone(calcNodes, b.commitsPool, isLeader, leader) {
continue
}
//自己是Leader,则聚合并发送交易
_, _, _, _, isLeader := b.getLeaderInfo()
if isLeader {
//自己是Leader,或共识高度超过阈值则聚合并发送交易
if isLeader || int32(math.Abs(float64(b.paraClient.commitMsgClient.chainHeight-consensHeight))) > b.consensHeightDiffThreshold {
_ = b.sendAggregateTx(nodes)
}
//聚合签名总共消耗大约1.5ms
//清空txsBuff,重新收集
b.commitsPool = make(map[int64]*pt.ParaBlsSignSumDetails)
b.mutex.Unlock()
case <-b.quit:
break out
......@@ -266,7 +315,6 @@ out:
func (b *blsClient) sendAggregateTx(nodes []string) error {
dones := filterDoneCommits(len(nodes), b.commitsPool)
plog.Info("sendAggregateTx filterDone", "commits", len(dones))
if len(dones) <= 0 {
return nil
}
......@@ -302,7 +350,7 @@ func (b *blsClient) rcvCommitTx(tx *types.Transaction) error {
}
if len(commits) > 0 {
plog.Debug("rcvCommitTx tx", "addr", tx.From(), "height", commits[0].Status.Height)
plog.Debug("bls.event.rcvCommitTx tx", "addr", tx.From(), "height", commits[0].Status.Height, "end", commits[len(commits)-1].Status.Height)
}
b.rcvCommitTxCh <- commits
......@@ -370,17 +418,18 @@ func integrateCommits(pool map[int64]*pt.ParaBlsSignSumDetails, commits []*pt.Pa
}
//txBuff中任一高度满足done则认为ok,有可能某些未达成的高度是冗余的,达成共识的高度发给链最终判决
func isMostCommitDone(peers int, txsBuff map[int64]*pt.ParaBlsSignSumDetails) bool {
if peers <= 0 {
func isMostCommitDone(nodes int, txsBuff map[int64]*pt.ParaBlsSignSumDetails, isLeader bool, leader string) bool {
if nodes <= 0 {
return false
}
for i, v := range txsBuff {
most, _ := util.GetMostCommit(v.Msgs)
if util.IsCommitDone(peers, most) {
plog.Info("blssign.isMostCommitDone", "height", i, "most", most, "peers", peers)
if util.IsCommitDone(nodes, most) {
plog.Info("bls.event.integrateCommits.mostCommitDone", "height", i, "peers", nodes, "isleader", isLeader, "leader", leader, "addrs", strings.Join(v.Addrs, ","))
return true
}
plog.Debug("bls.event.integrateCommits.isMostCommitDone.NOT", "height", i, "most", most, "nodes", nodes, "isleader", isLeader, "leader", leader, "addrs", strings.Join(v.Addrs, ","))
}
return false
}
......@@ -391,16 +440,16 @@ func filterDoneCommits(peers int, pool map[int64]*pt.ParaBlsSignSumDetails) []*p
for i, v := range pool {
most, hash := util.GetMostCommit(v.Msgs)
if !util.IsCommitDone(peers, most) {
plog.Debug("blssign.filterDoneCommits not commit done", "height", i)
continue
}
seq = append(seq, i)
//只保留与most相同的commits做聚合签名使用
a := &pt.ParaBlsSignSumDetails{Height: i, Msgs: [][]byte{[]byte(hash)}}
a := &pt.ParaBlsSignSumDetails{Height: i}
for j, m := range v.Msgs {
if bytes.Equal([]byte(hash), m) {
a.Addrs = append(a.Addrs, v.Addrs[j])
a.Msgs = append(a.Msgs, []byte(hash))
a.Signs = append(a.Signs, v.Signs[j])
}
}
......@@ -442,7 +491,6 @@ func (b *blsClient) aggregateCommit2Action(nodes []string, commits []*pt.ParaBls
}
a.Bls.Sign = sign.Bytes()
bits, remains := util.SetAddrsBitMap(nodes, v.Addrs)
plog.Debug("AggregateCommit2Action", "nodes", nodes, "addr", v.Addrs, "bits", common.ToHex(bits), "height", v.Height)
if len(remains) > 0 {
plog.Info("bls.signDoneCommits", "remains", remains)
}
......@@ -514,7 +562,7 @@ func (b *blsClient) blsSign(commits []*pt.ParacrossCommitAction) error {
return errors.Wrapf(types.ErrInvalidParam, "addr=%s,height=%d", b.selfID, cmt.Status.Height)
}
cmt.Bls.Sign = sign
plog.Info("bls sign msg", "data", common.ToHex(data), "height", cmt.Status.Height, "sign", len(cmt.Bls.Sign), "src", len(sign))
plog.Debug("bls sign msg", "data", common.ToHex(data), "height", cmt.Status.Height, "sign", len(cmt.Bls.Sign), "src", len(sign))
}
return nil
}
......@@ -585,9 +633,6 @@ func (b *blsClient) verifyBlsSign(addr string, commit *pt.ParacrossCommitAction)
}
func (b *blsClient) showTxBuffInfo() *pt.ParaBlsSignSumInfo {
b.mutex.Lock()
defer b.mutex.Unlock()
var ret pt.ParaBlsSignSumInfo
reply, err := b.paraClient.SendFetchP2PTopic()
......
......@@ -3,7 +3,9 @@
#1. 订阅P2P topic
1. 以PARA-BLS-SIGN-TOPIC为topic在P2P订阅,平行链内部节点间通过p2p广播同步消息,比如这里bls签名交易和leader同步消息
1. 以PARA-BLS-SIGN-TOPIC为topic在P2P订阅,平行链内部节点间通过p2p DHT机制广播同步消息,
1. 消息有两类,一类是leader心跳和监听消息,leader广播心跳消息,候选节点监听心跳,如果超时没收到,则自动选择下一个节点为leader
1. 另一类是bls签名共识交易,广播出来,由leader节点聚合发送
#2. 协商leader
1. 考虑到leader轮换发送共识交易,每隔一定共识高度比如100就会轮换下一个节点为leader发送交易,当前共识高度/100后对nodegroup 地址取余base值就是当前leader地址
......@@ -15,6 +17,8 @@
#3. 发送聚合共识交易
1. 共识交易P2P广播给所有订阅的节点,leader节点负责聚合后上链,如果收集的签名交易不超过2/3节点,则不发送上链交易,聚合交易最终在主链达成共识
1. 节点广播共识交易后,超过一定时间共识高度没增长,重新发送共识交易
1. leader节点长时间未发送成功共识交易,共识高度超过和链高度阈值,任一达成共识的节点都可发送
1. 用户可手工设置达成共识的节点数,可以少于2/3个,其他节点可以选择非聚合签名,直接发送
#4. BLS聚合签名算法
......
......@@ -6,6 +6,7 @@ package para
import (
"context"
"encoding/hex"
"time"
"strings"
......@@ -191,10 +192,9 @@ func (client *commitMsgClient) createCommitTx() {
if tx == nil {
return
}
//bls sign, send to p2p
if client.paraClient.subCfg.BlsSign {
//send to p2p pubsub
plog.Info("para commitMs send to p2p", "hash", common.ToHex(tx.Hash()))
//如果配置了blsSign 则发送到p2p的leader节点来聚合发送,否则发送到主链
if client.paraClient.subCfg.Bls.BlsSign {
plog.Debug("bls.event.para bls commitMs send to p2p", "hash", common.ToHex(tx.Hash()))
act := &pt.ParaP2PSubMsg{Ty: P2pSubCommitTx, Value: &pt.ParaP2PSubMsg_CommitTx{CommitTx: tx}}
client.paraClient.SendPubP2PMsg(paraBlsSignTopic, types.Encode(act))
return
......@@ -255,16 +255,24 @@ func (client *commitMsgClient) pushCommitTx(signTx *types.Transaction) {
client.sendMsgCh <- signTx
}
//根据收集的commit action,签名发送, 比如BLS签名后的commit msg
func (client *commitMsgClient) sendCommitActions(acts []*pt.ParacrossCommitAction) {
//如果当前正在发送交易,则取消此次发送,待发送被确认或取消后再触发. 考虑到已经聚合共识成功,又收到某节点消息场景,会多发送交易
curTx := client.getCurrentTx()
if curTx != nil {
plog.Info("paracommitmsg isSendingCommitMsg, cancel this operation", "sending.tx", common.ToHex(curTx.Hash()))
return
}
txs, _, err := client.createCommitMsgTxs(acts)
if err != nil {
return
}
plog.Debug("paracommitmsg sendCommitActions", "txhash", common.ToHex(txs.Hash()))
plog.Info("paracommitmsg sendCommitActions", "txhash", common.ToHex(txs.Hash()))
for i, msg := range acts {
plog.Debug("paracommitmsg sendCommitActions", "idx", i, "height", msg.Status.Height, "mainheight", msg.Status.MainBlockHeight,
"blockhash", common.HashHex(msg.Status.BlockHash), "mainHash", common.HashHex(msg.Status.MainBlockHash),
"addrsmap", common.ToHex(msg.Bls.AddrsMap), "sign", common.ToHex(msg.Bls.Sign))
"addrsmap", hex.EncodeToString(msg.Bls.AddrsMap), "sign", common.ToHex(msg.Bls.Sign))
}
client.pushCommitTx(txs)
}
......@@ -428,7 +436,7 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type
commits = append(commits, &pt.ParacrossCommitAction{Status: stat})
}
if client.paraClient.subCfg.BlsSign {
if client.paraClient.subCfg.Bls.BlsSign {
err = client.paraClient.blsSignCli.blsSign(commits)
if err != nil {
plog.Error("paracommitmsg bls sign", "err", err)
......
......@@ -60,8 +60,8 @@ func (client *client) Query_LeaderInfo(req *types.ReqNil) (types.Message, error)
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
}
nodes, leader, base, off, isLeader := client.blsSignCli.getLeaderInfo()
return &pt.ElectionStatus{IsLeader: isLeader, Leader: &pt.LeaderSyncInfo{ID: nodes[leader], BaseIdx: base, Offset: off}}, nil
_, leader, base, off, isLeader, _ := client.blsSignCli.getLeaderInfo()
return &pt.ElectionStatus{IsLeader: isLeader, Leader: &pt.LeaderSyncInfo{ID: leader, BaseIdx: base, Offset: off}}, nil
}
func (client *client) Query_CommitTxInfo(req *types.ReqNil) (types.Message, error) {
......
#!/usr/bin/env bash
# shellcheck disable=SC2128
set -x
set +x
PARA_CLI="docker exec ${NODE3} /root/chain33-cli --paraName user.p.para. --rpc_laddr http://localhost:8901"
......@@ -51,14 +51,12 @@ function para_init() {
sed -i $xsedfix 's/^authAccount=.*/authAccount="1NLHPEcbTWWxxU3dGUZBhayjrCHD3psX7k"/g' chain33.para31.toml
sed -i $xsedfix 's/^authAccount=.*/authAccount="1MCftFynyvG2F4ED5mdHYgziDxx6vDrScs"/g' chain33.para30.toml
para_set_toml chain33.para29.toml "$PARANAME_GAME" "$1"
# 一个节点不配置 blsSign
para_set_toml chain33.para29.toml "$PARANAME_GAME"
sed -i $xsedfix 's/^authAccount=.*/authAccount="1KSBd17H7ZK8iT37aJztFB22XGwsPTdwE4"/g' chain33.para29.toml
# 监督节点
para_set_toml chain33.para28.toml "$PARANAME" "$1"
para_set_toml chain33.para27.toml "$PARANAME" "$1"
para_set_toml chain33.para26.toml "$PARANAME" "$1"
para_set_toml chain33.para25.toml "$PARANAME" "$1"
para_set_toml chain33.para28.toml "$PARANAME"
sed -i $xsedfix 's/^authAccount=.*/authAccount="'"$ADDR_28"'"/g' chain33.para28.toml # 0x3a35610ba6e1e72d7878f4c819e6a6768668cb5481f423ef04b6a11e0e16e44f
}
......@@ -81,8 +79,7 @@ function para_set_toml() {
if [ -n "$3" ]; then
echo "${1} blssign=$3"
sed -i $xsedfix '/types=\["dht"\]/!b;n;cenable=true' "${1}"
sed -i $xsedfix '/emptyBlockInterval=/!b;n;cblsSign=true' "${1}"
sed -i $xsedfix '/blsSign=/!b;n;cblsLeaderSwitchIntval=10' "${1}"
sed -i $xsedfix 's/^blsSign=.*/blsSign=true/g' "${1}"
fi
......@@ -518,7 +515,7 @@ function para_cross_transfer_withdraw_for_token() {
echo "=========== # 3.asset_withdraw from parachain ============="
${CLI} send para asset_withdraw --paraName user.p.para. -a 111 -s FZM -n test -t 12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv -k 12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv
local times=100
local times=200
while true; do
acc=$(${CLI} asset balance -a 12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv --asset_symbol FZM --asset_exec token -e paracross | jq -r ".balance")
acc_para=$(${PARA_CLI} asset balance -a 12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv --asset_symbol token.FZM --asset_exec paracross -e paracross | jq -r ".balance")
......@@ -528,7 +525,7 @@ function para_cross_transfer_withdraw_for_token() {
times=$((times - 1))
if [ $times -le 0 ]; then
echo "para_cross_transfer_withdraw failed"
# exit 1
exit 1
fi
else
echo "para_cross_transfer_withdraw success"
......@@ -1253,7 +1250,6 @@ function para_test() {
}
function paracross() {
set -x
if [ "${2}" == "init" ]; then
para_init "${3}"
elif [ "${2}" == "config" ]; then
......
......@@ -495,7 +495,7 @@ message LeaderSyncInfo {
string ID = 1; //self id
int32 baseIdx = 2; //calculated by corrent consensus height and remainder by len(nodes)
int32 offset = 3;
uint32 count = 4; //发送计数器
}
message ParaP2PSubMsg {
......
......@@ -4763,6 +4763,7 @@ type LeaderSyncInfo struct {
ID string `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"` //self id
BaseIdx int32 `protobuf:"varint,2,opt,name=baseIdx,proto3" json:"baseIdx,omitempty"` //calculated by corrent consensus height and remainder by len(nodes)
Offset int32 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"`
Count uint32 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"` //发送计数器
}
func (x *LeaderSyncInfo) Reset() {
......@@ -4818,6 +4819,13 @@ func (x *LeaderSyncInfo) GetOffset() int32 {
return 0
}
func (x *LeaderSyncInfo) GetCount() uint32 {
if x != nil {
return x.Count
}
return 0
}
type ParaP2PSubMsg struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
......@@ -5680,34 +5688,35 @@ var file_paracross_proto_rawDesc = []byte{
0x70, 0x65, 0x73, 0x2e, 0x50, 0x61, 0x72, 0x61, 0x42, 0x6c, 0x73, 0x53, 0x69, 0x67, 0x6e, 0x53,
0x75, 0x6d, 0x44, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x53, 0x68, 0x6f, 0x77, 0x52, 0x04, 0x69,
0x6e, 0x66, 0x6f, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20,
0x03, 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x22, 0x52, 0x0a, 0x0e, 0x4c,
0x03, 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x22, 0x68, 0x0a, 0x0e, 0x4c,
0x65, 0x61, 0x64, 0x65, 0x72, 0x53, 0x79, 0x6e, 0x63, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x0e, 0x0a,
0x02, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x44, 0x12, 0x18, 0x0a,
0x07, 0x62, 0x61, 0x73, 0x65, 0x49, 0x64, 0x78, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07,
0x62, 0x61, 0x73, 0x65, 0x49, 0x64, 0x78, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65,
0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x22,
0x8d, 0x01, 0x0a, 0x0d, 0x50, 0x61, 0x72, 0x61, 0x50, 0x32, 0x50, 0x53, 0x75, 0x62, 0x4d, 0x73,
0x67, 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x74,
0x79, 0x12, 0x30, 0x0a, 0x08, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x54, 0x78, 0x18, 0x0a, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x54, 0x72, 0x61, 0x6e,
0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52, 0x08, 0x63, 0x6f, 0x6d, 0x6d, 0x69,
0x74, 0x54, 0x78, 0x12, 0x31, 0x0a, 0x07, 0x73, 0x79, 0x6e, 0x63, 0x4d, 0x73, 0x67, 0x18, 0x0b,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x4c, 0x65, 0x61,
0x64, 0x65, 0x72, 0x53, 0x79, 0x6e, 0x63, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x07, 0x73,
0x79, 0x6e, 0x63, 0x4d, 0x73, 0x67, 0x42, 0x07, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22,
0x5b, 0x0a, 0x0e, 0x45, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75,
0x73, 0x12, 0x1a, 0x0a, 0x08, 0x69, 0x73, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20,
0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x2d, 0x0a,
0x06, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e,
0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x53, 0x79, 0x6e, 0x63,
0x49, 0x6e, 0x66, 0x6f, 0x52, 0x06, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x22, 0x1d, 0x0a, 0x09,
0x42, 0x6c, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x32, 0x39, 0x0a, 0x09, 0x70,
0x61, 0x72, 0x61, 0x63, 0x72, 0x6f, 0x73, 0x73, 0x12, 0x2c, 0x0a, 0x06, 0x49, 0x73, 0x53, 0x79,
0x6e, 0x63, 0x12, 0x0d, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x4e, 0x69,
0x6c, 0x1a, 0x11, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x49, 0x73, 0x43, 0x61, 0x75, 0x67,
0x68, 0x74, 0x55, 0x70, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2e, 0x2f, 0x74, 0x79, 0x70,
0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12,
0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05,
0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x8d, 0x01, 0x0a, 0x0d, 0x50, 0x61, 0x72, 0x61, 0x50, 0x32,
0x50, 0x53, 0x75, 0x62, 0x4d, 0x73, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x79, 0x18, 0x01, 0x20,
0x01, 0x28, 0x05, 0x52, 0x02, 0x74, 0x79, 0x12, 0x30, 0x0a, 0x08, 0x63, 0x6f, 0x6d, 0x6d, 0x69,
0x74, 0x54, 0x78, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x74, 0x79, 0x70, 0x65,
0x73, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52,
0x08, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x54, 0x78, 0x12, 0x31, 0x0a, 0x07, 0x73, 0x79, 0x6e,
0x63, 0x4d, 0x73, 0x67, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x74, 0x79, 0x70,
0x65, 0x73, 0x2e, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x53, 0x79, 0x6e, 0x63, 0x49, 0x6e, 0x66,
0x6f, 0x48, 0x00, 0x52, 0x07, 0x73, 0x79, 0x6e, 0x63, 0x4d, 0x73, 0x67, 0x42, 0x07, 0x0a, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x5b, 0x0a, 0x0e, 0x45, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f,
0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x69, 0x73, 0x4c, 0x65, 0x61,
0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x4c, 0x65, 0x61,
0x64, 0x65, 0x72, 0x12, 0x2d, 0x0a, 0x06, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x4c, 0x65, 0x61, 0x64,
0x65, 0x72, 0x53, 0x79, 0x6e, 0x63, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x06, 0x6c, 0x65, 0x61, 0x64,
0x65, 0x72, 0x22, 0x1d, 0x0a, 0x09, 0x42, 0x6c, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12,
0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65,
0x79, 0x32, 0x39, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x61, 0x63, 0x72, 0x6f, 0x73, 0x73, 0x12, 0x2c,
0x0a, 0x06, 0x49, 0x73, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x0d, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73,
0x2e, 0x52, 0x65, 0x71, 0x4e, 0x69, 0x6c, 0x1a, 0x11, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e,
0x49, 0x73, 0x43, 0x61, 0x75, 0x67, 0x68, 0x74, 0x55, 0x70, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08,
0x2e, 0x2e, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment