Commit 09bd7466 authored by mdj33's avatar mdj33 Committed by vipwzw

improve

parent 2cba6e6a
...@@ -118,13 +118,9 @@ func (client *commitMsgClient) procSendTx() { ...@@ -118,13 +118,9 @@ func (client *commitMsgClient) procSendTx() {
client.sendingHeight = consensHeight client.sendingHeight = consensHeight
} }
//如果是在主链共识场景,共识高度可能大于平行链的链高度 //1.如果是在主链共识场景,共识高度可能大于平行链的链高度
if chainHeight < consensHeight { //2.已发送,未共识场景
return if chainHeight < consensHeight || client.sendingHeight > consensHeight{
}
//已发送,未共识场景
if client.sendingHeight > consensHeight {
return return
} }
...@@ -508,12 +504,11 @@ func (client *commitMsgClient) mainSync() error { ...@@ -508,12 +504,11 @@ func (client *commitMsgClient) mainSync() error {
func (client *commitMsgClient) checkConsensusStop(consensStopTimes uint32) uint32 { func (client *commitMsgClient) checkConsensusStop(consensStopTimes uint32) uint32 {
if client.sendingHeight > atomic.LoadInt64(&client.consensHeight) && !client.isSendingCommitMsg() { if client.sendingHeight > atomic.LoadInt64(&client.consensHeight) && !client.isSendingCommitMsg() {
consensStopTimes++
if consensStopTimes > client.waitConsensStopTimes { if consensStopTimes > client.waitConsensStopTimes {
client.clearSendingTx() client.clearSendingTx()
return 0 return 0
} }
return consensStopTimes return consensStopTimes+1
} }
return 0 return 0
...@@ -555,6 +550,7 @@ out: ...@@ -555,6 +550,7 @@ out:
if err != nil { if err != nil {
continue continue
} }
atomic.StoreInt64(&client.consensHeight, status.Height)
authExist := false authExist := false
if client.paraClient.authAccount != "" { if client.paraClient.authAccount != "" {
...@@ -564,15 +560,13 @@ out: ...@@ -564,15 +560,13 @@ out:
} }
authExist = strings.Contains(nodes, client.paraClient.authAccount) authExist = strings.Contains(nodes, client.paraClient.authAccount)
} }
//consensusRst <- &commitConsensRsp{status: status, authAccountIn: authExist}
atomic.StoreInt64(&client.consensHeight, status.Height)
if authExist { if authExist {
atomic.StoreInt32(&client.authAccountIn, 1) atomic.StoreInt32(&client.authAccountIn, 1)
} else { } else {
atomic.StoreInt32(&client.authAccountIn, 0) atomic.StoreInt32(&client.authAccountIn, 0)
} }
plog.Info("para getConsensusHeight", "height", status.Height, "AccoutIn", authExist)
plog.Debug("para getConsensusHeight", "height", status.Height, "AccoutIn", authExist)
} }
} }
...@@ -727,179 +721,3 @@ func (client *commitMsgClient) fetchPriKey() error { ...@@ -727,179 +721,3 @@ func (client *commitMsgClient) fetchPriKey() error {
plog.Info("para commit fetchPriKey success") plog.Info("para commit fetchPriKey success")
return nil return nil
} }
//func (client *commitMsgClient) handler() {
// var isSync bool
// var isRollback bool
// var notification []int64 //记录每次系统重启后 min and current height
// var finishHeight int64 = -1
// var sendingHeight int64 //当前发送的最大高度
// var readTick <-chan time.Time
// var ticker *time.Ticker
// var lastAuthAccountIn bool
// var consensStopTimes uint32
//
// client.paraClient.wg.Add(1)
// consensusCh := make(chan *commitConsensRsp, 1)
// go client.getConsensusHeight(consensusCh)
//
// client.paraClient.wg.Add(1)
// sendMsgCh := make(chan *types.Transaction, 1)
// go client.sendCommitMsg(sendMsgCh)
//
//out:
// for {
// select {
// case height := <-client.commitMsgNotify:
// if notification == nil {
// notification = append(notification, height)
// notification = append(notification, height)
// finishHeight = height - 1
// } else {
// //[0] need update to min value if any, [1] always get current height, as for fork case, the height may lower than before
// if height < notification[0] {
// notification[0] = height
// finishHeight = height - 1
// }
// notification[1] = height
// if finishHeight >= notification[1] {
// //分叉场景,finish设置为最小值,等待主链共识高度重新设定finishHeight
// finishHeight = notification[0] - 1
// }
// }
// isRollback = false
//
// case height := <-client.delMsgNotify:
// if notification == nil {
// continue
// }
// if height <= notification[1] {
// notification[1] = height - 1
// }
// if height <= sendingHeight && client.currentTx != nil {
// client.currentTx = nil
// }
// //在分叉的主链上,回滚会连续回滚,回滚结束前不会add block,停止发送同时也忽略共识消息,回滚结束后根据共识高度重新设定finishHeight
// //如果分叉高度大于当前已完成高度,说明新的主链也收到了finish的tx,不需要重发,也就不需要重新设定
// if height <= finishHeight {
// finishHeight = notification[0] - 1
// }
// isRollback = true
// plog.Debug("para del block", "delHeight", height)
//
//
// case <-readTick:
// plog.Debug("para readTick", "notify", notification,
// "finishHeight", finishHeight, "txIsNil", client.currentTx == nil, "sync", isSync)
//
// if notification != nil && finishHeight < notification[1] && client.currentTx == nil && isSync {
// count := notification[1] - finishHeight
// if count > types.TxGroupMaxCount {
// count = types.TxGroupMaxCount
// }
// status, err := client.getNodeStatus(finishHeight+1, finishHeight+count)
// if err != nil {
// plog.Error("para commit msg read tick", "err", err.Error())
// continue
// }
// if len(status) == 0 {
// continue
// }
//
// signTx, count, err := client.calcCommitMsgTxs(status)
// if err != nil || signTx == nil {
// continue
// }
// sendingHeight = finishHeight + count
// sendingMsgs := status[:count]
// client.currentTx = signTx
// client.checkTxCommitTimes = 0
// sendMsgCh <- client.currentTx
//
// plog.Debug("paracommitmsg sending", "txhash", common.ToHex(signTx.Hash()), "exec", string(signTx.Execer))
// for i, msg := range sendingMsgs {
// plog.Debug("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight,
// "blockhash", common.HashHex(msg.BlockHash), "mainHash", common.HashHex(msg.MainBlockHash),
// "from", client.paraClient.authAccount)
// }
// }
//
// //获取正在共识的高度,同步有两层意思,一个是主链跟其他节点完成了同步,另一个是当前平行链节点的高度追赶上了共识高度
// //一般来说高度增长从小到大: notifiy[0] -- selfConsensusHeight(mainHeight) -- finishHeight -- sendingHeight -- notify[1]
// case rsp := <-consensusCh:
// consensHeight := rsp.status.Height
// plog.Info("para consensus rcv", "notify", notification,
// "consensHeight", rsp.status.Height, "finishHeight", finishHeight, "authIn", rsp.authAccountIn, "sync", isSync, "miner", readTick != nil)
// plog.Debug("para consensus rcv", "consensBlockHash", common.ToHex(rsp.status.BlockHash))
//
// //每次账户加入nodegroup 重新设置finishHeight 重新发送,防止曾经发送过,又退出group场景
// if !lastAuthAccountIn && rsp.authAccountIn {
// finishHeight = consensHeight
// }
// lastAuthAccountIn = rsp.authAccountIn
//
// if notification == nil || isRollback || !rsp.authAccountIn {
// isSync = false
// continue
// }
//
// //所有节点还没有共识场景或新节点或重启节点catchingUp场景,要等到收到区块高度大于共识高度时候发送,在catchingup时候本身共识高度和块高度一起增长
// if notification[1] > consensHeight {
// isSync = true
// }
//
// // 共识高度追赶上完成高度之后再发,不然继续发浪费手续费
// if finishHeight > consensHeight {
// if consensStopTimes < client.waitConsensStopTimes {
// isSync = false
// consensStopTimes++
// continue
// }
//
// //reset finishHeight to consensHeight and resent
// finishHeight = consensHeight
// }
//
// //未共识过的小于当前共识高度的区块,可以不参与共识, 如果是新节点,一直等到同步的区块达到了共识高度,才设置同步参与共识
// //在某些特殊场景下,比如平行链连接的主链节点分叉后又恢复,主链的共识高度低于分叉高度时候,主链上形成共识空洞,需要从共识高度重新发送而不是分叉高度
// //共识高度和分叉高度不一致其中一个原因是共识交易组里面某个高度分叉了,分叉的主链节点执行成功,而其他主链节点执行失败,共识高度停留在交易组最小高度-1
// //而分叉高度是交易组里面的某个高度
// if finishHeight <= consensHeight {
// finishHeight = consensHeight
// consensStopTimes = 0
// }
//
// //系统每次重启都有检查一次共识,如果共识高度落后于系统起来后完成的第一个高度或最小高度,说明可能有共识空洞,需要重发
// // 需要是<而不是<=, 因为notification[0]被认为是系统起来后已经发送过的
// nextConsensHeight := consensHeight + 1
// if nextConsensHeight < notification[0] {
// notification[0] = nextConsensHeight
// finishHeight = consensHeight
// }
//
// case miner := <-client.minerSwitch:
// plog.Info("para consensus mining", "miner", miner)
// //停止挖矿
// if !miner {
// readTick = nil
// if ticker != nil {
// ticker.Stop()
// }
// plog.Info("para consensus stop mining")
// continue
// }
// //开启挖矿
// if readTick == nil {
// ticker = time.NewTicker(time.Second * time.Duration(minerInterval))
// readTick = ticker.C
// plog.Info("para consensus start mining")
//
// }
//
// case <-client.quit:
// break out
// }
// }
//
// client.paraClient.wg.Done()
//}
...@@ -75,7 +75,7 @@ func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) er ...@@ -75,7 +75,7 @@ func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) er
} }
func (client *client) checkCommitTxSuccess(detail *types.BlockDetail) { func (client *client) checkCommitTxSuccess(detail *types.BlockDetail) {
if !client.isCaughtUp { if !client.isCaughtUp || !client.commitMsgClient.isSendingCommitMsg(){
return return
} }
...@@ -87,7 +87,6 @@ func (client *client) checkCommitTxSuccess(detail *types.BlockDetail) { ...@@ -87,7 +87,6 @@ func (client *client) checkCommitTxSuccess(detail *types.BlockDetail) {
} }
} }
//return txMap[string(targetTx.Hash())]
client.commitMsgClient.checkSendingTxDone(txMap) client.commitMsgClient.checkSendingTxDone(txMap)
} }
...@@ -220,6 +219,7 @@ func (client *client) getLastLocalBlockSeq() (int64, []byte, error) { ...@@ -220,6 +219,7 @@ func (client *client) getLastLocalBlockSeq() (int64, []byte, error) {
} }
} }
plog.Info("Parachain getLastLocalBlockSeq from block")
//说明localDb获取存在错误,从chain获取 //说明localDb获取存在错误,从chain获取
mainSeq, chainBlock, err := client.getLastBlockMainInfo() mainSeq, chainBlock, err := client.getLastBlockMainInfo()
if err != nil { if err != nil {
...@@ -474,7 +474,7 @@ func (client *client) CreateBlock() { ...@@ -474,7 +474,7 @@ func (client *client) CreateBlock() {
} }
if err != nil { if err != nil {
plog.Error("para DownloadBlocks", "type", mainBlock.Seq.Type, "err", err.Error()) plog.Error("para CreateBlock", "type", mainBlock.Seq.Type, "err", err.Error())
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment