Commit e7ab702d authored by vipwzw's avatar vipwzw

auto ci

parent 646855e3
......@@ -20,4 +20,4 @@ func calcTitleLastHeightKey(title string) []byte {
func calcTitleFirstHeightKey(title string) []byte {
return []byte(fmt.Sprintf("%s-TFH-%s", types.ConsensusParaTxsPrefix, title))
}
\ No newline at end of file
}
......@@ -48,7 +48,7 @@ var (
mainBlockHashForkHeight int64 = 209186 //calc block hash fork height in main chain
mainParaSelfConsensusForkHeight int64 = types.MaxHeight //para chain self consensus height switch, must >= ForkParacrossCommitTx of main
mainForkParacrossCommitTx int64 = types.MaxHeight //support paracross commit tx fork height in main chain: ForkParacrossCommitTx
localCacheCount int64 = 1000 // local cache block max count
localCacheCount int64 = 1000 // local cache block max count
batchFetchSeqEnable bool
batchFetchSeqNum int64 = 128
)
......@@ -71,7 +71,7 @@ type client struct {
mtx sync.Mutex
syncCaughtUpAtom int32
localChangeAtom int32
localChangeAtom int32
}
type subConfig struct {
......@@ -87,7 +87,7 @@ type subConfig struct {
MainParaSelfConsensusForkHeight int64 `json:"mainParaSelfConsensusForkHeight,omitempty"`
MainForkParacrossCommitTx int64 `json:"mainForkParacrossCommitTx,omitempty"`
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
LocalCacheCount int64 `json:"localCacheCount,omitempty"`
LocalCacheCount int64 `json:"localCacheCount,omitempty"`
BatchFetchSeqEnable uint32 `json:"batchFetchSeqEnable,omitempty"`
BatchFetchSeqNum int64 `json:"batchFetchSeqNum,omitempty"`
}
......@@ -260,7 +260,7 @@ func (client *client) InitBlock() {
}
// GetStartMainHash get StartMainHash in mainchain
func (client *client) GetStartMainHash(height int64) ([]byte) {
func (client *client) GetStartMainHash(height int64) []byte {
if height <= 0 {
panic(fmt.Sprintf("startHeight(%d) should be more than 0 in mainchain", height))
}
......
......@@ -5,20 +5,21 @@
package para
import (
"fmt"
"errors"
"fmt"
"sync/atomic"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/common/merkle"
"time"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/merkle"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
"time"
)
//NextActionType 定义每一轮可执行状态
type NextActionType int8
const (
_ NextActionType = iota
//NextActionRollback 回滚到前一区块
......@@ -31,12 +32,12 @@ const (
//获取同步状态,供发送层调用
func (client *client) SyncHasCaughtUp() bool {
return atomic.LoadInt32(&client.syncCaughtUpAtom) == 1
return atomic.LoadInt32(&client.syncCaughtUpAtom) == 1
}
//下载状态通知,供下载层调用
func (client *client) NotifyLocalChange() {
atomic.StoreInt32(&client.localChangeAtom,1)
atomic.StoreInt32(&client.localChangeAtom, 1)
}
//创建创世区块
......@@ -80,86 +81,86 @@ func (client *client) SyncBlocks() {
}
//获取每一轮可执行状态
func (client *client) getNextAction() (NextActionType,*types.Block,*pt.ParaLocalDbBlock,int64,error) {
func (client *client) getNextAction() (NextActionType, *types.Block, *pt.ParaLocalDbBlock, int64, error) {
lastBlock, err := client.getLastBlockInfo()
if err != nil {
//取已执行最新区块发生错误,不做任何操作
return NextActionKeep,nil,nil,-1,err
if err != nil {
//取已执行最新区块发生错误,不做任何操作
return NextActionKeep, nil, nil, -1, err
}
lastLocalHeight, err := client.getLastLocalHeight()
if err != nil {
if err != nil {
//取db中最新高度区块发生错误,不做任何操作
return NextActionKeep,nil,nil,lastLocalHeight,err
return NextActionKeep, nil, nil, lastLocalHeight, err
}
if lastLocalHeight <= 0 {
if lastLocalHeight <= 0 {
//db中最新高度为0,不做任何操作(创世区块)
return NextActionKeep,nil,nil,lastLocalHeight,err
return NextActionKeep, nil, nil, lastLocalHeight, err
} else if lastLocalHeight < lastBlock.Height {
//db中最新区块高度小于已执行最新区块高度,回滚
return NextActionRollback,lastBlock,nil,lastLocalHeight,err
return NextActionRollback, lastBlock, nil, lastLocalHeight, err
} else if lastLocalHeight == lastBlock.Height {
localBlock, err := client.getLocalBlockByHeight(lastBlock.Height)
if err != nil {
if err != nil {
//取db中指定高度区块发生错误,不做任何操作
return NextActionKeep,nil,nil,lastLocalHeight,err
return NextActionKeep, nil, nil, lastLocalHeight, err
}
if common.ToHex(localBlock.MainHash) == common.ToHex(lastBlock.MainHash) {
if common.ToHex(localBlock.MainHash) == common.ToHex(lastBlock.MainHash) {
//db中最新区块高度等于已执行最新区块高度并且hash相同,不做任何操作(已保持同步状态)
return NextActionKeep,nil,nil,lastLocalHeight,err
return NextActionKeep, nil, nil, lastLocalHeight, err
}
//db中最新区块高度等于已执行最新区块高度并且hash不同,回滚
return NextActionRollback,lastBlock,nil,lastLocalHeight,err
return NextActionRollback, lastBlock, nil, lastLocalHeight, err
}
// lastLocalHeight > lastBlock.Height
localBlock, err := client.getLocalBlockByHeight(lastBlock.Height+1)
if err != nil {
localBlock, err := client.getLocalBlockByHeight(lastBlock.Height + 1)
if err != nil {
//取db中后一高度区块发生错误,不做任何操作
return NextActionKeep,nil,nil,lastLocalHeight,err
return NextActionKeep, nil, nil, lastLocalHeight, err
}
if common.ToHex(localBlock.ParentMainHash) != common.ToHex(lastBlock.MainHash) {
if common.ToHex(localBlock.ParentMainHash) != common.ToHex(lastBlock.MainHash) {
//db中后一高度区块的父hash不等于已执行最新区块的hash,回滚
return NextActionRollback,lastBlock,nil,lastLocalHeight,err
return NextActionRollback, lastBlock, nil, lastLocalHeight, err
}
//db中后一高度区块的父hash等于已执行最新区块的hash,执行区块创建
return NextActionAdd,lastBlock,localBlock,lastLocalHeight,err
return NextActionAdd, lastBlock, localBlock, lastLocalHeight, err
}
//根据当前可执行状态执行区块操作
//返回参数
//bool 是否已完成同步
func (client *client) syncBlocksIfNeed() (bool,error) {
nextAction, lastBlock, localBlock,lastLocalHeight, err := client.getNextAction()
func (client *client) syncBlocksIfNeed() (bool, error) {
nextAction, lastBlock, localBlock, lastLocalHeight, err := client.getNextAction()
if err != nil {
return false,err
return false, err
}
switch nextAction {
case NextActionAdd:
//1 db中后一高度区块的父hash等于已执行最新区块的hash
plog.Info("Para sync add block",
"lastBlock.Height",lastBlock.Height,"lastLocalHeight",lastLocalHeight)
return false,client.addBlock(lastBlock, localBlock)
"lastBlock.Height", lastBlock.Height, "lastLocalHeight", lastLocalHeight)
return false, client.addBlock(lastBlock, localBlock)
case NextActionRollback:
//1 db中最新区块高度小于已执行最新区块高度
//2 db中最新区块高度等于已执行最新区块高度并且hash不同
//3 db中后一高度区块的父hash不等于已执行最新区块的hash
plog.Info("Para sync rollback block",
"lastBlock.Height",lastBlock.Height,"lastLocalHeight",lastLocalHeight)
return false,client.rollbackBlock(lastBlock)
"lastBlock.Height", lastBlock.Height, "lastLocalHeight", lastLocalHeight)
return false, client.rollbackBlock(lastBlock)
default: //NextActionKeep
//1 已完成同步,没有需要同步的块
return true,err
//1 已完成同步,没有需要同步的块
return true, err
}
}
//批量删除下载层缓冲数据
func (client *client) delLocalBlocks(startHeight int64,endHeight int64) error {
func (client *client) delLocalBlocks(startHeight int64, endHeight int64) error {
if startHeight > endHeight {
return errors.New("startHeight > endHeight,can't clear local blocks")
return errors.New("startHeight > endHeight,can't clear local blocks")
}
index := startHeight
......@@ -177,17 +178,17 @@ func (client *client) delLocalBlocks(startHeight int64,endHeight int64) error {
}
key := calcTitleFirstHeightKey(types.GetTitle())
kv := &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: endHeight+1})}
kv := &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: endHeight + 1})}
set.KV = append(set.KV, kv)
plog.Info("Para sync clear local blocks", "startHeight:",startHeight,"endHeight:",endHeight)
plog.Info("Para sync clear local blocks", "startHeight:", startHeight, "endHeight:", endHeight)
return client.setLocalDb(set)
}
//最低高度没有设置的时候设置一下最低高度
func (client *client) initFirstLocalHeightIfNeed() error {
height,err := client.getFirstLocalHeight()
height, err := client.getFirstLocalHeight()
if err != nil || height < 0 {
set := &types.LocalDBSet{}
......@@ -202,7 +203,7 @@ func (client *client) initFirstLocalHeightIfNeed() error {
}
//获取下载层缓冲数据的区块最低高度
func (client *client) getFirstLocalHeight() (int64,error) {
func (client *client) getFirstLocalHeight() (int64, error) {
key := calcTitleFirstHeightKey(types.GetTitle())
set := &types.LocalDBGet{Keys: [][]byte{key}}
value, err := client.getLocalDb(set, len(set.Keys))
......@@ -222,27 +223,27 @@ func (client *client) getFirstLocalHeight() (int64,error) {
}
//清除指定数量(localCacheCount)以前的区块
func (client *client) clearLocalOldBlocks() (bool,error) {
func (client *client) clearLocalOldBlocks() (bool, error) {
lastLocalHeight, err := client.getLastLocalHeight()
if err != nil {
return false,err
return false, err
}
firstLocalHeight,err := client.getFirstLocalHeight()
firstLocalHeight, err := client.getFirstLocalHeight()
if err != nil {
return false,err
return false, err
}
canDelCount := lastLocalHeight - firstLocalHeight - localCacheCount + 1
if canDelCount <= 0 {
return false,nil
return false, nil
}
return true,client.delLocalBlocks(firstLocalHeight,firstLocalHeight +canDelCount- 1)
return true, client.delLocalBlocks(firstLocalHeight, firstLocalHeight+canDelCount-1)
}
// miner tx need all para node create, but not all node has auth account, here just not sign to keep align
func (client *client) addMinerTx(preStateHash []byte, block *types.Block,localBlock *pt.ParaLocalDbBlock) error {
func (client *client) addMinerTx(preStateHash []byte, block *types.Block, localBlock *pt.ParaLocalDbBlock) error {
status := &pt.ParacrossNodeStatus{
Title: types.GetTitle(),
Height: block.Height,
......@@ -267,7 +268,7 @@ func (client *client) addMinerTx(preStateHash []byte, block *types.Block,localBl
}
//添加一个区块
func (client *client) addBlock(lastBlock *types.Block,localBlock *pt.ParaLocalDbBlock ) error {
func (client *client) addBlock(lastBlock *types.Block, localBlock *pt.ParaLocalDbBlock) error {
var newBlock types.Block
plog.Debug(fmt.Sprintf("the len txs is: %v", len(localBlock.Txs)))
......@@ -369,15 +370,15 @@ func (client *client) writeBlock(prev []byte, paraBlock *types.Block) error {
//设置同步状态,原子操作,线程访问安全,原则上只限于此线程单元使用
func (client *client) setSyncCaughtUp(isSyncCaughtUp bool) {
if isSyncCaughtUp {
atomic.StoreInt32(&client.syncCaughtUpAtom,1)
atomic.StoreInt32(&client.syncCaughtUpAtom, 1)
} else {
atomic.StoreInt32(&client.syncCaughtUpAtom,0)
atomic.StoreInt32(&client.syncCaughtUpAtom, 0)
}
}
//初始化下载状态,原则上只限于此线程单元使用
func (client *client) initLocalChangeState() {
atomic.StoreInt32(&client.localChangeAtom,0)
atomic.StoreInt32(&client.localChangeAtom, 0)
}
//获取当前是否有新的下载到来,获取一次,并马上把状态设置为没有新通知
......@@ -385,18 +386,16 @@ func (client *client) initLocalChangeState() {
func (client *client) getAndFlipLocalChangeStateIfNeed() bool {
hasLocalChange := atomic.LoadInt32(&client.localChangeAtom) == 1
if hasLocalChange {
atomic.StoreInt32(&client.localChangeAtom,0)
atomic.StoreInt32(&client.localChangeAtom, 0)
}
return hasLocalChange
}
//打印错误日志
func (client *client) printError(err error) {
plog.Error(fmt.Sprintf("----------------->Para Sync Block Error:%v", err.Error()))
}
//初始化
func (client *client) syncInit() {
client.setSyncCaughtUp(false)
......@@ -407,11 +406,3 @@ func (client *client) syncInit() {
client.printError(err)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment