Commit 22038f33 authored by mdj33's avatar mdj33 Committed by 33cn

get consensus status by block hash in main chain

parent 7a2a62c4
......@@ -33,12 +33,6 @@ type commitMsgClient struct {
quit chan struct{}
}
//获取主链和平行链本身节点的平行链共识状态
type consensStatus struct {
mainStatus *pt.ParacrossStatus
selfStatus *pt.ParacrossStatus
}
func (client *commitMsgClient) handler() {
var isSync bool
var isRollback bool
......@@ -49,7 +43,7 @@ func (client *commitMsgClient) handler() {
var readTick <-chan time.Time
client.paraClient.wg.Add(1)
consensusCh := make(chan *consensStatus, 1)
consensusCh := make(chan *pt.ParacrossStatus, 1)
go client.getConsensusHeight(consensusCh)
client.paraClient.wg.Add(1)
......@@ -156,32 +150,25 @@ out:
//获取正在共识的高度,同步有两层意思,一个是主链跟其他节点完成了同步,另一个是当前平行链节点的高度追赶上了共识高度
//一般来说高度增长从小到大: notifiy[0] -- selfConsensusHeight(mainHeight) -- finishHeight -- sendingHeight -- notify[1]
case rsp := <-consensusCh:
selfConsensusHeight := rsp.selfStatus.Height
mainConsensHeight := rsp.mainStatus.Height
consensHeight := rsp.Height
plog.Info("para consensus rcv", "notify", notification, "sending", len(sendingMsgs),
"mainHeigt", rsp.mainStatus.Height, "mainHash", common.ToHex(rsp.mainStatus.BlockHash),
"selfHeight", rsp.selfStatus.Height, "selfHash", common.ToHex(rsp.selfStatus.BlockHash), "sync", isSync)
"consensHeigt", rsp.Height, "consensBlockHash", common.ToHex(rsp.BlockHash), "sync", isSync)
if notification == nil || isRollback {
continue
}
//所有节点还没有共识场景或新节点或重启节点catchingUp场景,要等到收到区块高度大于主链共识高度时候发送,在catchingup时候本身共识高度和块高度一起增长
if notification[1] > mainConsensHeight {
//所有节点还没有共识场景或新节点或重启节点catchingUp场景,要等到收到区块高度大于共识高度时候发送,在catchingup时候本身共识高度和块高度一起增长
if notification[1] > consensHeight {
isSync = true
}
//如果自共识高度在参与共识后小于主链共识高度,则本节点共识可能出现问题,停止发送
if selfConsensusHeight < mainConsensHeight && selfConsensusHeight != -1 {
isSync = false
continue
}
//未共识过的小于当前共识高度的区块,可以不参与共识, 如果是新节点,一直等到同步的区块达到了共识高度,才设置同步参与共识
//在某些特殊场景下,比如平行链连接的主链节点分叉后又恢复,主链的共识高度低于分叉高度时候,主链上形成共识空洞,需要从共识高度重新发送而不是分叉高度
//共识高度和分叉高度不一致其中一个原因是共识交易组里面某个高度分叉了,分叉的主链节点执行成功,而其他主链节点执行失败,共识高度停留在交易组最小高度-1
//而分叉高度是交易组里面的某个高度
if finishHeight < mainConsensHeight {
finishHeight = mainConsensHeight
if finishHeight < consensHeight {
finishHeight = consensHeight
sendingMsgs = nil
client.currentTx = nil
}
......@@ -190,10 +177,10 @@ out:
//最大高度重发一遍,直到确认收到,发过的最小到最大高度也要重发是因为之前空洞原因共识不连续,即便满足2/3节点也不会增长,需要重发来触发commit
//此处也整合了当前consensus height=-1 场景
// 需要是<而不是<=, 因为notification[0]被认为是系统起来后已经发送过的
nextConsensHeight := mainConsensHeight + 1
nextConsensHeight := consensHeight + 1
if nextConsensHeight < notification[0] {
notification[0] = nextConsensHeight
finishHeight = mainConsensHeight
finishHeight = consensHeight
sendingMsgs = nil
client.currentTx = nil
}
......@@ -257,7 +244,7 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod
var rawTxs types.Transactions
for _, status := range notifications {
execName := pt.ParaX
if isMainCommitHeightForked(status.MainBlockHeight) {
if isParaSelfConsensusForked(status.MainBlockHeight) {
execName = paracross.GetExecName()
}
tx, err := paracross.CreateRawCommitTx4MainChain(status, execName, 0)
......@@ -277,7 +264,7 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod
func (client *commitMsgClient) singleCalcTx(status *pt.ParacrossNodeStatus) (*types.Transaction, error) {
execName := pt.ParaX
if isMainCommitHeightForked(status.MainBlockHeight) {
if isParaSelfConsensusForked(status.MainBlockHeight) {
execName = paracross.GetExecName()
}
tx, err := paracross.CreateRawCommitTx4MainChain(status, execName, 0)
......@@ -354,7 +341,7 @@ func checkTxInMainBlock(targetTx *types.Transaction, detail *types.BlockDetail)
}
func isMainCommitHeightForked(height int64) bool {
func isParaSelfConsensusForked(height int64) bool {
return height > mainParaSelfConsensusForkHeight
}
......@@ -497,7 +484,7 @@ func (client *commitMsgClient) mainSync() error {
}
func (client *commitMsgClient) getConsensusHeight(consensusRst chan *consensStatus) {
func (client *commitMsgClient) getConsensusHeight(consensusRst chan *pt.ParacrossStatus) {
ticker := time.NewTicker(time.Second * time.Duration(consensusInterval))
isSync := false
defer ticker.Stop()
......@@ -516,47 +503,60 @@ out:
isSync = true
}
var status consensStatus
//从本地查询共识高度
ret, err := client.paraClient.GetAPI().QueryChain(&types.ChainExecutor{
Driver: "paracross",
FuncName: "GetTitle",
Param: types.Encode(&types.ReqString{Data: types.GetTitle()}),
})
if err != nil {
plog.Error("getConsensusHeight ", "err", err.Error())
continue
}
resp, ok := ret.(*pt.ParacrossStatus)
if !ok {
plog.Error("getConsensusHeight ParacrossStatus nok")
if !client.paraClient.isCaughtUp {
plog.Debug("getConsensusHeight para is CatchingUp")
continue
}
status.selfStatus = resp
//获取主链共识高度
reply, err := client.paraClient.grpcClient.QueryChain(context.Background(), &types.ChainExecutor{
Driver: "paracross",
FuncName: "GetTitle",
Param: types.Encode(&types.ReqString{Data: types.GetTitle()}),
})
_, block, err := client.paraClient.getLastBlockInfo()
if err != nil {
plog.Error("getMainConsensusHeight", "err", err.Error())
continue
}
if !reply.GetIsOk() {
plog.Info("getMainConsensusHeight nok", "error", reply.GetMsg())
continue
}
var result pt.ParacrossStatus
err = types.Decode(reply.Msg, &result)
if err != nil {
plog.Error("getMainConsensusHeight decode", "err", err.Error())
continue
var status *pt.ParacrossStatus
if isParaSelfConsensusForked(block.MainHeight) {
//从本地查询共识高度
ret, err := client.paraClient.GetAPI().QueryChain(&types.ChainExecutor{
Driver: "paracross",
FuncName: "GetTitle",
Param: types.Encode(&types.ReqString{Data: types.GetTitle()}),
})
if err != nil {
plog.Error("getConsensusHeight ", "err", err.Error())
continue
}
resp, ok := ret.(*pt.ParacrossStatus)
if !ok {
plog.Error("getConsensusHeight ParacrossStatus nok")
continue
}
status = resp
} else {
//获取主链共识高度
reply, err := client.paraClient.grpcClient.QueryChain(context.Background(), &types.ChainExecutor{
Driver: "paracross",
FuncName: "GetTitleByHash",
Param: types.Encode(&pt.ReqParacrossTitleHash{Title: types.GetTitle(), BlockHash: block.MainHash}),
})
if err != nil {
plog.Error("getMainConsensusHeight", "err", err.Error())
continue
}
if !reply.GetIsOk() {
plog.Info("getMainConsensusHeight nok", "error", reply.GetMsg())
continue
}
var result pt.ParacrossStatus
err = types.Decode(reply.Msg, &result)
if err != nil {
plog.Error("getMainConsensusHeight decode", "err", err.Error())
continue
}
status = &result
}
status.mainStatus = &result
consensusRst <- &status
consensusRst <- status
}
}
......
......@@ -357,6 +357,12 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
titleStatus.BlockHash = commit.Status.BlockHash
saveTitle(a.db, calcTitleKey(commit.Status.Title), titleStatus)
if types.IsDappFork(a.exec.GetMainHeight(), pt.ParaX, pt.ForkCommitTx) {
key := calcTitleHashKey(commit.Status.Title, hex.EncodeToString(commit.Status.MainBlockHash))
saveTitle(a.db, key, titleStatus)
receipt.KV = append(receipt.KV, &types.KeyValue{Key: key, Value: types.Encode(titleStatus)})
}
clog.Info("paracross.Commit commit done", "height", commit.Status.Height,
"cross tx count", len(commit.Status.CrossTxHashs), "statusBlockHash", hex.EncodeToString(titleStatus.BlockHash))
......
......@@ -13,6 +13,7 @@ import (
var (
title string
titleHeight string
titleHash string
configNodes string
localTx string
localTitle string
......@@ -23,6 +24,7 @@ var (
func setPrefix() {
title = "mavl-paracross-title-"
titleHeight = "mavl-paracross-titleHeight-"
titleHash = "mavl-paracross-titleHash-"
configNodes = "paracross-nodes-"
localTx = "LODB-paracross-titleHeightAddr-"
localTitle = "LODB-paracross-title-"
......@@ -38,6 +40,10 @@ func calcTitleHeightKey(title string, height int64) []byte {
return []byte(fmt.Sprintf(titleHeight+"%s-%d", title, height))
}
func calcTitleHashKey(title string, blockHash string) []byte {
return []byte(fmt.Sprintf(titleHash+"%s-%s", title, blockHash))
}
func calcLocalHeightKey(title string, height int64) []byte {
return []byte(fmt.Sprintf(localTitleHeight+"%s-%d", title, height))
}
......
......@@ -5,6 +5,8 @@
package executor
import (
"encoding/hex"
dbm "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
......@@ -19,6 +21,24 @@ func (p *Paracross) Query_GetTitle(in *types.ReqString) (types.Message, error) {
return p.paracrossGetHeight(in.GetData())
}
// Query_GetTitleHash query paracross title by block hash
func (p *Paracross) Query_GetTitleByHash(in *pt.ReqParacrossTitleHash) (types.Message, error) {
if in == nil {
return nil, types.ErrInvalidParam
}
if !types.IsDappFork(p.GetMainHeight(), pt.ParaX, pt.ForkCommitTx) {
block, err := p.GetAPI().GetBlockByHashes(&types.ReqHashes{Hashes: [][]byte{in.BlockHash}})
if err != nil || block == nil {
return nil, types.ErrHashNotExist
}
return p.paracrossGetHeight(in.GetTitle())
}
return p.paracrossGetHeightByHash(in)
}
//Query_ListTitles query paracross titles list
func (p *Paracross) Query_ListTitles(in *types.ReqNil) (types.Message, error) {
return p.paracrossListTitles()
......@@ -80,6 +100,14 @@ func (p *Paracross) paracrossGetHeight(title string) (types.Message, error) {
return ret, nil
}
func (p *Paracross) paracrossGetHeightByHash(in *pt.ReqParacrossTitleHash) (types.Message, error) {
ret, err := getTitle(p.GetStateDB(), calcTitleHashKey(in.GetTitle(), hex.EncodeToString(in.GetBlockHash())))
if err != nil {
return nil, errors.Cause(err)
}
return ret, nil
}
func (p *Paracross) paracrossListTitles() (types.Message, error) {
return listLocalTitles(p.GetLocalDB())
}
......
......@@ -126,6 +126,11 @@ message RespParacrossTitles {
repeated ReceiptParacrossDone titles = 1;
}
message ReqParacrossTitleHash {
string title = 1;
bytes blockHash = 2;
}
// 跨链转账相关
message ParacrossAsset {
// input
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment