Commit 15f0aa0a authored by mdj33's avatar mdj33 Committed by vipwzw

para self consens height fork in txGroup issue

parent 85dddaa4
...@@ -124,6 +124,7 @@ function start() { ...@@ -124,6 +124,7 @@ function start() {
done done
miner "${CLI}" miner "${CLI}"
# miner "${CLI4}"
block_wait "${CLI}" 1 block_wait "${CLI}" 1
echo "=========== check genesis hash ========== " echo "=========== check genesis hash ========== "
...@@ -276,10 +277,10 @@ function transfer() { ...@@ -276,10 +277,10 @@ function transfer() {
echo "=========== # transfer =============" echo "=========== # transfer ============="
hashes=() hashes=()
for ((i = 0; i < 10; i++)); do for ((i = 0; i < 10; i++)); do
hash=$(${CLI} send coins transfer -a 1 -n test -t 14KEKbYtKKQm4wMthSK9J4La4nAiidGozt -k 4257D8692EF7FE13C68B65D6A52F03933DB2FA5CE8FAF210B5B8B80C721CED01) hash=$(${1} send coins transfer -a 1 -n test -t 14KEKbYtKKQm4wMthSK9J4La4nAiidGozt -k 4257D8692EF7FE13C68B65D6A52F03933DB2FA5CE8FAF210B5B8B80C721CED01)
hashes=("${hashes[@]}" "$hash") hashes=("${hashes[@]}" "$hash")
done done
block_wait "${CLI}" 1 block_wait "${1}" 1
echo "len: ${#hashes[@]}" echo "len: ${#hashes[@]}"
if [ "${#hashes[@]}" != 10 ]; then if [ "${#hashes[@]}" != 10 ]; then
echo "tx number wrong" echo "tx number wrong"
...@@ -287,7 +288,7 @@ function transfer() { ...@@ -287,7 +288,7 @@ function transfer() {
fi fi
for ((i = 0; i < ${#hashes[*]}; i++)); do for ((i = 0; i < ${#hashes[*]}; i++)); do
txs=$(${CLI} tx query_hash -s "${hashes[$i]}" | jq ".txs") txs=$(${1} tx query_hash -s "${hashes[$i]}" | jq ".txs")
if [ -z "${txs}" ]; then if [ -z "${txs}" ]; then
echo "cannot find tx" echo "cannot find tx"
exit 1 exit 1
...@@ -295,19 +296,19 @@ function transfer() { ...@@ -295,19 +296,19 @@ function transfer() {
done done
echo "=========== # withdraw =============" echo "=========== # withdraw ============="
hash=$(${CLI} send coins transfer -a 2 -n deposit -t 1wvmD6RNHzwhY4eN75WnM6JcaAvNQ4nHx -k CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944) hash=$(${1} send coins transfer -a 2 -n deposit -t 1wvmD6RNHzwhY4eN75WnM6JcaAvNQ4nHx -k CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944)
echo "${hash}" echo "${hash}"
block_wait "${CLI}" 1 block_wait "${1}" 1
before=$(${CLI} account balance -a 14KEKbYtKKQm4wMthSK9J4La4nAiidGozt -e retrieve | jq -r ".balance") before=$(${1} account balance -a 14KEKbYtKKQm4wMthSK9J4La4nAiidGozt -e retrieve | jq -r ".balance")
if [ "${before}" == "0.0000" ]; then if [ "${before}" == "0.0000" ]; then
echo "wrong ticket balance, should not be zero" echo "wrong ticket balance, should not be zero"
exit 1 exit 1
fi fi
hash=$(${CLI} send coins withdraw -a 1 -n withdraw -e retrieve -k CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944) hash=$(${1} send coins withdraw -a 1 -n withdraw -e retrieve -k CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944)
echo "${hash}" echo "${hash}"
block_wait "${CLI}" 1 block_wait "${1}" 1
txs=$(${CLI} tx query_hash -s "${hash}" | jq ".txs") txs=$(${1} tx query_hash -s "${hash}" | jq ".txs")
if [ "${txs}" == "null" ]; then if [ "${txs}" == "null" ]; then
echo "withdraw cannot find tx" echo "withdraw cannot find tx"
exit 1 exit 1
...@@ -316,7 +317,8 @@ function transfer() { ...@@ -316,7 +317,8 @@ function transfer() {
function base_config() { function base_config() {
sync sync
transfer transfer "${CLI}"
# transfer "${CLI4}"
} }
function dapp_run() { function dapp_run() {
......
...@@ -633,14 +633,15 @@ func (client *client) CreateBlock() { ...@@ -633,14 +633,15 @@ func (client *client) CreateBlock() {
lastSeqMainHash = blockOnMain.Detail.Block.ParentHash lastSeqMainHash = blockOnMain.Detail.Block.ParentHash
} }
_, lastBlock, err := client.getLastBlockInfo() lastBlockSeq, lastBlock, err := client.getLastBlockInfo()
if err != nil { if err != nil {
plog.Error("Parachain getLastBlockInfo fail", "err", err) plog.Error("Parachain getLastBlockInfo fail", "err", err)
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
} }
plog.Info("Parachain process block", "lastBlockSeq", lastSeq, "curSeq", currSeq, plog.Info("Parachain process block", "lastSeq", lastSeq, "curSeq", currSeq,
"lastBlockHeight",lastBlock.Height,"lastBlockSeq",lastBlockSeq,
"currSeqMainHeight", lastSeqMainHeight, "currSeqMainHash", common.ToHex(lastSeqMainHash), "currSeqMainHeight", lastSeqMainHeight, "currSeqMainHash", common.ToHex(lastSeqMainHash),
"lastBlockMainHeight", lastBlock.MainHeight, "lastBlockMainHash", common.ToHex(lastBlock.MainHash), "seqTy", blockOnMain.Seq.Type) "lastBlockMainHeight", lastBlock.MainHeight, "lastBlockMainHash", common.ToHex(lastBlock.MainHash), "seqTy", blockOnMain.Seq.Type)
......
...@@ -33,6 +33,12 @@ type commitMsgClient struct { ...@@ -33,6 +33,12 @@ type commitMsgClient struct {
quit chan struct{} quit chan struct{}
} }
//获取主链和平行链本身节点的平行链共识状态
type consensStatus struct{
mainStatus *pt.ParacrossStatus
selfStatus *pt.ParacrossStatus
}
func (client *commitMsgClient) handler() { func (client *commitMsgClient) handler() {
var isSync bool var isSync bool
var notification []int64 //记录每次系统重启后 min and current height var notification []int64 //记录每次系统重启后 min and current height
...@@ -42,7 +48,7 @@ func (client *commitMsgClient) handler() { ...@@ -42,7 +48,7 @@ func (client *commitMsgClient) handler() {
var readTick <-chan time.Time var readTick <-chan time.Time
client.paraClient.wg.Add(1) client.paraClient.wg.Add(1)
consensusCh := make(chan *pt.ParacrossStatus, 1) consensusCh := make(chan *consensStatus, 1)
go client.getConsensusHeight(consensusCh) go client.getConsensusHeight(consensusCh)
client.paraClient.wg.Add(1) client.paraClient.wg.Add(1)
...@@ -81,6 +87,9 @@ out: ...@@ -81,6 +87,9 @@ out:
sendingMsgs = nil sendingMsgs = nil
client.currentTx = nil client.currentTx = nil
} }
//在分叉的主链上,有可能在del完全之前收到共识消息后sync又置为true,然后发送消息,不过影响不大,共识消息间隔比较长
isSync = false
plog.Debug("para del block", "delHeight", height)
case block := <-client.mainBlockAdd: case block := <-client.mainBlockAdd:
if client.currentTx != nil && client.paraClient.isCaughtUp { if client.currentTx != nil && client.paraClient.isCaughtUp {
...@@ -92,19 +101,18 @@ out: ...@@ -92,19 +101,18 @@ out:
} else { } else {
client.checkTxCommitTimes++ client.checkTxCommitTimes++
if client.checkTxCommitTimes > client.waitMainBlocks { if client.checkTxCommitTimes > client.waitMainBlocks {
//需要从rawtx构建,nonce需要改,不然会认为重复交易 //超过等待最大次数,reset,重新组织发送,防止一直发送同一笔消息
signTx, _, err := client.calcCommitMsgTxs(sendingMsgs) sendingMsgs = nil
if err != nil || signTx == nil { client.currentTx = nil
continue
}
client.currentTx = signTx
client.checkTxCommitTimes = 0 client.checkTxCommitTimes = 0
sendMsgCh <- client.currentTx
} }
} }
} }
case <-readTick: case <-readTick:
plog.Debug("para readTick", "notify", notification, "sending", len(sendingMsgs),
"finishHeight", finishHeight, "txIsNil", client.currentTx==nil,"sync",isSync)
if notification != nil && finishHeight < notification[1] && client.currentTx == nil && isSync { if notification != nil && finishHeight < notification[1] && client.currentTx == nil && isSync {
count := notification[1] - finishHeight count := notification[1] - finishHeight
if count > types.TxGroupMaxCount { if count > types.TxGroupMaxCount {
...@@ -134,37 +142,44 @@ out: ...@@ -134,37 +142,44 @@ out:
} }
//获取正在共识的高度,同步有两层意思,一个是主链跟其他节点完成了同步,另一个是当前平行链节点的高度追赶上了共识高度 //获取正在共识的高度,同步有两层意思,一个是主链跟其他节点完成了同步,另一个是当前平行链节点的高度追赶上了共识高度
//一般来说高度增长从小到大: notifiy[0] -- selfConsensusHeight(mainHeight) -- finishHeight -- sendingHeight -- notify[1]
case rsp := <-consensusCh: case rsp := <-consensusCh:
consensusHeight := rsp.Height selfConsensusHeight := rsp.selfStatus.Height
mainConsensHeight := rsp.mainStatus.Height
plog.Info("para consensus rcv", "notify", notification, "sending", len(sendingMsgs), plog.Info("para consensus rcv", "notify", notification, "sending", len(sendingMsgs),
"consens heigt", rsp.Height, "consens blockhash", common.HashHex(rsp.BlockHash), "sync", isSync) "mainHeigt", rsp.mainStatus.Height, "mainlockhash", common.ToHex(rsp.mainStatus.BlockHash),
"selfHeight",rsp.selfStatus.Height,"selfHash",common.ToHex(rsp.selfStatus.BlockHash),"sync", isSync)
//所有节点还没有共识场景或新节点catchingUp场景,要等到收到区块高度大于共识高度时候发送 //所有节点还没有共识场景或新节点或重启节点catchingUp场景,要等到收到区块高度大于主链共识高度时候发送,在catchingup时候本身共识高度和块高度一起增长
if consensusHeight == -1 || (notification != nil && notification[1] > consensusHeight) { if selfConsensusHeight == -1 || (notification != nil && notification[1] > mainConsensHeight ) {
isSync = true isSync = true
} }
//未共识过的小于当前共识高度的区块,可以不参与共识 //未共识过的小于当前共识高度的区块,可以不参与共识, 如果是新节点,一直等到同步的区块达到了共识高度,才设置同步参与共识
//如果是新节点,一直等到同步的区块达到了共识高度,才设置同步参与共识 if notification != nil && finishHeight < selfConsensusHeight {
if notification != nil && finishHeight < consensusHeight { finishHeight = selfConsensusHeight
finishHeight = consensusHeight
} }
//如果正在发送的共识高度小于已经共识的高度,则取消发送,考虑新节点正在catchingup且新节点的加入能达成2/3共识场景,每次最多发送20 tx,
//但是由于addblock 正在catchingup,没办法确认tx,新tx达成了新的共识高度,需要把sendingmsg置nil,以发送下一笔共识交易
if sendingHeight <= consensusHeight && client.currentTx != nil {
sendingMsgs = nil
client.currentTx = nil
continue
}
//系统每次重启都有检查一次共识,如果共识高度落后于系统起来后完成的第一个高度或最小高度,说明可能有共识空洞,需要把从当前共识高度到完成的 //系统每次重启都有检查一次共识,如果共识高度落后于系统起来后完成的第一个高度或最小高度,说明可能有共识空洞,需要把从当前共识高度到完成的
//最大高度重发一遍,直到确认收到,发过的最小到最大高度也要重发是因为之前空洞原因共识不连续,即便满足2/3节点也不会增长,需要重发来触发commit //最大高度重发一遍,直到确认收到,发过的最小到最大高度也要重发是因为之前空洞原因共识不连续,即便满足2/3节点也不会增长,需要重发来触发commit
//此处也整合了当前consensus height=-1 场景 //此处也整合了当前consensus height=-1 场景
nextConsensHeight := consensusHeight + 1 // 需要是<而不是<=, 因为notification[0]被认为是系统起来后已经发送过的
nextConsensHeight := selfConsensusHeight + 1
if notification != nil && nextConsensHeight < notification[0] { if notification != nil && nextConsensHeight < notification[0] {
notification[0] = nextConsensHeight notification[0] = nextConsensHeight
finishHeight = nextConsensHeight - 1 finishHeight = selfConsensusHeight
sendingMsgs = nil
client.currentTx = nil
}
//在某些特殊场景下,比如平行链连接的主链节点分叉后又恢复,主链的共识高度低于分叉高度时候,主链上形成共识空洞,需要从共识高度重新发送而不是分叉高度
//共识高度和分叉高度不一致其中一个原因是共识交易组里面某个高度分叉了,分叉的主链节点执行成功,而其他主链节点执行失败
//理论上来说selfConsensusHeight只能小于等于mainConsensusHeihgt,因为在主链先共识之后才会同步到平行链
//此处主链共识高度应该会开始追赶平行链高度,在这种异常场景下,可能会有重复发送
if mainConsensHeight < selfConsensusHeight {
plog.Info("para consensus reset","finishHeight",finishHeight,"mainHeight",mainConsensHeight,"selfHeight",selfConsensusHeight)
finishHeight = mainConsensHeight
sendingMsgs = nil sendingMsgs = nil
client.currentTx = nil client.currentTx = nil
} }
...@@ -456,7 +471,7 @@ func (client *commitMsgClient) mainSync() error { ...@@ -456,7 +471,7 @@ func (client *commitMsgClient) mainSync() error {
} }
func (client *commitMsgClient) getConsensusHeight(consensusRst chan *pt.ParacrossStatus) { func (client *commitMsgClient) getConsensusHeight(consensusRst chan *consensStatus) {
ticker := time.NewTicker(time.Second * time.Duration(consensusInterval)) ticker := time.NewTicker(time.Second * time.Duration(consensusInterval))
isSync := false isSync := false
defer ticker.Stop() defer ticker.Stop()
...@@ -475,20 +490,47 @@ out: ...@@ -475,20 +490,47 @@ out:
isSync = true isSync = true
} }
var req types.ChainExecutor var status consensStatus
req.Driver = "paracross"
req.FuncName = "GetTitle"
req.Param = types.Encode(&types.ReqString{Data: types.GetTitle()})
//从本地查询共识高度 //从本地查询共识高度
ret, err := client.paraClient.GetAPI().QueryChain(&req) ret, err := client.paraClient.GetAPI().QueryChain(&types.ChainExecutor{
Driver:"paracross",
FuncName:"GetTitle",
Param:types.Encode(&types.ReqString{Data: types.GetTitle()}),
})
if err != nil { if err != nil {
plog.Error("getConsensusHeight ", "err", err.Error()) plog.Error("getConsensusHeight ", "err", err.Error())
continue continue
} }
if resp, ok := ret.(*pt.ParacrossStatus); ok { resp, ok := ret.(*pt.ParacrossStatus)
consensusRst <- resp if !ok {
plog.Error("getConsensusHeight ParacrossStatus nok")
continue
}
status.selfStatus = resp
//获取主链共识高度
reply, err := client.paraClient.grpcClient.QueryChain(context.Background(),&types.ChainExecutor{
Driver:"paracross",
FuncName:"GetTitle",
Param:types.Encode(&types.ReqString{Data: types.GetTitle()}),
})
if err != nil {
plog.Error("getMainConsensusHeight", "err", err.Error())
continue
}
if !reply.GetIsOk() {
plog.Info("getMainConsensusHeight nok", "error", reply.GetMsg())
continue
}
var result pt.ParacrossStatus
err = types.Decode(reply.Msg, &result)
if err != nil{
plog.Error("getMainConsensusHeight decode", "err", err.Error())
continue
} }
status.mainStatus = &result
consensusRst <- &status
} }
} }
......
...@@ -41,6 +41,9 @@ ...@@ -41,6 +41,9 @@
1. 因为某种原因,比如超过2/3节点崩溃或者数据不一致,系统在某一个高度没有产生共识,共识系统会把已收到的交易记下,即便记录已经达到共识但是因为共识高度 1. 因为某种原因,比如超过2/3节点崩溃或者数据不一致,系统在某一个高度没有产生共识,共识系统会把已收到的交易记下,即便记录已经达到共识但是因为共识高度
并不是连续的,或者说因为共识空洞,后面来的共识也只是记录,不会触发done,只有和数据库共识连续的共识commit才能触发done,所以一旦产生空洞,需要 并不是连续的,或者说因为共识空洞,后面来的共识也只是记录,不会触发done,只有和数据库共识连续的共识commit才能触发done,所以一旦产生空洞,需要
从共识开始处连续发送后续交易,而不能只发送空洞的共识数据 从共识开始处连续发送后续交易,而不能只发送空洞的共识数据
1. 主链在云端场景,平行链都连到一个分叉的主链节点,平行链都可以共识,主链没分叉节点不能,主链分叉节点后来回退并同步主分支后,平行链节点需要重新同步,
特别是平行链起初发送20tx的交易组,结果在第10个height分叉了,主链共识高度为-1,平行链共识正常,待分叉主节点从第10个节点恢复时候,平行链节点需要
从0开始重新发布共识消息,因为当前共识高度为-1
## 发送失败策略 ## 发送失败策略
1. 当前策略是是要么单个交易,要么一个交易组发送共识消息,要么全部成功,要么全部失败,如果失败,也就是交易在新块里面找不到,超过2个块会重发当前 1. 当前策略是是要么单个交易,要么一个交易组发送共识消息,要么全部成功,要么全部失败,如果失败,也就是交易在新块里面找不到,超过2个块会重发当前
......
...@@ -101,7 +101,8 @@ emptyBlockInterval=50 ...@@ -101,7 +101,8 @@ emptyBlockInterval=50
authAccount="" authAccount=""
#等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2 #等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2
waitBlocks4CommitMsg=2 waitBlocks4CommitMsg=2
searchHashMatchedBlockDepth=100 #云端主链节点切换后,平行链适配新主链节点block,回溯查找和自己记录的相同blockhash的深度
searchHashMatchedBlockDepth=10000
#创世地址额度 #创世地址额度
genesisAmount=100000000 genesisAmount=100000000
......
...@@ -254,14 +254,14 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error ...@@ -254,14 +254,14 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
if !bytes.Equal(blockHash.Hash, commit.Status.MainBlockHash) && commit.Status.Height > 0 { if !bytes.Equal(blockHash.Hash, commit.Status.MainBlockHash) && commit.Status.Height > 0 {
clog.Error("paracross.Commit blockHash not match", "db", hex.EncodeToString(blockHash.Hash), clog.Error("paracross.Commit blockHash not match", "db", hex.EncodeToString(blockHash.Hash),
"commit tx", hex.EncodeToString(commit.Status.MainBlockHash), "commitHeight", commit.Status.Height, "commit tx", hex.EncodeToString(commit.Status.MainBlockHash), "commitHeight", commit.Status.Height,
"from", a.fromaddr) "commitMainHeight",commit.Status.MainBlockHeight,"from", a.fromaddr)
return nil, types.ErrBlockHashNoMatch return nil, types.ErrBlockHashNoMatch
} }
} }
clog.Debug("paracross.Commit check input done") clog.Debug("paracross.Commit check input done")
// 在完成共识之后来的, 增加 record log, 只记录不修改已经达成的共识 // 在完成共识之后来的, 增加 record log, 只记录不修改已经达成的共识
if commit.Status.Height <= titleStatus.Height { if commit.Status.Height <= titleStatus.Height {
clog.Info("paracross.Commit record", "node", a.fromaddr, "titile", commit.Status.Title, clog.Debug("paracross.Commit record", "node", a.fromaddr, "titile", commit.Status.Title,
"height", commit.Status.Height) "height", commit.Status.Height)
return makeRecordReceipt(a.fromaddr, commit), nil return makeRecordReceipt(a.fromaddr, commit), nil
} }
...@@ -398,7 +398,7 @@ func (a *action) execCrossTx(tx *types.TransactionDetail, commit *pt.ParacrossCo ...@@ -398,7 +398,7 @@ func (a *action) execCrossTx(tx *types.TransactionDetail, commit *pt.ParacrossCo
func (a *action) execCrossTxs(commit *pt.ParacrossCommitAction) (*types.Receipt, error) { func (a *action) execCrossTxs(commit *pt.ParacrossCommitAction) (*types.Receipt, error) {
var receipt types.Receipt var receipt types.Receipt
for i := 0; i < len(commit.Status.CrossTxHashs); i++ { for i := 0; i < len(commit.Status.CrossTxHashs); i++ {
clog.Info("paracross.Commit commitDone", "do cross number", i, "hash", clog.Debug("paracross.Commit commitDone", "do cross number", i, "hash",
hex.EncodeToString(commit.Status.CrossTxHashs[i]), hex.EncodeToString(commit.Status.CrossTxHashs[i]),
"res", util.BitMapBit(commit.Status.CrossTxResult, uint32(i))) "res", util.BitMapBit(commit.Status.CrossTxResult, uint32(i)))
if util.BitMapBit(commit.Status.CrossTxResult, uint32(i)) { if util.BitMapBit(commit.Status.CrossTxResult, uint32(i)) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment