Commit e5aad0ec authored by yukang's avatar yukang Committed by vipwzw

Optimize sync implement for performace

parent 313328a5
......@@ -48,7 +48,6 @@ var (
mainBlockHashForkHeight int64 = 209186 //calc block hash fork height in main chain
mainParaSelfConsensusForkHeight int64 = types.MaxHeight //para chain self consensus height switch, must >= ForkParacrossCommitTx of main
mainForkParacrossCommitTx int64 = types.MaxHeight //support paracross commit tx fork height in main chain: ForkParacrossCommitTx
localCacheCount int64 = 1000 // local cache block max count
batchFetchSeqEnable bool
batchFetchSeqNum int64 = 128
)
......@@ -60,16 +59,15 @@ func init() {
type client struct {
*drivers.BaseClient
grpcClient types.Chain33Client
execAPI api.ExecutorAPI
isCaughtUp int32
commitMsgClient *commitMsgClient
authAccount string
privateKey crypto.PrivKey
wg sync.WaitGroup
subCfg *subConfig
syncCaughtUpAtom int32
localChangeAtom int32
grpcClient types.Chain33Client
execAPI api.ExecutorAPI
isCaughtUp int32
commitMsgClient *commitMsgClient
blockSyncClient *BlockSyncClient
authAccount string
privateKey crypto.PrivKey
wg sync.WaitGroup
subCfg *subConfig
quitCreate chan struct{}
}
......@@ -86,7 +84,8 @@ type subConfig struct {
MainParaSelfConsensusForkHeight int64 `json:"mainParaSelfConsensusForkHeight,omitempty"`
MainForkParacrossCommitTx int64 `json:"mainForkParacrossCommitTx,omitempty"`
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
LocalCacheCount int64 `json:"localCacheCount,omitempty"`
MaxCacheCount int64 `json:"maxCacheCount,omitempty"`
MaxSyncErrCount int32 `json:"maxSyncErrCount,omitempty"`
BatchFetchSeqEnable uint32 `json:"batchFetchSeqEnable,omitempty"`
BatchFetchSeqNum int64 `json:"batchFetchSeqNum,omitempty"`
}
......@@ -128,10 +127,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
mainForkParacrossCommitTx = subcfg.MainForkParacrossCommitTx
}
if subcfg.LocalCacheCount > 0 {
localCacheCount = subcfg.LocalCacheCount
}
if subcfg.BatchFetchSeqEnable > 0 {
batchFetchSeqEnable = true
}
......@@ -190,6 +185,20 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
sendingHeight: -1,
quit: make(chan struct{}),
}
para.blockSyncClient = &BlockSyncClient{
notifyChan: make(chan bool),
quitChan: make(chan struct{}),
maxCacheCount: 1000,
maxSyncErrCount: 100,
}
if subcfg.MaxCacheCount > 0 {
para.blockSyncClient.maxCacheCount = subcfg.MaxCacheCount
}
if subcfg.MaxSyncErrCount > 0 {
para.blockSyncClient.maxSyncErrCount = subcfg.MaxSyncErrCount
}
c.SetChild(para)
return para
}
......@@ -204,6 +213,7 @@ func (client *client) Close() {
client.BaseClient.Close()
close(client.commitMsgClient.quit)
close(client.quitCreate)
close(client.blockSyncClient.quitChan)
client.wg.Wait()
plog.Info("consensus para closed")
}
......
......@@ -13,11 +13,23 @@ import (
"github.com/33cn/chain33/common/merkle"
"github.com/33cn/chain33/common"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
"time"
)
//BlockSyncClient 区块同步控制和状态变量
type BlockSyncClient struct {
//notifyChan 下载通知通道
notifyChan chan bool
//quitChan 线程退出通知通道
quitChan chan struct{}
//syncState 同步状态
syncState int32
//syncErrMaxCount 同步错误最大数量
maxSyncErrCount int32
//maxCacheCount 本地缓冲的最大块数量
maxCacheCount int64
}
//NextActionType 定义每一轮可执行状态
//NextActionType 定义每一轮可执行操作
type NextActionType int8
const (
//NextActionKeep 保持
......@@ -28,14 +40,29 @@ const (
NextActionAdd
)
//获取同步状态,供发送层调用
//BlockSyncState 定义当前区块同步状态
type BlockSyncState int32
const (
//BlockSyncStateNone 未同步状态
BlockSyncStateNone BlockSyncState = iota
//BlockSyncStateSyncing 正在同步中
BlockSyncStateSyncing
//BlockSyncStateFinished 同步完成
BlockSyncStateFinished
)
//判断同步是否已追赶上,供发送层调用
func (client *client) SyncHasCaughtUp() bool {
return atomic.LoadInt32(&client.syncCaughtUpAtom) == 1
return client.getBlockSyncState() == BlockSyncStateFinished
}
//下载状态通知,供下载层调用
func (client *client) NotifyLocalChange() {
atomic.StoreInt32(&client.localChangeAtom,1)
plog.Info("Para sync - notify change")
if client.getBlockSyncState() != BlockSyncStateSyncing {
plog.Info("Para sync - notified change")
client.blockSyncClient.notifyChan <- true
}
}
//创建创世区块
......@@ -48,34 +75,62 @@ func (client *client) CreateGenesisBlock(newblock *types.Block) error {
func (client *client) SyncBlocks() {
client.syncInit()
isSyncCaughtUp := false
//首次同步,不用等待通知
client.batchSyncBlocks()
//开始正常同步,需要等待通知信号触发
quited := false
for {
select {
case <- client.blockSyncClient.notifyChan:
client.batchSyncBlocks()
case <- client.blockSyncClient.quitChan:
quited = true
plog.Info("Para sync - quit notify")
}
if quited {
plog.Info("Para sync - quit goroutine")
break
}
}
}
//批量执行同步区块
func (client *client) batchSyncBlocks() {
client.setBlockSyncState(BlockSyncStateSyncing)
plog.Info("Para sync - syncing")
var errCount int32
errCount = 0
for {
//获取同步状态,在需要同步的情况下执行同步
curSyncCaughtState, err := client.syncBlocksIfNeed()
if err != nil {
errCount++
client.printError(err)
}
//同步状态改变,发出通知并保存新状态
if curSyncCaughtState != isSyncCaughtUp {
isSyncCaughtUp = curSyncCaughtState
client.setSyncCaughtUp(curSyncCaughtState)
if errCount > client.blockSyncClient.maxSyncErrCount {
client.printError(errors.New(
"para sync - sync has some errors,please check"))
client.setBlockSyncState(BlockSyncStateNone)
return
}
//没有需要同步的块,清理本地数据库中localCacheCount前的块
canCleanLocalBlocks := isSyncCaughtUp &&
!client.getAndFlipLocalChangeStateIfNeed()
if canCleanLocalBlocks {
cleanUpSomeBlocks, err := client.clearLocalOldBlocks()
if curSyncCaughtState {
_, err := client.clearLocalOldBlocks()
if err != nil {
client.printError(err)
}
if !cleanUpSomeBlocks {
time.Sleep(time.Second)
}
client.setBlockSyncState(BlockSyncStateFinished)
plog.Info("Para sync - finished")
return
}
}
}
//获取每一轮可执行状态
......@@ -141,14 +196,14 @@ func (client *client) syncBlocksIfNeed() (bool,error) {
switch nextAction {
case NextActionAdd:
//1 db中后一高度区块的父hash等于已执行最新区块的hash
plog.Info("Para sync add block",
plog.Info("Para sync - add block",
"lastBlock.Height",lastBlock.Height,"lastLocalHeight",lastLocalHeight)
return false,client.addBlock(lastBlock, localBlock)
case NextActionRollback:
//1 db中最新区块高度小于已执行最新区块高度
//2 db中最新区块高度等于已执行最新区块高度并且hash不同
//3 db中后一高度区块的父hash不等于已执行最新区块的hash
plog.Info("Para sync rollback block",
plog.Info("Para sync - rollback block",
"lastBlock.Height",lastBlock.Height,"lastLocalHeight",lastLocalHeight)
return false,client.rollbackBlock(lastBlock)
default: //NextActionKeep
......@@ -161,7 +216,7 @@ func (client *client) syncBlocksIfNeed() (bool,error) {
//批量删除下载层缓冲数据
func (client *client) delLocalBlocks(startHeight int64,endHeight int64) error {
if startHeight > endHeight {
return errors.New("startHeight > endHeight,can't clear local blocks")
return errors.New("para sync - startHeight > endHeight,can't clear local blocks")
}
index := startHeight
......@@ -182,7 +237,7 @@ func (client *client) delLocalBlocks(startHeight int64,endHeight int64) error {
kv := &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: endHeight+1})}
set.KV = append(set.KV, kv)
plog.Info("Para sync clear local blocks", "startHeight:",startHeight,"endHeight:",endHeight)
plog.Info("Para sync - clear local blocks", "startHeight:",startHeight,"endHeight:",endHeight)
return client.setLocalDb(set)
}
......@@ -235,8 +290,8 @@ func (client *client) clearLocalOldBlocks() (bool,error) {
return false,err
}
canDelCount := lastLocalHeight - firstLocalHeight - localCacheCount + 1
if canDelCount <= 0 {
canDelCount := lastLocalHeight - firstLocalHeight - client.blockSyncClient.maxCacheCount + 1
if canDelCount <= client.blockSyncClient.maxCacheCount {
return false,nil
}
......@@ -271,7 +326,7 @@ func (client *client) addMinerTx(preStateHash []byte, block *types.Block,localBl
//添加一个区块
func (client *client) addBlock(lastBlock *types.Block,localBlock *pt.ParaLocalDbBlock) error {
var newBlock types.Block
plog.Debug(fmt.Sprintf("the len txs is: %v", len(localBlock.Txs)))
plog.Debug(fmt.Sprintf("Para sync - the len txs is: %v", len(localBlock.Txs)))
newBlock.ParentHash = lastBlock.Hash()
newBlock.Height = lastBlock.Height + 1
......@@ -289,7 +344,7 @@ func (client *client) addBlock(lastBlock *types.Block,localBlock *pt.ParaLocalDb
err = client.writeBlock(lastBlock.StateHash, &newBlock)
plog.Debug("para create new Block", "newblock.ParentHash", common.ToHex(newBlock.ParentHash),
plog.Debug("Para sync - para create new Block", "newblock.ParentHash", common.ToHex(newBlock.ParentHash),
"newblock.Height", newBlock.Height, "newblock.TxHash", common.ToHex(newBlock.TxHash),
"newblock.BlockTime", newBlock.BlockTime)
......@@ -298,11 +353,11 @@ func (client *client) addBlock(lastBlock *types.Block,localBlock *pt.ParaLocalDb
// 向blockchain删区块
func (client *client) rollbackBlock(block *types.Block) error {
plog.Debug("delete block in parachain")
plog.Debug("Para sync - delete block in parachain")
start := block.Height
if start == 0 {
panic("Parachain attempt to Delete GenesisBlock !")
panic("Para sync - Parachain attempt to Delete GenesisBlock !")
}
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetBlocks, &types.ReqBlocks{Start: start, End: start, IsDetail: true, Pid: []string{""}})
......@@ -356,7 +411,7 @@ func (client *client) writeBlock(prev []byte, paraBlock *types.Block) error {
}
blkdetail := resp.GetData().(*types.BlockDetail)
if blkdetail == nil {
return errors.New("block detail is nil")
return errors.New("Para sync - block detail is nil")
}
client.SetCurrentBlock(blkdetail.Block)
......@@ -368,42 +423,25 @@ func (client *client) writeBlock(prev []byte, paraBlock *types.Block) error {
return nil
}
//设置同步状态,原子操作,线程访问安全,原则上只限于此线程单元使用
func (client *client) setSyncCaughtUp(isSyncCaughtUp bool) {
if isSyncCaughtUp {
atomic.StoreInt32(&client.syncCaughtUpAtom,1)
} else {
atomic.StoreInt32(&client.syncCaughtUpAtom,0)
}
//获取同步状态
func (client *client) getBlockSyncState() BlockSyncState {
return BlockSyncState(atomic.LoadInt32(&client.blockSyncClient.syncState))
}
//初始化下载状态,原则上只限于此线程单元使用
func (client *client) initLocalChangeState() {
atomic.StoreInt32(&client.localChangeAtom,0)
//设置同步状态
func (client *client) setBlockSyncState(state BlockSyncState) {
atomic.StoreInt32(&client.blockSyncClient.syncState,int32(state))
}
//获取当前是否有新的下载到来,获取一次,并马上把状态设置为没有新通知
//此函数原则上只限于此线程单元使用
func (client *client) getAndFlipLocalChangeStateIfNeed() bool {
hasLocalChange := atomic.LoadInt32(&client.localChangeAtom) == 1
if hasLocalChange {
atomic.StoreInt32(&client.localChangeAtom,0)
}
return hasLocalChange
}
//打印错误日志
func (client *client) printError(err error) {
plog.Error(fmt.Sprintf("----------------->Para Sync Block Error:%v", err.Error()))
plog.Error(fmt.Sprintf("Para sync - sync block error:%v", err.Error()))
}
//初始化
func (client *client) syncInit() {
client.setSyncCaughtUp(false)
client.initLocalChangeState()
//false
plog.Info("Para sync - init")
client.setBlockSyncState(BlockSyncStateNone)
err := client.initFirstLocalHeightIfNeed()
if err != nil {
client.printError(err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment