Commit 66d7594d authored by madengji's avatar madengji Committed by 33cn

correct collect addr error

parent 5ea68097
......@@ -93,7 +93,7 @@ type subConfig struct {
MultiDownServerRspTime uint32 `json:"multiDownServerRspTime,omitempty"`
RmCommitParamMainHeight int64 `json:"rmCommitParamMainHeight,omitempty"`
JumpDownloadClose bool `json:"jumpDownloadClose,omitempty"`
BlsSignOff bool `json:"blsSignOff,omitempty"`
BlsSign bool `json:"blsSign,omitempty"`
BlsLeaderSwitchInt int32 `json:"blsLeaderSwitchInt,omitempty"`
}
......
......@@ -24,8 +24,9 @@ const (
maxRcvTxCount = 100 //channel buffer, max 100 nodes, 1 height tx or 1 txgroup per node
leaderSyncInt = 15 //15s heartbeat sync interval
defLeaderSwitchInt = 100 //每隔100个共识高度切换一次leader,大约6小时(按50个空块间隔计算)
delaySubP2pTopic = 30 //30s to sub p2p topic
defaultParaBlsSignTopic = "PARA-BLS-SIGN-TOPIC"
paraBlsSignTopic = "PARA-BLS-SIGN-TOPIC"
)
//新增需要保证顺序
......@@ -82,7 +83,7 @@ func (b *blsClient) procLeaderSync() {
var feedDogTicker <-chan time.Time
var watchDogTicker <-chan time.Time
p2pTimer := time.After(time.Minute)
p2pTimer := time.After(delaySubP2pTopic * time.Second)
out:
for {
select {
......@@ -92,7 +93,7 @@ out:
if isLeader {
act := &pt.ParaP2PSubMsg{Ty: P2pSubLeaderSyncMsg}
act.Value = &pt.ParaP2PSubMsg_SyncMsg{SyncMsg: &pt.LeaderSyncInfo{ID: b.selfID, BaseIdx: base, Offset: off}}
err := b.paraClient.SendPubP2PMsg(defaultParaBlsSignTopic, types.Encode(act))
err := b.paraClient.SendPubP2PMsg(paraBlsSignTopic, types.Encode(act))
if err != nil {
plog.Error("para.procLeaderSync feed dog", "err", err)
}
......@@ -112,10 +113,10 @@ out:
atomic.StoreUint32(&b.feedDog, 0)
case <-p2pTimer:
err := b.paraClient.SendSubP2PTopic(defaultParaBlsSignTopic)
err := b.paraClient.SendSubP2PTopic(paraBlsSignTopic)
if err != nil {
plog.Error("procLeaderSync.SubP2PTopic", "err", err)
p2pTimer = time.After(time.Minute)
p2pTimer = time.After(delaySubP2pTopic * time.Second)
continue
}
feedDogTicker = time.NewTicker(leaderSyncInt * time.Second).C
......@@ -201,7 +202,7 @@ out:
if isLeader {
b.sendAggregateTx(nodes)
}
//聚合签名总共消耗大约1.5ms
//清空txsBuff,重新收集
b.commitsPool = make(map[int64]*pt.ParaBlsSignSumDetails)
b.mutex.Unlock()
......@@ -218,7 +219,7 @@ func (b *blsClient) sendAggregateTx(nodes []string) error {
if len(dones) <= 0 {
return nil
}
acts, err := b.AggregateCommit2Action(nodes, dones)
acts, err := aggregateCommit2Action(nodes, dones)
if err != nil {
plog.Error("sendAggregateTx AggregateCommit2Action", "err", err)
return err
......@@ -273,7 +274,7 @@ func (b *blsClient) checkCommitTx(txs []*types.Transaction) ([]*pt.ParacrossComm
if tx.From() != commit.Bls.Addrs[0] {
return nil, errors.Wrapf(types.ErrFromAddr, "from=%s,bls addr=%s", tx.From(), commit.Bls.Addrs[0])
}
//验证bls 签名
//验证bls 签名,大概40ms, 是secp 80倍
err = b.verifyBlsSign(tx.From(), commit)
if err != nil {
return nil, errors.Wrapf(pt.ErrBlsSignVerify, "from=%s", tx.From())
......@@ -284,6 +285,15 @@ func (b *blsClient) checkCommitTx(txs []*types.Transaction) ([]*pt.ParacrossComm
return commits, nil
}
func hasCommited(addrs []string, addr string) (bool, int) {
for i, a := range addrs {
if a == addr {
return true, i
}
}
return false, 0
}
//整合相同高度commits
func integrateCommits(pool map[int64]*pt.ParaBlsSignSumDetails, commits []*pt.ParacrossCommitAction) {
for _, cmt := range commits {
......@@ -291,14 +301,13 @@ func integrateCommits(pool map[int64]*pt.ParaBlsSignSumDetails, commits []*pt.Pa
pool[cmt.Status.Height] = &pt.ParaBlsSignSumDetails{Height: cmt.Status.Height}
}
a := pool[cmt.Status.Height]
for i, v := range a.Addrs {
//节点更新交易参数的场景
if v == cmt.Bls.Addrs[0] {
found, i := hasCommited(a.Addrs, cmt.Bls.Addrs[0])
if found {
a.Msgs[i] = types.Encode(cmt.Status)
a.Signs[i] = cmt.Bls.Sign
continue
}
}
a.Addrs = append(a.Addrs, cmt.Bls.Addrs[0])
a.Msgs = append(a.Msgs, types.Encode(cmt.Status))
a.Signs = append(a.Signs, cmt.Bls.Sign)
......@@ -322,26 +331,25 @@ func isMostCommitDone(peers int, txsBuff map[int64]*pt.ParaBlsSignSumDetails) bo
}
//找出共识并达到2/3的commits, 并去除与共识不同的commits,为后面聚合签名做准备
func filterDoneCommits(peers int, txs map[int64]*pt.ParaBlsSignSumDetails) []*pt.ParaBlsSignSumDetails {
func filterDoneCommits(peers int, pool map[int64]*pt.ParaBlsSignSumDetails) []*pt.ParaBlsSignSumDetails {
var seq []int64
for i, v := range txs {
for i, v := range pool {
most, hash := getMostCommit(v.Msgs)
if !isCommitDone(peers, most) {
plog.Debug("blssign.filterDoneCommits not commit done", "height", i)
delete(txs, i)
continue
}
seq = append(seq, i)
//只保留与most相同的commits做聚合签名使用
a := &pt.ParaBlsSignSumDetails{Msgs: [][]byte{[]byte(hash)}}
a := &pt.ParaBlsSignSumDetails{Height: i, Msgs: [][]byte{[]byte(hash)}}
for j, m := range v.Msgs {
if bytes.Equal([]byte(hash), m) {
a.Addrs = append(a.Addrs, v.Addrs[j])
a.Signs = append(a.Signs, v.Signs[j])
}
}
txs[i] = a
pool[i] = a
}
if len(seq) <= 0 {
......@@ -350,7 +358,6 @@ func filterDoneCommits(peers int, txs map[int64]*pt.ParaBlsSignSumDetails) []*pt
//从低到高找出连续的commits
sort.Slice(seq, func(i, j int) bool { return seq[i] < seq[j] })
plog.Debug("blssign.filterDoneCommits", "seq", seq)
var signs []*pt.ParaBlsSignSumDetails
//共识高度要连续,不连续则退出
lastSeq := seq[0] - 1
......@@ -358,7 +365,7 @@ func filterDoneCommits(peers int, txs map[int64]*pt.ParaBlsSignSumDetails) []*pt
if lastSeq+1 != h {
return signs
}
signs = append(signs, txs[h])
signs = append(signs, pool[h])
lastSeq = h
}
return signs
......@@ -366,7 +373,7 @@ func filterDoneCommits(peers int, txs map[int64]*pt.ParaBlsSignSumDetails) []*pt
}
//聚合多个签名为一个签名,并设置地址bitmap
func (b *blsClient) AggregateCommit2Action(nodes []string, commits []*pt.ParaBlsSignSumDetails) ([]*pt.ParacrossCommitAction, error) {
func aggregateCommit2Action(nodes []string, commits []*pt.ParaBlsSignSumDetails) ([]*pt.ParacrossCommitAction, error) {
var notify []*pt.ParacrossCommitAction
for _, v := range commits {
......@@ -375,13 +382,14 @@ func (b *blsClient) AggregateCommit2Action(nodes []string, commits []*pt.ParaBls
types.Decode(v.Msgs[0], s)
a.Status = s
sign, err := b.aggregateSigns(v.Signs)
sign, err := aggregateSigns(v.Signs)
if err != nil {
return nil, errors.Wrapf(err, "bls aggreate=%s", v.Addrs)
return nil, errors.Wrapf(err, "bls aggregate=%s", v.Addrs)
}
signData := sign.Serialize()
a.Bls.Sign = append(a.Bls.Sign, signData[:]...)
bits, remains := setAddrsBitMap(nodes, v.Addrs)
plog.Debug("AggregateCommit2Action", "nodes", nodes, "addr", v.Addrs, "bits", common.ToHex(bits), "height", v.Height)
if len(remains) > 0 {
plog.Info("bls.signDoneCommits", "remains", remains)
}
......@@ -391,7 +399,7 @@ func (b *blsClient) AggregateCommit2Action(nodes []string, commits []*pt.ParaBls
return notify, nil
}
func (b *blsClient) aggregateSigns(signs [][]byte) (*g2pubs.Signature, error) {
func aggregateSigns(signs [][]byte) (*g2pubs.Signature, error) {
var signatures []*g2pubs.Signature
for _, data := range signs {
var s [48]byte
......
......@@ -188,11 +188,11 @@ func (client *commitMsgClient) createCommitTx() {
return
}
//bls sign, send to p2p
if !client.paraClient.subCfg.BlsSignOff {
if client.paraClient.subCfg.BlsSign {
//send to p2p pubsub
plog.Info("para commitMs send to p2p", "hash", common.ToHex(tx.Hash()))
act := &pt.ParaP2PSubMsg{Ty: P2pSubCommitTx, Value: &pt.ParaP2PSubMsg_CommitTx{CommitTx: tx}}
client.paraClient.SendPubP2PMsg(defaultParaBlsSignTopic, types.Encode(act))
client.paraClient.SendPubP2PMsg(paraBlsSignTopic, types.Encode(act))
return
}
client.pushCommitTx(tx)
......@@ -314,15 +314,7 @@ func (client *commitMsgClient) reSendCommitTx(tx *types.Transaction) bool {
if client.checkTxCommitTimes < client.waitMainBlocks {
return false
}
client.checkTxCommitTimes = 0
//bls聚合签名场景,发送未成功上链,继续发送交易
if !client.paraClient.subCfg.BlsSignOff {
//resend tx
client.sendMsgCh <- tx
return false
}
//非聚合签名场景,发送未成功,触发重新构建交易发送
client.resetSendEnv()
return true
}
......@@ -429,7 +421,7 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type
commits = append(commits, &pt.ParacrossCommitAction{Status: stat})
}
if !client.paraClient.subCfg.BlsSignOff {
if client.paraClient.subCfg.BlsSign {
err = client.paraClient.blsSignCli.blsSign(commits)
if err != nil {
plog.Error("paracommitmsg bls sign", "err", err)
......
......@@ -7,8 +7,6 @@ package para
import (
"testing"
"encoding/hex"
"github.com/33cn/chain33/queue"
_ "github.com/33cn/chain33/system"
drivers "github.com/33cn/chain33/system/consensus"
......@@ -73,10 +71,3 @@ func TestSetSelfConsEnable(t *testing.T) {
assert.Equal(t, ep1, para.commitMsgClient.selfConsEnableList)
}
func TestSetAddrsBitMap(t *testing.T) {
nodes := []string{"1KSBd17H7ZK8iT37aJztFB22XGwsPTdwE4", "1JRNjdEqp4LJ5fqycUBm9ayCKSeeskgMKR", "1NLHPEcbTWWxxU3dGUZBhayjrCHD3psX7k", "1MCftFynyvG2F4ED5mdHYgziDxx6vDrScs"}
addrs := []string{"1KSBd17H7ZK8iT37aJztFB22XGwsPTdwE4"}
val, remain := setAddrsBitMap(nodes, addrs)
t.Log("val", hex.EncodeToString(val), "remain", remain)
}
This diff is collapsed.
......@@ -9,7 +9,6 @@ import (
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
"github.com/pkg/errors"
)
//Exec_Commit consensus commit tx exec process
......@@ -18,7 +17,7 @@ func (e *Paracross) Exec_Commit(payload *pt.ParacrossCommitAction, tx *types.Tra
receipt, err := a.Commit(payload)
if err != nil {
clog.Error("Paracross commit failed", "error", err, "hash", hex.EncodeToString(tx.Hash()))
return nil, errors.Cause(err)
return nil, err
}
return receipt, nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment