Commit 34a9a7f9 authored by yukang's avatar yukang Committed by vipwzw

Optimize some code for review

parent 408ddd34
...@@ -187,6 +187,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -187,6 +187,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
} }
para.blockSyncClient = &BlockSyncClient{ para.blockSyncClient = &BlockSyncClient{
paraClient: para,
notifyChan: make(chan bool), notifyChan: make(chan bool),
quitChan: make(chan struct{}), quitChan: make(chan struct{}),
maxCacheCount: 1000, maxCacheCount: 1000,
...@@ -228,7 +229,8 @@ func (client *client) SetQueueClient(c queue.Client) { ...@@ -228,7 +229,8 @@ func (client *client) SetQueueClient(c queue.Client) {
go client.commitMsgClient.handler() go client.commitMsgClient.handler()
client.wg.Add(1) client.wg.Add(1)
go client.CreateBlock() go client.CreateBlock()
go client.SyncBlocks() client.wg.Add(1)
go client.blockSyncClient.SyncBlocks()
} }
func (client *client) InitBlock() { func (client *client) InitBlock() {
...@@ -257,7 +259,7 @@ func (client *client) InitBlock() { ...@@ -257,7 +259,7 @@ func (client *client) InitBlock() {
tx := client.CreateGenesisTx() tx := client.CreateGenesisTx()
newblock.Txs = tx newblock.Txs = tx
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs) newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
err := client.CreateGenesisBlock(newblock) err := client.blockSyncClient.CreateGenesisBlock(newblock)
if err != nil { if err != nil {
panic(fmt.Sprintf("para chain create genesis block,err=%s", err.Error())) panic(fmt.Sprintf("para chain create genesis block,err=%s", err.Error()))
} }
......
...@@ -170,8 +170,8 @@ func (client *commitMsgClient) isSync() bool { ...@@ -170,8 +170,8 @@ func (client *commitMsgClient) isSync() bool {
return false return false
} }
if !client.paraClient.SyncHasCaughtUp() { if !client.paraClient.blockSyncClient.SyncHasCaughtUp() {
plog.Info("para is not Sync", "syncCaughtUp", client.paraClient.SyncHasCaughtUp()) plog.Info("para is not Sync", "syncCaughtUp", client.paraClient.blockSyncClient.SyncHasCaughtUp())
return false return false
} }
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
//BlockSyncClient 区块同步控制和状态变量 //BlockSyncClient 区块同步控制和状态变量
type BlockSyncClient struct { type BlockSyncClient struct {
paraClient *client
//notifyChan 下载通知通道 //notifyChan 下载通知通道
notifyChan chan bool notifyChan chan bool
//quitChan 线程退出通知通道 //quitChan 线程退出通知通道
...@@ -53,55 +54,50 @@ const ( ...@@ -53,55 +54,50 @@ const (
BlockSyncStateFinished BlockSyncStateFinished
) )
//判断同步是否已追赶上,供发送层调用 //SyncHasCaughtUp 判断同步是否已追赶上,供发送层调用
func (client *client) SyncHasCaughtUp() bool { func (client *BlockSyncClient) SyncHasCaughtUp() bool {
return client.getBlockSyncState() == BlockSyncStateFinished return client.getBlockSyncState() == BlockSyncStateFinished
} }
//下载状态通知,供下载层调用 //NotifyLocalChange 下载状态通知,供下载层调用
func (client *client) NotifyLocalChange() { func (client *BlockSyncClient) NotifyLocalChange() {
plog.Info("Para sync - notify change") plog.Info("Para sync - notify change")
if client.getBlockSyncState() != BlockSyncStateSyncing { if client.getBlockSyncState() != BlockSyncStateSyncing {
plog.Info("Para sync - notified change") plog.Info("Para sync - notified change")
client.blockSyncClient.notifyChan <- true client.notifyChan <- true
} }
} }
//创建创世区块 //CreateGenesisBlock 创建创世区块
func (client *client) CreateGenesisBlock(newblock *types.Block) error { func (client *BlockSyncClient) CreateGenesisBlock(newblock *types.Block) error {
return client.writeBlock(zeroHash[:], newblock) return client.writeBlock(zeroHash[:], newblock)
} }
//区块执行线程 //SyncBlocks 区块执行线程
//循环执行 //循环执行
func (client *client) SyncBlocks() { func (client *BlockSyncClient) SyncBlocks() {
client.syncInit() client.syncInit()
//首次同步,不用等待通知 //首次同步,不用等待通知
client.batchSyncBlocks() client.batchSyncBlocks()
//开始正常同步,需要等待通知信号触发 //开始正常同步,需要等待通知信号触发
quited := false out:
for { for {
select { select {
case <-client.blockSyncClient.notifyChan: case <-client.notifyChan:
client.batchSyncBlocks() client.batchSyncBlocks()
case <-client.blockSyncClient.quitChan: case <-client.quitChan:
break out
quited = true
plog.Info("Para sync - quit notify")
}
if quited {
plog.Info("Para sync - quit goroutine")
break
} }
} }
client.paraClient.wg.Done()
} }
//批量执行同步区块 //批量执行同步区块
func (client *client) batchSyncBlocks() { func (client *BlockSyncClient) batchSyncBlocks() {
client.setBlockSyncState(BlockSyncStateSyncing) client.setBlockSyncState(BlockSyncStateSyncing)
plog.Info("Para sync - syncing") plog.Info("Para sync - syncing")
...@@ -114,7 +110,7 @@ func (client *client) batchSyncBlocks() { ...@@ -114,7 +110,7 @@ func (client *client) batchSyncBlocks() {
client.printError(err) client.printError(err)
} }
if errCount > client.blockSyncClient.maxSyncErrCount { if errCount > client.maxSyncErrCount {
client.printError(errors.New( client.printError(errors.New(
"para sync - sync has some errors,please check")) "para sync - sync has some errors,please check"))
client.setBlockSyncState(BlockSyncStateNone) client.setBlockSyncState(BlockSyncStateNone)
...@@ -135,14 +131,14 @@ func (client *client) batchSyncBlocks() { ...@@ -135,14 +131,14 @@ func (client *client) batchSyncBlocks() {
} }
//获取每一轮可执行状态 //获取每一轮可执行状态
func (client *client) getNextAction() (NextActionType, *types.Block, *pt.ParaLocalDbBlock, int64, error) { func (client *BlockSyncClient) getNextAction() (NextActionType, *types.Block, *pt.ParaLocalDbBlock, int64, error) {
lastBlock, err := client.getLastBlockInfo() lastBlock, err := client.paraClient.getLastBlockInfo()
if err != nil { if err != nil {
//取已执行最新区块发生错误,不做任何操作 //取已执行最新区块发生错误,不做任何操作
return NextActionKeep, nil, nil, -1, err return NextActionKeep, nil, nil, -1, err
} }
lastLocalHeight, err := client.getLastLocalHeight() lastLocalHeight, err := client.paraClient.getLastLocalHeight()
if err != nil { if err != nil {
//取db中最新高度区块发生错误,不做任何操作 //取db中最新高度区块发生错误,不做任何操作
return NextActionKeep, nil, nil, lastLocalHeight, err return NextActionKeep, nil, nil, lastLocalHeight, err
...@@ -150,45 +146,45 @@ func (client *client) getNextAction() (NextActionType, *types.Block, *pt.ParaLoc ...@@ -150,45 +146,45 @@ func (client *client) getNextAction() (NextActionType, *types.Block, *pt.ParaLoc
if lastLocalHeight <= 0 { if lastLocalHeight <= 0 {
//db中最新高度为0,不做任何操作(创世区块) //db中最新高度为0,不做任何操作(创世区块)
return NextActionKeep, nil, nil, lastLocalHeight, err return NextActionKeep, nil, nil, lastLocalHeight, nil
} }
switch { switch {
case lastLocalHeight < lastBlock.Height: case lastLocalHeight < lastBlock.Height:
//db中最新区块高度小于已执行最新区块高度,回滚 //db中最新区块高度小于已执行最新区块高度,回滚
return NextActionRollback, lastBlock, nil, lastLocalHeight, err return NextActionRollback, lastBlock, nil, lastLocalHeight, nil
case lastLocalHeight == lastBlock.Height: case lastLocalHeight == lastBlock.Height:
localBlock, err := client.getLocalBlockByHeight(lastBlock.Height) localBlock, err := client.paraClient.getLocalBlockByHeight(lastBlock.Height)
if err != nil { if err != nil {
//取db中指定高度区块发生错误,不做任何操作 //取db中指定高度区块发生错误,不做任何操作
return NextActionKeep, nil, nil, lastLocalHeight, err return NextActionKeep, nil, nil, lastLocalHeight, err
} }
if common.ToHex(localBlock.MainHash) == common.ToHex(lastBlock.MainHash) { if common.ToHex(localBlock.MainHash) == common.ToHex(lastBlock.MainHash) {
//db中最新区块高度等于已执行最新区块高度并且hash相同,不做任何操作(已保持同步状态) //db中最新区块高度等于已执行最新区块高度并且hash相同,不做任何操作(已保持同步状态)
return NextActionKeep, nil, nil, lastLocalHeight, err return NextActionKeep, nil, nil, lastLocalHeight, nil
} }
//db中最新区块高度等于已执行最新区块高度并且hash不同,回滚 //db中最新区块高度等于已执行最新区块高度并且hash不同,回滚
return NextActionRollback, lastBlock, nil, lastLocalHeight, err return NextActionRollback, lastBlock, nil, lastLocalHeight, nil
default: default:
// lastLocalHeight > lastBlock.Height // lastLocalHeight > lastBlock.Height
localBlock, err := client.getLocalBlockByHeight(lastBlock.Height + 1) localBlock, err := client.paraClient.getLocalBlockByHeight(lastBlock.Height + 1)
if err != nil { if err != nil {
//取db中后一高度区块发生错误,不做任何操作 //取db中后一高度区块发生错误,不做任何操作
return NextActionKeep, nil, nil, lastLocalHeight, err return NextActionKeep, nil, nil, lastLocalHeight, err
} }
if common.ToHex(localBlock.ParentMainHash) != common.ToHex(lastBlock.MainHash) { if common.ToHex(localBlock.ParentMainHash) != common.ToHex(lastBlock.MainHash) {
//db中后一高度区块的父hash不等于已执行最新区块的hash,回滚 //db中后一高度区块的父hash不等于已执行最新区块的hash,回滚
return NextActionRollback, lastBlock, nil, lastLocalHeight, err return NextActionRollback, lastBlock, nil, lastLocalHeight, nil
} }
//db中后一高度区块的父hash等于已执行最新区块的hash,执行区块创建 //db中后一高度区块的父hash等于已执行最新区块的hash,执行区块创建
return NextActionAdd, lastBlock, localBlock, lastLocalHeight, err return NextActionAdd, lastBlock, localBlock, lastLocalHeight, nil
} }
} }
//根据当前可执行状态执行区块操作 //根据当前可执行状态执行区块操作
//返回参数 //返回参数
//bool 是否已完成同步 //bool 是否已完成同步
func (client *client) syncBlocksIfNeed() (bool, error) { func (client *BlockSyncClient) syncBlocksIfNeed() (bool, error) {
nextAction, lastBlock, localBlock, lastLocalHeight, err := client.getNextAction() nextAction, lastBlock, localBlock, lastLocalHeight, err := client.getNextAction()
if err != nil { if err != nil {
return false, err return false, err
...@@ -209,13 +205,13 @@ func (client *client) syncBlocksIfNeed() (bool, error) { ...@@ -209,13 +205,13 @@ func (client *client) syncBlocksIfNeed() (bool, error) {
return false, client.rollbackBlock(lastBlock) return false, client.rollbackBlock(lastBlock)
default: //NextActionKeep default: //NextActionKeep
//1 已完成同步,没有需要同步的块 //1 已完成同步,没有需要同步的块
return true, err return true, nil
} }
} }
//批量删除下载层缓冲数据 //批量删除下载层缓冲数据
func (client *client) delLocalBlocks(startHeight int64, endHeight int64) error { func (client *BlockSyncClient) delLocalBlocks(startHeight int64, endHeight int64) error {
if startHeight > endHeight { if startHeight > endHeight {
return errors.New("para sync - startHeight > endHeight,can't clear local blocks") return errors.New("para sync - startHeight > endHeight,can't clear local blocks")
} }
...@@ -240,11 +236,11 @@ func (client *client) delLocalBlocks(startHeight int64, endHeight int64) error { ...@@ -240,11 +236,11 @@ func (client *client) delLocalBlocks(startHeight int64, endHeight int64) error {
plog.Info("Para sync - clear local blocks", "startHeight:", startHeight, "endHeight:", endHeight) plog.Info("Para sync - clear local blocks", "startHeight:", startHeight, "endHeight:", endHeight)
return client.setLocalDb(set) return client.paraClient.setLocalDb(set)
} }
//最低高度没有设置的时候设置一下最低高度 //最低高度没有设置的时候设置一下最低高度
func (client *client) initFirstLocalHeightIfNeed() error { func (client *BlockSyncClient) initFirstLocalHeightIfNeed() error {
height, err := client.getFirstLocalHeight() height, err := client.getFirstLocalHeight()
if err != nil || height < 0 { if err != nil || height < 0 {
...@@ -253,17 +249,17 @@ func (client *client) initFirstLocalHeightIfNeed() error { ...@@ -253,17 +249,17 @@ func (client *client) initFirstLocalHeightIfNeed() error {
kv := &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: 0})} kv := &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: 0})}
set.KV = append(set.KV, kv) set.KV = append(set.KV, kv)
return client.setLocalDb(set) return client.paraClient.setLocalDb(set)
} }
return err return err
} }
//获取下载层缓冲数据的区块最低高度 //获取下载层缓冲数据的区块最低高度
func (client *client) getFirstLocalHeight() (int64, error) { func (client *BlockSyncClient) getFirstLocalHeight() (int64, error) {
key := calcTitleFirstHeightKey(types.GetTitle()) key := calcTitleFirstHeightKey(types.GetTitle())
set := &types.LocalDBGet{Keys: [][]byte{key}} set := &types.LocalDBGet{Keys: [][]byte{key}}
value, err := client.getLocalDb(set, len(set.Keys)) value, err := client.paraClient.getLocalDb(set, len(set.Keys))
if err != nil { if err != nil {
return -1, err return -1, err
} }
...@@ -280,8 +276,8 @@ func (client *client) getFirstLocalHeight() (int64, error) { ...@@ -280,8 +276,8 @@ func (client *client) getFirstLocalHeight() (int64, error) {
} }
//清除指定数量(localCacheCount)以前的区块 //清除指定数量(localCacheCount)以前的区块
func (client *client) clearLocalOldBlocks() (bool, error) { func (client *BlockSyncClient) clearLocalOldBlocks() (bool, error) {
lastLocalHeight, err := client.getLastLocalHeight() lastLocalHeight, err := client.paraClient.getLastLocalHeight()
if err != nil { if err != nil {
return false, err return false, err
} }
...@@ -291,8 +287,8 @@ func (client *client) clearLocalOldBlocks() (bool, error) { ...@@ -291,8 +287,8 @@ func (client *client) clearLocalOldBlocks() (bool, error) {
return false, err return false, err
} }
canDelCount := lastLocalHeight - firstLocalHeight - client.blockSyncClient.maxCacheCount + 1 canDelCount := lastLocalHeight - firstLocalHeight - client.maxCacheCount + 1
if canDelCount <= client.blockSyncClient.maxCacheCount { if canDelCount <= client.maxCacheCount {
return false, nil return false, nil
} }
...@@ -300,7 +296,7 @@ func (client *client) clearLocalOldBlocks() (bool, error) { ...@@ -300,7 +296,7 @@ func (client *client) clearLocalOldBlocks() (bool, error) {
} }
// miner tx need all para node create, but not all node has auth account, here just not sign to keep align // miner tx need all para node create, but not all node has auth account, here just not sign to keep align
func (client *client) addMinerTx(preStateHash []byte, block *types.Block, localBlock *pt.ParaLocalDbBlock) error { func (client *BlockSyncClient) addMinerTx(preStateHash []byte, block *types.Block, localBlock *pt.ParaLocalDbBlock) error {
status := &pt.ParacrossNodeStatus{ status := &pt.ParacrossNodeStatus{
Title: types.GetTitle(), Title: types.GetTitle(),
Height: block.Height, Height: block.Height,
...@@ -318,14 +314,14 @@ func (client *client) addMinerTx(preStateHash []byte, block *types.Block, localB ...@@ -318,14 +314,14 @@ func (client *client) addMinerTx(preStateHash []byte, block *types.Block, localB
return err return err
} }
tx.Sign(types.SECP256K1, client.privateKey) tx.Sign(types.SECP256K1, client.paraClient.privateKey)
block.Txs = append([]*types.Transaction{tx}, block.Txs...) block.Txs = append([]*types.Transaction{tx}, block.Txs...)
return nil return nil
} }
//添加一个区块 //添加一个区块
func (client *client) addBlock(lastBlock *types.Block, localBlock *pt.ParaLocalDbBlock) error { func (client *BlockSyncClient) addBlock(lastBlock *types.Block, localBlock *pt.ParaLocalDbBlock) error {
var newBlock types.Block var newBlock types.Block
plog.Debug(fmt.Sprintf("Para sync - the len txs is: %v", len(localBlock.Txs))) plog.Debug(fmt.Sprintf("Para sync - the len txs is: %v", len(localBlock.Txs)))
...@@ -353,7 +349,7 @@ func (client *client) addBlock(lastBlock *types.Block, localBlock *pt.ParaLocalD ...@@ -353,7 +349,7 @@ func (client *client) addBlock(lastBlock *types.Block, localBlock *pt.ParaLocalD
} }
// 向blockchain删区块 // 向blockchain删区块
func (client *client) rollbackBlock(block *types.Block) error { func (client *BlockSyncClient) rollbackBlock(block *types.Block) error {
plog.Debug("Para sync - delete block in parachain") plog.Debug("Para sync - delete block in parachain")
start := block.Height start := block.Height
...@@ -361,31 +357,31 @@ func (client *client) rollbackBlock(block *types.Block) error { ...@@ -361,31 +357,31 @@ func (client *client) rollbackBlock(block *types.Block) error {
panic("Para sync - Parachain attempt to Delete GenesisBlock !") panic("Para sync - Parachain attempt to Delete GenesisBlock !")
} }
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetBlocks, &types.ReqBlocks{Start: start, End: start, IsDetail: true, Pid: []string{""}}) msg := client.paraClient.GetQueueClient().NewMessage("blockchain", types.EventGetBlocks, &types.ReqBlocks{Start: start, End: start, IsDetail: true, Pid: []string{""}})
err := client.GetQueueClient().Send(msg, true) err := client.paraClient.GetQueueClient().Send(msg, true)
if err != nil { if err != nil {
return err return err
} }
resp, err := client.GetQueueClient().Wait(msg) resp, err := client.paraClient.GetQueueClient().Wait(msg)
if err != nil { if err != nil {
return err return err
} }
blocks := resp.GetData().(*types.BlockDetails) blocks := resp.GetData().(*types.BlockDetails)
parablockDetail := &types.ParaChainBlockDetail{Blockdetail: blocks.Items[0]} parablockDetail := &types.ParaChainBlockDetail{Blockdetail: blocks.Items[0]}
msg = client.GetQueueClient().NewMessage("blockchain", types.EventDelParaChainBlockDetail, parablockDetail) msg = client.paraClient.GetQueueClient().NewMessage("blockchain", types.EventDelParaChainBlockDetail, parablockDetail)
err = client.GetQueueClient().Send(msg, true) err = client.paraClient.GetQueueClient().Send(msg, true)
if err != nil { if err != nil {
return err return err
} }
resp, err = client.GetQueueClient().Wait(msg) resp, err = client.paraClient.GetQueueClient().Wait(msg)
if err != nil { if err != nil {
return err return err
} }
if resp.GetData().(*types.Reply).IsOk { if resp.GetData().(*types.Reply).IsOk {
if client.authAccount != "" { if client.paraClient.authAccount != "" {
client.commitMsgClient.updateChainHeight(blocks.Items[0].Block.Height, true) client.paraClient.commitMsgClient.updateChainHeight(blocks.Items[0].Block.Height, true)
} }
} else { } else {
...@@ -396,17 +392,17 @@ func (client *client) rollbackBlock(block *types.Block) error { ...@@ -396,17 +392,17 @@ func (client *client) rollbackBlock(block *types.Block) error {
} }
// 向blockchain写区块 // 向blockchain写区块
func (client *client) writeBlock(prev []byte, paraBlock *types.Block) error { func (client *BlockSyncClient) writeBlock(prev []byte, paraBlock *types.Block) error {
//共识模块不执行block,统一由blockchain模块执行block并做去重的处理,返回执行后的blockdetail //共识模块不执行block,统一由blockchain模块执行block并做去重的处理,返回执行后的blockdetail
blockDetail := &types.BlockDetail{Block: paraBlock} blockDetail := &types.BlockDetail{Block: paraBlock}
parablockDetail := &types.ParaChainBlockDetail{Blockdetail: blockDetail} parablockDetail := &types.ParaChainBlockDetail{Blockdetail: blockDetail}
msg := client.GetQueueClient().NewMessage("blockchain", types.EventAddParaChainBlockDetail, parablockDetail) msg := client.paraClient.GetQueueClient().NewMessage("blockchain", types.EventAddParaChainBlockDetail, parablockDetail)
err := client.GetQueueClient().Send(msg, true) err := client.paraClient.GetQueueClient().Send(msg, true)
if err != nil { if err != nil {
return err return err
} }
resp, err := client.GetQueueClient().Wait(msg) resp, err := client.paraClient.GetQueueClient().Wait(msg)
if err != nil { if err != nil {
return err return err
} }
...@@ -415,32 +411,32 @@ func (client *client) writeBlock(prev []byte, paraBlock *types.Block) error { ...@@ -415,32 +411,32 @@ func (client *client) writeBlock(prev []byte, paraBlock *types.Block) error {
return errors.New("Para sync - block detail is nil") return errors.New("Para sync - block detail is nil")
} }
client.SetCurrentBlock(blkdetail.Block) client.paraClient.SetCurrentBlock(blkdetail.Block)
if client.authAccount != "" { if client.paraClient.authAccount != "" {
client.commitMsgClient.updateChainHeight(blockDetail.Block.Height, false) client.paraClient.commitMsgClient.updateChainHeight(blockDetail.Block.Height, false)
} }
return nil return nil
} }
//获取同步状态 //获取同步状态
func (client *client) getBlockSyncState() BlockSyncState { func (client *BlockSyncClient) getBlockSyncState() BlockSyncState {
return BlockSyncState(atomic.LoadInt32(&client.blockSyncClient.syncState)) return BlockSyncState(atomic.LoadInt32(&client.syncState))
} }
//设置同步状态 //设置同步状态
func (client *client) setBlockSyncState(state BlockSyncState) { func (client *BlockSyncClient) setBlockSyncState(state BlockSyncState) {
atomic.StoreInt32(&client.blockSyncClient.syncState, int32(state)) atomic.StoreInt32(&client.syncState, int32(state))
} }
//打印错误日志 //打印错误日志
func (client *client) printError(err error) { func (client *BlockSyncClient) printError(err error) {
plog.Error(fmt.Sprintf("Para sync - sync block error:%v", err.Error())) plog.Error(fmt.Sprintf("Para sync - sync block error:%v", err.Error()))
} }
//初始化 //初始化
func (client *client) syncInit() { func (client *BlockSyncClient) syncInit() {
plog.Info("Para sync - init") plog.Info("Para sync - init")
client.setBlockSyncState(BlockSyncStateNone) client.setBlockSyncState(BlockSyncStateNone)
err := client.initFirstLocalHeightIfNeed() err := client.initFirstLocalHeightIfNeed()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment