Commit 022584d9 authored by mdj33's avatar mdj33 Committed by vipwzw

add jump dld para block

parent 8b282c75
......@@ -108,6 +108,8 @@ mainBlockHashForkHeight=209186
mainForkParacrossCommitTx=2270000
#主链开启循环检查共识交易done的fork高度,需要和主链保持严格一致,不可修改,4320000是bityuan主链对应高度, ycc或其他按实际修改
mainLoopCheckCommitTxDoneForkHeight=4320000
#主链交易验证merkelRoot主链分叉高度,需和主链保持一致,不可修改
mainVerifyMerkleRootForkHeight=-1
#主链每隔几个没有相关平行链交易的区块,平行链上打包空区块,缺省从平行链blockHeight=0开始,依次增长,空块间隔不能为0
[[consensus.sub.para.emptyBlockInterval]]
blockHeight=0
......
......@@ -58,6 +58,7 @@ type client struct {
commitMsgClient *commitMsgClient
blockSyncClient *blockSyncClient
multiDldCli *multiDldClient
jumpDldCli *jumpDldClient
minerPrivateKey crypto.PrivKey
wg sync.WaitGroup
subCfg *subConfig
......@@ -71,27 +72,27 @@ type emptyBlockInterval struct {
}
type subConfig struct {
WriteBlockSeconds int64 `json:"writeBlockSeconds,omitempty"`
ParaRemoteGrpcClient string `json:"paraRemoteGrpcClient,omitempty"`
StartHeight int64 `json:"startHeight,omitempty"`
GenesisStartHeightSame bool `json:"genesisStartHeightSame,omitempty"`
EmptyBlockInterval []*emptyBlockInterval `json:"emptyBlockInterval,omitempty"`
AuthAccount string `json:"authAccount,omitempty"`
WaitBlocks4CommitMsg int32 `json:"waitBlocks4CommitMsg,omitempty"`
GenesisAmount int64 `json:"genesisAmount,omitempty"`
MainBlockHashForkHeight int64 `json:"mainBlockHashForkHeight,omitempty"`
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
MaxCacheCount int64 `json:"maxCacheCount,omitempty"`
MaxSyncErrCount int32 `json:"maxSyncErrCount,omitempty"`
FetchFilterParaTxsClose bool `json:"fetchFilterParaTxsClose,omitempty"`
BatchFetchBlockCount int64 `json:"batchFetchBlockCount,omitempty"`
ParaConsensStartHeight int64 `json:"paraConsensStartHeight,omitempty"`
MultiDownloadOpen bool `json:"multiDownloadOpen,omitempty"`
MultiDownInvNumPerJob int64 `json:"multiDownInvNumPerJob,omitempty"`
MultiDownJobBuffNum uint32 `json:"multiDownJobBuffNum,omitempty"`
MultiDownServerRspTime uint32 `json:"multiDownServerRspTime,omitempty"`
RmCommitParamMainHeight int64 `json:"rmCommitParamMainHeight,omitempty"`
JumpDownloadClose bool `json:"jumpDownloadClose,omitempty"`
WriteBlockSeconds int64 `json:"writeBlockSeconds,omitempty"`
ParaRemoteGrpcClient string `json:"paraRemoteGrpcClient,omitempty"`
StartHeight int64 `json:"startHeight,omitempty"`
GenesisStartHeightSame bool `json:"genesisStartHeightSame,omitempty"`
EmptyBlockInterval []*emptyBlockInterval `json:"emptyBlockInterval,omitempty"`
AuthAccount string `json:"authAccount,omitempty"`
WaitBlocks4CommitMsg int32 `json:"waitBlocks4CommitMsg,omitempty"`
GenesisAmount int64 `json:"genesisAmount,omitempty"`
MainBlockHashForkHeight int64 `json:"mainBlockHashForkHeight,omitempty"`
MainVrfMerkleRootForkHeight int64 `json:"mainVrfMerkleRootForkHeight,omitempty"`
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
MaxCacheCount int64 `json:"maxCacheCount,omitempty"`
MaxSyncErrCount int32 `json:"maxSyncErrCount,omitempty"`
BatchFetchBlockCount int64 `json:"batchFetchBlockCount,omitempty"`
ParaConsensStartHeight int64 `json:"paraConsensStartHeight,omitempty"`
MultiDownloadOpen bool `json:"multiDownloadOpen,omitempty"`
MultiDownInvNumPerJob int64 `json:"multiDownInvNumPerJob,omitempty"`
MultiDownJobBuffNum uint32 `json:"multiDownJobBuffNum,omitempty"`
MultiDownServerRspTime uint32 `json:"multiDownServerRspTime,omitempty"`
RmCommitParamMainHeight int64 `json:"rmCommitParamMainHeight,omitempty"`
JumpDownloadOpen bool `json:"jumpDownloadOpen,omitempty"`
}
// New function to init paracross env
......@@ -199,6 +200,13 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
if subcfg.MultiDownServerRspTime > 0 {
para.multiDldCli.serverTimeout = subcfg.MultiDownServerRspTime
}
para.jumpDldCli = &jumpDldClient{paraClient: para}
if subcfg.MainVrfMerkleRootForkHeight <= 0 {
subcfg.MainVrfMerkleRootForkHeight = types.MaxHeight
}
c.SetChild(para)
return para
}
......
......@@ -25,7 +25,7 @@ func (client *client) createLocalGenesisBlock(genesis *types.Block) error {
return client.alignLocalBlock2ChainBlock(genesis)
}
func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *types.ParaTxDetail) error {
func getNewBlock(lastBlock *pt.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *types.ParaTxDetail) *pt.ParaLocalDbBlock {
var newblock pt.ParaLocalDbBlock
newblock.Height = lastBlock.Height + 1
......@@ -33,10 +33,13 @@ func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*ty
newblock.MainHeight = mainBlock.Header.Height
newblock.ParentMainHash = lastBlock.MainHash
newblock.BlockTime = mainBlock.Header.BlockTime
newblock.Txs = txs
err := client.addLocalBlock(newblock.Height, &newblock)
return &newblock
}
func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *types.ParaTxDetail) error {
err := client.addLocalBlock(getNewBlock(lastBlock, txs, mainBlock))
if err != nil {
return err
}
......@@ -67,11 +70,12 @@ func (client *client) alignLocalBlock2ChainBlock(chainBlock *types.Block) error
BlockTime: chainBlock.BlockTime,
}
return client.addLocalBlock(localBlock.Height, localBlock)
return client.addLocalBlock(localBlock)
}
//如果localdb里面没有信息,就从chain block返回,至少有创世区块,然后进入循环匹配切换场景
//如果localBlock被删除了,就从chain block获取,如果block能获取到,但远程seq获取不到,则返回当前块主链hash和错误的seq=0,
//然后请求交易校验不过会进入循环匹配切换场景
func (client *client) getLastLocalBlockSeq() (int64, []byte, error) {
height, err := client.getLastLocalHeight()
if err == nil {
......@@ -237,7 +241,7 @@ func (client *client) getBatchSeqCount(currSeq int64) (int64, error) {
} else {
atomic.StoreInt32(&client.caughtUp, 1)
}
if !client.subCfg.FetchFilterParaTxsClose && lastSeq-currSeq > client.subCfg.BatchFetchBlockCount {
if lastSeq-currSeq > client.subCfg.BatchFetchBlockCount {
return client.subCfg.BatchFetchBlockCount - 1, nil
}
return 0, nil
......@@ -353,11 +357,7 @@ func (client *client) requestFilterParaTxs(currSeq int64, count int64, preMainBl
}
func (client *client) RequestTx(currSeq int64, count int64, preMainBlockHash []byte) (*types.ParaTxDetails, error) {
if !client.subCfg.FetchFilterParaTxsClose {
return client.requestFilterParaTxs(currSeq, count, preMainBlockHash)
}
return client.requestTxsFromBlock(currSeq, preMainBlockHash)
return client.requestFilterParaTxs(currSeq, count, preMainBlockHash)
}
func (client *client) processHashNotMatchError(currSeq int64, lastSeqMainHash []byte, err error) (int64, []byte, error) {
......@@ -414,7 +414,6 @@ func (client *client) procLocalBlock(mainBlock *types.ParaTxDetail) (bool, error
plog.Info("Create empty block", "newHeight", lastBlock.Height+1)
}
return true, client.createLocalBlock(lastBlock, txs, mainBlock)
}
func (client *client) procLocalBlocks(mainBlocks *types.ParaTxDetails) error {
......@@ -435,10 +434,70 @@ func (client *client) procLocalBlocks(mainBlocks *types.ParaTxDetails) error {
return nil
}
func (client *client) procLocalAddBlock(mainBlock *types.ParaTxDetail, lastBlock *pt.ParaLocalDbBlock) *pt.ParaLocalDbBlock {
cfg := client.GetAPI().GetConfig()
curMainHeight := mainBlock.Header.Height
emptyInterval := client.getEmptyInterval(lastBlock)
txs := paraexec.FilterTxsForPara(cfg, mainBlock)
plog.Debug("Parachain process block", "lastBlockHeight", lastBlock.Height, "lastBlockMainHeight", lastBlock.MainHeight,
"lastBlockMainHash", common.ToHex(lastBlock.MainHash), "currMainHeight", curMainHeight,
"curMainHash", common.ToHex(mainBlock.Header.Hash), "emptyIntval", emptyInterval, "seqTy", mainBlock.Type)
if mainBlock.Type != types.AddBlock {
panic("para chain quick sync,not addBlock type")
}
//AddAct
if len(txs) == 0 {
if curMainHeight-lastBlock.MainHeight < emptyInterval {
return nil
}
plog.Debug("Create empty block", "newHeight", lastBlock.Height+1)
}
return getNewBlock(lastBlock, txs, mainBlock)
}
//只同步只有AddType的block,比如在当前高度1w高度前的主链blocks,默认没有分叉,只获取addType类型的,如果有分叉,也会在后面常规同步时候纠正过来
func (client *client) procLocalAddBlocks(mainBlocks *types.ParaTxDetails) error {
var blocks []*pt.ParaLocalDbBlock
lastBlock, err := client.getLastLocalBlock()
if err != nil {
plog.Error("procLocalAddBlocks getLastLocalBlock", "err", err)
return err
}
for _, main := range mainBlocks.Items {
b := client.procLocalAddBlock(main, lastBlock)
if b == nil {
continue
}
lastBlock = b
blocks = append(blocks, b)
}
err = client.saveBatchLocalBlocks(blocks)
if err != nil {
plog.Error("procLocalAddBlocks saveBatchLocalBlocks", "err", err)
panic(err)
}
plog.Info("procLocalAddBlocks.saveLocalBlocks", "start", blocks[0].Height, "end", blocks[len(blocks)-1].Height)
client.blockSyncClient.handleLocalChangedMsg()
return nil
}
func (client *client) CreateBlock() {
defer client.wg.Done()
client.multiDldCli.tryMultiServerDownload()
if client.subCfg.JumpDownloadOpen {
client.jumpDldCli.tryJumpDownload()
}
if client.subCfg.MultiDownloadOpen {
client.multiDldCli.tryMultiServerDownload()
}
lastSeq, lastSeqMainHash, err := client.getLastLocalBlockSeq()
if err != nil {
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package para
import (
"errors"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
)
func (client *client) setLocalDb(set *types.LocalDBSet) error {
//如果追赶上主链了,则落盘
if client.isCaughtUp() {
set.Txid = 1
client.blockSyncClient.handleLocalCaughtUpMsg()
}
msg := client.GetQueueClient().NewMessage("blockchain", types.EventSetValueByKey, set)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return err
}
if resp.GetData().(*types.Reply).IsOk {
return nil
}
return errors.New(string(resp.GetData().(*types.Reply).GetMsg()))
}
func (client *client) getLocalDb(set *types.LocalDBGet, count int) ([][]byte, error) {
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetValueByKey, set)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return nil, err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return nil, err
}
reply := resp.GetData().(*types.LocalReplyValue)
if len(reply.Values) != count {
plog.Error("Parachain getLocalDb count not match", "expert", count, "real", len(reply.Values))
return nil, types.ErrInvalidParam
}
return reply.Values, nil
}
func (client *client) addLocalBlock(block *pt.ParaLocalDbBlock) error {
cfg := client.GetAPI().GetConfig()
set := &types.LocalDBSet{}
key := calcTitleHeightKey(cfg.GetTitle(), block.Height)
kv := &types.KeyValue{Key: key, Value: types.Encode(block)}
set.KV = append(set.KV, kv)
//两个key原子操作
key = calcTitleLastHeightKey(cfg.GetTitle())
kv = &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: block.Height})}
set.KV = append(set.KV, kv)
return client.setLocalDb(set)
}
func (client *client) saveBatchLocalBlocks(blocks []*pt.ParaLocalDbBlock) error {
cfg := client.GetAPI().GetConfig()
set := &types.LocalDBSet{}
for _, block := range blocks {
key := calcTitleHeightKey(cfg.GetTitle(), block.Height)
kv := &types.KeyValue{Key: key, Value: types.Encode(block)}
set.KV = append(set.KV, kv)
}
//save lastHeight,两个key原子操作
key := calcTitleLastHeightKey(cfg.GetTitle())
kv := &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: blocks[len(blocks)-1].Height})}
set.KV = append(set.KV, kv)
return client.setLocalDb(set)
}
func (client *client) delLocalBlock(height int64) error {
cfg := client.GetAPI().GetConfig()
set := &types.LocalDBSet{}
key := calcTitleHeightKey(cfg.GetTitle(), height)
kv := &types.KeyValue{Key: key, Value: nil}
set.KV = append(set.KV, kv)
//两个key原子操作
key = calcTitleLastHeightKey(cfg.GetTitle())
kv = &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: height - 1})}
set.KV = append(set.KV, kv)
return client.setLocalDb(set)
}
// localblock 设置到当前高度,当前高度后面block会被新的区块覆盖
func (client *client) removeLocalBlocks(curHeight int64) error {
cfg := client.GetAPI().GetConfig()
set := &types.LocalDBSet{}
key := calcTitleLastHeightKey(cfg.GetTitle())
kv := &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: curHeight})}
set.KV = append(set.KV, kv)
return client.setLocalDb(set)
}
func (client *client) getLastLocalHeight() (int64, error) {
cfg := client.GetAPI().GetConfig()
key := calcTitleLastHeightKey(cfg.GetTitle())
set := &types.LocalDBGet{Keys: [][]byte{key}}
value, err := client.getLocalDb(set, len(set.Keys))
if err != nil {
return -1, err
}
if len(value) == 0 || value[0] == nil {
return -1, types.ErrNotFound
}
height := &types.Int64{}
err = types.Decode(value[0], height)
if err != nil {
return -1, err
}
return height.Data, nil
}
func (client *client) getLocalBlockByHeight(height int64) (*pt.ParaLocalDbBlock, error) {
cfg := client.GetAPI().GetConfig()
key := calcTitleHeightKey(cfg.GetTitle(), height)
set := &types.LocalDBGet{Keys: [][]byte{key}}
value, err := client.getLocalDb(set, len(set.Keys))
if err != nil {
return nil, err
}
if len(value) == 0 || value[0] == nil {
return nil, types.ErrNotFound
}
var block pt.ParaLocalDbBlock
err = types.Decode(value[0], &block)
if err != nil {
return nil, err
}
return &block, nil
}
func (client *client) saveMainBlock(height int64, block *types.ParaTxDetail) error {
cfg := client.GetAPI().GetConfig()
set := &types.LocalDBSet{}
key := calcTitleMainHeightKey(cfg.GetTitle(), height)
kv := &types.KeyValue{Key: key, Value: types.Encode(block)}
set.KV = append(set.KV, kv)
return client.setLocalDb(set)
}
func (client *client) saveBatchMainBlocks(txs *types.ParaTxDetails) error {
cfg := client.GetAPI().GetConfig()
set := &types.LocalDBSet{}
for _, block := range txs.Items {
key := calcTitleMainHeightKey(cfg.GetTitle(), block.Header.Height)
kv := &types.KeyValue{Key: key, Value: types.Encode(block)}
set.KV = append(set.KV, kv)
}
return client.setLocalDb(set)
}
func (client *client) rmvBatchMainBlocks(start, end int64) error {
cfg := client.GetAPI().GetConfig()
set := &types.LocalDBSet{}
for i := start; i < end; i++ {
key := calcTitleMainHeightKey(cfg.GetTitle(), i)
kv := &types.KeyValue{Key: key, Value: nil}
set.KV = append(set.KV, kv)
}
return client.setLocalDb(set)
}
func (client *client) getMainBlockFromDb(height int64) (*types.ParaTxDetail, error) {
cfg := client.GetAPI().GetConfig()
key := calcTitleMainHeightKey(cfg.GetTitle(), height)
set := &types.LocalDBGet{Keys: [][]byte{key}}
value, err := client.getLocalDb(set, len(set.Keys))
if err != nil {
return nil, err
}
if len(value) == 0 || value[0] == nil {
return nil, types.ErrNotFound
}
var tx types.ParaTxDetail
err = types.Decode(value[0], &tx)
if err != nil {
return nil, err
}
return &tx, nil
}
func (client *client) saveBatchMainHeaders(headers *types.Headers) error {
cfg := client.GetAPI().GetConfig()
set := &types.LocalDBSet{}
for _, header := range headers.Items {
key := calcTitleMainHeightKey(cfg.GetTitle(), header.Height)
kv := &types.KeyValue{Key: key, Value: types.Encode(&types.ParaTxDetail{Type: types.AddBlock, Header: header})}
set.KV = append(set.KV, kv)
}
return client.setLocalDb(set)
}
func (client *client) getBatchMainHeadersFromDb(start, end int64) (*types.ParaTxDetails, error) {
title := client.GetAPI().GetConfig().GetTitle()
set := &types.LocalDBGet{}
for i := start; i <= end; i++ {
key := calcTitleMainHeightKey(title, i)
set.Keys = append(set.Keys, key)
}
value, err := client.getLocalDb(set, len(set.Keys))
if err != nil {
return nil, err
}
if len(value) == 0 || value[0] == nil {
return nil, types.ErrNotFound
}
var txs types.ParaTxDetails
for _, v := range value {
var tx types.ParaTxDetail
err = types.Decode(v, &tx)
if err != nil {
return nil, err
}
txs.Items = append(txs.Items, &tx)
}
return &txs, nil
}
This diff is collapsed.
......@@ -159,10 +159,6 @@ func (m *multiDldClient) getConns(inv *inventory) error {
//缺省不打开,因为有些节点下载时间不稳定,容易超时出错,后面看怎么优化
func (m *multiDldClient) tryMultiServerDownload() {
if !m.paraClient.subCfg.MultiDownloadOpen {
return
}
curMainHeight, err := m.paraClient.GetLastHeightOnMainChain()
if err != nil {
plog.Error("tryMultiServerDownload getMain height", "err", err.Error())
......@@ -280,7 +276,7 @@ func (d *downloadJob) process() {
d.mDldCli.paraClient.blockSyncClient.handleLocalChangedMsg()
} else {
//block需要严格顺序执行,数据库错误,panic 重新来过
err := d.mDldCli.paraClient.procLocalBlocks(inv.txs)
err := d.mDldCli.paraClient.procLocalAddBlocks(inv.txs)
if err != nil {
panic(err)
}
......
......@@ -39,6 +39,8 @@ type blockSyncClient struct {
isSyncCaughtUpAtom int32
//isDownloadCaughtUpAtom 下载是否已经追赶上
isDownloadCaughtUpAtom int32
//isSyncFirstCaughtUp 系统启动后download 层和sync层第一次都追赶上的设置,后面如果因为回滚或节点切换不同步,则不考虑
isSyncFirstCaughtUp bool
}
//nextActionType 定义每一轮可执行操作
......@@ -471,7 +473,13 @@ func (client *blockSyncClient) rollbackBlock(block *types.Block) error {
func (client *blockSyncClient) writeBlock(prev []byte, paraBlock *types.Block) error {
//共识模块不执行block,统一由blockchain模块执行block并做去重的处理,返回执行后的blockdetail
blockDetail := &types.BlockDetail{Block: paraBlock}
paraBlockDetail := &types.ParaChainBlockDetail{Blockdetail: blockDetail, IsSync: client.downloadHasCaughtUp()}
//database刷盘设置,默认不刷盘,提高执行速度,系统启动后download 层和sync层第一次都追赶上设置为刷盘,后面如果有回滚或节点切换不同步,则不再改变,减少数据库损坏风险
if !client.isSyncFirstCaughtUp && client.downloadHasCaughtUp() && client.syncHasCaughtUp() {
client.isSyncFirstCaughtUp = true
plog.Info("Para sync - SyncFirstCaughtUp", "Height", paraBlock.Height)
}
paraBlockDetail := &types.ParaChainBlockDetail{Blockdetail: blockDetail, IsSync: client.isSyncFirstCaughtUp}
msg := client.paraClient.GetQueueClient().NewMessage("blockchain", types.EventAddParaChainBlockDetail, paraBlockDetail)
err := client.paraClient.GetQueueClient().Send(msg, true)
if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment