Commit 493afe81 authored by mdj33's avatar mdj33 Committed by vipwzw

para download improve

parent 7fdd37ad
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package para
import (
"fmt"
)
func calcTitleHeightKey(title string,height int64) []byte {
return []byte(fmt.Sprintf("TH-%s-%d", title,height))
}
func calcTitleLastHeightKey(title string) []byte {
return []byte(fmt.Sprintf("LH-%s", title))
}
......@@ -222,7 +222,7 @@ func (client *client) InitBlock() {
tx := client.CreateGenesisTx()
newblock.Txs = tx
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
client.WriteBlock(zeroHash[:], newblock, startSeq-1)
client.WriteBlock(zeroHash[:], newblock, startSeq)
} else {
client.SetCurrentBlock(block)
}
......@@ -234,8 +234,8 @@ func (client *client) InitBlock() {
// GetStartSeq get startSeq in mainchain
func (client *client) GetStartSeq(height int64) (int64, []byte) {
if height == 0 {
return 0, nil
if height <= 0 {
panic(fmt.Sprintf("startHeight(%d) should be more than 0 in mainchain", height))
}
lastHeight, err := client.GetLastHeightOnMainChain()
......@@ -262,7 +262,7 @@ func (client *client) GetStartSeq(height int64) (int64, []byte) {
hint.Stop()
plog.Info(fmt.Sprintf("lastHeight more than %d blocks after startHeight", minBlockNum), "lastHeight", lastHeight, "startHeight", height)
seq, hash, err := client.GetSeqByHeightOnMainChain(height)
seq, hash, err := client.GetSeqByHeightOnMainChain(height-1)
if err != nil {
panic(err)
}
......@@ -320,43 +320,29 @@ func (client *client) GetBlockByHeight(height int64) (*types.Block, error) {
return blockDetails.Items[0].Block, nil
}
// 获取上一个平行链对应主链seq,hash信息
// 对于平行链创世区块特殊场景:
// 1,创世区块seq从-1开始,也就是从主链0高度同步区块,主链seq从0开始,平行链对seq=0的区块校验时候做特殊处理,不校验parentHash
// 2,创世区块seq不是-1, 也就是从主链seq=n高度同步区块,此时创世区块记录了起始高度对应的主链hash,通过hash获取当前seq,然后创世区块需要倒退一个seq,lastSeq=n-1,
// 因为对于云端主链节点,创世区块记录seq在不同主链节点上差异很大,通过记录的主链hash获取的真实seq-1来使用,主链hash使用对应区块的parenthash做校验目的
func (client *client) getLastBlockMainInfo() (int64, []byte, error) {
lastSeq, lastBlock, err := client.getLastBlockInfo()
// 获取当前平行链block对应主链seq,hash信息
// 对于云端主链节点,创世区块记录seq在不同主链节点上差异很大,通过记录的主链hash获取真实seq使用
func (client *client) getLastBlockMainInfo() (int64, *types.Block, error) {
lastBlock, err := client.getLastBlockInfo()
if err != nil {
return -2, nil, err
}
if lastBlock.Height == 0 && lastSeq > -1 {
mainBlock, err := client.GetBlockOnMainByHash(lastBlock.MainHash)
if err != nil {
return -2, nil, err
}
mainSeq, err := client.GetSeqByHashOnMainChain(lastBlock.MainHash)
if err != nil {
return -2, nil, err
return client.reqChainMatchedBlock(0)
}
return mainSeq - 1, mainBlock.ParentHash, nil
}
return lastSeq, lastBlock.MainHash, nil
return mainSeq, lastBlock, nil
}
func (client *client) getLastBlockInfo() (int64, *types.Block, error) {
func (client *client) getLastBlockInfo() (*types.Block, error) {
lastBlock, err := client.RequestLastBlock()
if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err)
return -2, nil, err
}
blockedSeq, err := client.GetBlockedSeq(lastBlock.Hash())
if err != nil {
plog.Error("Parachain GetBlockedSeq fail", "err", err)
return -2, nil, err
return nil, err
}
return blockedSeq, lastBlock, nil
return lastBlock, nil
}
func (client *client) GetForkHeightOnMainChain(key string) (int64, error) {
......@@ -409,7 +395,7 @@ func (client *client) GetHashByHeightOnMainChain(height int64) ([]byte, error) {
func (client *client) GetSeqByHashOnMainChain(hash []byte) (int64, error) {
seq, err := client.grpcClient.GetSequenceByHash(context.Background(), &types.ReqHash{Hash: hash})
if err != nil {
plog.Error("GetSeqByHashOnMainChain", "Error", err.Error())
plog.Error("GetSeqByHashOnMainChain", "Error", err.Error(),"hash",hex.EncodeToString(hash))
return -1, err
}
//the reflect checked in grpcHandle
......@@ -498,18 +484,18 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type
//genesis block scenario, new main node's blockHash as preMainHash, genesis sequence+1 as currSeq
// for genesis seq=-1 scenario, mainHash not care, as the 0 seq instead of -1
// not seq=-1 scenario, mainHash needed
func (client *client) syncFromGenesisBlock() (int64, []byte, error) {
lastSeq, lastMainHash, err := client.getLastBlockMainInfo()
func (client *client) syncFromGenesisBlock() (int64, *types.Block, error) {
lastSeq, lastBlock, err := client.getLastBlockMainInfo()
if err != nil {
plog.Error("Parachain getLastBlockInfo fail", "err", err)
return -2, nil, err
}
plog.Info("syncFromGenesisBlock sync from height 0")
return lastSeq + 1, lastMainHash, nil
return lastSeq, lastBlock, nil
}
// search base on para block but not last MainBlockHash, last MainBlockHash can not back tracing
func (client *client) switchHashMatchedBlock(currSeq int64) (int64, []byte, error) {
func (client *client) switchHashMatchedBlock(currSeq int64) (int64, *types.Block, error) {
lastBlock, err := client.RequestLastBlock()
if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err)
......@@ -559,7 +545,7 @@ func (client *client) switchHashMatchedBlock(currSeq int64) (int64, []byte, erro
plog.Info("switchHashMatchedBlock succ", "currHeight", height, "initHeight", lastBlock.Height,
"new currSeq", mainSeq+1, "new preMainBlockHash", hex.EncodeToString(block.MainHash))
return mainSeq + 1, block.MainHash, nil
return mainSeq, block, nil
}
return -2, nil, paracross.ErrParaCurHashNotMatch
}
......@@ -615,15 +601,17 @@ func (client *client) CreateBlock() {
txs, blockOnMain, err := client.RequestTx(currSeq, lastSeqMainHash)
if err != nil {
incSeqFlag = false
if err == paracross.ErrParaCurHashNotMatch {
newSeq, newSeqMainHash, err := client.switchHashMatchedBlock(currSeq)
if err == nil {
incSeqFlag = true
currSeq = newSeq
lastSeqMainHash = newSeqMainHash
continue
}
}
incSeqFlag = false
time.Sleep(time.Second * time.Duration(blockSec))
continue
}
......@@ -634,7 +622,7 @@ func (client *client) CreateBlock() {
lastSeqMainHash = blockOnMain.Detail.Block.ParentHash
}
lastBlockSeq, lastBlock, err := client.getLastBlockInfo()
lastBlock, err := client.getLastBlockInfo()
if err != nil {
plog.Error("Parachain getLastBlockInfo fail", "err", err)
time.Sleep(time.Second)
......@@ -642,7 +630,7 @@ func (client *client) CreateBlock() {
}
plog.Info("Parachain process block", "lastSeq", lastSeq, "curSeq", currSeq,
"lastBlockHeight", lastBlock.Height, "lastBlockSeq", lastBlockSeq,
"lastBlockHeight", lastBlock.Height,
"currSeqMainHeight", lastSeqMainHeight, "currSeqMainHash", common.ToHex(lastSeqMainHash),
"lastBlockMainHeight", lastBlock.MainHeight, "lastBlockMainHash", common.ToHex(lastBlock.MainHash), "seqTy", blockOnMain.Seq.Type)
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package para
import (
"time"
"errors"
"github.com/33cn/chain33/common"
paracross "github.com/33cn/plugin/plugin/dapp/paracross/types"
"github.com/33cn/chain33/types"
"encoding/hex"
)
func (client *client) setLocalBlock(set *types.LocalDBSet) (error) {
//如果追赶上主链了,则落盘
if client.isCaughtUp{
set.Txid = 1
}
msg := client.GetQueueClient().NewMessage("blockchain", types.EventSetValueByKey, set)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return err
}
if resp.GetData().(*types.Reply).IsOk{
return nil
}
return errors.New(string(resp.GetData().(*types.Reply).GetMsg()))
}
func (client *client) addLocalBlock(height int64, block *paracross.ParaLocalDbBlock) (error) {
set := &types.LocalDBSet{}
key := calcTitleHeightKey(types.GetTitle(),height)
kv := &types.KeyValue{Key: key, Value: types.Encode(block)}
set.KV = append(set.KV,kv)
//两个key原子操作
key = calcTitleLastHeightKey(types.GetTitle())
kv = &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: height})}
set.KV = append(set.KV,kv)
return client.setLocalBlock(set)
}
func (client *client) createLocalBlock(lastBlock *paracross.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *types.BlockSeq) error {
var newblock paracross.ParaLocalDbBlock
newblock.Height = lastBlock.Height + 1
newblock.Txs = txs
newblock.BlockTime = mainBlock.Detail.Block.BlockTime
newblock.MainHash = mainBlock.Seq.Hash
newblock.MainHeight = mainBlock.Detail.Block.Height
return client.addLocalBlock(newblock.Height, &newblock)
}
func (client *client) delLocalBlock(height int64) (error) {
set := &types.LocalDBSet{}
key := calcTitleHeightKey(types.GetTitle(),height)
kv := &types.KeyValue{Key: key, Value: nil}
set.KV = append(set.KV,kv)
//两个key原子操作
key = calcTitleLastHeightKey(types.GetTitle())
kv = &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: height-1})}
set.KV = append(set.KV,kv)
return client.setLocalBlock(set)
}
// localblock 最小高度不为0,如果minHeight=0,则把localblocks 清空,只设置lastHeight key
func (client *client) removeLocalBlocks(minHeight int64) error {
set := &types.LocalDBSet{}
key := calcTitleLastHeightKey(types.GetTitle())
if minHeight >0{
kv := &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: minHeight})}
set.KV = append(set.KV,kv)
}else {
kv := &types.KeyValue{Key: key, Value: nil}
set.KV = append(set.KV,kv)
}
return client.setLocalBlock(set)
}
func (client *client) getFromLocalDb(set *types.LocalDBGet, count int) ([][]byte, error) {
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetValueByKey, set)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return nil,err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return nil,err
}
reply := resp.GetData().(*types.LocalReplyValue)
if len(reply.Values) != count{
plog.Error("Parachain getFromLocalDb count not match", "expert", count,"real",len(reply.Values))
return nil, types.ErrInvalidParam
}
return reply.Values,nil
}
func (client *client) getLastLocalHeight() (int64, error) {
key := calcTitleLastHeightKey(types.GetTitle())
set := &types.LocalDBGet{Keys:[][]byte{key}}
value,err := client.getFromLocalDb(set,len(set.Keys))
if err != nil{
return -1, err
}
if value[0] == nil{
return -1, types.ErrNotFound
}
height := &types.Int64{}
err = types.Decode(value[0],height)
if err != nil{
return -1,err
}
return height.Data,nil
}
func (client *client) getLocalBlockByHeight(height int64) (*paracross.ParaLocalDbBlock, error) {
key := calcTitleHeightKey(types.GetTitle(),height)
set := &types.LocalDBGet{Keys:[][]byte{key}}
value,err := client.getFromLocalDb(set,len(set.Keys))
if err != nil{
return nil, err
}
if value[0] == nil{
return nil, types.ErrNotFound
}
var block paracross.ParaLocalDbBlock
err = types.Decode(value[0],&block)
if err != nil{
return nil,err
}
return &block,nil
}
//TODO 是否考虑mainHash获取不到,回溯查找?
func (client *client) getLocalBlockInfoByHeight(height int64) (int64, []byte, error) {
lastBlock, err := client.getLocalBlockByHeight(height)
if err != nil {
return -2, nil, err
}
mainSeq, err := client.GetSeqByHashOnMainChain(lastBlock.MainHash)
if err != nil {
return -2, nil, err
}
return mainSeq, lastBlock.MainHash, nil
}
func (client *client) setLocalBlockByChainBlock(chainBlock *types.Block) (error) {
//根据匹配上的chainblock,设置当前localdb block
localBlock := &paracross.ParaLocalDbBlock{
Height:chainBlock.Height,
MainHeight:chainBlock.MainHeight,
MainHash:chainBlock.MainHash,
BlockTime:chainBlock.BlockTime,
}
return client.addLocalBlock(localBlock.Height,localBlock)
}
//如果localdb里面没有信息,就从chain block返回,至少有创世区块,然后进入循环匹配切换场景
func (client *client) getLastLocalBlockInfo() (int64, []byte, error) {
height,err := client.getLastLocalHeight()
if err == nil{
mainSeq,mainHash,err := client.getLocalBlockInfoByHeight(height)
if err == nil{
return mainSeq, mainHash,nil
}
}
mainSeq,chainBlock,err := client.getLastBlockMainInfo()
if err != nil{
return -2,nil,err
}
err = client.setLocalBlockByChainBlock(chainBlock)
if err != nil{
return -2,nil,err
}
return mainSeq,chainBlock.MainHash,nil
}
func (client *client) getLastDbBlock() (*paracross.ParaLocalDbBlock, error) {
height,err := client.getLastLocalHeight()
if err != nil {
return nil, err
}
return client.getLocalBlockByHeight(height)
}
func (client *client) reqChainMatchedBlock(startHeight int64) (int64, *types.Block, error) {
lastBlock, err := client.RequestLastBlock()
if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err)
return -2, nil, err
}
if lastBlock.Height == 0 {
return client.syncFromGenesisBlock()
}
if startHeight == 0 || startHeight > lastBlock.Height{
startHeight = lastBlock.Height
}
depth := searchHashMatchDepth
for height := startHeight; height > 0 && depth > 0; height-- {
block, err := client.GetBlockByHeight(height)
if err != nil {
return -2, nil, err
}
//当前block结构已经有mainHash和MainHeight但是从blockchain获取的block还没有写入,以后如果获取到,可以替换从minerTx获取
plog.Info("switchHashMatchedBlock", "lastParaBlockHeight", height, "mainHeight",
block.MainHeight, "mainHash", hex.EncodeToString(block.MainHash))
mainSeq, err := client.GetSeqByHashOnMainChain(block.MainHash)
if err != nil {
depth--
if depth == 0 {
plog.Error("switchHashMatchedBlock depth overflow", "last info:mainHeight", block.MainHeight,
"mainHash", hex.EncodeToString(block.MainHash), "search startHeight", lastBlock.Height, "curHeight", height,
"search depth", searchHashMatchDepth)
panic("search HashMatchedBlock overflow, re-setting search depth and restart to try")
}
if height == 1 {
plog.Error("switchHashMatchedBlock search to height=1 not found", "lastBlockHeight", lastBlock.Height,
"height1 mainHash", hex.EncodeToString(block.MainHash))
return client.syncFromGenesisBlock()
}
continue
}
plog.Info("reqChainMatchedBlock succ", "currHeight", height, "initHeight", lastBlock.Height,
"new currSeq", mainSeq, "new preMainBlockHash", hex.EncodeToString(block.MainHash))
return mainSeq, block, nil
}
return -2, nil, paracross.ErrParaCurHashNotMatch
}
func (client *client) switchChainMatchedBlock(startHeight int64) (int64, []byte, error) {
mainSeq, chainBlock,err := client.reqChainMatchedBlock(startHeight)
if err != nil{
return -2,nil,err
}
err = client.setLocalBlockByChainBlock(chainBlock)
if err != nil{
return -2,nil,err
}
return mainSeq,chainBlock.MainHash,nil
}
// search base on para block but not last MainBlockHash, last MainBlockHash can not back tracing
func (client *client) switchLocalHashMatchedBlock(currSeq int64) (int64, []byte, error) {
lastBlock, err := client.getLastDbBlock()
if err != nil {
if err == types.ErrNotFound{
//TODO 或者通知执行层去切换
return client.switchChainMatchedBlock(0)
}
plog.Error("Parachain RequestLastBlock fail", "err", err)
return -2, nil, err
}
for height := lastBlock.Height; height > 0 ; height-- {
block, err := client.getLocalBlockByHeight(height)
if err != nil {
if err == types.ErrNotFound{
plog.Error("switchLocalHashMatchedBlock search not found", "lastBlockHeight", height)
err = client.removeLocalBlocks(height)
if err != nil{
return -2, nil, err
}
return client.switchChainMatchedBlock(height)
}
return -2, nil, err
}
//当前block结构已经有mainHash和MainHeight但是从blockchain获取的block还没有写入,以后如果获取到,可以替换从minerTx获取
plog.Info("switchLocalHashMatchedBlock", "lastlocalBlockHeight", height, "mainHeight",
block.MainHeight, "mainHash", hex.EncodeToString(block.MainHash))
mainSeq, err := client.GetSeqByHashOnMainChain(block.MainHash)
if err != nil {
continue
}
//remove fail, the para chain may be remove part, set the preMainBlockHash to nil, to match nothing, force to search from last
err = client.removeLocalBlocks(height)
if err != nil {
return -2, nil, err
}
plog.Info("switchLocalHashMatchedBlock succ", "currHeight", height, "initHeight", lastBlock.Height,
"currSeq", mainSeq, "currMainBlockHash", hex.EncodeToString(block.MainHash))
return mainSeq, block.MainHash, nil
}
return -2, nil, paracross.ErrParaCurHashNotMatch
}
func (client *client) downloadBlocks() {
lastSeq, lastSeqMainHash, err := client.getLastLocalBlockInfo()
if err != nil {
plog.Error("Parachain CreateBlock getLastLocalBlockInfo fail", "err", err.Error())
return
}
currSeq := lastSeq+1
for {
txs, mainBlock, err := client.RequestTx(currSeq, lastSeqMainHash)
if err != nil {
if err == paracross.ErrParaCurHashNotMatch {
preSeq, preSeqMainHash, err := client.switchLocalHashMatchedBlock(currSeq)
if err == nil {
currSeq = preSeq+1
lastSeqMainHash = preSeqMainHash
continue
}
}
time.Sleep(time.Second * time.Duration(blockSec))
continue
}
lastSeqMainHeight := mainBlock.Detail.Block.Height
lastSeqMainHash = mainBlock.Seq.Hash
if mainBlock.Seq.Type == delAct {
lastSeqMainHash = mainBlock.Detail.Block.ParentHash
}
lastBlock, err := client.getLastDbBlock()
if err != nil && err != types.ErrNotFound{
plog.Error("Parachain getLastDbBlock", "err", err)
time.Sleep(time.Second)
continue
}
plog.Info("Parachain process block", "curSeq", currSeq,"lastBlockHeight", lastBlock.Height,
"currSeqMainHeight", lastSeqMainHeight, "currSeqMainHash", common.ToHex(lastSeqMainHash),
"lastBlockMainHeight", lastBlock.MainHeight, "lastBlockMainHash", common.ToHex(lastBlock.MainHash), "seqTy", mainBlock.Seq.Type)
if mainBlock.Seq.Type == delAct {
if len(txs) == 0 {
if lastSeqMainHeight > lastBlock.MainHeight {
currSeq++
continue
}
plog.Info("Delete empty block")
}
err = client.delLocalBlock(lastBlock.Height)
} else if mainBlock.Seq.Type == addAct {
if len(txs) == 0 {
if lastSeqMainHeight-lastBlock.MainHeight < emptyBlockInterval {
currSeq++
continue
}
plog.Info("Create empty block")
}
err = client.createLocalBlock(lastBlock, txs, mainBlock)
} else {
err = types.ErrInvalidParam
}
if err != nil{
plog.Error("para DownloadBlocks", "type",mainBlock.Seq.Type,"err",err.Error())
time.Sleep(time.Second)
continue
}
currSeq++
}
}
......@@ -301,6 +301,14 @@ message ParacrossAsset {
bool success = 23;
}
message ParaLocalDbBlock {
int64 height = 1;
bytes mainHash = 2;
int64 mainHeight = 3;
bytes parentMainHash = 4;
int64 blockTime = 5;
repeated Transaction txs = 6;
}
service paracross {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment