Commit 98fd0905 authored by mdj33's avatar mdj33 Committed by 33cn

para self consenus from fork height

parent de56ac48
...@@ -50,8 +50,7 @@ var ( ...@@ -50,8 +50,7 @@ var (
minerPrivateKey = "6da92a632ab7deb67d38c0f6560bcfed28167998f6496db64c258d5e8393a81b" minerPrivateKey = "6da92a632ab7deb67d38c0f6560bcfed28167998f6496db64c258d5e8393a81b"
searchHashMatchDepth int32 = 100 searchHashMatchDepth int32 = 100
mainBlockHashForkHeight int64 = 209186 //calc block hash fork height in main chain mainBlockHashForkHeight int64 = 209186 //calc block hash fork height in main chain
mainParaSelfConsensusForkHeight int64 = types.MaxHeight //support paracross commit tx fork height in main chain mainParaSelfConsensusForkHeight int64 = types.MaxHeight //support paracross commit tx fork height in main chain: ForkParacrossCommitTx
curMainChainHeight int64 //当前实时的主链高度
) )
func init() { func init() {
...@@ -208,6 +207,9 @@ func (client *client) InitBlock() { ...@@ -208,6 +207,9 @@ func (client *client) InitBlock() {
client.SetCurrentBlock(block) client.SetCurrentBlock(block)
} }
plog.Debug("para consensus init parameter", "mainBlockHashForkHeight", mainBlockHashForkHeight)
plog.Debug("para consensus init parameter", "mainParaSelfConsensusForkHeight", mainParaSelfConsensusForkHeight)
} }
// GetStartSeq get startSeq in mainchain // GetStartSeq get startSeq in mainchain
...@@ -724,8 +726,6 @@ func (client *client) createBlock(lastBlock *types.Block, txs []*types.Transacti ...@@ -724,8 +726,6 @@ func (client *client) createBlock(lastBlock *types.Block, txs []*types.Transacti
newblock.MainHash = mainBlock.Seq.Hash newblock.MainHash = mainBlock.Seq.Hash
newblock.MainHeight = mainBlock.Detail.Block.Height newblock.MainHeight = mainBlock.Detail.Block.Height
curMainChainHeight = mainBlock.Detail.Block.Height
err = client.WriteBlock(lastBlock.StateHash, &newblock, seq) err = client.WriteBlock(lastBlock.StateHash, &newblock, seq)
plog.Debug("para create new Block", "newblock.ParentHash", common.ToHex(newblock.ParentHash), plog.Debug("para create new Block", "newblock.ParentHash", common.ToHex(newblock.ParentHash),
......
...@@ -41,6 +41,7 @@ type consensStatus struct { ...@@ -41,6 +41,7 @@ type consensStatus struct {
func (client *commitMsgClient) handler() { func (client *commitMsgClient) handler() {
var isSync bool var isSync bool
var isRollback bool
var notification []int64 //记录每次系统重启后 min and current height var notification []int64 //记录每次系统重启后 min and current height
var finishHeight int64 var finishHeight int64
var sendingHeight int64 //当前发送的最大高度 var sendingHeight int64 //当前发送的最大高度
...@@ -75,20 +76,30 @@ out: ...@@ -75,20 +76,30 @@ out:
} }
notification[1] = height notification[1] = height
if finishHeight >= notification[1] { if finishHeight >= notification[1] {
finishHeight = notification[1] - 1 //分叉场景,finish设置为最小值,等待主链共识高度重新设定finishHeight
finishHeight = notification[0] - 1
} }
} }
isRollback = false
case height := <-client.delMsgNotify: case height := <-client.delMsgNotify:
if len(notification) > 0 && height <= notification[1] { if notification == nil {
continue
}
if height <= notification[1] {
notification[1] = height - 1 notification[1] = height - 1
} }
if height <= sendingHeight && client.currentTx != nil { if height <= sendingHeight && client.currentTx != nil {
sendingMsgs = nil sendingMsgs = nil
client.currentTx = nil client.currentTx = nil
} }
//在分叉的主链上,有可能在del完全之前收到共识消息后sync又置为true,然后发送消息,不过影响不大,共识消息间隔比较长 //在分叉的主链上,回滚会连续回滚,回滚结束前不会add block,停止发送同时也忽略共识消息,回滚结束后根据共识高度重新设定finishHeight
// 如果分叉高度大于当前已完成高度,说明新的主链也收到了finish的tx,不需要重发,也就不需要重新设定
if height <= finishHeight {
finishHeight = notification[0] - 1
}
isSync = false isSync = false
isRollback = true
plog.Debug("para del block", "delHeight", height) plog.Debug("para del block", "delHeight", height)
case block := <-client.mainBlockAdd: case block := <-client.mainBlockAdd:
...@@ -134,11 +145,11 @@ out: ...@@ -134,11 +145,11 @@ out:
client.checkTxCommitTimes = 0 client.checkTxCommitTimes = 0
sendMsgCh <- client.currentTx sendMsgCh <- client.currentTx
txhash := common.ToHex(signTx.Hash()) plog.Info("paracommitmsg sending", "txhash", common.ToHex(signTx.Hash()), "exec", string(signTx.Execer))
for i, msg := range sendingMsgs { for i, msg := range sendingMsgs {
plog.Info("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight, plog.Info("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight,
"blockhash", common.HashHex(msg.BlockHash), "mainHash", common.HashHex(msg.MainBlockHash), "blockhash", common.HashHex(msg.BlockHash), "mainHash", common.HashHex(msg.MainBlockHash),
"txhash", txhash, "from", client.paraClient.authAccount) "from", client.paraClient.authAccount)
} }
} }
...@@ -151,44 +162,36 @@ out: ...@@ -151,44 +162,36 @@ out:
"mainHeigt", rsp.mainStatus.Height, "mainlockhash", common.ToHex(rsp.mainStatus.BlockHash), "mainHeigt", rsp.mainStatus.Height, "mainlockhash", common.ToHex(rsp.mainStatus.BlockHash),
"selfHeight", rsp.selfStatus.Height, "selfHash", common.ToHex(rsp.selfStatus.BlockHash), "sync", isSync) "selfHeight", rsp.selfStatus.Height, "selfHash", common.ToHex(rsp.selfStatus.BlockHash), "sync", isSync)
if notification == nil { if notification == nil || isRollback {
continue continue
} }
//所有节点还没有共识场景或新节点或重启节点catchingUp场景,要等到收到区块高度大于主链共识高度时候发送,在catchingup时候本身共识高度和块高度一起增长 //所有节点还没有共识场景或新节点或重启节点catchingUp场景,要等到收到区块高度大于主链共识高度时候发送,在catchingup时候本身共识高度和块高度一起增长
if selfConsensusHeight == -1 || (notification[1] > mainConsensHeight) { if notification[1] > mainConsensHeight {
isSync = true isSync = true
} }
//如果自共识高度在参与共识后小于主链共识高度,则本节点共识可能出现问题,停止发送
//未共识过的小于当前共识高度的区块,可以不参与共识, 如果是新节点,一直等到同步的区块达到了共识高度,才设置同步参与共识 if selfConsensusHeight < mainConsensHeight && selfConsensusHeight != -1 {
isSync = false
if finishHeight < selfConsensusHeight { continue
finishHeight = selfConsensusHeight
} }
// 自共识分叉高度切换场景, 分叉高度前平行链共识高度是-1,分叉高度后,需要重发tx平行链共识高度才能增长 //未共识过的小于当前共识高度的区块,可以不参与共识, 如果是新节点,一直等到同步的区块达到了共识高度,才设置同步参与共识
if isMainCommitHeightForked() && selfConsensusHeight == -1 && mainConsensHeight > selfConsensusHeight { //在某些特殊场景下,比如平行链连接的主链节点分叉后又恢复,主链的共识高度低于分叉高度时候,主链上形成共识空洞,需要从共识高度重新发送而不是分叉高度
finishHeight = selfConsensusHeight //共识高度和分叉高度不一致其中一个原因是共识交易组里面某个高度分叉了,分叉的主链节点执行成功,而其他主链节点执行失败
if finishHeight < mainConsensHeight {
finishHeight = mainConsensHeight
sendingMsgs = nil
client.currentTx = nil
} }
//系统每次重启都有检查一次共识,如果共识高度落后于系统起来后完成的第一个高度或最小高度,说明可能有共识空洞,需要把从当前共识高度到完成的 //系统每次重启都有检查一次共识,如果共识高度落后于系统起来后完成的第一个高度或最小高度,说明可能有共识空洞,需要把从当前共识高度到完成的
//最大高度重发一遍,直到确认收到,发过的最小到最大高度也要重发是因为之前空洞原因共识不连续,即便满足2/3节点也不会增长,需要重发来触发commit //最大高度重发一遍,直到确认收到,发过的最小到最大高度也要重发是因为之前空洞原因共识不连续,即便满足2/3节点也不会增长,需要重发来触发commit
//此处也整合了当前consensus height=-1 场景 //此处也整合了当前consensus height=-1 场景
// 需要是<而不是<=, 因为notification[0]被认为是系统起来后已经发送过的 // 需要是<而不是<=, 因为notification[0]被认为是系统起来后已经发送过的
nextConsensHeight := selfConsensusHeight + 1 nextConsensHeight := mainConsensHeight + 1
if nextConsensHeight < notification[0] { if nextConsensHeight < notification[0] {
notification[0] = nextConsensHeight notification[0] = nextConsensHeight
finishHeight = selfConsensusHeight
sendingMsgs = nil
client.currentTx = nil
}
//在某些特殊场景下,比如平行链连接的主链节点分叉后又恢复,主链的共识高度低于分叉高度时候,主链上形成共识空洞,需要从共识高度重新发送而不是分叉高度
//共识高度和分叉高度不一致其中一个原因是共识交易组里面某个高度分叉了,分叉的主链节点执行成功,而其他主链节点执行失败
//理论上来说selfConsensusHeight只能小于等于mainConsensusHeihgt,因为在主链先共识之后才会同步到平行链
//此处主链共识高度应该会开始追赶平行链高度,在这种异常场景下,可能会有重复发送
if mainConsensHeight < selfConsensusHeight {
plog.Info("para consensus reset", "finishHeight", finishHeight, "mainHeight", mainConsensHeight, "selfHeight", selfConsensusHeight)
finishHeight = mainConsensHeight finishHeight = mainConsensHeight
sendingMsgs = nil sendingMsgs = nil
client.currentTx = nil client.currentTx = nil
...@@ -253,7 +256,7 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod ...@@ -253,7 +256,7 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod
var rawTxs types.Transactions var rawTxs types.Transactions
for _, status := range notifications { for _, status := range notifications {
execName := pt.ParaX execName := pt.ParaX
if isMainCommitHeightForked() { if isMainCommitHeightForked(status.MainBlockHeight) {
execName = paracross.GetExecName() execName = paracross.GetExecName()
} }
tx, err := paracross.CreateRawCommitTx4MainChain(status, execName, 0) tx, err := paracross.CreateRawCommitTx4MainChain(status, execName, 0)
...@@ -273,7 +276,7 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod ...@@ -273,7 +276,7 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod
func (client *commitMsgClient) singleCalcTx(status *pt.ParacrossNodeStatus) (*types.Transaction, error) { func (client *commitMsgClient) singleCalcTx(status *pt.ParacrossNodeStatus) (*types.Transaction, error) {
execName := pt.ParaX execName := pt.ParaX
if isMainCommitHeightForked() { if isMainCommitHeightForked(status.MainBlockHeight) {
execName = paracross.GetExecName() execName = paracross.GetExecName()
} }
tx, err := paracross.CreateRawCommitTx4MainChain(status, execName, 0) tx, err := paracross.CreateRawCommitTx4MainChain(status, execName, 0)
...@@ -350,8 +353,8 @@ func checkTxInMainBlock(targetTx *types.Transaction, detail *types.BlockDetail) ...@@ -350,8 +353,8 @@ func checkTxInMainBlock(targetTx *types.Transaction, detail *types.BlockDetail)
} }
func isMainCommitHeightForked() bool { func isMainCommitHeightForked(height int64) bool {
return curMainChainHeight > mainParaSelfConsensusForkHeight+100 return height > mainParaSelfConsensusForkHeight
} }
//当前未考虑获取key非常多失败的场景, 如果获取height非常多,block模块会比较大,但是使用完了就释放了 //当前未考虑获取key非常多失败的场景, 如果获取height非常多,block模块会比较大,但是使用完了就释放了
...@@ -552,10 +555,6 @@ out: ...@@ -552,10 +555,6 @@ out:
continue continue
} }
status.mainStatus = &result status.mainStatus = &result
//如果没有开启平行链自共识, 采用主链共识, 平行链自共识开启会影响发送tx高度的判断
if !isMainCommitHeightForked() {
status.selfStatus = status.mainStatus
}
consensusRst <- &status consensusRst <- &status
} }
} }
......
...@@ -86,6 +86,10 @@ func checkCommitInfo(commit *pt.ParacrossCommitAction) error { ...@@ -86,6 +86,10 @@ func checkCommitInfo(commit *pt.ParacrossCommitAction) error {
if commit.Status == nil { if commit.Status == nil {
return types.ErrInvalidParam return types.ErrInvalidParam
} }
clog.Debug("paracross.Commit check input", "height", commit.Status.Height, "mainHeight", commit.Status.MainBlockHeight,
"mainHash", hex.EncodeToString(commit.Status.MainBlockHash), "blockHash", hex.EncodeToString(commit.Status.BlockHash),
"preBlockHash", hex.EncodeToString(commit.Status.PreBlockHash))
if commit.Status.Height == 0 { if commit.Status.Height == 0 {
if len(commit.Status.Title) == 0 || len(commit.Status.BlockHash) == 0 { if len(commit.Status.Title) == 0 || len(commit.Status.BlockHash) == 0 {
return types.ErrInvalidParam return types.ErrInvalidParam
...@@ -212,7 +216,7 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error ...@@ -212,7 +216,7 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
if err != nil { if err != nil {
return nil, err return nil, err
} }
clog.Debug("paracross.Commit check", "input", commit.Status)
if !validTitle(commit.Status.Title) { if !validTitle(commit.Status.Title) {
return nil, pt.ErrInvalidTitle return nil, pt.ErrInvalidTitle
} }
...@@ -297,15 +301,19 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error ...@@ -297,15 +301,19 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
} }
receipt = makeCommitReceipt(a.fromaddr, commit, &copyStat, stat) receipt = makeCommitReceipt(a.fromaddr, commit, &copyStat, stat)
} }
clog.Info("paracross.Commit commit", "stat", stat, "notes", len(nodes)) clog.Info("paracross.Commit commit", "stat.title", stat.Title, "stat.height", stat.Height, "notes", len(nodes))
for i, v := range stat.Details.Addrs {
clog.Info("paracross.Commit commit detail", "addr", v, "hash", hex.EncodeToString(stat.Details.BlockHash[i]))
}
if commit.Status.Height > titleStatus.Height+1 { if commit.Status.Height > titleStatus.Height+1 {
saveTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, commit.Status.Height), stat) saveTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, commit.Status.Height), stat)
return receipt, nil //平行链由主链共识无缝切换,即接收第一个收到的高度,可以不从0开始
} if !(types.IsPara() && titleStatus.Height == -1) {
for i, v := range stat.Details.Addrs { return receipt, nil
clog.Debug("paracross.Commit stat detail", "addr", v, "hash", hex.EncodeToString(stat.Details.BlockHash[i])) }
} }
commitCount := len(stat.Details.Addrs) commitCount := len(stat.Details.Addrs)
most, mostHash := getMostCommit(stat) most, mostHash := getMostCommit(stat)
if !isCommitDone(stat, nodes, most) { if !isCommitDone(stat, nodes, most) {
...@@ -320,13 +328,13 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error ...@@ -320,13 +328,13 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
blockHash, err := getBlockHash(a.api, stat.Height) blockHash, err := getBlockHash(a.api, stat.Height)
if err != nil { if err != nil {
clog.Error("paracross.Commit getBlockHash local", "err", err.Error(), "commitheight", commit.Status.Height, clog.Error("paracross.Commit para getBlockHash local", "err", err.Error(), "commitheight", commit.Status.Height,
"commitHash", hex.EncodeToString(commit.Status.BlockHash), "mainHash", hex.EncodeToString(commit.Status.MainBlockHash), "commitHash", hex.EncodeToString(commit.Status.BlockHash), "mainHash", hex.EncodeToString(commit.Status.MainBlockHash),
"mainHeight", commit.Status.MainBlockHeight) "mainHeight", commit.Status.MainBlockHeight)
return receipt, nil return receipt, nil
} }
if !bytes.Equal(blockHash.Hash, []byte(mostHash)) { if !bytes.Equal(blockHash.Hash, []byte(mostHash)) {
clog.Error("paracross.Commit blockHash not match", "selfBlockHash", hex.EncodeToString(blockHash.Hash), clog.Error("paracross.Commit para blockHash not match", "selfBlockHash", hex.EncodeToString(blockHash.Hash),
"mostHash", hex.EncodeToString([]byte(mostHash)), "commitHeight", commit.Status.Height, "mostHash", hex.EncodeToString([]byte(mostHash)), "commitHeight", commit.Status.Height,
"commitMainHash", hex.EncodeToString(commit.Status.MainBlockHash), "commitMainHeight", commit.Status.MainBlockHeight) "commitMainHash", hex.EncodeToString(commit.Status.MainBlockHash), "commitMainHeight", commit.Status.MainBlockHeight)
return receipt, nil return receipt, nil
...@@ -345,7 +353,7 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error ...@@ -345,7 +353,7 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
saveTitle(a.db, calcTitleKey(commit.Status.Title), titleStatus) saveTitle(a.db, calcTitleKey(commit.Status.Title), titleStatus)
clog.Info("paracross.Commit commit done", "height", commit.Status.Height, clog.Info("paracross.Commit commit done", "height", commit.Status.Height,
"cross tx count", len(commit.Status.CrossTxHashs), "status", titleStatus) "cross tx count", len(commit.Status.CrossTxHashs), "statusBlockHash", hex.EncodeToString(titleStatus.BlockHash))
//parallel chain not need to process cross commit tx here //parallel chain not need to process cross commit tx here
if types.IsPara() { if types.IsPara() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment