Commit 83fa966c authored by yukang's avatar yukang Committed by vipwzw

correct code style

parent 5cd041d5
......@@ -63,7 +63,7 @@ type client struct {
execAPI api.ExecutorAPI
caughtUp int32
commitMsgClient *commitMsgClient
blockSyncClient *BlockSyncClient
blockSyncClient *blockSyncClient
authAccount string
privateKey crypto.PrivKey
wg sync.WaitGroup
......@@ -182,12 +182,12 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
para.commitMsgClient.consensStartHeight = subcfg.ParaConsensStartHeight - 1
}
para.blockSyncClient = &BlockSyncClient{
para.blockSyncClient = &blockSyncClient{
paraClient: para,
notifyChan: make(chan bool, 1),
quitChan: make(chan struct{}),
maxCacheCount: DefaultMaxCacheCount,
maxSyncErrCount: DefaultMaxSyncErrCount,
maxCacheCount: defaultMaxCacheCount,
maxSyncErrCount: defaultMaxSyncErrCount,
isPrintDebugInfo: false,
}
if subcfg.MaxCacheCount > 0 {
......@@ -229,7 +229,7 @@ func (client *client) SetQueueClient(c queue.Client) {
client.wg.Add(1)
go client.CreateBlock()
client.wg.Add(1)
go client.blockSyncClient.SyncBlocks()
go client.blockSyncClient.syncBlocks()
}
func (client *client) InitBlock() {
......@@ -258,7 +258,7 @@ func (client *client) InitBlock() {
tx := client.CreateGenesisTx()
newblock.Txs = tx
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
err := client.blockSyncClient.CreateGenesisBlock(newblock)
err := client.blockSyncClient.createGenesisBlock(newblock)
if err != nil {
panic(fmt.Sprintf("para chain create genesis block,err=%s", err.Error()))
}
......
......@@ -204,7 +204,7 @@ func TestAddMinerTx(t *testing.T) {
para.commitMsgClient = new(commitMsgClient)
para.commitMsgClient.paraClient = para
para.blockSyncClient = new(BlockSyncClient)
para.blockSyncClient = new(blockSyncClient)
para.blockSyncClient.paraClient = para
para.blockSyncClient.addMinerTx(nil, block, localBlock)
assert.Equal(t, 1, len(block.Txs))
......
......@@ -314,8 +314,8 @@ func (client *commitMsgClient) isSync() bool {
return false
}
if !client.paraClient.blockSyncClient.SyncHasCaughtUp() {
plog.Info("para is not Sync", "syncCaughtUp", client.paraClient.blockSyncClient.SyncHasCaughtUp())
if !client.paraClient.blockSyncClient.syncHasCaughtUp() {
plog.Info("para is not Sync", "syncCaughtUp", client.paraClient.blockSyncClient.syncHasCaughtUp())
return false
}
......
......@@ -465,7 +465,7 @@ func (client *client) procLocalBlocks(mainBlocks *types.ParaTxDetails) error {
}
}
if notify {
client.blockSyncClient.NotifyLocalChange()
client.blockSyncClient.notifyLocalChange()
}
return nil
......
......@@ -16,14 +16,14 @@ import (
)
const (
//DefaultMaxCacheCount 默认local最大缓冲数
DefaultMaxCacheCount = int64(1000)
//DefaultMaxSyncErrCount 默认连续错误最大数量
DefaultMaxSyncErrCount = int32(100)
//defaultMaxCacheCount 默认local最大缓冲数
defaultMaxCacheCount = int64(1000)
//defaultMaxSyncErrCount 默认连续错误最大数量
defaultMaxSyncErrCount = int32(100)
)
//BlockSyncClient 区块同步控制和状态变量
type BlockSyncClient struct {
//blockSyncClient 区块同步控制和状态变量
type blockSyncClient struct {
paraClient *client
//notifyChan 下载通知通道
notifyChan chan bool
......@@ -41,52 +41,52 @@ type BlockSyncClient struct {
isPrintDebugInfo bool
}
//NextActionType 定义每一轮可执行操作
type NextActionType int8
//nextActionType 定义每一轮可执行操作
type nextActionType int8
const (
//NextActionKeep 保持
NextActionKeep NextActionType = iota
//NextActionRollback 回滚到前一区块
NextActionRollback
//NextActionAdd 增加一个新的区块
NextActionAdd
//nextActionKeep 保持
nextActionKeep nextActionType = iota
//nextActionRollback 回滚到前一区块
nextActionRollback
//nextActionAdd 增加一个新的区块
nextActionAdd
)
//BlockSyncState 定义当前区块同步状态
type BlockSyncState int32
//blockSyncState 定义当前区块同步状态
type blockSyncState int32
const (
//BlockSyncStateNone 未同步状态
BlockSyncStateNone BlockSyncState = iota
//BlockSyncStateSyncing 正在同步中
BlockSyncStateSyncing
//BlockSyncStateFinished 同步完成
BlockSyncStateFinished
//blockSyncStateNone 未同步状态
blockSyncStateNone blockSyncState = iota
//blockSyncStateSyncing 正在同步中
blockSyncStateSyncing
//blockSyncStateFinished 同步完成
blockSyncStateFinished
)
//SyncHasCaughtUp 判断同步是否已追赶上,供发送层调用
func (client *BlockSyncClient) SyncHasCaughtUp() bool {
//syncHasCaughtUp 判断同步是否已追赶上,供发送层调用
func (client *blockSyncClient) syncHasCaughtUp() bool {
return atomic.LoadInt32(&client.isSyncCaughtUpAtom) == 1
}
//NotifyLocalChange 下载状态通知,供下载层调用
func (client *BlockSyncClient) NotifyLocalChange() {
//notifyLocalChange 下载状态通知,供下载层调用
func (client *blockSyncClient) notifyLocalChange() {
client.printDebugInfo("Para sync - notify change")
if client.getBlockSyncState() != BlockSyncStateSyncing {
if client.getBlockSyncState() != blockSyncStateSyncing {
client.printDebugInfo("Para sync - notified change")
client.notifyChan <- true
}
}
//CreateGenesisBlock 创建创世区块
func (client *BlockSyncClient) CreateGenesisBlock(newblock *types.Block) error {
//createGenesisBlock 创建创世区块
func (client *blockSyncClient) createGenesisBlock(newblock *types.Block) error {
return client.writeBlock(zeroHash[:], newblock)
}
//SyncBlocks 区块执行线程
//syncBlocks 区块执行线程
//循环执行
func (client *BlockSyncClient) SyncBlocks() {
func (client *blockSyncClient) syncBlocks() {
client.syncInit()
//首次同步,不用等待通知
......@@ -109,8 +109,8 @@ out:
}
//批量执行同步区块
func (client *BlockSyncClient) batchSyncBlocks() {
client.setBlockSyncState(BlockSyncStateSyncing)
func (client *blockSyncClient) batchSyncBlocks() {
client.setBlockSyncState(blockSyncStateSyncing)
client.printDebugInfo("Para sync - syncing")
errCount := int32(0)
......@@ -129,7 +129,7 @@ func (client *BlockSyncClient) batchSyncBlocks() {
if errCount > client.maxSyncErrCount {
client.printError(errors.New(
"para sync - sync has some errors,please check"))
client.setBlockSyncState(BlockSyncStateNone)
client.setBlockSyncState(blockSyncStateNone)
return
}
//没有需要同步的块,清理本地数据库中localCacheCount前的块
......@@ -139,7 +139,7 @@ func (client *BlockSyncClient) batchSyncBlocks() {
client.printError(err)
}
client.setBlockSyncState(BlockSyncStateFinished)
client.setBlockSyncState(blockSyncStateFinished)
client.printDebugInfo("Para sync - finished")
return
}
......@@ -148,67 +148,67 @@ func (client *BlockSyncClient) batchSyncBlocks() {
}
//获取每一轮可执行状态
func (client *BlockSyncClient) getNextAction() (NextActionType, *types.Block, *pt.ParaLocalDbBlock, int64, error) {
func (client *blockSyncClient) getNextAction() (nextActionType, *types.Block, *pt.ParaLocalDbBlock, int64, error) {
lastBlock, err := client.paraClient.getLastBlockInfo()
if err != nil {
//取已执行最新区块发生错误,不做任何操作
return NextActionKeep, nil, nil, -1, err
return nextActionKeep, nil, nil, -1, err
}
lastLocalHeight, err := client.paraClient.getLastLocalHeight()
if err != nil {
//取db中最新高度区块发生错误,不做任何操作
return NextActionKeep, nil, nil, lastLocalHeight, err
return nextActionKeep, nil, nil, lastLocalHeight, err
}
if lastLocalHeight <= 0 {
//db中最新高度为0,不做任何操作(创世区块)
return NextActionKeep, nil, nil, lastLocalHeight, nil
return nextActionKeep, nil, nil, lastLocalHeight, nil
}
switch {
case lastLocalHeight < lastBlock.Height:
//db中最新区块高度小于已执行最新区块高度,回滚
return NextActionRollback, lastBlock, nil, lastLocalHeight, nil
return nextActionRollback, lastBlock, nil, lastLocalHeight, nil
case lastLocalHeight == lastBlock.Height:
localBlock, err := client.paraClient.getLocalBlockByHeight(lastBlock.Height)
if err != nil {
//取db中指定高度区块发生错误,不做任何操作
return NextActionKeep, nil, nil, lastLocalHeight, err
return nextActionKeep, nil, nil, lastLocalHeight, err
}
if common.ToHex(localBlock.MainHash) == common.ToHex(lastBlock.MainHash) {
//db中最新区块高度等于已执行最新区块高度并且hash相同,不做任何操作(已保持同步状态)
return NextActionKeep, nil, nil, lastLocalHeight, nil
return nextActionKeep, nil, nil, lastLocalHeight, nil
}
//db中最新区块高度等于已执行最新区块高度并且hash不同,回滚
return NextActionRollback, lastBlock, nil, lastLocalHeight, nil
return nextActionRollback, lastBlock, nil, lastLocalHeight, nil
default:
// lastLocalHeight > lastBlock.Height
localBlock, err := client.paraClient.getLocalBlockByHeight(lastBlock.Height + 1)
if err != nil {
//取db中后一高度区块发生错误,不做任何操作
return NextActionKeep, nil, nil, lastLocalHeight, err
return nextActionKeep, nil, nil, lastLocalHeight, err
}
if common.ToHex(localBlock.ParentMainHash) != common.ToHex(lastBlock.MainHash) {
//db中后一高度区块的父hash不等于已执行最新区块的hash,回滚
return NextActionRollback, lastBlock, nil, lastLocalHeight, nil
return nextActionRollback, lastBlock, nil, lastLocalHeight, nil
}
//db中后一高度区块的父hash等于已执行最新区块的hash,执行区块创建
return NextActionAdd, lastBlock, localBlock, lastLocalHeight, nil
return nextActionAdd, lastBlock, localBlock, lastLocalHeight, nil
}
}
//根据当前可执行状态执行区块操作
//返回参数
//bool 是否已完成同步
func (client *BlockSyncClient) syncBlocksIfNeed() (bool, error) {
func (client *blockSyncClient) syncBlocksIfNeed() (bool, error) {
nextAction, lastBlock, localBlock, lastLocalHeight, err := client.getNextAction()
if err != nil {
return false, err
}
switch nextAction {
case NextActionAdd:
case nextActionAdd:
//1 db中后一高度区块的父hash等于已执行最新区块的hash
plog.Info("Para sync - add block",
"lastBlock.Height", lastBlock.Height, "lastLocalHeight", lastLocalHeight)
......@@ -226,7 +226,7 @@ func (client *BlockSyncClient) syncBlocksIfNeed() (bool, error) {
}
return false, err
case NextActionRollback:
case nextActionRollback:
//1 db中最新区块高度小于已执行最新区块高度
//2 db中最新区块高度等于已执行最新区块高度并且hash不同
//3 db中后一高度区块的父hash不等于已执行最新区块的hash
......@@ -245,7 +245,7 @@ func (client *BlockSyncClient) syncBlocksIfNeed() (bool, error) {
}
return false, err
default: //NextActionKeep
default: //nextActionKeep
//1 已完成同步,没有需要同步的块
return true, nil
}
......@@ -253,7 +253,7 @@ func (client *BlockSyncClient) syncBlocksIfNeed() (bool, error) {
}
//批量删除下载层缓冲数据
func (client *BlockSyncClient) delLocalBlocks(startHeight int64, endHeight int64) error {
func (client *blockSyncClient) delLocalBlocks(startHeight int64, endHeight int64) error {
if startHeight > endHeight {
return errors.New("para sync - startHeight > endHeight,can't clear local blocks")
}
......@@ -282,7 +282,7 @@ func (client *BlockSyncClient) delLocalBlocks(startHeight int64, endHeight int64
}
//最低高度没有设置的时候设置一下最低高度
func (client *BlockSyncClient) initFirstLocalHeightIfNeed() error {
func (client *blockSyncClient) initFirstLocalHeightIfNeed() error {
height, err := client.getFirstLocalHeight()
if err != nil || height < 0 {
......@@ -298,7 +298,7 @@ func (client *BlockSyncClient) initFirstLocalHeightIfNeed() error {
}
//获取下载层缓冲数据的区块最低高度
func (client *BlockSyncClient) getFirstLocalHeight() (int64, error) {
func (client *blockSyncClient) getFirstLocalHeight() (int64, error) {
key := calcTitleFirstHeightKey(types.GetTitle())
set := &types.LocalDBGet{Keys: [][]byte{key}}
value, err := client.paraClient.getLocalDb(set, len(set.Keys))
......@@ -323,7 +323,7 @@ func (client *BlockSyncClient) getFirstLocalHeight() (int64, error) {
}
//清除指定数量(localCacheCount)以前的区块
func (client *BlockSyncClient) clearLocalOldBlocks() (bool, error) {
func (client *blockSyncClient) clearLocalOldBlocks() (bool, error) {
lastLocalHeight, err := client.paraClient.getLastLocalHeight()
if err != nil {
return false, err
......@@ -343,7 +343,7 @@ func (client *BlockSyncClient) clearLocalOldBlocks() (bool, error) {
}
// miner tx need all para node create, but not all node has auth account, here just not sign to keep align
func (client *BlockSyncClient) addMinerTx(preStateHash []byte, block *types.Block, localBlock *pt.ParaLocalDbBlock) error {
func (client *blockSyncClient) addMinerTx(preStateHash []byte, block *types.Block, localBlock *pt.ParaLocalDbBlock) error {
status := &pt.ParacrossNodeStatus{
Title: types.GetTitle(),
Height: block.Height,
......@@ -368,7 +368,7 @@ func (client *BlockSyncClient) addMinerTx(preStateHash []byte, block *types.Bloc
}
//添加一个区块
func (client *BlockSyncClient) addBlock(lastBlock *types.Block, localBlock *pt.ParaLocalDbBlock) error {
func (client *blockSyncClient) addBlock(lastBlock *types.Block, localBlock *pt.ParaLocalDbBlock) error {
var newBlock types.Block
client.printDebugInfo(fmt.Sprintf("Para sync - the len txs is: %v", len(localBlock.Txs)))
......@@ -396,7 +396,7 @@ func (client *BlockSyncClient) addBlock(lastBlock *types.Block, localBlock *pt.P
}
// 向blockchain删区块
func (client *BlockSyncClient) rollbackBlock(block *types.Block) error {
func (client *blockSyncClient) rollbackBlock(block *types.Block) error {
client.printDebugInfo("Para sync - delete block in parachain")
start := block.Height
......@@ -439,7 +439,7 @@ func (client *BlockSyncClient) rollbackBlock(block *types.Block) error {
}
// 向blockchain写区块
func (client *BlockSyncClient) writeBlock(prev []byte, paraBlock *types.Block) error {
func (client *blockSyncClient) writeBlock(prev []byte, paraBlock *types.Block) error {
//共识模块不执行block,统一由blockchain模块执行block并做去重的处理,返回执行后的blockdetail
blockDetail := &types.BlockDetail{Block: paraBlock}
......@@ -464,17 +464,17 @@ func (client *BlockSyncClient) writeBlock(prev []byte, paraBlock *types.Block) e
}
//获取同步状态
func (client *BlockSyncClient) getBlockSyncState() BlockSyncState {
return BlockSyncState(atomic.LoadInt32(&client.syncState))
func (client *blockSyncClient) getBlockSyncState() blockSyncState {
return blockSyncState(atomic.LoadInt32(&client.syncState))
}
//设置同步状态
func (client *BlockSyncClient) setBlockSyncState(state BlockSyncState) {
func (client *blockSyncClient) setBlockSyncState(state blockSyncState) {
atomic.StoreInt32(&client.syncState, int32(state))
}
//设置是否追赶上
func (client *BlockSyncClient) setSyncCaughtUp(isCaughtUp bool) {
func (client *blockSyncClient) setSyncCaughtUp(isCaughtUp bool) {
if isCaughtUp {
atomic.StoreInt32(&client.isSyncCaughtUpAtom, 1)
} else {
......@@ -483,21 +483,21 @@ func (client *BlockSyncClient) setSyncCaughtUp(isCaughtUp bool) {
}
//打印错误日志
func (client *BlockSyncClient) printError(err error) {
func (client *blockSyncClient) printError(err error) {
plog.Error(fmt.Sprintf("Para sync - sync block error:%v", err.Error()))
}
//打印调试信息
func (client *BlockSyncClient) printDebugInfo(msg string, ctx ...interface{}) {
func (client *blockSyncClient) printDebugInfo(msg string, ctx ...interface{}) {
if client.isPrintDebugInfo {
plog.Info(msg, ctx...)
}
}
//初始化
func (client *BlockSyncClient) syncInit() {
func (client *blockSyncClient) syncInit() {
client.printDebugInfo("Para sync - init")
client.setBlockSyncState(BlockSyncStateNone)
client.setBlockSyncState(blockSyncStateNone)
client.setSyncCaughtUp(false)
err := client.initFirstLocalHeightIfNeed()
if err != nil {
......
......@@ -68,7 +68,7 @@ func createParaTestInstance(t *testing.T, q queue.Queue) *client {
para.privateKey = priKey
//实例化BlockSyncClient
para.blockSyncClient = &BlockSyncClient{
para.blockSyncClient = &blockSyncClient{
paraClient: para,
notifyChan: make(chan bool),
quitChan: make(chan struct{}),
......@@ -333,7 +333,7 @@ func mockMessageReply(q queue.Queue) {
//测试创世区块写入
func testCreateGenesisBlock(t *testing.T, para *client, testLoopCount int32) {
genesisBlock := makeGenesisBlockInputTestData()
err := para.blockSyncClient.CreateGenesisBlock(genesisBlock)
err := para.blockSyncClient.createGenesisBlock(genesisBlock)
switch testLoopCount {
case 0:
......@@ -394,9 +394,9 @@ func testSyncBlocksIfNeed(t *testing.T, para *client, testLoopCount int32) {
//测试SyncHasCaughtUp
func testSyncHasCaughtUp(t *testing.T, para *client, testLoopCount int32) {
oldValue := para.blockSyncClient.SyncHasCaughtUp()
oldValue := para.blockSyncClient.syncHasCaughtUp()
para.blockSyncClient.setSyncCaughtUp(true)
isSyncHasCaughtUp := para.blockSyncClient.SyncHasCaughtUp()
isSyncHasCaughtUp := para.blockSyncClient.syncHasCaughtUp()
para.blockSyncClient.setSyncCaughtUp(oldValue)
assert.Equal(t, true, isSyncHasCaughtUp)
......@@ -405,11 +405,11 @@ func testSyncHasCaughtUp(t *testing.T, para *client, testLoopCount int32) {
//测试getBlockSyncState
func testGetBlockSyncState(t *testing.T, para *client, testLoopCount int32) {
oldValue := para.blockSyncClient.getBlockSyncState()
para.blockSyncClient.setBlockSyncState(BlockSyncStateFinished)
para.blockSyncClient.setBlockSyncState(blockSyncStateFinished)
syncState := para.blockSyncClient.getBlockSyncState()
para.blockSyncClient.setBlockSyncState(oldValue)
assert.Equal(t, true, syncState == BlockSyncStateFinished)
assert.Equal(t, true, syncState == blockSyncStateFinished)
}
//执行所有函数测试
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment