Commit 0459fa83 authored by yukang's avatar yukang Committed by vipwzw

add batch sync options for performance

parent aa5c7591
...@@ -190,7 +190,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -190,7 +190,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
quitChan: make(chan struct{}), quitChan: make(chan struct{}),
maxCacheCount: defaultMaxCacheCount, maxCacheCount: defaultMaxCacheCount,
maxSyncErrCount: defaultMaxSyncErrCount, maxSyncErrCount: defaultMaxSyncErrCount,
isPrintDebugInfo: false,
} }
if subcfg.MaxCacheCount > 0 { if subcfg.MaxCacheCount > 0 {
para.blockSyncClient.maxCacheCount = subcfg.MaxCacheCount para.blockSyncClient.maxCacheCount = subcfg.MaxCacheCount
......
...@@ -500,7 +500,7 @@ func (client *client) procLocalBlocks(mainBlocks *types.ParaTxDetails) error { ...@@ -500,7 +500,7 @@ func (client *client) procLocalBlocks(mainBlocks *types.ParaTxDetails) error {
} }
} }
if notify { if notify {
client.blockSyncClient.notifyLocalChange() client.blockSyncClient.handleLocalChangedMsg()
} }
return nil return nil
......
...@@ -18,6 +18,7 @@ func (client *client) setLocalDb(set *types.LocalDBSet) error { ...@@ -18,6 +18,7 @@ func (client *client) setLocalDb(set *types.LocalDBSet) error {
//如果追赶上主链了,则落盘 //如果追赶上主链了,则落盘
if client.isCaughtUp() { if client.isCaughtUp() {
set.Txid = 1 set.Txid = 1
client.blockSyncClient.handleLocalCaughtUpMsg()
} }
msg := client.GetQueueClient().NewMessage("blockchain", types.EventSetValueByKey, set) msg := client.GetQueueClient().NewMessage("blockchain", types.EventSetValueByKey, set)
......
...@@ -37,8 +37,8 @@ type blockSyncClient struct { ...@@ -37,8 +37,8 @@ type blockSyncClient struct {
maxCacheCount int64 maxCacheCount int64
//isSyncCaughtUp 是否追赶上 //isSyncCaughtUp 是否追赶上
isSyncCaughtUpAtom int32 isSyncCaughtUpAtom int32
//printDebugInfo 打印Debug信息 //isDownloadCaughtUpAtom 下载是否已经追赶上
isPrintDebugInfo bool isDownloadCaughtUpAtom int32
} }
//nextActionType 定义每一轮可执行操作 //nextActionType 定义每一轮可执行操作
...@@ -70,12 +70,21 @@ func (client *blockSyncClient) syncHasCaughtUp() bool { ...@@ -70,12 +70,21 @@ func (client *blockSyncClient) syncHasCaughtUp() bool {
return atomic.LoadInt32(&client.isSyncCaughtUpAtom) == 1 return atomic.LoadInt32(&client.isSyncCaughtUpAtom) == 1
} }
//notifyLocalChange 下载状态通知,供下载层调用 //handleLocalChangedMsg 处理下载通知消息,供下载层调用
func (client *blockSyncClient) notifyLocalChange() { func (client *blockSyncClient) handleLocalChangedMsg() {
client.printDebugInfo("Para sync - notify change") client.printDebugInfo("Para sync - notify change")
if client.getBlockSyncState() != blockSyncStateSyncing { if client.getBlockSyncState() == blockSyncStateSyncing || client.paraClient.isCancel() {
return
}
client.printDebugInfo("Para sync - notified change") client.printDebugInfo("Para sync - notified change")
client.notifyChan <- true client.notifyChan <- true
}
//handleLocalCaughtUpMsg 处理下载已追赶上消息,供下载层调用
func (client *blockSyncClient) handleLocalCaughtUpMsg() {
client.printDebugInfo("Para sync -notify download has caughtUp")
if !client.downloadHasCaughtUp() {
client.setDownloadHasCaughtUp(true)
} }
} }
...@@ -115,6 +124,9 @@ func (client *blockSyncClient) batchSyncBlocks() { ...@@ -115,6 +124,9 @@ func (client *blockSyncClient) batchSyncBlocks() {
errCount := int32(0) errCount := int32(0)
for { for {
if client.paraClient.isCancel() {
return
}
//获取同步状态,在需要同步的情况下执行同步 //获取同步状态,在需要同步的情况下执行同步
curSyncCaughtState, err := client.syncBlocksIfNeed() curSyncCaughtState, err := client.syncBlocksIfNeed()
...@@ -442,7 +454,7 @@ func (client *blockSyncClient) rollbackBlock(block *types.Block) error { ...@@ -442,7 +454,7 @@ func (client *blockSyncClient) rollbackBlock(block *types.Block) error {
func (client *blockSyncClient) writeBlock(prev []byte, paraBlock *types.Block) error { func (client *blockSyncClient) writeBlock(prev []byte, paraBlock *types.Block) error {
//共识模块不执行block,统一由blockchain模块执行block并做去重的处理,返回执行后的blockdetail //共识模块不执行block,统一由blockchain模块执行block并做去重的处理,返回执行后的blockdetail
blockDetail := &types.BlockDetail{Block: paraBlock} blockDetail := &types.BlockDetail{Block: paraBlock}
paraBlockDetail := &types.ParaChainBlockDetail{Blockdetail: blockDetail} paraBlockDetail := &types.ParaChainBlockDetail{Blockdetail: blockDetail, IsSync: client.downloadHasCaughtUp()}
msg := client.paraClient.GetQueueClient().NewMessage("blockchain", types.EventAddParaChainBlockDetail, paraBlockDetail) msg := client.paraClient.GetQueueClient().NewMessage("blockchain", types.EventAddParaChainBlockDetail, paraBlockDetail)
err := client.paraClient.GetQueueClient().Send(msg, true) err := client.paraClient.GetQueueClient().Send(msg, true)
if err != nil { if err != nil {
...@@ -483,6 +495,20 @@ func (client *blockSyncClient) setSyncCaughtUp(isCaughtUp bool) { ...@@ -483,6 +495,20 @@ func (client *blockSyncClient) setSyncCaughtUp(isCaughtUp bool) {
} }
} }
//下载是否已经追赶上
func (client *blockSyncClient) downloadHasCaughtUp() bool {
return atomic.LoadInt32(&client.isDownloadCaughtUpAtom) == 1
}
//设置下载同步追赶状态
func (client *blockSyncClient) setDownloadHasCaughtUp(isCaughtUp bool) {
if isCaughtUp {
atomic.CompareAndSwapInt32(&client.isDownloadCaughtUpAtom, 0, 1)
} else {
atomic.CompareAndSwapInt32(&client.isDownloadCaughtUpAtom, 1, 0)
}
}
//打印错误日志 //打印错误日志
func (client *blockSyncClient) printError(err error) { func (client *blockSyncClient) printError(err error) {
plog.Error(fmt.Sprintf("Para sync - sync block error:%v", err.Error())) plog.Error(fmt.Sprintf("Para sync - sync block error:%v", err.Error()))
...@@ -490,9 +516,7 @@ func (client *blockSyncClient) printError(err error) { ...@@ -490,9 +516,7 @@ func (client *blockSyncClient) printError(err error) {
//打印调试信息 //打印调试信息
func (client *blockSyncClient) printDebugInfo(msg string, ctx ...interface{}) { func (client *blockSyncClient) printDebugInfo(msg string, ctx ...interface{}) {
if client.isPrintDebugInfo { plog.Debug(msg, ctx...)
plog.Info(msg, ctx...)
}
} }
//初始化 //初始化
...@@ -500,6 +524,7 @@ func (client *blockSyncClient) syncInit() { ...@@ -500,6 +524,7 @@ func (client *blockSyncClient) syncInit() {
client.printDebugInfo("Para sync - init") client.printDebugInfo("Para sync - init")
client.setBlockSyncState(blockSyncStateNone) client.setBlockSyncState(blockSyncStateNone)
client.setSyncCaughtUp(false) client.setSyncCaughtUp(false)
client.setDownloadHasCaughtUp(false)
err := client.initFirstLocalHeightIfNeed() err := client.initFirstLocalHeightIfNeed()
if err != nil { if err != nil {
client.printError(err) client.printError(err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment