Commit 9186443e authored by mdj33's avatar mdj33 Committed by vipwzw

add isCaughtup

parent fd50c684
...@@ -14,6 +14,8 @@ import ( ...@@ -14,6 +14,8 @@ import (
log "github.com/33cn/chain33/common/log/log15" log "github.com/33cn/chain33/common/log/log15"
"sync/atomic"
"github.com/33cn/chain33/client/api" "github.com/33cn/chain33/client/api"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto" "github.com/33cn/chain33/common/crypto"
...@@ -59,7 +61,7 @@ type client struct { ...@@ -59,7 +61,7 @@ type client struct {
*drivers.BaseClient *drivers.BaseClient
grpcClient types.Chain33Client grpcClient types.Chain33Client
execAPI api.ExecutorAPI execAPI api.ExecutorAPI
isCaughtUp bool isCaughtUp int32
commitMsgClient *commitMsgClient commitMsgClient *commitMsgClient
authAccount string authAccount string
privateKey crypto.PrivKey privateKey crypto.PrivKey
...@@ -455,9 +457,11 @@ func (client *client) Query_IsCaughtUp(req *types.ReqNil) (types.Message, error) ...@@ -455,9 +457,11 @@ func (client *client) Query_IsCaughtUp(req *types.ReqNil) (types.Message, error)
if client == nil { if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.") return nil, fmt.Errorf("%s", "client not bind message queue.")
} }
client.mtx.Lock()
caughtUp := client.isCaughtUp caughtUp := false
client.mtx.Unlock() if atomic.LoadInt32(&client.isCaughtUp) == 1 {
caughtUp = true
}
return &types.IsCaughtUp{Iscaughtup: caughtUp}, nil return &types.IsCaughtUp{Iscaughtup: caughtUp}, nil
} }
......
...@@ -161,6 +161,12 @@ func (client *commitMsgClient) isSync() bool { ...@@ -161,6 +161,12 @@ func (client *commitMsgClient) isSync() bool {
} }
if atomic.LoadInt32(&client.isRollBack) == 1 { if atomic.LoadInt32(&client.isRollBack) == 1 {
plog.Info("para is not Sync", "isRollBack", atomic.LoadInt32(&client.isRollBack))
return false
}
if atomic.LoadInt32(&client.paraClient.isCaughtUp) != 1 {
plog.Info("para is not Sync", "isCaughtUp", atomic.LoadInt32(&client.paraClient.isCaughtUp))
return false return false
} }
...@@ -533,14 +539,6 @@ out: ...@@ -533,14 +539,6 @@ out:
isSync = true isSync = true
} }
client.paraClient.mtx.Lock()
isCaughtUp := client.paraClient.isCaughtUp
client.paraClient.mtx.Unlock()
if !isCaughtUp {
plog.Debug("getConsensusHeight para is CatchingUp")
continue
}
block, err := client.paraClient.getLastBlockInfo() block, err := client.paraClient.getLastBlockInfo()
if err != nil { if err != nil {
continue continue
......
...@@ -12,6 +12,8 @@ import ( ...@@ -12,6 +12,8 @@ import (
"bytes" "bytes"
"sync/atomic"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
paraexec "github.com/33cn/plugin/plugin/dapp/paracross/executor" paraexec "github.com/33cn/plugin/plugin/dapp/paracross/executor"
...@@ -20,7 +22,7 @@ import ( ...@@ -20,7 +22,7 @@ import (
func (client *client) setLocalDb(set *types.LocalDBSet) error { func (client *client) setLocalDb(set *types.LocalDBSet) error {
//如果追赶上主链了,则落盘 //如果追赶上主链了,则落盘
if client.isCaughtUp { if atomic.LoadInt32(&client.isCaughtUp) == 1 {
set.Txid = 1 set.Txid = 1
} }
...@@ -75,7 +77,7 @@ func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) er ...@@ -75,7 +77,7 @@ func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) er
} }
func (client *client) checkCommitTxSuccess(detail *types.BlockDetail) { func (client *client) checkCommitTxSuccess(detail *types.BlockDetail) {
if !client.isCaughtUp || !client.commitMsgClient.isSendingCommitMsg() { if atomic.LoadInt32(&client.isCaughtUp) != 1 || !client.commitMsgClient.isSendingCommitMsg() {
return return
} }
...@@ -376,20 +378,18 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type ...@@ -376,20 +378,18 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
//genesis block start with seq=-1 not check
if (bytes.Equal(preMainBlockHash, blockSeq.Detail.Block.ParentHash) && blockSeq.Seq.Type == addAct) || if (bytes.Equal(preMainBlockHash, blockSeq.Detail.Block.ParentHash) && blockSeq.Seq.Type == addAct) ||
(bytes.Equal(preMainBlockHash, blockSeq.Seq.Hash) && blockSeq.Seq.Type == delAct) { (bytes.Equal(preMainBlockHash, blockSeq.Seq.Hash) && blockSeq.Seq.Type == delAct) {
txs := paraexec.FilterTxsForPara(types.GetTitle(), blockSeq.Detail) txs := paraexec.FilterTxsForPara(types.GetTitle(), blockSeq.Detail)
plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", blockSeq.Seq.Type) plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", blockSeq.Seq.Type)
client.mtx.Lock()
if lastSeq-currSeq > emptyBlockInterval { if lastSeq-currSeq > emptyBlockInterval {
client.isCaughtUp = false atomic.StoreInt32(&client.isCaughtUp, 0)
} else { } else {
client.isCaughtUp = true atomic.StoreInt32(&client.isCaughtUp, 1)
} }
client.mtx.Unlock()
return txs, blockSeq, nil return txs, blockSeq, nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment