Commit ef0127f4 authored by mdj33's avatar mdj33 Committed by vipwzw

improve commit msg

parent 71d7abb0
......@@ -61,7 +61,7 @@ type client struct {
*drivers.BaseClient
grpcClient types.Chain33Client
execAPI api.ExecutorAPI
isCaughtUp int32
caughtUp int32
commitMsgClient *commitMsgClient
blockSyncClient *BlockSyncClient
authAccount string
......@@ -176,9 +176,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
paraClient: para,
waitMainBlocks: waitBlocks,
waitConsensStopTimes: waitConsensTimes,
commitCh: make(chan int64, 1),
resetCh: make(chan int64, 1),
verifyCh: make(chan []byte, 1),
consensHeight: -2,
sendingHeight: -1,
quit: make(chan struct{}),
......@@ -329,18 +326,17 @@ func (client *client) ProcEvent(msg *queue.Message) bool {
return false
}
func (client *client) isCaughtUp() bool {
return atomic.LoadInt32(&client.caughtUp) == 1
}
//IsCaughtUp 是否追上最新高度,
func (client *client) Query_IsCaughtUp(req *types.ReqNil) (types.Message, error) {
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
}
caughtUp := false
if atomic.LoadInt32(&client.isCaughtUp) == 1 {
caughtUp = true
}
return &types.IsCaughtUp{Iscaughtup: caughtUp}, nil
return &types.IsCaughtUp{Iscaughtup: client.isCaughtUp()}, nil
}
func checkMinerTx(current *types.BlockDetail) error {
......
......@@ -15,6 +15,8 @@ import (
"bytes"
"sync"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/types"
......@@ -24,7 +26,7 @@ import (
)
var (
consensusInterval = 5 //about 1 new block interval
consensusInterval = 10 //about 1 new block interval
minerInterval = 10 //5s的主块间隔后分叉概率增加,10s可以消除一些分叉回退
)
......@@ -32,9 +34,6 @@ type commitMsgClient struct {
paraClient *client
waitMainBlocks int32 //等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2
waitConsensStopTimes uint32 //共识高度低于完成高度, reset高度重发等待的次数
commitCh chan int64
resetCh chan int64
verifyCh chan []byte
sendMsgCh chan *types.Transaction
minerSwitch int32
currentTx unsafe.Pointer
......@@ -46,6 +45,7 @@ type commitMsgClient struct {
checkTxCommitTimes int32
privateKey crypto.PrivKey
quit chan struct{}
mutex sync.Mutex
}
type commitCheckParams struct {
......@@ -74,27 +74,10 @@ func (client *commitMsgClient) handler() {
out:
for {
select {
//正常commitMsg 入口
case height := <-client.commitCh:
//如果回滚高度小于发送高度,需要reset发送参数,回滚完成后重新发送
if height < client.sendingHeight {
client.resetSendEnv()
}
client.procSendTx()
//出错场景入口,需要reset 重发
case <-client.resetCh:
client.resetSendEnv()
client.procSendTx()
//发送成功后,验证是否commitTx上链
case verifyTx := <-client.verifyCh:
client.procVerifyTx(verifyTx)
//例行检查发送入口
case <-readTick:
client.procChecks(checkParams)
client.procSendTx()
client.sendCommitTx()
case <-client.quit:
break out
......@@ -104,26 +87,38 @@ out:
client.paraClient.wg.Done()
}
func (client *commitMsgClient) commitNotify(height int64) {
client.commitCh <- height
//chain height更新时候入口
func (client *commitMsgClient) updateChainHeightNotify(height int64, isDel bool) {
if isDel {
atomic.StoreInt32(&client.isRollBack, 1)
} else {
atomic.StoreInt32(&client.isRollBack, 0)
}
atomic.StoreInt64(&client.chainHeight, height)
client.checkRollback(height)
client.sendCommitTx()
}
// reset notify 是为了保证与其他channel的串行执行顺序,
// 在channel外clearSendingTx 虽然变量有原子锁,但是并行的
// reset notify 提供重设发送参数,发送tx的入口
func (client *commitMsgClient) resetNotify() {
client.resetCh <- 1
client.mutex.Lock()
defer client.mutex.Unlock()
client.resetSendEnv()
}
func (client *commitMsgClient) verifyNotify(verifyTx []byte) {
client.verifyCh <- verifyTx
//新的区块产生,检查是否有commitTx正在发送入口
func (client *commitMsgClient) commitTxCheckNotify(txs []*pt.TxDetail) {
if client.checkCommitTxSuccess(txs) {
client.sendCommitTx()
}
}
func (client *commitMsgClient) resetSendEnv() {
client.sendingHeight = -1
client.clearCurrentTx()
}
func (client *commitMsgClient) clearCurrentTx() {
client.setCurrentTx(nil)
}
......@@ -137,7 +132,10 @@ func (client *commitMsgClient) getConsensusHeight() int64 {
return status.Height
}
func (client *commitMsgClient) procSendTx() {
func (client *commitMsgClient) sendCommitTx() {
client.mutex.Lock()
defer client.mutex.Unlock()
consensHeight := client.getConsensusHeight()
chainHeight := atomic.LoadInt64(&client.chainHeight)
sendingHeight := client.sendingHeight
......@@ -171,29 +169,64 @@ func (client *commitMsgClient) procSendTx() {
}
func (client *commitMsgClient) procVerifyTx(verifyTx []byte) {
curTx := client.getCurrentTx()
if curTx == nil {
return
}
if bytes.Equal(curTx.Hash(), verifyTx) {
client.clearCurrentTx()
client.procSendTx()
return
func (client *commitMsgClient) verifyTx(curTx *types.Transaction, verifyTxs map[string]bool) bool {
if verifyTxs[string(curTx.Hash())] {
client.setCurrentTx(nil)
return true
}
client.checkTxCommitTimes++
if client.checkTxCommitTimes >= client.waitMainBlocks {
client.checkTxCommitTimes = 0
client.resetSendEnv()
client.procSendTx()
return true
}
return false
}
func (client *commitMsgClient) checkCommitTxSuccess(txs []*pt.TxDetail) bool {
client.mutex.Lock()
defer client.mutex.Unlock()
curTx := client.getCurrentTx()
if curTx == nil {
return false
}
txMap := make(map[string]bool)
//committx是平行链交易
if types.IsParaExecName(string(curTx.Execer)) {
for _, tx := range txs {
if bytes.HasSuffix(tx.Tx.Execer, []byte(pt.ParaX)) && tx.Receipt.Ty == types.ExecOk {
txMap[string(tx.Tx.Hash())] = true
}
}
// committx是主链交易,需要向主链查询
} else {
//如果正在追赶,则暂时不去主链查找,减少耗时
if !client.paraClient.isCaughtUp() {
return false
}
receipt, _ := client.paraClient.QueryTxOnMainByHash(curTx.Hash())
if receipt != nil && receipt.Receipt.Ty == types.ExecOk {
txMap[string(curTx.Hash())] = true
}
}
//如果没找到且当前正在追赶,则不计数,如果找到了,即便当前在追赶,也立即处理
if !txMap[string(curTx.Hash())] && !client.paraClient.isCaughtUp() {
return false
}
return client.verifyTx(curTx, txMap)
}
//如果共识高度一直没有追上发送高度,且当前发送高度已经上链,说明共识一直没达成,安全起见,超过停止次数后,重发
func (client *commitMsgClient) checkConsensusStop(consensStopTimes uint32) uint32 {
client.mutex.Lock()
defer client.mutex.Unlock()
consensHeight := client.getConsensusHeight()
if client.sendingHeight > consensHeight && !client.isSendingCommitMsg() {
if consensStopTimes > client.waitConsensStopTimes {
......@@ -208,6 +241,9 @@ func (client *commitMsgClient) checkConsensusStop(consensStopTimes uint32) uint3
}
func (client *commitMsgClient) checkAuthAccountIn() {
client.mutex.Lock()
defer client.mutex.Unlock()
nodes, err := client.getNodeGroupAddrs()
if err != nil {
return
......@@ -223,7 +259,7 @@ func (client *commitMsgClient) checkAuthAccountIn() {
}
func (client *commitMsgClient) procChecks(checks *commitCheckParams) {
checks.consensStopTimes = client.checkConsensusStop(checks.consensStopTimes)
//checks.consensStopTimes = client.checkConsensusStop(checks.consensStopTimes)
client.checkAuthAccountIn()
}
......@@ -255,8 +291,8 @@ func (client *commitMsgClient) isSync() bool {
return false
}
if atomic.LoadInt32(&client.paraClient.isCaughtUp) != 1 {
plog.Info("para is not Sync", "isCaughtUp", atomic.LoadInt32(&client.paraClient.isCaughtUp))
if !client.paraClient.isCaughtUp() {
plog.Info("para is not Sync", "caughtUp", client.paraClient.isCaughtUp())
return false
}
......@@ -387,16 +423,31 @@ func (client *commitMsgClient) isSendingCommitMsg() bool {
return client.getCurrentTx() != nil
}
func (client *commitMsgClient) updateChainHeight(height int64, isDel bool) {
if isDel {
atomic.StoreInt32(&client.isRollBack, 1)
} else {
atomic.StoreInt32(&client.isRollBack, 0)
func (client *commitMsgClient) checkRollback(height int64) {
client.mutex.Lock()
defer client.mutex.Unlock()
if height < client.sendingHeight {
client.resetSendEnv()
}
}
atomic.StoreInt64(&client.chainHeight, height)
func (client *commitMsgClient) sendCommitTxOut(tx *types.Transaction) error {
if tx == nil {
return nil
}
resp, err := client.paraClient.grpcClient.SendTransaction(context.Background(), tx)
if err != nil {
plog.Error("sendCommitTxOut send tx", "tx", common.ToHex(tx.Hash()), "err", err.Error())
return err
}
if !resp.GetIsOk() {
plog.Error("sendCommitTxOut send tx Nok", "tx", common.ToHex(tx.Hash()), "err", string(resp.GetMsg()))
return errors.New(string(resp.GetMsg()))
}
client.commitNotify(height)
return nil
}
......@@ -409,13 +460,13 @@ out:
for {
select {
case tx = <-client.sendMsgCh:
err = client.sendCommitMsgTx(tx)
err = client.sendCommitTxOut(tx)
if err != nil && (err != types.ErrBalanceLessThanTenTimesFee && err != types.ErrNoBalance) {
resendTimer = time.After(time.Second * 2)
}
case <-resendTimer:
if err != nil && tx != nil {
client.sendCommitMsgTx(tx)
client.sendCommitTxOut(tx)
}
case <-client.quit:
break out
......@@ -425,25 +476,6 @@ out:
client.paraClient.wg.Done()
}
func (client *commitMsgClient) sendCommitMsgTx(tx *types.Transaction) error {
if tx == nil {
return nil
}
resp, err := client.paraClient.grpcClient.SendTransaction(context.Background(), tx)
if err != nil {
plog.Error("sendCommitMsgTx send tx", "tx", common.ToHex(tx.Hash()), "err", err.Error())
return err
}
if !resp.GetIsOk() {
plog.Error("sendCommitMsgTx send tx Nok", "tx", common.ToHex(tx.Hash()), "err", string(resp.GetMsg()))
return errors.New(string(resp.GetMsg()))
}
return nil
}
func isParaSelfConsensusForked(height int64) bool {
return height > mainParaSelfConsensusForkHeight
}
......@@ -531,7 +563,7 @@ func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossN
//3,如果形如xxoxx的块排列,x代表commit空块,o代表实际的块,即只要不全部是commit块,也要全部打包一起发出去
//如果=0 意味着全部是paracross commit tx,延迟发送
if needSentTxs == 0 && len(ret) < types.TxGroupMaxCount {
plog.Debug("para commitmsg getNodeStatus all self consensus commit tx,send delay", "start", start, "end", end)
plog.Debug("para commitmsg all self-consensus commit tx,send delay", "start", start, "end", end)
return nil, nil
}
......
......@@ -34,44 +34,6 @@ func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) er
return client.setLocalDb(set)
}
func (client *client) checkCommitTxSuccess(txs []*pt.TxDetail) {
curTx := client.commitMsgClient.getCurrentTx()
if curTx == nil {
return
}
txMap := make(map[string]bool)
if types.IsParaExecName(string(curTx.Execer)) {
for _, tx := range txs {
if bytes.HasSuffix(tx.Tx.Execer, []byte(pt.ParaX)) && tx.Receipt.Ty == types.ExecOk {
txMap[string(tx.Tx.Hash())] = true
}
}
} else {
//如果正在追赶,则暂时不去主链查找,减少耗时
if atomic.LoadInt32(&client.isCaughtUp) != 1 {
return
}
//去主链查询
receipt, _ := client.QueryTxOnMainByHash(curTx.Hash())
if receipt != nil && receipt.Receipt.Ty == types.ExecOk {
txMap[string(curTx.Hash())] = true
}
}
//如果没找到且当前正在追赶,则不计数,如果找到了,即便当前在追赶,也通知
if !txMap[string(curTx.Hash())] && atomic.LoadInt32(&client.isCaughtUp) != 1 {
return
}
if txMap[string(curTx.Hash())] {
client.commitMsgClient.verifyNotify(curTx.Hash())
} else {
client.commitMsgClient.verifyNotify(nil)
}
}
func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *pt.ParaTxDetail) error {
var newblock pt.ParaLocalDbBlock
......@@ -87,7 +49,7 @@ func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*ty
if err != nil {
return err
}
client.checkCommitTxSuccess(mainBlock.TxDetails)
client.commitMsgClient.commitTxCheckNotify(mainBlock.TxDetails)
return err
}
......@@ -346,9 +308,9 @@ func (client *client) getBatchSeqCount(currSeq int64) (int64, error) {
if lastSeq > currSeq {
if lastSeq-currSeq > emptyBlockInterval {
atomic.StoreInt32(&client.isCaughtUp, 0)
atomic.StoreInt32(&client.caughtUp, 0)
} else {
atomic.StoreInt32(&client.isCaughtUp, 1)
atomic.StoreInt32(&client.caughtUp, 1)
}
if batchFetchSeqEnable && lastSeq-currSeq > batchFetchSeqNum {
return batchFetchSeqNum, nil
......
......@@ -10,15 +10,13 @@ import (
"encoding/hex"
"errors"
"sync/atomic"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/types"
)
func (client *client) setLocalDb(set *types.LocalDBSet) error {
//如果追赶上主链了,则落盘
if atomic.LoadInt32(&client.isCaughtUp) == 1 {
if client.isCaughtUp() {
set.Txid = 1
}
......
......@@ -214,7 +214,7 @@ func (client *BlockSyncClient) syncBlocksIfNeed() (bool, error) {
client.setSyncCaughtUp(isSyncCaughtUp)
if client.paraClient.authAccount != "" {
client.printDebugInfo("Para sync - add block commit", "isSyncCaughtUp", isSyncCaughtUp)
client.paraClient.commitMsgClient.updateChainHeight(lastBlock.Height+1, false)
client.paraClient.commitMsgClient.updateChainHeightNotify(lastBlock.Height+1, false)
}
}
......@@ -233,7 +233,7 @@ func (client *BlockSyncClient) syncBlocksIfNeed() (bool, error) {
client.setSyncCaughtUp(false)
if client.paraClient.authAccount != "" {
client.printDebugInfo("Para sync - rollback block commit", "isSyncCaughtUp", false)
client.paraClient.commitMsgClient.updateChainHeight(lastBlock.Height-1, true)
client.paraClient.commitMsgClient.updateChainHeightNotify(lastBlock.Height-1, true)
}
}
......
......@@ -322,11 +322,14 @@ function para_cross_transfer_withdraw() {
echo "=========== # para cross transfer/withdraw test ============="
paracrossAddr=1HPkPopVe3ERfvaAgedDtJQ792taZFEHCe
${CLI} account list
${CLI} send coins transfer -a 10 -n test -t $paracrossAddr -k 4257D8692EF7FE13C68B65D6A52F03933DB2FA5CE8FAF210B5B8B80C721CED01
hash=$(${CLI} send coins transfer -a 10 -n test -t $paracrossAddr -k 4257D8692EF7FE13C68B65D6A52F03933DB2FA5CE8FAF210B5B8B80C721CED01)
echo "${hash}"
query_tx "${CLI}" "${hash}"
hash=$(${CLI} send para asset_transfer --title user.p.para. -a 1.4 -n test -t 12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv -k 4257D8692EF7FE13C68B65D6A52F03933DB2FA5CE8FAF210B5B8B80C721CED01)
echo "${hash}"
query_tx "${PARA_CLI}" "${hash}"
sleep 15
hash2=$(${CLI} send para asset_withdraw --title user.p.para. -a 0.7 -n test -t 12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv -k 4257D8692EF7FE13C68B65D6A52F03933DB2FA5CE8FAF210B5B8B80C721CED01)
local times=200
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment