Commit f6f7ee68 authored by mdj33's avatar mdj33 Committed by vipwzw

improve

parent b56786e6
...@@ -5,8 +5,6 @@ ...@@ -5,8 +5,6 @@
package para package para
import ( import (
"bytes"
"context"
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
...@@ -25,7 +23,6 @@ import ( ...@@ -25,7 +23,6 @@ import (
drivers "github.com/33cn/chain33/system/consensus" drivers "github.com/33cn/chain33/system/consensus"
cty "github.com/33cn/chain33/system/dapp/coins/types" cty "github.com/33cn/chain33/system/dapp/coins/types"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
paraexec "github.com/33cn/plugin/plugin/dapp/paracross/executor"
paracross "github.com/33cn/plugin/plugin/dapp/paracross/types" paracross "github.com/33cn/plugin/plugin/dapp/paracross/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types" pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
) )
...@@ -223,12 +220,12 @@ func (client *client) InitBlock() { ...@@ -223,12 +220,12 @@ func (client *client) InitBlock() {
newblock.Txs = tx newblock.Txs = tx
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs) newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
err := client.WriteBlock(zeroHash[:], newblock, startSeq) err := client.WriteBlock(zeroHash[:], newblock, startSeq)
if err != nil{ if err != nil {
panic(fmt.Sprintf("para chain create genesis block,err=%s",err.Error())) panic(fmt.Sprintf("para chain create genesis block,err=%s", err.Error()))
} }
err = client.createLocalGenesisBlock(newblock) err = client.createLocalGenesisBlock(newblock)
if err != nil{ if err != nil {
panic(fmt.Sprintf("para chain create local genesis block,err=%s",err.Error())) panic(fmt.Sprintf("para chain create local genesis block,err=%s", err.Error()))
} }
} else { } else {
...@@ -295,269 +292,6 @@ func (client *client) ProcEvent(msg *queue.Message) bool { ...@@ -295,269 +292,6 @@ func (client *client) ProcEvent(msg *queue.Message) bool {
return false return false
} }
//get the last sequence in parachain
func (client *client) GetLastSeq() (int64, error) {
blockedSeq, err := client.GetAPI().GetLastBlockMainSequence()
if err != nil {
return -2, err
}
return blockedSeq.Data, nil
}
func (client *client) GetBlockedSeq(hash []byte) (int64, error) {
//from blockchain db
blockedSeq, err := client.GetAPI().GetMainSequenceByHash(&types.ReqHash{Hash: hash})
if err != nil {
return -2, err
}
return blockedSeq.Data, nil
}
func (client *client) GetBlockByHeight(height int64) (*types.Block, error) {
//from blockchain db
blockDetails, err := client.GetAPI().GetBlocks(&types.ReqBlocks{Start: height, End: height})
if err != nil {
plog.Error("paracommitmsg get node status block count fail")
return nil, err
}
if 1 != int64(len(blockDetails.Items)) {
plog.Error("paracommitmsg get node status block count fail")
return nil, types.ErrInvalidParam
}
return blockDetails.Items[0].Block, nil
}
// 获取当前平行链block对应主链seq,hash信息
// 对于云端主链节点,创世区块记录seq在不同主链节点上差异很大,通过记录的主链hash获取真实seq使用
func (client *client) getLastBlockMainInfo() (int64, *types.Block, error) {
lastBlock, err := client.getLastBlockInfo()
if err != nil {
return -2, nil, err
}
mainSeq, err := client.GetSeqByHashOnMainChain(lastBlock.MainHash)
if err != nil {
return client.reqMatchedBlockOnChain(0)
}
return mainSeq, lastBlock, nil
}
func (client *client) getLastBlockInfo() (*types.Block, error) {
lastBlock, err := client.RequestLastBlock()
if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err)
return nil, err
}
return lastBlock, nil
}
func (client *client) GetForkHeightOnMainChain(key string) (int64, error) {
ret, err := client.grpcClient.GetFork(context.Background(), &types.ReqKey{Key: []byte(key)})
if err != nil {
plog.Error("para get rpc ForkHeight fail", "key", key, "err", err.Error())
return types.MaxHeight, err
}
return ret.Data, nil
}
func (client *client) GetLastHeightOnMainChain() (int64, error) {
header, err := client.grpcClient.GetLastHeader(context.Background(), &types.ReqNil{})
if err != nil {
plog.Error("GetLastHeightOnMainChain", "Error", err.Error())
return -1, err
}
return header.Height, nil
}
func (client *client) GetLastSeqOnMainChain() (int64, error) {
seq, err := client.grpcClient.GetLastBlockSequence(context.Background(), &types.ReqNil{})
if err != nil {
plog.Error("GetLastSeqOnMainChain", "Error", err.Error())
return -1, err
}
//the reflect checked in grpcHandle
return seq.Data, nil
}
func (client *client) GetSeqByHeightOnMainChain(height int64) (int64, []byte, error) {
hash, err := client.GetHashByHeightOnMainChain(height)
if err != nil {
return -1, nil, err
}
seq, err := client.GetSeqByHashOnMainChain(hash)
return seq, hash, err
}
func (client *client) GetHashByHeightOnMainChain(height int64) ([]byte, error) {
reply, err := client.grpcClient.GetBlockHash(context.Background(), &types.ReqInt{Height: height})
if err != nil {
plog.Error("GetHashByHeightOnMainChain", "Error", err.Error())
return nil, err
}
return reply.Hash, nil
}
func (client *client) GetSeqByHashOnMainChain(hash []byte) (int64, error) {
seq, err := client.grpcClient.GetSequenceByHash(context.Background(), &types.ReqHash{Hash: hash})
if err != nil {
plog.Error("GetSeqByHashOnMainChain", "Error", err.Error(), "hash", hex.EncodeToString(hash))
return -1, err
}
//the reflect checked in grpcHandle
return seq.Data, nil
}
func (client *client) GetBlockOnMainBySeq(seq int64) (*types.BlockSeq, error) {
blockSeq, err := client.grpcClient.GetBlockBySeq(context.Background(), &types.Int64{Data: seq})
if err != nil {
plog.Error("Not found block on main", "seq", seq)
return nil, err
}
hash := blockSeq.Detail.Block.HashByForkHeight(mainBlockHashForkHeight)
if !bytes.Equal(blockSeq.Seq.Hash, hash) {
plog.Error("para compare ForkBlockHash fail", "forkHeight", mainBlockHashForkHeight,
"seqHash", hex.EncodeToString(blockSeq.Seq.Hash), "calcHash", hex.EncodeToString(hash))
return nil, types.ErrBlockHashNoMatch
}
return blockSeq, nil
}
func (client *client) GetBlockOnMainByHash(hash []byte) (*types.Block, error) {
blocks, err := client.grpcClient.GetBlockByHashes(context.Background(), &types.ReqHashes{Hashes: [][]byte{hash}})
if err != nil || blocks.Items[0] == nil {
plog.Error("GetBlockOnMainByHash Not found", "blockhash", common.ToHex(hash))
return nil, err
}
return blocks.Items[0].Block, nil
}
// preBlockHash to identify the same main node
func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*types.Transaction, *types.BlockSeq, error) {
plog.Debug("Para consensus RequestTx")
lastSeq, err := client.GetLastSeqOnMainChain()
if err != nil {
return nil, nil, err
}
plog.Info("RequestTx", "LastMainSeq", lastSeq, "CurrSeq", currSeq)
if lastSeq >= currSeq {
blockSeq, err := client.GetBlockOnMainBySeq(currSeq)
if err != nil {
return nil, nil, err
}
//genesis block start with seq=-1 not check
if currSeq == 0 ||
(bytes.Equal(preMainBlockHash, blockSeq.Detail.Block.ParentHash) && blockSeq.Seq.Type == addAct) ||
(bytes.Equal(preMainBlockHash, blockSeq.Seq.Hash) && blockSeq.Seq.Type == delAct) {
txs := paraexec.FilterTxsForPara(types.GetTitle(), blockSeq.Detail)
plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", blockSeq.Seq.Type)
client.mtx.Lock()
if lastSeq-currSeq > emptyBlockInterval {
client.isCaughtUp = false
} else {
client.isCaughtUp = true
}
client.mtx.Unlock()
if client.authAccount != "" {
client.commitMsgClient.onMainBlockAdded(blockSeq.Detail)
}
return txs, blockSeq, nil
}
//not consistent case be processed at below
plog.Error("RequestTx", "preMainHash", hex.EncodeToString(preMainBlockHash), "currSeq preMainHash", hex.EncodeToString(blockSeq.Detail.Block.ParentHash),
"currSeq mainHash", hex.EncodeToString(blockSeq.Seq.Hash), "curr seq", currSeq, "ty", blockSeq.Seq.Type, "currSeq Mainheight", blockSeq.Detail.Block.Height)
return nil, nil, paracross.ErrParaCurHashNotMatch
}
//lastSeq < CurrSeq case:
//lastSeq = currSeq-1, main node not update
if lastSeq+1 == currSeq {
plog.Debug("Waiting new sequence from main chain")
return nil, nil, paracross.ErrParaWaitingNewSeq
}
// 1. lastSeq < currSeq-1
// 2. lastSeq >= currSeq and seq not consistent or fork case
return nil, nil, paracross.ErrParaCurHashNotMatch
}
//genesis block scenario, new main node's blockHash as preMainHash, genesis sequence+1 as currSeq
// for genesis seq=-1 scenario, mainHash not care, as the 0 seq instead of -1
// not seq=-1 scenario, mainHash needed
func (client *client) syncFromGenesisBlock() (int64, *types.Block, error) {
lastSeq, lastBlock, err := client.getLastBlockMainInfo()
if err != nil {
plog.Error("Parachain getLastBlockInfo fail", "err", err)
return -2, nil, err
}
plog.Info("syncFromGenesisBlock sync from height 0")
return lastSeq, lastBlock, nil
}
// search base on para block but not last MainBlockHash, last MainBlockHash can not back tracing
func (client *client) switchHashMatchedBlock(currSeq int64) (int64, *types.Block, error) {
lastBlock, err := client.RequestLastBlock()
if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err)
return -2, nil, err
}
if lastBlock.Height == 0 {
return client.syncFromGenesisBlock()
}
depth := searchHashMatchDepth
for height := lastBlock.Height; height > 0 && depth > 0; height-- {
block, err := client.GetBlockByHeight(height)
if err != nil {
return -2, nil, err
}
//当前block结构已经有mainHash和MainHeight但是从blockchain获取的block还没有写入,以后如果获取到,可以替换从minerTx获取
plog.Info("switchHashMatchedBlock", "lastParaBlockHeight", height, "mainHeight",
block.MainHeight, "mainHash", hex.EncodeToString(block.MainHash))
mainSeq, err := client.GetSeqByHashOnMainChain(block.MainHash)
if err != nil {
depth--
if depth == 0 {
plog.Error("switchHashMatchedBlock depth overflow", "last info:mainHeight", block.MainHeight,
"mainHash", hex.EncodeToString(block.MainHash), "search startHeight", lastBlock.Height, "curHeight", height,
"search depth", searchHashMatchDepth)
panic("search HashMatchedBlock overflow, re-setting search depth and restart to try")
}
if height == 1 {
plog.Error("switchHashMatchedBlock search to height=1 not found", "lastBlockHeight", lastBlock.Height,
"height1 mainHash", hex.EncodeToString(block.MainHash))
err = client.removeBlocks(0)
if err != nil {
return currSeq, nil, nil
}
return client.syncFromGenesisBlock()
}
continue
}
//remove fail, the para chain may be remove part, set the preMainBlockHash to nil, to match nothing, force to search from last
err = client.removeBlocks(height)
if err != nil {
return currSeq, nil, nil
}
plog.Info("switchHashMatchedBlock succ", "currHeight", height, "initHeight", lastBlock.Height,
"new currSeq", mainSeq+1, "new preMainBlockHash", hex.EncodeToString(block.MainHash))
return mainSeq, block, nil
}
return -2, nil, paracross.ErrParaCurHashNotMatch
}
func (client *client) removeBlocks(endHeight int64) error { func (client *client) removeBlocks(endHeight int64) error {
for { for {
lastBlock, err := client.RequestLastBlock() lastBlock, err := client.RequestLastBlock()
...@@ -584,100 +318,6 @@ func (client *client) removeBlocks(endHeight int64) error { ...@@ -584,100 +318,6 @@ func (client *client) removeBlocks(endHeight int64) error {
} }
} }
//正常情况下,打包交易
func (client *client) CreateBlock() {
incSeqFlag := true
//system startup, take the last added block's seq is ok
currSeq, lastSeqMainHash, err := client.getLastBlockMainInfo()
if err != nil {
plog.Error("Parachain CreateBlock getLastBlockMainInfo fail", "err", err.Error())
return
}
for {
//should be lastSeq but not LastBlockSeq as del block case the seq is not equal
lastSeq, err := client.GetLastSeq()
if err != nil {
plog.Error("Parachain GetLastSeq fail", "err", err.Error())
time.Sleep(time.Second)
continue
}
if incSeqFlag || currSeq == lastSeq {
currSeq++
}
txs, blockOnMain, err := client.RequestTx(currSeq, lastSeqMainHash)
if err != nil {
if err == paracross.ErrParaCurHashNotMatch {
newSeq, newSeqMainHash, err := client.switchHashMatchedBlock(currSeq)
if err == nil {
incSeqFlag = true
currSeq = newSeq
lastSeqMainHash = newSeqMainHash
continue
}
}
incSeqFlag = false
time.Sleep(time.Second * time.Duration(blockSec))
continue
}
lastSeqMainHeight := blockOnMain.Detail.Block.Height
lastSeqMainHash = blockOnMain.Seq.Hash
if blockOnMain.Seq.Type == delAct {
lastSeqMainHash = blockOnMain.Detail.Block.ParentHash
}
lastBlock, err := client.getLastBlockInfo()
if err != nil {
plog.Error("Parachain getLastBlockInfo fail", "err", err)
time.Sleep(time.Second)
continue
}
plog.Info("Parachain process block", "lastSeq", lastSeq, "curSeq", currSeq,
"lastBlockHeight", lastBlock.Height,
"currSeqMainHeight", lastSeqMainHeight, "currSeqMainHash", common.ToHex(lastSeqMainHash),
"lastBlockMainHeight", lastBlock.MainHeight, "lastBlockMainHash", common.ToHex(lastBlock.MainHash), "seqTy", blockOnMain.Seq.Type)
if blockOnMain.Seq.Type == delAct {
if len(txs) == 0 {
if lastSeqMainHeight > lastBlock.MainHeight {
incSeqFlag = true
continue
}
plog.Info("Delete empty block")
}
err := client.DelBlock(lastBlock, currSeq)
incSeqFlag = false
if err != nil {
plog.Error(fmt.Sprintf("********************err:%v", err.Error()))
}
} else if blockOnMain.Seq.Type == addAct {
if len(txs) == 0 {
if lastSeqMainHeight-lastBlock.MainHeight < emptyBlockInterval {
incSeqFlag = true
continue
}
plog.Info("Create empty block")
}
err := client.createBlock(lastBlock, txs, currSeq, blockOnMain)
incSeqFlag = false
if err != nil {
plog.Error(fmt.Sprintf("********************err:%v", err.Error()))
}
} else {
plog.Error("Incorrect sequence type")
incSeqFlag = false
}
if client.isCaughtUp {
time.Sleep(time.Second * time.Duration(blockSec))
}
}
}
// miner tx need all para node create, but not all node has auth account, here just not sign to keep align // miner tx need all para node create, but not all node has auth account, here just not sign to keep align
func (client *client) addMinerTx(preStateHash []byte, block *types.Block, main *types.BlockSeq, txs []*types.Transaction) error { func (client *client) addMinerTx(preStateHash []byte, block *types.Block, main *types.BlockSeq, txs []*types.Transaction) error {
status := &pt.ParacrossNodeStatus{ status := &pt.ParacrossNodeStatus{
......
...@@ -559,7 +559,7 @@ out: ...@@ -559,7 +559,7 @@ out:
continue continue
} }
_, block, err := client.paraClient.getLastBlockInfo() block, err := client.paraClient.getLastBlockInfo()
if err != nil { if err != nil {
continue continue
} }
......
...@@ -10,12 +10,15 @@ import ( ...@@ -10,12 +10,15 @@ import (
"encoding/hex" "encoding/hex"
"bytes"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
paraexec "github.com/33cn/plugin/plugin/dapp/paracross/executor"
paracross "github.com/33cn/plugin/plugin/dapp/paracross/types" paracross "github.com/33cn/plugin/plugin/dapp/paracross/types"
) )
func (client *client) setLocalBlock(set *types.LocalDBSet) error { func (client *client) setLocalDb(set *types.LocalDBSet) error {
//如果追赶上主链了,则落盘 //如果追赶上主链了,则落盘
if client.isCaughtUp { if client.isCaughtUp {
set.Txid = 1 set.Txid = 1
...@@ -36,6 +39,26 @@ func (client *client) setLocalBlock(set *types.LocalDBSet) error { ...@@ -36,6 +39,26 @@ func (client *client) setLocalBlock(set *types.LocalDBSet) error {
return errors.New(string(resp.GetData().(*types.Reply).GetMsg())) return errors.New(string(resp.GetData().(*types.Reply).GetMsg()))
} }
func (client *client) getLocalDb(set *types.LocalDBGet, count int) ([][]byte, error) {
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetValueByKey, set)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return nil, err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return nil, err
}
reply := resp.GetData().(*types.LocalReplyValue)
if len(reply.Values) != count {
plog.Error("Parachain getLocalDb count not match", "expert", count, "real", len(reply.Values))
return nil, types.ErrInvalidParam
}
return reply.Values, nil
}
func (client *client) addLocalBlock(height int64, block *paracross.ParaLocalDbBlock) error { func (client *client) addLocalBlock(height int64, block *paracross.ParaLocalDbBlock) error {
set := &types.LocalDBSet{} set := &types.LocalDBSet{}
...@@ -48,7 +71,7 @@ func (client *client) addLocalBlock(height int64, block *paracross.ParaLocalDbBl ...@@ -48,7 +71,7 @@ func (client *client) addLocalBlock(height int64, block *paracross.ParaLocalDbBl
kv = &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: height})} kv = &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: height})}
set.KV = append(set.KV, kv) set.KV = append(set.KV, kv)
return client.setLocalBlock(set) return client.setLocalDb(set)
} }
func (client *client) createLocalBlock(lastBlock *paracross.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *types.BlockSeq) error { func (client *client) createLocalBlock(lastBlock *paracross.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *types.BlockSeq) error {
...@@ -66,10 +89,9 @@ func (client *client) createLocalBlock(lastBlock *paracross.ParaLocalDbBlock, tx ...@@ -66,10 +89,9 @@ func (client *client) createLocalBlock(lastBlock *paracross.ParaLocalDbBlock, tx
} }
func (client *client) createLocalGenesisBlock(genesis *types.Block) error { func (client *client) createLocalGenesisBlock(genesis *types.Block) error {
return client.setLocalBlockByChainBlock(genesis) return client.alignLocalBlock2ChainBlock(genesis)
} }
func (client *client) delLocalBlock(height int64) error { func (client *client) delLocalBlock(height int64) error {
set := &types.LocalDBSet{} set := &types.LocalDBSet{}
key := calcTitleHeightKey(types.GetTitle(), height) key := calcTitleHeightKey(types.GetTitle(), height)
...@@ -81,50 +103,24 @@ func (client *client) delLocalBlock(height int64) error { ...@@ -81,50 +103,24 @@ func (client *client) delLocalBlock(height int64) error {
kv = &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: height - 1})} kv = &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: height - 1})}
set.KV = append(set.KV, kv) set.KV = append(set.KV, kv)
return client.setLocalBlock(set) return client.setLocalDb(set)
} }
// localblock 最小高度不为0,如果minHeight=0,则把localblocks 清空,只设置lastHeight key // localblock 设置到当前高度,当前高度后面block会被新的区块覆盖
func (client *client) removeLocalBlocks(minHeight int64) error { func (client *client) removeLocalBlocks(curHeight int64) error {
set := &types.LocalDBSet{} set := &types.LocalDBSet{}
key := calcTitleLastHeightKey(types.GetTitle()) key := calcTitleLastHeightKey(types.GetTitle())
if minHeight > 0 { kv := &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: curHeight})}
kv := &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: minHeight})}
set.KV = append(set.KV, kv) set.KV = append(set.KV, kv)
} else {
kv := &types.KeyValue{Key: key, Value: nil}
set.KV = append(set.KV, kv)
}
return client.setLocalBlock(set)
}
func (client *client) getFromLocalDb(set *types.LocalDBGet, count int) ([][]byte, error) {
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetValueByKey, set)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return nil, err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return nil, err
}
reply := resp.GetData().(*types.LocalReplyValue)
if len(reply.Values) != count {
plog.Error("Parachain getFromLocalDb count not match", "expert", count, "real", len(reply.Values))
return nil, types.ErrInvalidParam
}
return reply.Values, nil
return client.setLocalDb(set)
} }
func (client *client) getLastLocalHeight() (int64, error) { func (client *client) getLastLocalHeight() (int64, error) {
key := calcTitleLastHeightKey(types.GetTitle()) key := calcTitleLastHeightKey(types.GetTitle())
set := &types.LocalDBGet{Keys: [][]byte{key}} set := &types.LocalDBGet{Keys: [][]byte{key}}
value, err := client.getFromLocalDb(set, len(set.Keys)) value, err := client.getLocalDb(set, len(set.Keys))
if err != nil { if err != nil {
return -1, err return -1, err
} }
...@@ -145,7 +141,7 @@ func (client *client) getLocalBlockByHeight(height int64) (*paracross.ParaLocalD ...@@ -145,7 +141,7 @@ func (client *client) getLocalBlockByHeight(height int64) (*paracross.ParaLocalD
key := calcTitleHeightKey(types.GetTitle(), height) key := calcTitleHeightKey(types.GetTitle(), height)
set := &types.LocalDBGet{Keys: [][]byte{key}} set := &types.LocalDBGet{Keys: [][]byte{key}}
value, err := client.getFromLocalDb(set, len(set.Keys)) value, err := client.getLocalDb(set, len(set.Keys))
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -168,16 +164,17 @@ func (client *client) getLocalBlockSeq(height int64) (int64, []byte, error) { ...@@ -168,16 +164,17 @@ func (client *client) getLocalBlockSeq(height int64) (int64, []byte, error) {
return -2, nil, err return -2, nil, err
} }
//如果当前mainHash对应seq获取不到,返回0 seq,和当前hash,去switchLocalHashMatchedBlock里面回溯查找
mainSeq, err := client.GetSeqByHashOnMainChain(lastBlock.MainHash) mainSeq, err := client.GetSeqByHashOnMainChain(lastBlock.MainHash)
if err != nil { if err != nil {
return -2, nil, err return 0, lastBlock.MainHash, nil
} }
return mainSeq, lastBlock.MainHash, nil return mainSeq, lastBlock.MainHash, nil
} }
func (client *client) setLocalBlockByChainBlock(chainBlock *types.Block) error { //根据匹配上的chainblock,设置当前localdb block
//根据匹配上的chainblock,设置当前localdb block func (client *client) alignLocalBlock2ChainBlock(chainBlock *types.Block) error {
localBlock := &paracross.ParaLocalDbBlock{ localBlock := &paracross.ParaLocalDbBlock{
Height: chainBlock.Height, Height: chainBlock.Height,
MainHeight: chainBlock.MainHeight, MainHeight: chainBlock.MainHeight,
...@@ -190,7 +187,7 @@ func (client *client) setLocalBlockByChainBlock(chainBlock *types.Block) error { ...@@ -190,7 +187,7 @@ func (client *client) setLocalBlockByChainBlock(chainBlock *types.Block) error {
} }
//如果localdb里面没有信息,就从chain block返回,至少有创世区块,然后进入循环匹配切换场景 //如果localdb里面没有信息,就从chain block返回,至少有创世区块,然后进入循环匹配切换场景
func (client *client) getLastLocalBlockInfo() (int64, []byte, error) { func (client *client) getLastLocalBlockSeq() (int64, []byte, error) {
height, err := client.getLastLocalHeight() height, err := client.getLastLocalHeight()
if err == nil { if err == nil {
mainSeq, mainHash, err := client.getLocalBlockSeq(height) mainSeq, mainHash, err := client.getLocalBlockSeq(height)
...@@ -199,12 +196,14 @@ func (client *client) getLastLocalBlockInfo() (int64, []byte, error) { ...@@ -199,12 +196,14 @@ func (client *client) getLastLocalBlockInfo() (int64, []byte, error) {
} }
} }
//说明localDb获取存在错误,从chain获取
mainSeq, chainBlock, err := client.getLastBlockMainInfo() mainSeq, chainBlock, err := client.getLastBlockMainInfo()
if err != nil { if err != nil {
return -2, nil, err return -2, nil, err
} }
err = client.setLocalBlockByChainBlock(chainBlock) //chain block中获取成功,设置last local block和找到的chainBlock main高度和mainhash对齐
err = client.alignLocalBlock2ChainBlock(chainBlock)
if err != nil { if err != nil {
return -2, nil, err return -2, nil, err
} }
...@@ -212,7 +211,7 @@ func (client *client) getLastLocalBlockInfo() (int64, []byte, error) { ...@@ -212,7 +211,7 @@ func (client *client) getLastLocalBlockInfo() (int64, []byte, error) {
} }
func (client *client) getLastDbBlock() (*paracross.ParaLocalDbBlock, error) { func (client *client) getLastLocalBlock() (*paracross.ParaLocalDbBlock, error) {
height, err := client.getLastLocalHeight() height, err := client.getLastLocalHeight()
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -221,7 +220,18 @@ func (client *client) getLastDbBlock() (*paracross.ParaLocalDbBlock, error) { ...@@ -221,7 +220,18 @@ func (client *client) getLastDbBlock() (*paracross.ParaLocalDbBlock, error) {
return client.getLocalBlockByHeight(height) return client.getLocalBlockByHeight(height)
} }
func (client *client) reqMatchedBlockOnChain(startHeight int64) (int64, *types.Block, error) { //genesis block scenario
func (client *client) syncFromGenesisBlock() (int64, *types.Block, error) {
lastSeq, lastBlock, err := client.getLastBlockMainInfo()
if err != nil {
plog.Error("Parachain getLastBlockInfo fail", "err", err)
return -2, nil, err
}
plog.Info("syncFromGenesisBlock sync from height 0")
return lastSeq, lastBlock, nil
}
func (client *client) getMatchedBlockOnChain(startHeight int64) (int64, *types.Block, error) {
lastBlock, err := client.RequestLastBlock() lastBlock, err := client.RequestLastBlock()
if err != nil { if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err) plog.Error("Parachain RequestLastBlock fail", "err", err)
...@@ -263,7 +273,7 @@ func (client *client) reqMatchedBlockOnChain(startHeight int64) (int64, *types.B ...@@ -263,7 +273,7 @@ func (client *client) reqMatchedBlockOnChain(startHeight int64) (int64, *types.B
continue continue
} }
plog.Info("reqMatchedBlockOnChain succ", "currHeight", height, "initHeight", lastBlock.Height, plog.Info("getMatchedBlockOnChain succ", "currHeight", height, "initHeight", lastBlock.Height,
"new currSeq", mainSeq, "new preMainBlockHash", hex.EncodeToString(block.MainHash)) "new currSeq", mainSeq, "new preMainBlockHash", hex.EncodeToString(block.MainHash))
return mainSeq, block, nil return mainSeq, block, nil
} }
...@@ -271,44 +281,41 @@ func (client *client) reqMatchedBlockOnChain(startHeight int64) (int64, *types.B ...@@ -271,44 +281,41 @@ func (client *client) reqMatchedBlockOnChain(startHeight int64) (int64, *types.B
} }
func (client *client) switchMatchedBlockOnChain(startHeight int64) (int64, []byte, error) { func (client *client) switchMatchedBlockOnChain(startHeight int64) (int64, []byte, error) {
mainSeq, chainBlock, err := client.reqMatchedBlockOnChain(startHeight) mainSeq, chainBlock, err := client.getMatchedBlockOnChain(startHeight)
if err != nil { if err != nil {
return -2, nil, err return -2, nil, err
} }
err = client.setLocalBlockByChainBlock(chainBlock) //chain block中获取成功,设置last local block和找到的chainBlock main高度和mainhash对齐
err = client.alignLocalBlock2ChainBlock(chainBlock)
if err != nil { if err != nil {
return -2, nil, err return -2, nil, err
} }
return mainSeq, chainBlock.MainHash, nil return mainSeq, chainBlock.MainHash, nil
} }
// search base on para block but not last MainBlockHash, last MainBlockHash can not back tracing func (client *client) switchHashMatchedBlock() (int64, []byte, error) {
func (client *client) switchLocalHashMatchedBlock(currSeq int64) (int64, []byte, error) { mainSeq, mainHash, err := client.switchLocalHashMatchedBlock()
lastBlock, err := client.getLastDbBlock()
if err != nil { if err != nil {
if err == types.ErrNotFound {
return client.switchMatchedBlockOnChain(0) return client.switchMatchedBlockOnChain(0)
} }
return mainSeq, mainHash, nil
}
//
func (client *client) switchLocalHashMatchedBlock() (int64, []byte, error) {
lastBlock, err := client.getLastLocalBlock()
if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err) plog.Error("Parachain RequestLastBlock fail", "err", err)
return -2, nil, err return -2, nil, err
} }
for height := lastBlock.Height; height > 0; height-- { for height := lastBlock.Height; height >= 0; height-- {
block, err := client.getLocalBlockByHeight(height) block, err := client.getLocalBlockByHeight(height)
if err != nil { if err != nil {
if err == types.ErrNotFound {
plog.Error("switchLocalHashMatchedBlock search not found", "lastBlockHeight", height)
err = client.removeLocalBlocks(height)
if err != nil {
return -2, nil, err
}
return client.switchMatchedBlockOnChain(height)
}
return -2, nil, err return -2, nil, err
} }
//当前block结构已经有mainHash和MainHeight但是从blockchain获取的block还没有写入,以后如果获取到,可以替换从minerTx获取 //当前block结构已经有mainHash和MainHeight但是从blockchain获取的block还没有写入,以后如果获取到,可以替换从minerTx获取
plog.Info("switchLocalHashMatchedBlock", "lastlocalBlockHeight", height, "mainHeight", plog.Info("switchLocalHashMatchedBlock", "height", height, "mainHeight", block.MainHeight, "mainHash", hex.EncodeToString(block.MainHash))
block.MainHeight, "mainHash", hex.EncodeToString(block.MainHash))
mainSeq, err := client.GetSeqByHashOnMainChain(block.MainHash) mainSeq, err := client.GetSeqByHashOnMainChain(block.MainHash)
if err != nil { if err != nil {
continue continue
...@@ -327,10 +334,61 @@ func (client *client) switchLocalHashMatchedBlock(currSeq int64) (int64, []byte, ...@@ -327,10 +334,61 @@ func (client *client) switchLocalHashMatchedBlock(currSeq int64) (int64, []byte,
return -2, nil, paracross.ErrParaCurHashNotMatch return -2, nil, paracross.ErrParaCurHashNotMatch
} }
func (client *client) downloadBlocks() { // preBlockHash to identify the same main node
lastSeq, lastSeqMainHash, err := client.getLastLocalBlockInfo() func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*types.Transaction, *types.BlockSeq, error) {
plog.Debug("Para consensus RequestTx")
lastSeq, err := client.GetLastSeqOnMainChain()
if err != nil {
return nil, nil, err
}
plog.Info("RequestTx", "LastMainSeq", lastSeq, "CurrSeq", currSeq)
if lastSeq >= currSeq {
blockSeq, err := client.GetBlockOnMainBySeq(currSeq)
if err != nil { if err != nil {
plog.Error("Parachain CreateBlock getLastLocalBlockInfo fail", "err", err.Error()) return nil, nil, err
}
//genesis block start with seq=-1 not check
if (bytes.Equal(preMainBlockHash, blockSeq.Detail.Block.ParentHash) && blockSeq.Seq.Type == addAct) ||
(bytes.Equal(preMainBlockHash, blockSeq.Seq.Hash) && blockSeq.Seq.Type == delAct) {
txs := paraexec.FilterTxsForPara(types.GetTitle(), blockSeq.Detail)
plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", blockSeq.Seq.Type)
client.mtx.Lock()
if lastSeq-currSeq > emptyBlockInterval {
client.isCaughtUp = false
} else {
client.isCaughtUp = true
}
client.mtx.Unlock()
if client.authAccount != "" {
client.commitMsgClient.onMainBlockAdded(blockSeq.Detail)
}
return txs, blockSeq, nil
}
//not consistent case be processed at below
plog.Error("RequestTx", "preMainHash", hex.EncodeToString(preMainBlockHash), "currSeq preMainHash", hex.EncodeToString(blockSeq.Detail.Block.ParentHash),
"currSeq mainHash", hex.EncodeToString(blockSeq.Seq.Hash), "curr seq", currSeq, "ty", blockSeq.Seq.Type, "currSeq Mainheight", blockSeq.Detail.Block.Height)
return nil, nil, paracross.ErrParaCurHashNotMatch
}
//lastSeq < CurrSeq case:
//lastSeq = currSeq-1, main node not update
if lastSeq+1 == currSeq {
plog.Debug("Waiting new sequence from main chain")
return nil, nil, paracross.ErrParaWaitingNewSeq
}
// 1. lastSeq < currSeq-1
// 2. lastSeq >= currSeq and seq not consistent or fork case
return nil, nil, paracross.ErrParaCurHashNotMatch
}
func (client *client) CreateBlock() {
lastSeq, lastSeqMainHash, err := client.getLastLocalBlockSeq()
if err != nil {
plog.Error("Parachain CreateBlock getLastLocalBlockSeq fail", "err", err.Error())
return return
} }
currSeq := lastSeq + 1 currSeq := lastSeq + 1
...@@ -338,7 +396,7 @@ func (client *client) downloadBlocks() { ...@@ -338,7 +396,7 @@ func (client *client) downloadBlocks() {
txs, mainBlock, err := client.RequestTx(currSeq, lastSeqMainHash) txs, mainBlock, err := client.RequestTx(currSeq, lastSeqMainHash)
if err != nil { if err != nil {
if err == paracross.ErrParaCurHashNotMatch { if err == paracross.ErrParaCurHashNotMatch {
preSeq, preSeqMainHash, err := client.switchLocalHashMatchedBlock(currSeq) preSeq, preSeqMainHash, err := client.switchHashMatchedBlock()
if err == nil { if err == nil {
currSeq = preSeq + 1 currSeq = preSeq + 1
lastSeqMainHash = preSeqMainHash lastSeqMainHash = preSeqMainHash
...@@ -355,9 +413,9 @@ func (client *client) downloadBlocks() { ...@@ -355,9 +413,9 @@ func (client *client) downloadBlocks() {
lastSeqMainHash = mainBlock.Detail.Block.ParentHash lastSeqMainHash = mainBlock.Detail.Block.ParentHash
} }
lastBlock, err := client.getLastDbBlock() lastBlock, err := client.getLastLocalBlock()
if err != nil && err != types.ErrNotFound { if err != nil {
plog.Error("Parachain getLastDbBlock", "err", err) plog.Error("Parachain getLastLocalBlock", "err", err)
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
} }
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package para
import (
"bytes"
"context"
"encoding/hex"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/types"
)
func (client *client) GetBlockedSeq(hash []byte) (int64, error) {
//from blockchain db
blockedSeq, err := client.GetAPI().GetMainSequenceByHash(&types.ReqHash{Hash: hash})
if err != nil {
return -2, err
}
return blockedSeq.Data, nil
}
func (client *client) GetBlockByHeight(height int64) (*types.Block, error) {
//from blockchain db
blockDetails, err := client.GetAPI().GetBlocks(&types.ReqBlocks{Start: height, End: height})
if err != nil {
plog.Error("paracommitmsg get node status block count fail")
return nil, err
}
if 1 != int64(len(blockDetails.Items)) {
plog.Error("paracommitmsg get node status block count fail")
return nil, types.ErrInvalidParam
}
return blockDetails.Items[0].Block, nil
}
// 获取当前平行链block对应主链seq,hash信息
// 对于云端主链节点,创世区块记录seq在不同主链节点上差异很大,通过记录的主链hash获取真实seq使用
func (client *client) getLastBlockMainInfo() (int64, *types.Block, error) {
lastBlock, err := client.getLastBlockInfo()
if err != nil {
return -2, nil, err
}
//如果在云端节点获取不到对应MainHash,切换到switchLocalHashMatchedBlock 去循环查找
mainSeq, err := client.GetSeqByHashOnMainChain(lastBlock.MainHash)
if err != nil {
return 0, lastBlock, nil
}
return mainSeq, lastBlock, nil
}
func (client *client) getLastBlockInfo() (*types.Block, error) {
lastBlock, err := client.RequestLastBlock()
if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err)
return nil, err
}
return lastBlock, nil
}
func (client *client) GetForkHeightOnMainChain(key string) (int64, error) {
ret, err := client.grpcClient.GetFork(context.Background(), &types.ReqKey{Key: []byte(key)})
if err != nil {
plog.Error("para get rpc ForkHeight fail", "key", key, "err", err.Error())
return types.MaxHeight, err
}
return ret.Data, nil
}
func (client *client) GetLastHeightOnMainChain() (int64, error) {
header, err := client.grpcClient.GetLastHeader(context.Background(), &types.ReqNil{})
if err != nil {
plog.Error("GetLastHeightOnMainChain", "Error", err.Error())
return -1, err
}
return header.Height, nil
}
func (client *client) GetLastSeqOnMainChain() (int64, error) {
seq, err := client.grpcClient.GetLastBlockSequence(context.Background(), &types.ReqNil{})
if err != nil {
plog.Error("GetLastSeqOnMainChain", "Error", err.Error())
return -1, err
}
//the reflect checked in grpcHandle
return seq.Data, nil
}
func (client *client) GetSeqByHeightOnMainChain(height int64) (int64, []byte, error) {
hash, err := client.GetHashByHeightOnMainChain(height)
if err != nil {
return -1, nil, err
}
seq, err := client.GetSeqByHashOnMainChain(hash)
return seq, hash, err
}
func (client *client) GetHashByHeightOnMainChain(height int64) ([]byte, error) {
reply, err := client.grpcClient.GetBlockHash(context.Background(), &types.ReqInt{Height: height})
if err != nil {
plog.Error("GetHashByHeightOnMainChain", "Error", err.Error())
return nil, err
}
return reply.Hash, nil
}
func (client *client) GetSeqByHashOnMainChain(hash []byte) (int64, error) {
seq, err := client.grpcClient.GetSequenceByHash(context.Background(), &types.ReqHash{Hash: hash})
if err != nil {
plog.Error("GetSeqByHashOnMainChain", "Error", err.Error(), "hash", hex.EncodeToString(hash))
return -1, err
}
//the reflect checked in grpcHandle
return seq.Data, nil
}
func (client *client) GetBlockOnMainBySeq(seq int64) (*types.BlockSeq, error) {
blockSeq, err := client.grpcClient.GetBlockBySeq(context.Background(), &types.Int64{Data: seq})
if err != nil {
plog.Error("Not found block on main", "seq", seq)
return nil, err
}
hash := blockSeq.Detail.Block.HashByForkHeight(mainBlockHashForkHeight)
if !bytes.Equal(blockSeq.Seq.Hash, hash) {
plog.Error("para compare ForkBlockHash fail", "forkHeight", mainBlockHashForkHeight,
"seqHash", hex.EncodeToString(blockSeq.Seq.Hash), "calcHash", hex.EncodeToString(hash))
return nil, types.ErrBlockHashNoMatch
}
return blockSeq, nil
}
func (client *client) GetBlockOnMainByHash(hash []byte) (*types.Block, error) {
blocks, err := client.grpcClient.GetBlockByHashes(context.Background(), &types.ReqHashes{Hashes: [][]byte{hash}})
if err != nil || blocks.Items[0] == nil {
plog.Error("GetBlockOnMainByHash Not found", "blockhash", common.ToHex(hash))
return nil, err
}
return blocks.Items[0].Block, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment