Commit 2dec6ca8 authored by mdj33's avatar mdj33 Committed by vipwzw

proc commit msg

parent c9cabf8d
......@@ -181,6 +181,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
waitConsensStopTimes: waitConsensTimes,
commitCh: make(chan int64, 1),
resetCh: make(chan int64, 1),
verifyCh: make(chan []byte, 1),
consensHeight: -2,
sendingHeight: -1,
quit: make(chan struct{}),
......
......@@ -36,7 +36,7 @@ func TestFilterTxsForPara(t *testing.T) {
types.Init(Title, cfg)
detail, filterTxs, _ := createTestTxs(t)
rst := paraexec.FilterTxsForPara(Title, detail)
rst := paraexec.FilterTxsForParaByBlock(Title, detail)
assert.Equal(t, filterTxs, rst)
......@@ -201,7 +201,7 @@ func TestAddMinerTx(t *testing.T) {
Txs: filterTxs}
para := new(client)
para.privateKey = priKey
para.addMinerTx(nil, block, localBlock)
para.blockSyncClient.addMinerTx(nil, block, localBlock)
assert.Equal(t, 1, len(block.Txs))
}
......
......@@ -13,6 +13,8 @@ import (
"sync/atomic"
"unsafe"
"bytes"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/types"
......@@ -32,6 +34,7 @@ type commitMsgClient struct {
waitConsensStopTimes uint32 //共识高度低于完成高度, reset高度重发等待的次数
commitCh chan int64
resetCh chan int64
verifyCh chan []byte
sendMsgCh chan *types.Transaction
minerSwitch int32
currentTx unsafe.Pointer
......@@ -67,17 +70,23 @@ func (client *commitMsgClient) handler() {
out:
for {
select {
//正常commit 入口
case <-client.commitCh:
//回滚场景
if atomic.LoadInt64(&client.chainHeight) < client.sendingHeight {
client.clearSendingTx()
//正常commitMsg 入口
case height := <-client.commitCh:
//如果回滚高度小于发送高度,需要reset发送参数,回滚完成后重新发送
if height < client.sendingHeight {
client.procResetSendTx()
continue
}
client.procSendTx()
//出错场景入口,需要reset 重发
case <-client.resetCh:
client.clearSendingTx()
client.procSendTx()
client.procResetSendTx()
//发送成功后,验证是否commitTx上链
case verifyTx := <-client.verifyCh:
client.procVerifyTx(verifyTx)
//例行检查发送入口
case <-readTick:
consensStopTimes = client.checkConsensusStop(consensStopTimes)
......@@ -91,52 +100,101 @@ out:
client.paraClient.wg.Done()
}
func (client *commitMsgClient) commitNotify() {
client.commitCh <- 1
func (client *commitMsgClient) commitNotify(height int64) {
client.commitCh <- height
}
// reset notify 是为了保证与其他channel的串行执行顺序,
// 在channel外clearSendingTx 虽然变量有原子锁,但是并行的
func (client *commitMsgClient) resetNotify() {
client.resetCh <- 1
}
func (client *commitMsgClient) clearSendingTx() {
func (client *commitMsgClient) verifyNotify(verifyTx []byte) {
client.verifyCh <- verifyTx
}
func (client *commitMsgClient) resetSendEnv() {
client.sendingHeight = -1
client.clearCurrentTx()
}
func (client *commitMsgClient) clearCurrentTx() {
client.setCurrentTx(nil)
}
func (client *commitMsgClient) procSendTx() {
plog.Info("para commitMsg---status", "chainHeight", atomic.LoadInt64(&client.chainHeight), "sendingHeight", client.sendingHeight,
"consensHeight", atomic.LoadInt64(&client.consensHeight), "isSendingTx", client.isSendingCommitMsg(), "sync", client.isSync())
consensHeight := atomic.LoadInt64(&client.consensHeight)
chainHeight := atomic.LoadInt64(&client.chainHeight)
sendingHeight := client.sendingHeight
plog.Info("para commitMsg---status", "chainHeight", chainHeight, "sendingHeight", sendingHeight,
"consensHeight", consensHeight, "isSendingTx", client.isSendingCommitMsg(), "sync", client.isSync())
if client.isSendingCommitMsg() || !client.isSync() {
return
}
consensHeight := atomic.LoadInt64(&client.consensHeight)
chainHeight := atomic.LoadInt64(&client.chainHeight)
if client.sendingHeight < consensHeight {
client.sendingHeight = consensHeight
if sendingHeight < consensHeight {
sendingHeight = consensHeight
}
//1.如果是在主链共识场景,共识高度可能大于平行链的链高度
//2.已发送,未共识场景
if chainHeight < consensHeight || client.sendingHeight > consensHeight {
if chainHeight < consensHeight || sendingHeight > consensHeight {
return
}
if client.sendingHeight < chainHeight {
signTx, count := client.getSendingTx(client.sendingHeight, chainHeight)
if sendingHeight < chainHeight {
signTx, count := client.getSendingTx(sendingHeight, chainHeight)
if signTx == nil {
return
}
client.sendingHeight = client.sendingHeight + count
client.checkTxCommitTimes = 0
client.sendingHeight = sendingHeight + count
client.setCurrentTx(signTx)
atomic.StoreInt32(&client.checkTxCommitTimes, 0)
client.sendMsgCh <- signTx
}
}
func (client *commitMsgClient) procVerifyTx(verifyTx []byte) {
curTx := client.getCurrentTx()
if curTx == nil {
return
}
if len(verifyTx) == 0 {
client.checkTxCommitTimes++
if client.checkTxCommitTimes >= client.waitMainBlocks {
client.checkTxCommitTimes = 0
client.procResetSendTx()
}
return
}
if bytes.Equal(curTx.Hash(), verifyTx) {
client.clearCurrentTx()
client.procSendTx()
}
}
func (client *commitMsgClient) procResetSendTx() {
client.resetSendEnv()
client.procSendTx()
}
func (client *commitMsgClient) checkConsensusStop(consensStopTimes uint32) uint32 {
if client.sendingHeight > atomic.LoadInt64(&client.consensHeight) && !client.isSendingCommitMsg() {
if consensStopTimes > client.waitConsensStopTimes {
client.resetSendEnv()
return 0
}
return consensStopTimes + 1
}
return 0
}
func (client *commitMsgClient) isSync() bool {
height := atomic.LoadInt64(&client.chainHeight)
if height <= 0 {
......@@ -209,54 +267,6 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type
return signTx, count
}
func (client *commitMsgClient) setCurrentTx(tx *types.Transaction) {
atomic.StorePointer(&client.currentTx, unsafe.Pointer(tx))
}
func (client *commitMsgClient) getCurrentTx() *types.Transaction {
return (*types.Transaction)(atomic.LoadPointer(&client.currentTx))
}
func (client *commitMsgClient) isSendingCommitMsg() bool {
return client.getCurrentTx() != nil
}
func (client *commitMsgClient) updateChainHeight(height int64, isDel bool) {
if isDel {
atomic.StoreInt32(&client.isRollBack, 1)
} else {
atomic.StoreInt32(&client.isRollBack, 0)
}
atomic.StoreInt64(&client.chainHeight, height)
client.commitNotify()
}
//TODO 非平行鏈的commit tx 去主鏈查詢
func (client *commitMsgClient) checkSendingTxDone(txs map[string]bool) {
tx := client.getCurrentTx()
if tx == nil {
return
}
if txs[string(tx.Hash())] {
client.setCurrentTx(nil)
atomic.StoreInt32(&client.checkTxCommitTimes, 0)
//继续处理
client.commitNotify()
return
}
atomic.AddInt32(&client.checkTxCommitTimes, 1)
if atomic.LoadInt32(&client.checkTxCommitTimes) >= client.waitMainBlocks {
atomic.StoreInt32(&client.checkTxCommitTimes, 0)
//重新发送
client.resetNotify()
}
}
func (client *commitMsgClient) calcCommitMsgTxs(notifications []*pt.ParacrossNodeStatus) (*types.Transaction, int64, error) {
txs, count, err := client.batchCalcTxGroup(notifications)
if err != nil {
......@@ -333,6 +343,31 @@ func (client *commitMsgClient) singleCalcTx(status *pt.ParacrossNodeStatus) (*ty
}
func (client *commitMsgClient) setCurrentTx(tx *types.Transaction) {
atomic.StorePointer(&client.currentTx, unsafe.Pointer(tx))
}
func (client *commitMsgClient) getCurrentTx() *types.Transaction {
return (*types.Transaction)(atomic.LoadPointer(&client.currentTx))
}
func (client *commitMsgClient) isSendingCommitMsg() bool {
return client.getCurrentTx() != nil
}
func (client *commitMsgClient) updateChainHeight(height int64, isDel bool) {
if isDel {
atomic.StoreInt32(&client.isRollBack, 1)
} else {
atomic.StoreInt32(&client.isRollBack, 0)
}
atomic.StoreInt64(&client.chainHeight, height)
client.commitNotify(height)
}
func (client *commitMsgClient) sendCommitMsg() {
var err error
var tx *types.Transaction
......@@ -513,18 +548,6 @@ func (client *commitMsgClient) mainSync() error {
}
func (client *commitMsgClient) checkConsensusStop(consensStopTimes uint32) uint32 {
if client.sendingHeight > atomic.LoadInt64(&client.consensHeight) && !client.isSendingCommitMsg() {
if consensStopTimes > client.waitConsensStopTimes {
client.clearSendingTx()
return 0
}
return consensStopTimes + 1
}
return 0
}
func (client *commitMsgClient) getConsensusHeight() {
ticker := time.NewTicker(time.Second * time.Duration(consensusInterval))
isSync := false
......
......@@ -35,12 +35,16 @@ func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) er
}
func (client *client) checkCommitTxSuccess(txs []*pt.TxDetail) {
if atomic.LoadInt32(&client.isCaughtUp) != 1 || !client.commitMsgClient.isSendingCommitMsg() {
if atomic.LoadInt32(&client.isCaughtUp) != 1 {
return
}
txMap := make(map[string]bool)
curTx := client.commitMsgClient.getCurrentTx()
if curTx == nil {
return
}
txMap := make(map[string]bool)
if types.IsParaExecName(string(curTx.Execer)) {
for _, tx := range txs {
if bytes.HasSuffix(tx.Tx.Execer, []byte(pt.ParaX)) && tx.Receipt.Ty == types.ExecOk {
......@@ -55,8 +59,11 @@ func (client *client) checkCommitTxSuccess(txs []*pt.TxDetail) {
}
}
client.commitMsgClient.checkSendingTxDone(txMap)
if txMap[string(curTx.Hash())] {
client.commitMsgClient.verifyNotify(curTx.Hash())
} else {
client.commitMsgClient.verifyNotify(nil)
}
}
func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *pt.ParaTxDetail) error {
......
......@@ -382,7 +382,7 @@ func (client *BlockSyncClient) rollbackBlock(block *types.Block) error {
if resp.GetData().(*types.Reply).IsOk {
if client.paraClient.authAccount != "" {
client.paraClient.commitMsgClient.updateChainHeight(blocks.Items[0].Block.Height, true)
client.paraClient.commitMsgClient.updateChainHeight(blocks.Items[0].Block.Height-1, true)
}
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment