Commit fd136884 authored by yukang's avatar yukang Committed by vipwzw

Adjust notification to messageCommit for performance

parent 1bdb1f42
...@@ -28,6 +28,8 @@ type BlockSyncClient struct { ...@@ -28,6 +28,8 @@ type BlockSyncClient struct {
maxSyncErrCount int32 maxSyncErrCount int32
//maxCacheCount 本地缓冲的最大块数量 //maxCacheCount 本地缓冲的最大块数量
maxCacheCount int64 maxCacheCount int64
//isSyncCaughtUp 是否追赶上
isSyncCaughtUpAtom int32
} }
//NextActionType 定义每一轮可执行操作 //NextActionType 定义每一轮可执行操作
...@@ -56,7 +58,7 @@ const ( ...@@ -56,7 +58,7 @@ const (
//SyncHasCaughtUp 判断同步是否已追赶上,供发送层调用 //SyncHasCaughtUp 判断同步是否已追赶上,供发送层调用
func (client *BlockSyncClient) SyncHasCaughtUp() bool { func (client *BlockSyncClient) SyncHasCaughtUp() bool {
return client.getBlockSyncState() == BlockSyncStateFinished return atomic.LoadInt32(&client.isSyncCaughtUpAtom) == 1
} }
//NotifyLocalChange 下载状态通知,供下载层调用 //NotifyLocalChange 下载状态通知,供下载层调用
...@@ -106,11 +108,15 @@ func (client *BlockSyncClient) batchSyncBlocks() { ...@@ -106,11 +108,15 @@ func (client *BlockSyncClient) batchSyncBlocks() {
for { for {
//获取同步状态,在需要同步的情况下执行同步 //获取同步状态,在需要同步的情况下执行同步
curSyncCaughtState, err := client.syncBlocksIfNeed() curSyncCaughtState, err := client.syncBlocksIfNeed()
//统计连续出错数量
if err != nil { if err != nil {
errCount++ errCount++
client.printError(err) client.printError(err)
} else {
errCount = int32(0)
} }
//连续出错达到一定数量,退出循环,等待下一次通知
if errCount > client.maxSyncErrCount { if errCount > client.maxSyncErrCount {
client.printError(errors.New( client.printError(errors.New(
"para sync - sync has some errors,please check")) "para sync - sync has some errors,please check"))
...@@ -118,11 +124,12 @@ func (client *BlockSyncClient) batchSyncBlocks() { ...@@ -118,11 +124,12 @@ func (client *BlockSyncClient) batchSyncBlocks() {
return return
} }
//没有需要同步的块,清理本地数据库中localCacheCount前的块 //没有需要同步的块,清理本地数据库中localCacheCount前的块
if curSyncCaughtState { if err == nil && curSyncCaughtState {
_, err := client.clearLocalOldBlocks() _, err := client.clearLocalOldBlocks()
if err != nil { if err != nil {
client.printError(err) client.printError(err)
} }
client.setBlockSyncState(BlockSyncStateFinished) client.setBlockSyncState(BlockSyncStateFinished)
plog.Info("Para sync - finished") plog.Info("Para sync - finished")
return return
...@@ -196,14 +203,39 @@ func (client *BlockSyncClient) syncBlocksIfNeed() (bool, error) { ...@@ -196,14 +203,39 @@ func (client *BlockSyncClient) syncBlocksIfNeed() (bool, error) {
//1 db中后一高度区块的父hash等于已执行最新区块的hash //1 db中后一高度区块的父hash等于已执行最新区块的hash
plog.Info("Para sync - add block", plog.Info("Para sync - add block",
"lastBlock.Height", lastBlock.Height, "lastLocalHeight", lastLocalHeight) "lastBlock.Height", lastBlock.Height, "lastLocalHeight", lastLocalHeight)
return false, client.addBlock(lastBlock, localBlock)
err := client.addBlock(lastBlock, localBlock)
//通知发送层
if err == nil {
isSyncCaughtUp := lastBlock.Height+1 == lastLocalHeight
client.setSyncCaughtUp(isSyncCaughtUp)
if client.paraClient.authAccount != "" {
plog.Info("Para sync - add block commit", "isSyncCaughtUp", isSyncCaughtUp)
client.paraClient.commitMsgClient.updateChainHeight(lastBlock.Height+1, false)
}
}
return false, err
case NextActionRollback: case NextActionRollback:
//1 db中最新区块高度小于已执行最新区块高度 //1 db中最新区块高度小于已执行最新区块高度
//2 db中最新区块高度等于已执行最新区块高度并且hash不同 //2 db中最新区块高度等于已执行最新区块高度并且hash不同
//3 db中后一高度区块的父hash不等于已执行最新区块的hash //3 db中后一高度区块的父hash不等于已执行最新区块的hash
plog.Info("Para sync - rollback block", plog.Info("Para sync - rollback block",
"lastBlock.Height", lastBlock.Height, "lastLocalHeight", lastLocalHeight) "lastBlock.Height", lastBlock.Height, "lastLocalHeight", lastLocalHeight)
return false, client.rollbackBlock(lastBlock)
err := client.rollbackBlock(lastBlock)
//通知发送层
if err == nil {
client.setSyncCaughtUp(false)
if client.paraClient.authAccount != "" {
plog.Info("Para sync - rollback block commit", "isSyncCaughtUp", false)
client.paraClient.commitMsgClient.updateChainHeight(lastBlock.Height-1, true)
}
}
return false, err
default: //NextActionKeep default: //NextActionKeep
//1 已完成同步,没有需要同步的块 //1 已完成同步,没有需要同步的块
return true, nil return true, nil
...@@ -380,15 +412,11 @@ func (client *BlockSyncClient) rollbackBlock(block *types.Block) error { ...@@ -380,15 +412,11 @@ func (client *BlockSyncClient) rollbackBlock(block *types.Block) error {
return err return err
} }
if resp.GetData().(*types.Reply).IsOk { if !resp.GetData().(*types.Reply).IsOk {
if client.paraClient.authAccount != "" {
client.paraClient.commitMsgClient.updateChainHeight(blocks.Items[0].Block.Height-1, true)
}
} else {
reply := resp.GetData().(*types.Reply) reply := resp.GetData().(*types.Reply)
return errors.New(string(reply.GetMsg())) return errors.New(string(reply.GetMsg()))
} }
return nil return nil
} }
...@@ -414,10 +442,6 @@ func (client *BlockSyncClient) writeBlock(prev []byte, paraBlock *types.Block) e ...@@ -414,10 +442,6 @@ func (client *BlockSyncClient) writeBlock(prev []byte, paraBlock *types.Block) e
client.paraClient.SetCurrentBlock(blkdetail.Block) client.paraClient.SetCurrentBlock(blkdetail.Block)
if client.paraClient.authAccount != "" {
client.paraClient.commitMsgClient.updateChainHeight(blockDetail.Block.Height, false)
}
return nil return nil
} }
...@@ -431,6 +455,15 @@ func (client *BlockSyncClient) setBlockSyncState(state BlockSyncState) { ...@@ -431,6 +455,15 @@ func (client *BlockSyncClient) setBlockSyncState(state BlockSyncState) {
atomic.StoreInt32(&client.syncState, int32(state)) atomic.StoreInt32(&client.syncState, int32(state))
} }
//设置是否追赶上
func (client *BlockSyncClient) setSyncCaughtUp(isCaughtUp bool) {
if isCaughtUp {
atomic.StoreInt32(&client.isSyncCaughtUpAtom, 1)
} else {
atomic.StoreInt32(&client.isSyncCaughtUpAtom, 0)
}
}
//打印错误日志 //打印错误日志
func (client *BlockSyncClient) printError(err error) { func (client *BlockSyncClient) printError(err error) {
plog.Error(fmt.Sprintf("Para sync - sync block error:%v", err.Error())) plog.Error(fmt.Sprintf("Para sync - sync block error:%v", err.Error()))
...@@ -440,6 +473,7 @@ func (client *BlockSyncClient) printError(err error) { ...@@ -440,6 +473,7 @@ func (client *BlockSyncClient) printError(err error) {
func (client *BlockSyncClient) syncInit() { func (client *BlockSyncClient) syncInit() {
plog.Info("Para sync - init") plog.Info("Para sync - init")
client.setBlockSyncState(BlockSyncStateNone) client.setBlockSyncState(BlockSyncStateNone)
client.setSyncCaughtUp(false)
err := client.initFirstLocalHeightIfNeed() err := client.initFirstLocalHeightIfNeed()
if err != nil { if err != nil {
client.printError(err) client.printError(err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment