Commit 02443aa3 authored by mdj33's avatar mdj33 Committed by vipwzw

adjust commit msg

parent 4d9a73bf
...@@ -163,10 +163,9 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -163,10 +163,9 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
paraClient: para, paraClient: para,
waitMainBlocks: waitBlocks, waitMainBlocks: waitBlocks,
waitConsensStopTimes: waitConsensTimes, waitConsensStopTimes: waitConsensTimes,
commitMsgNotify: make(chan int64, 1), commitNotify: make(chan int64, 1),
delMsgNotify: make(chan int64, 1), resetNotify: make(chan int64, 1),
mainBlockAdd: make(chan *types.BlockDetail, 1), chainHeight: -1,
minerSwitch: make(chan bool, 1),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
c.SetChild(para) c.SetChild(para)
...@@ -393,7 +392,7 @@ func (client *client) WriteBlock(prev []byte, paraBlock *types.Block, seq int64) ...@@ -393,7 +392,7 @@ func (client *client) WriteBlock(prev []byte, paraBlock *types.Block, seq int64)
client.SetCurrentBlock(blkdetail.Block) client.SetCurrentBlock(blkdetail.Block)
if client.authAccount != "" { if client.authAccount != "" {
client.commitMsgClient.onBlockAdded(blkdetail.Block.Height) client.commitMsgClient.updateChainHeight(blockDetail.Block.Height, false)
} }
return nil return nil
...@@ -430,7 +429,8 @@ func (client *client) DelBlock(block *types.Block, seq int64) error { ...@@ -430,7 +429,8 @@ func (client *client) DelBlock(block *types.Block, seq int64) error {
if resp.GetData().(*types.Reply).IsOk { if resp.GetData().(*types.Reply).IsOk {
if client.authAccount != "" { if client.authAccount != "" {
client.commitMsgClient.onBlockDeleted(blocks.Items[0].Block.Height) client.commitMsgClient.updateChainHeight(blocks.Items[0].Block.Height, true)
} }
} else { } else {
reply := resp.GetData().(*types.Reply) reply := resp.GetData().(*types.Reply)
......
...@@ -5,12 +5,14 @@ ...@@ -5,12 +5,14 @@
package para package para
import ( import (
"bytes"
"context" "context"
"time" "time"
"strings" "strings"
"sync/atomic"
"unsafe"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto" "github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
...@@ -28,217 +30,205 @@ type commitMsgClient struct { ...@@ -28,217 +30,205 @@ type commitMsgClient struct {
paraClient *client paraClient *client
waitMainBlocks int32 //等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2 waitMainBlocks int32 //等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2
waitConsensStopTimes uint32 //共识高度低于完成高度, reset高度重发等待的次数 waitConsensStopTimes uint32 //共识高度低于完成高度, reset高度重发等待的次数
commitMsgNotify chan int64 commitNotify chan int64
delMsgNotify chan int64 resetNotify chan int64
mainBlockAdd chan *types.BlockDetail sendMsgCh chan *types.Transaction
minerSwitch chan bool minerSwitch int32
currentTx *types.Transaction currentTx unsafe.Pointer
chainHeight int64
sendingHeight int64
consensHeight int64
authAccountIn int32
isRollBack int32
checkTxCommitTimes int32 checkTxCommitTimes int32
privateKey crypto.PrivKey privateKey crypto.PrivKey
quit chan struct{} quit chan struct{}
} }
type commitConsensRsp struct { // 1. 链高度回滚,低于当前发送高度,需要重新计算当前发送高度,不然不会重新发送回滚的高度
status *pt.ParacrossStatus // 2. 定时轮询是在比如锁定解锁钱包这类外部条件变化时候,其他输入条件不会触发时候及时响应,不然任何一个外部条件变化都触发一下发送,可能条件比较多
authAccountIn bool //是否授权账户包含在node group addrs
}
func (client *commitMsgClient) handler() { func (client *commitMsgClient) handler() {
var isSync bool
var isRollback bool
var notification []int64 //记录每次系统重启后 min and current height
var finishHeight int64 = -1
var sendingHeight int64 //当前发送的最大高度
var sendingMsgs []*pt.ParacrossNodeStatus
var readTick <-chan time.Time var readTick <-chan time.Time
var ticker *time.Ticker
var lastAuthAccountIn bool
var consensStopTimes uint32 var consensStopTimes uint32
client.paraClient.wg.Add(1) client.paraClient.wg.Add(1)
consensusCh := make(chan *commitConsensRsp, 1) go client.getConsensusHeight()
go client.getConsensusHeight(consensusCh)
if client.paraClient.authAccount != "" {
client.paraClient.wg.Add(1) client.paraClient.wg.Add(1)
sendMsgCh := make(chan *types.Transaction, 1) client.sendMsgCh = make(chan *types.Transaction, 1)
go client.sendCommitMsg(sendMsgCh) go client.sendCommitMsg()
ticker := time.NewTicker(time.Second * time.Duration(minerInterval))
readTick = ticker.C
defer ticker.Stop()
}
out: out:
for { for {
select { select {
case height := <-client.commitMsgNotify: //正常的触发检查
if notification == nil { case <-client.commitNotify:
notification = append(notification, height) //回滚场景
notification = append(notification, height) if atomic.LoadInt64(&client.chainHeight) < client.sendingHeight {
finishHeight = height - 1 client.resetSendTx()
} else { }
//[0] need update to min value if any, [1] always get current height, as for fork case, the height may lower than before client.procSendTx()
if height < notification[0] { //发送出错场景,需要reset 重发
notification[0] = height case <-client.resetNotify:
finishHeight = height - 1 client.resetSendTx()
} client.procSendTx()
notification[1] = height //例行检查发送
if finishHeight >= notification[1] { case <-readTick:
//分叉场景,finish设置为最小值,等待主链共识高度重新设定finishHeight consensStopTimes = client.checkConsensusStop(consensStopTimes)
finishHeight = notification[0] - 1 client.procSendTx()
case <-client.quit:
break out
} }
} }
isRollback = false
case height := <-client.delMsgNotify: client.paraClient.wg.Done()
if notification == nil { }
continue
func (client *commitMsgClient) resetSendTx() {
client.sendingHeight = 0
client.setCurrentTx(nil)
}
func (client *commitMsgClient) procSendTx() {
plog.Debug("para readTick", "notify", atomic.LoadInt64(&client.chainHeight),
"finishHeight", client.sendingHeight, "txIsNil", client.currentTx == nil, "sync", client.isSync())
if client.isSendingCommitMsg() || !client.isSync() {
return
} }
if height <= notification[1] {
notification[1] = height - 1 consensHeight := atomic.LoadInt64(&client.consensHeight)
chainHeight := atomic.LoadInt64(&client.chainHeight)
if client.sendingHeight < consensHeight {
client.sendingHeight = consensHeight
} }
if height <= sendingHeight && client.currentTx != nil {
sendingMsgs = nil //如果是在主链共识场景,共识高度可能大于平行链的链高度
client.currentTx = nil if chainHeight < consensHeight {
return
} }
//在分叉的主链上,回滚会连续回滚,回滚结束前不会add block,停止发送同时也忽略共识消息,回滚结束后根据共识高度重新设定finishHeight
//如果分叉高度大于当前已完成高度,说明新的主链也收到了finish的tx,不需要重发,也就不需要重新设定 //已发送,未共识场景
if height <= finishHeight { if client.sendingHeight > consensHeight {
finishHeight = notification[0] - 1 return
} }
isRollback = true
plog.Debug("para del block", "delHeight", height)
case block := <-client.mainBlockAdd: if client.sendingHeight < chainHeight {
client.paraClient.mtx.Lock() signTx, count := client.getSendingTx(client.sendingHeight, chainHeight)
isCaughtUp := client.paraClient.isCaughtUp if signTx == nil {
client.paraClient.mtx.Unlock() return
if client.currentTx != nil && isCaughtUp { }
exist := checkTxInMainBlock(client.currentTx, block) client.sendingHeight = client.sendingHeight + count
if exist { client.setCurrentTx(signTx)
finishHeight = sendingHeight
sendingMsgs = nil
client.currentTx = nil
} else {
client.checkTxCommitTimes++
if client.checkTxCommitTimes > client.waitMainBlocks {
//超过等待最大次数,reset,重新组织发送,防止一直发送同一笔消息
sendingMsgs = nil
client.currentTx = nil
client.checkTxCommitTimes = 0 client.checkTxCommitTimes = 0
client.sendMsgCh <- signTx
} }
}
func (client *commitMsgClient) isSync() bool {
chainHeight := atomic.LoadInt64(&client.chainHeight)
if chainHeight < 0 {
return false
} }
if atomic.LoadInt32(&client.authAccountIn) != 1 {
return false
} }
case <-readTick: if atomic.LoadInt32(&client.minerSwitch) == 0 {
plog.Debug("para readTick", "notify", notification, "sending", len(sendingMsgs), return false
"finishHeight", finishHeight, "txIsNil", client.currentTx == nil, "sync", isSync) }
if atomic.LoadInt32(&client.isRollBack) == 1 {
return false
}
return true
if notification != nil && finishHeight < notification[1] && client.currentTx == nil && isSync { }
count := notification[1] - finishHeight
func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*types.Transaction, int64) {
count := endHeight - startHeight
if count > types.TxGroupMaxCount { if count > types.TxGroupMaxCount {
count = types.TxGroupMaxCount count = types.TxGroupMaxCount
} }
status, err := client.getNodeStatus(finishHeight+1, finishHeight+count) status, err := client.getNodeStatus(startHeight+1, startHeight+count)
if err != nil { if err != nil {
plog.Error("para commit msg read tick", "err", err.Error()) plog.Error("para commit msg read tick", "err", err.Error())
continue return nil, 0
} }
if len(status) == 0 { if len(status) == 0 {
continue return nil, 0
} }
signTx, count, err := client.calcCommitMsgTxs(status) signTx, count, err := client.calcCommitMsgTxs(status)
if err != nil || signTx == nil { if err != nil || signTx == nil {
continue return nil, 0
} }
sendingHeight = finishHeight + count
sendingMsgs = status[:count]
client.currentTx = signTx
client.checkTxCommitTimes = 0
sendMsgCh <- client.currentTx
sendingMsgs := status[:count]
plog.Debug("paracommitmsg sending", "txhash", common.ToHex(signTx.Hash()), "exec", string(signTx.Execer)) plog.Debug("paracommitmsg sending", "txhash", common.ToHex(signTx.Hash()), "exec", string(signTx.Execer))
for i, msg := range sendingMsgs { for i, msg := range sendingMsgs {
plog.Debug("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight, plog.Debug("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight,
"blockhash", common.HashHex(msg.BlockHash), "mainHash", common.HashHex(msg.MainBlockHash), "blockhash", common.HashHex(msg.BlockHash), "mainHash", common.HashHex(msg.MainBlockHash),
"from", client.paraClient.authAccount) "from", client.paraClient.authAccount)
} }
}
//获取正在共识的高度,同步有两层意思,一个是主链跟其他节点完成了同步,另一个是当前平行链节点的高度追赶上了共识高度 return signTx, count
//一般来说高度增长从小到大: notifiy[0] -- selfConsensusHeight(mainHeight) -- finishHeight -- sendingHeight -- notify[1] }
case rsp := <-consensusCh:
consensHeight := rsp.status.Height
plog.Info("para consensus rcv", "notify", notification, "sending", len(sendingMsgs),
"consensHeight", rsp.status.Height, "finishHeight", finishHeight, "authIn", rsp.authAccountIn, "sync", isSync, "miner", readTick != nil)
plog.Debug("para consensus rcv", "consensBlockHash", common.ToHex(rsp.status.BlockHash))
//每次账户加入nodegroup 重新设置finishHeight 重新发送,防止曾经发送过,又退出group场景
if !lastAuthAccountIn && rsp.authAccountIn {
finishHeight = consensHeight
}
lastAuthAccountIn = rsp.authAccountIn
if notification == nil || isRollback || !rsp.authAccountIn { func (client *commitMsgClient) setCurrentTx(tx *types.Transaction) {
isSync = false atomic.StorePointer(&client.currentTx, unsafe.Pointer(tx))
continue }
}
//所有节点还没有共识场景或新节点或重启节点catchingUp场景,要等到收到区块高度大于共识高度时候发送,在catchingup时候本身共识高度和块高度一起增长 func (client *commitMsgClient) getCurrentTx() *types.Transaction {
if notification[1] > consensHeight { return (*types.Transaction)(atomic.LoadPointer(&client.currentTx))
isSync = true }
}
// 共识高度追赶上完成高度之后再发,不然继续发浪费手续费 func (client *commitMsgClient) isSendingCommitMsg() bool {
if finishHeight > consensHeight { return client.getCurrentTx() != nil
if consensStopTimes < client.waitConsensStopTimes { }
isSync = false
consensStopTimes++
continue
}
//reset finishHeight to consensHeight and resent func (client *commitMsgClient) updateChainHeight(height int64, isDel bool) {
finishHeight = consensHeight if isDel {
atomic.StoreInt32(&client.isRollBack, 1)
} else {
atomic.StoreInt32(&client.isRollBack, 0)
} }
//未共识过的小于当前共识高度的区块,可以不参与共识, 如果是新节点,一直等到同步的区块达到了共识高度,才设置同步参与共识 atomic.StoreInt64(&client.chainHeight, height)
//在某些特殊场景下,比如平行链连接的主链节点分叉后又恢复,主链的共识高度低于分叉高度时候,主链上形成共识空洞,需要从共识高度重新发送而不是分叉高度
//共识高度和分叉高度不一致其中一个原因是共识交易组里面某个高度分叉了,分叉的主链节点执行成功,而其他主链节点执行失败,共识高度停留在交易组最小高度-1
//而分叉高度是交易组里面的某个高度
if finishHeight <= consensHeight {
finishHeight = consensHeight
consensStopTimes = 0
}
//系统每次重启都有检查一次共识,如果共识高度落后于系统起来后完成的第一个高度或最小高度,说明可能有共识空洞,需要重发 }
// 需要是<而不是<=, 因为notification[0]被认为是系统起来后已经发送过的
nextConsensHeight := consensHeight + 1
if nextConsensHeight < notification[0] {
notification[0] = nextConsensHeight
finishHeight = consensHeight
}
case miner := <-client.minerSwitch: //TODO 非平行鏈的commit tx 去主鏈查詢
plog.Info("para consensus mining", "miner", miner) func (client *commitMsgClient) checkSendingTxDone(txs map[string]bool) {
//停止挖矿 tx := client.getCurrentTx()
if !miner { if tx == nil {
readTick = nil return
if ticker != nil {
ticker.Stop()
}
plog.Info("para consensus stop mining")
continue
} }
//开启挖矿
if readTick == nil {
ticker = time.NewTicker(time.Second * time.Duration(minerInterval))
readTick = ticker.C
plog.Info("para consensus start mining")
if txs[string(tx.Hash())] {
client.setCurrentTx(nil)
atomic.StoreInt32(&client.checkTxCommitTimes, 0)
//继续处理
client.commitNotify <- 1
return
} }
case <-client.quit: atomic.AddInt32(&client.checkTxCommitTimes, 1)
break out if atomic.LoadInt32(&client.checkTxCommitTimes) >= client.waitMainBlocks {
} //重新发送
client.resetNotify <- 1
} }
client.paraClient.wg.Done()
} }
func (client *commitMsgClient) calcCommitMsgTxs(notifications []*pt.ParacrossNodeStatus) (*types.Transaction, int64, error) { func (client *commitMsgClient) calcCommitMsgTxs(notifications []*pt.ParacrossNodeStatus) (*types.Transaction, int64, error) {
...@@ -317,7 +307,7 @@ func (client *commitMsgClient) singleCalcTx(status *pt.ParacrossNodeStatus) (*ty ...@@ -317,7 +307,7 @@ func (client *commitMsgClient) singleCalcTx(status *pt.ParacrossNodeStatus) (*ty
} }
func (client *commitMsgClient) sendCommitMsg(ch chan *types.Transaction) { func (client *commitMsgClient) sendCommitMsg() {
var err error var err error
var tx *types.Transaction var tx *types.Transaction
var resendTimer <-chan time.Time var resendTimer <-chan time.Time
...@@ -325,7 +315,7 @@ func (client *commitMsgClient) sendCommitMsg(ch chan *types.Transaction) { ...@@ -325,7 +315,7 @@ func (client *commitMsgClient) sendCommitMsg(ch chan *types.Transaction) {
out: out:
for { for {
select { select {
case tx = <-ch: case tx = <-client.sendMsgCh:
err = client.sendCommitMsgTx(tx) err = client.sendCommitMsgTx(tx)
if err != nil && (err != types.ErrBalanceLessThanTenTimesFee && err != types.ErrNoBalance) { if err != nil && (err != types.ErrBalanceLessThanTenTimesFee && err != types.ErrNoBalance) {
resendTimer = time.After(time.Second * 2) resendTimer = time.After(time.Second * 2)
...@@ -361,18 +351,6 @@ func (client *commitMsgClient) sendCommitMsgTx(tx *types.Transaction) error { ...@@ -361,18 +351,6 @@ func (client *commitMsgClient) sendCommitMsgTx(tx *types.Transaction) error {
} }
func checkTxInMainBlock(targetTx *types.Transaction, detail *types.BlockDetail) bool {
txMap := make(map[string]bool)
for i, tx := range detail.Block.Txs {
if bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) && detail.Receipts[i].Ty == types.ExecOk {
txMap[string(tx.Hash())] = true
}
}
return txMap[string(targetTx.Hash())]
}
func isParaSelfConsensusForked(height int64) bool { func isParaSelfConsensusForked(height int64) bool {
return height > mainParaSelfConsensusForkHeight return height > mainParaSelfConsensusForkHeight
} }
...@@ -491,29 +469,6 @@ func (client *commitMsgClient) getGenesisNodeStatus() (*pt.ParacrossNodeStatus, ...@@ -491,29 +469,6 @@ func (client *commitMsgClient) getGenesisNodeStatus() (*pt.ParacrossNodeStatus,
return &status, nil return &status, nil
} }
func (client *commitMsgClient) onBlockAdded(height int64) error {
select {
case client.commitMsgNotify <- height:
case <-client.quit:
}
return nil
}
func (client *commitMsgClient) onBlockDeleted(height int64) {
select {
case client.delMsgNotify <- height:
case <-client.quit:
}
}
func (client *commitMsgClient) onMainBlockAdded(block *types.BlockDetail) {
select {
case client.mainBlockAdd <- block:
case <-client.quit:
}
}
//only sync once, as main usually sync, here just need the first sync status after start up //only sync once, as main usually sync, here just need the first sync status after start up
func (client *commitMsgClient) mainSync() error { func (client *commitMsgClient) mainSync() error {
req := &types.ReqNil{} req := &types.ReqNil{}
...@@ -532,7 +487,20 @@ func (client *commitMsgClient) mainSync() error { ...@@ -532,7 +487,20 @@ func (client *commitMsgClient) mainSync() error {
} }
func (client *commitMsgClient) getConsensusHeight(consensusRst chan *commitConsensRsp) { func (client *commitMsgClient) checkConsensusStop(consensStopTimes uint32) uint32 {
if client.sendingHeight > atomic.LoadInt64(&client.consensHeight) && !client.isSendingCommitMsg() {
consensStopTimes++
if consensStopTimes > client.waitConsensStopTimes {
client.resetSendTx()
return 0
}
return consensStopTimes
}
return 0
}
func (client *commitMsgClient) getConsensusHeight() {
ticker := time.NewTicker(time.Second * time.Duration(consensusInterval)) ticker := time.NewTicker(time.Second * time.Duration(consensusInterval))
isSync := false isSync := false
defer ticker.Stop() defer ticker.Stop()
...@@ -577,7 +545,15 @@ out: ...@@ -577,7 +545,15 @@ out:
} }
authExist = strings.Contains(nodes, client.paraClient.authAccount) authExist = strings.Contains(nodes, client.paraClient.authAccount)
} }
consensusRst <- &commitConsensRsp{status: status, authAccountIn: authExist}
//consensusRst <- &commitConsensRsp{status: status, authAccountIn: authExist}
atomic.StoreInt64(&client.consensHeight, status.Height)
if authExist {
atomic.StoreInt32(&client.authAccountIn, 1)
} else {
atomic.StoreInt32(&client.authAccountIn, 0)
}
} }
} }
...@@ -674,10 +650,12 @@ func (client *commitMsgClient) onWalletStatus(status *types.WalletStatus) { ...@@ -674,10 +650,12 @@ func (client *commitMsgClient) onWalletStatus(status *types.WalletStatus) {
return return
} }
select { if status.IsWalletLock {
case client.minerSwitch <- !status.IsWalletLock: atomic.StoreInt32(&client.minerSwitch, 0)
case <-client.quit: } else {
atomic.StoreInt32(&client.minerSwitch, 1)
} }
} }
func (client *commitMsgClient) onWalletAccount(acc *types.Account) { func (client *commitMsgClient) onWalletAccount(acc *types.Account) {
...@@ -690,10 +668,8 @@ func (client *commitMsgClient) onWalletAccount(acc *types.Account) { ...@@ -690,10 +668,8 @@ func (client *commitMsgClient) onWalletAccount(acc *types.Account) {
return return
} }
select { atomic.StoreInt32(&client.minerSwitch, 1)
case client.minerSwitch <- true:
case <-client.quit:
}
} }
func (client *commitMsgClient) fetchPriKey() error { func (client *commitMsgClient) fetchPriKey() error {
...@@ -731,3 +707,179 @@ func (client *commitMsgClient) fetchPriKey() error { ...@@ -731,3 +707,179 @@ func (client *commitMsgClient) fetchPriKey() error {
plog.Info("para commit fetchPriKey success") plog.Info("para commit fetchPriKey success")
return nil return nil
} }
//func (client *commitMsgClient) handler() {
// var isSync bool
// var isRollback bool
// var notification []int64 //记录每次系统重启后 min and current height
// var finishHeight int64 = -1
// var sendingHeight int64 //当前发送的最大高度
// var readTick <-chan time.Time
// var ticker *time.Ticker
// var lastAuthAccountIn bool
// var consensStopTimes uint32
//
// client.paraClient.wg.Add(1)
// consensusCh := make(chan *commitConsensRsp, 1)
// go client.getConsensusHeight(consensusCh)
//
// client.paraClient.wg.Add(1)
// sendMsgCh := make(chan *types.Transaction, 1)
// go client.sendCommitMsg(sendMsgCh)
//
//out:
// for {
// select {
// case height := <-client.commitMsgNotify:
// if notification == nil {
// notification = append(notification, height)
// notification = append(notification, height)
// finishHeight = height - 1
// } else {
// //[0] need update to min value if any, [1] always get current height, as for fork case, the height may lower than before
// if height < notification[0] {
// notification[0] = height
// finishHeight = height - 1
// }
// notification[1] = height
// if finishHeight >= notification[1] {
// //分叉场景,finish设置为最小值,等待主链共识高度重新设定finishHeight
// finishHeight = notification[0] - 1
// }
// }
// isRollback = false
//
// case height := <-client.delMsgNotify:
// if notification == nil {
// continue
// }
// if height <= notification[1] {
// notification[1] = height - 1
// }
// if height <= sendingHeight && client.currentTx != nil {
// client.currentTx = nil
// }
// //在分叉的主链上,回滚会连续回滚,回滚结束前不会add block,停止发送同时也忽略共识消息,回滚结束后根据共识高度重新设定finishHeight
// //如果分叉高度大于当前已完成高度,说明新的主链也收到了finish的tx,不需要重发,也就不需要重新设定
// if height <= finishHeight {
// finishHeight = notification[0] - 1
// }
// isRollback = true
// plog.Debug("para del block", "delHeight", height)
//
//
// case <-readTick:
// plog.Debug("para readTick", "notify", notification,
// "finishHeight", finishHeight, "txIsNil", client.currentTx == nil, "sync", isSync)
//
// if notification != nil && finishHeight < notification[1] && client.currentTx == nil && isSync {
// count := notification[1] - finishHeight
// if count > types.TxGroupMaxCount {
// count = types.TxGroupMaxCount
// }
// status, err := client.getNodeStatus(finishHeight+1, finishHeight+count)
// if err != nil {
// plog.Error("para commit msg read tick", "err", err.Error())
// continue
// }
// if len(status) == 0 {
// continue
// }
//
// signTx, count, err := client.calcCommitMsgTxs(status)
// if err != nil || signTx == nil {
// continue
// }
// sendingHeight = finishHeight + count
// sendingMsgs := status[:count]
// client.currentTx = signTx
// client.checkTxCommitTimes = 0
// sendMsgCh <- client.currentTx
//
// plog.Debug("paracommitmsg sending", "txhash", common.ToHex(signTx.Hash()), "exec", string(signTx.Execer))
// for i, msg := range sendingMsgs {
// plog.Debug("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight,
// "blockhash", common.HashHex(msg.BlockHash), "mainHash", common.HashHex(msg.MainBlockHash),
// "from", client.paraClient.authAccount)
// }
// }
//
// //获取正在共识的高度,同步有两层意思,一个是主链跟其他节点完成了同步,另一个是当前平行链节点的高度追赶上了共识高度
// //一般来说高度增长从小到大: notifiy[0] -- selfConsensusHeight(mainHeight) -- finishHeight -- sendingHeight -- notify[1]
// case rsp := <-consensusCh:
// consensHeight := rsp.status.Height
// plog.Info("para consensus rcv", "notify", notification,
// "consensHeight", rsp.status.Height, "finishHeight", finishHeight, "authIn", rsp.authAccountIn, "sync", isSync, "miner", readTick != nil)
// plog.Debug("para consensus rcv", "consensBlockHash", common.ToHex(rsp.status.BlockHash))
//
// //每次账户加入nodegroup 重新设置finishHeight 重新发送,防止曾经发送过,又退出group场景
// if !lastAuthAccountIn && rsp.authAccountIn {
// finishHeight = consensHeight
// }
// lastAuthAccountIn = rsp.authAccountIn
//
// if notification == nil || isRollback || !rsp.authAccountIn {
// isSync = false
// continue
// }
//
// //所有节点还没有共识场景或新节点或重启节点catchingUp场景,要等到收到区块高度大于共识高度时候发送,在catchingup时候本身共识高度和块高度一起增长
// if notification[1] > consensHeight {
// isSync = true
// }
//
// // 共识高度追赶上完成高度之后再发,不然继续发浪费手续费
// if finishHeight > consensHeight {
// if consensStopTimes < client.waitConsensStopTimes {
// isSync = false
// consensStopTimes++
// continue
// }
//
// //reset finishHeight to consensHeight and resent
// finishHeight = consensHeight
// }
//
// //未共识过的小于当前共识高度的区块,可以不参与共识, 如果是新节点,一直等到同步的区块达到了共识高度,才设置同步参与共识
// //在某些特殊场景下,比如平行链连接的主链节点分叉后又恢复,主链的共识高度低于分叉高度时候,主链上形成共识空洞,需要从共识高度重新发送而不是分叉高度
// //共识高度和分叉高度不一致其中一个原因是共识交易组里面某个高度分叉了,分叉的主链节点执行成功,而其他主链节点执行失败,共识高度停留在交易组最小高度-1
// //而分叉高度是交易组里面的某个高度
// if finishHeight <= consensHeight {
// finishHeight = consensHeight
// consensStopTimes = 0
// }
//
// //系统每次重启都有检查一次共识,如果共识高度落后于系统起来后完成的第一个高度或最小高度,说明可能有共识空洞,需要重发
// // 需要是<而不是<=, 因为notification[0]被认为是系统起来后已经发送过的
// nextConsensHeight := consensHeight + 1
// if nextConsensHeight < notification[0] {
// notification[0] = nextConsensHeight
// finishHeight = consensHeight
// }
//
// case miner := <-client.minerSwitch:
// plog.Info("para consensus mining", "miner", miner)
// //停止挖矿
// if !miner {
// readTick = nil
// if ticker != nil {
// ticker.Stop()
// }
// plog.Info("para consensus stop mining")
// continue
// }
// //开启挖矿
// if readTick == nil {
// ticker = time.NewTicker(time.Second * time.Duration(minerInterval))
// readTick = ticker.C
// plog.Info("para consensus start mining")
//
// }
//
// case <-client.quit:
// break out
// }
// }
//
// client.paraClient.wg.Done()
//}
...@@ -15,7 +15,7 @@ import ( ...@@ -15,7 +15,7 @@ import (
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
paraexec "github.com/33cn/plugin/plugin/dapp/paracross/executor" paraexec "github.com/33cn/plugin/plugin/dapp/paracross/executor"
paracross "github.com/33cn/plugin/plugin/dapp/paracross/types" pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
) )
func (client *client) setLocalDb(set *types.LocalDBSet) error { func (client *client) setLocalDb(set *types.LocalDBSet) error {
...@@ -59,7 +59,7 @@ func (client *client) getLocalDb(set *types.LocalDBGet, count int) ([][]byte, er ...@@ -59,7 +59,7 @@ func (client *client) getLocalDb(set *types.LocalDBGet, count int) ([][]byte, er
return reply.Values, nil return reply.Values, nil
} }
func (client *client) addLocalBlock(height int64, block *paracross.ParaLocalDbBlock) error { func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) error {
set := &types.LocalDBSet{} set := &types.LocalDBSet{}
key := calcTitleHeightKey(types.GetTitle(), height) key := calcTitleHeightKey(types.GetTitle(), height)
...@@ -74,8 +74,26 @@ func (client *client) addLocalBlock(height int64, block *paracross.ParaLocalDbBl ...@@ -74,8 +74,26 @@ func (client *client) addLocalBlock(height int64, block *paracross.ParaLocalDbBl
return client.setLocalDb(set) return client.setLocalDb(set)
} }
func (client *client) createLocalBlock(lastBlock *paracross.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *types.BlockSeq) error { func (client *client) checkTxInMainBlock(detail *types.BlockDetail) {
var newblock paracross.ParaLocalDbBlock if !client.isCaughtUp {
return
}
txMap := make(map[string]bool)
for i, tx := range detail.Block.Txs {
if bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) && detail.Receipts[i].Ty == types.ExecOk {
txMap[string(tx.Hash())] = true
}
}
//return txMap[string(targetTx.Hash())]
client.commitMsgClient.checkSendingTxDone(txMap)
}
func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *types.BlockSeq) error {
var newblock pt.ParaLocalDbBlock
newblock.Height = lastBlock.Height + 1 newblock.Height = lastBlock.Height + 1
newblock.MainHash = mainBlock.Seq.Hash newblock.MainHash = mainBlock.Seq.Hash
...@@ -85,7 +103,12 @@ func (client *client) createLocalBlock(lastBlock *paracross.ParaLocalDbBlock, tx ...@@ -85,7 +103,12 @@ func (client *client) createLocalBlock(lastBlock *paracross.ParaLocalDbBlock, tx
newblock.Txs = txs newblock.Txs = txs
return client.addLocalBlock(newblock.Height, &newblock) err := client.addLocalBlock(newblock.Height, &newblock)
if err != nil {
return err
}
client.checkTxInMainBlock(mainBlock.Detail)
return nil
} }
func (client *client) createLocalGenesisBlock(genesis *types.Block) error { func (client *client) createLocalGenesisBlock(genesis *types.Block) error {
...@@ -137,7 +160,7 @@ func (client *client) getLastLocalHeight() (int64, error) { ...@@ -137,7 +160,7 @@ func (client *client) getLastLocalHeight() (int64, error) {
} }
func (client *client) getLocalBlockByHeight(height int64) (*paracross.ParaLocalDbBlock, error) { func (client *client) getLocalBlockByHeight(height int64) (*pt.ParaLocalDbBlock, error) {
key := calcTitleHeightKey(types.GetTitle(), height) key := calcTitleHeightKey(types.GetTitle(), height)
set := &types.LocalDBGet{Keys: [][]byte{key}} set := &types.LocalDBGet{Keys: [][]byte{key}}
...@@ -149,7 +172,7 @@ func (client *client) getLocalBlockByHeight(height int64) (*paracross.ParaLocalD ...@@ -149,7 +172,7 @@ func (client *client) getLocalBlockByHeight(height int64) (*paracross.ParaLocalD
return nil, types.ErrNotFound return nil, types.ErrNotFound
} }
var block paracross.ParaLocalDbBlock var block pt.ParaLocalDbBlock
err = types.Decode(value[0], &block) err = types.Decode(value[0], &block)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -175,7 +198,7 @@ func (client *client) getLocalBlockSeq(height int64) (int64, []byte, error) { ...@@ -175,7 +198,7 @@ func (client *client) getLocalBlockSeq(height int64) (int64, []byte, error) {
//根据匹配上的chainblock,设置当前localdb block //根据匹配上的chainblock,设置当前localdb block
func (client *client) alignLocalBlock2ChainBlock(chainBlock *types.Block) error { func (client *client) alignLocalBlock2ChainBlock(chainBlock *types.Block) error {
localBlock := &paracross.ParaLocalDbBlock{ localBlock := &pt.ParaLocalDbBlock{
Height: chainBlock.Height, Height: chainBlock.Height,
MainHeight: chainBlock.MainHeight, MainHeight: chainBlock.MainHeight,
MainHash: chainBlock.MainHash, MainHash: chainBlock.MainHash,
...@@ -211,7 +234,7 @@ func (client *client) getLastLocalBlockSeq() (int64, []byte, error) { ...@@ -211,7 +234,7 @@ func (client *client) getLastLocalBlockSeq() (int64, []byte, error) {
} }
func (client *client) getLastLocalBlock() (*paracross.ParaLocalDbBlock, error) { func (client *client) getLastLocalBlock() (*pt.ParaLocalDbBlock, error) {
height, err := client.getLastLocalHeight() height, err := client.getLastLocalHeight()
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -277,7 +300,7 @@ func (client *client) getMatchedBlockOnChain(startHeight int64) (int64, *types.B ...@@ -277,7 +300,7 @@ func (client *client) getMatchedBlockOnChain(startHeight int64) (int64, *types.B
"new currSeq", mainSeq, "new preMainBlockHash", hex.EncodeToString(block.MainHash)) "new currSeq", mainSeq, "new preMainBlockHash", hex.EncodeToString(block.MainHash))
return mainSeq, block, nil return mainSeq, block, nil
} }
return -2, nil, paracross.ErrParaCurHashNotMatch return -2, nil, pt.ErrParaCurHashNotMatch
} }
func (client *client) switchMatchedBlockOnChain(startHeight int64) (int64, []byte, error) { func (client *client) switchMatchedBlockOnChain(startHeight int64) (int64, []byte, error) {
...@@ -331,7 +354,7 @@ func (client *client) switchLocalHashMatchedBlock() (int64, []byte, error) { ...@@ -331,7 +354,7 @@ func (client *client) switchLocalHashMatchedBlock() (int64, []byte, error) {
"currSeq", mainSeq, "currMainBlockHash", hex.EncodeToString(block.MainHash)) "currSeq", mainSeq, "currMainBlockHash", hex.EncodeToString(block.MainHash))
return mainSeq, block.MainHash, nil return mainSeq, block.MainHash, nil
} }
return -2, nil, paracross.ErrParaCurHashNotMatch return -2, nil, pt.ErrParaCurHashNotMatch
} }
// preBlockHash to identify the same main node // preBlockHash to identify the same main node
...@@ -362,27 +385,23 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type ...@@ -362,27 +385,23 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type
} }
client.mtx.Unlock() client.mtx.Unlock()
if client.authAccount != "" {
client.commitMsgClient.onMainBlockAdded(blockSeq.Detail)
}
return txs, blockSeq, nil return txs, blockSeq, nil
} }
//not consistent case be processed at below //not consistent case be processed at below
plog.Error("RequestTx", "preMainHash", hex.EncodeToString(preMainBlockHash), "currSeq preMainHash", hex.EncodeToString(blockSeq.Detail.Block.ParentHash), plog.Error("RequestTx", "preMainHash", hex.EncodeToString(preMainBlockHash), "currSeq preMainHash", hex.EncodeToString(blockSeq.Detail.Block.ParentHash),
"currSeq mainHash", hex.EncodeToString(blockSeq.Seq.Hash), "curr seq", currSeq, "ty", blockSeq.Seq.Type, "currSeq Mainheight", blockSeq.Detail.Block.Height) "currSeq mainHash", hex.EncodeToString(blockSeq.Seq.Hash), "curr seq", currSeq, "ty", blockSeq.Seq.Type, "currSeq Mainheight", blockSeq.Detail.Block.Height)
return nil, nil, paracross.ErrParaCurHashNotMatch return nil, nil, pt.ErrParaCurHashNotMatch
} }
//lastSeq < CurrSeq case: //lastSeq < CurrSeq case:
//lastSeq = currSeq-1, main node not update //lastSeq = currSeq-1, main node not update
if lastSeq+1 == currSeq { if lastSeq+1 == currSeq {
plog.Debug("Waiting new sequence from main chain") plog.Debug("Waiting new sequence from main chain")
return nil, nil, paracross.ErrParaWaitingNewSeq return nil, nil, pt.ErrParaWaitingNewSeq
} }
// 1. lastSeq < currSeq-1 // 1. lastSeq < currSeq-1
// 2. lastSeq >= currSeq and seq not consistent or fork case // 2. lastSeq >= currSeq and seq not consistent or fork case
return nil, nil, paracross.ErrParaCurHashNotMatch return nil, nil, pt.ErrParaCurHashNotMatch
} }
func (client *client) CreateBlock() { func (client *client) CreateBlock() {
...@@ -395,7 +414,7 @@ func (client *client) CreateBlock() { ...@@ -395,7 +414,7 @@ func (client *client) CreateBlock() {
for { for {
txs, mainBlock, err := client.RequestTx(currSeq, lastSeqMainHash) txs, mainBlock, err := client.RequestTx(currSeq, lastSeqMainHash)
if err != nil { if err != nil {
if err == paracross.ErrParaCurHashNotMatch { if err == pt.ErrParaCurHashNotMatch {
preSeq, preSeqMainHash, err := client.switchHashMatchedBlock() preSeq, preSeqMainHash, err := client.switchHashMatchedBlock()
if err == nil { if err == nil {
currSeq = preSeq + 1 currSeq = preSeq + 1
......
...@@ -534,7 +534,7 @@ func (a *action) commitTxDoneStep2(nodeStatus *pt.ParacrossNodeStatus, stat *pt. ...@@ -534,7 +534,7 @@ func (a *action) commitTxDoneStep2(nodeStatus *pt.ParacrossNodeStatus, stat *pt.
if !bytes.Equal(selfBlockHash.Hash, nodeStatus.BlockHash) { if !bytes.Equal(selfBlockHash.Hash, nodeStatus.BlockHash) {
clog.Error("paracross.CommitDone mosthash not match", "height", nodeStatus.Height, clog.Error("paracross.CommitDone mosthash not match", "height", nodeStatus.Height,
"blockHash", hex.EncodeToString(selfBlockHash.Hash), "mosthash", hex.EncodeToString(nodeStatus.BlockHash)) "blockHash", hex.EncodeToString(selfBlockHash.Hash), "mosthash", hex.EncodeToString(nodeStatus.BlockHash))
return nil, pt.ErrParaCurHashNotMatch return nil, types.ErrConsensusHashErr
} }
//平行连进行奖励分配 //平行连进行奖励分配
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment