Commit 36ce6853 authored by mdj33's avatar mdj33 Committed by vipwzw

correct comments

parent 74c65969
...@@ -292,43 +292,35 @@ func (client *client) FilterTxsForPara(main *types.BlockDetail) []*types.Transac ...@@ -292,43 +292,35 @@ func (client *client) FilterTxsForPara(main *types.BlockDetail) []*types.Transac
//get the last sequence in parachain //get the last sequence in parachain
func (client *client) GetLastSeq() (int64, error) { func (client *client) GetLastSeq() (int64, error) {
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetLastBlockSequence, "") blockedSeq, err := client.GetAPI().GetLastBlockSequence()
client.GetQueueClient().Send(msg, true)
resp, err := client.GetQueueClient().Wait(msg)
if err != nil { if err != nil {
return -2, err return -2, err
} }
if lastSeq, ok := resp.GetData().(*types.Int64); ok { return blockedSeq.Data, nil
return lastSeq.Data, nil
}
return -2, errors.New("Not an int64 data")
} }
func (client *client) GetBlockedSeq(hash []byte) (int64, error) { func (client *client) GetBlockedSeq(hash []byte) (int64, error) {
//from blockchain db //from blockchain db
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetSeqByHash, &types.ReqHash{Hash: hash}) blockedSeq, err := client.GetAPI().GetSequenceByHash(&types.ReqHash{Hash: hash})
client.GetQueueClient().Send(msg, true) if err != nil {
resp, _ := client.GetQueueClient().Wait(msg) return -2, err
if blockedSeq, ok := resp.GetData().(*types.Int64); ok {
return blockedSeq.Data, nil
} }
return -2, errors.New("Not an int64 data") return blockedSeq.Data, nil
} }
func (client *client) GetBlockByHeight(height int64) (*types.Block, error) { func (client *client) GetBlockByHeight(height int64) (*types.Block, error) {
//from blockchain db //from blockchain db
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetBlocks, &types.ReqBlocks{Start: height, End: height}) blockDetails, err := client.GetAPI().GetBlocks(&types.ReqBlocks{Start: height, End: height})
client.GetQueueClient().Send(msg, true)
resp, err := client.GetQueueClient().Wait(msg)
if err != nil { if err != nil {
plog.Error("paracommitmsg get node status block count fail")
return nil, err return nil, err
} }
v := resp.GetData().(*types.BlockDetails) if 1 != int64(len(blockDetails.Items)) {
if 1 != int64(len(v.Items)) {
plog.Error("paracommitmsg get node status block count fail") plog.Error("paracommitmsg get node status block count fail")
return nil, err return nil, types.ErrInvalidParam
} }
return v.Items[0].Block, nil return blockDetails.Items[0].Block, nil
} }
func (client *client) getLastBlockInfo() (int64, *types.Block, []byte, int64, error) { func (client *client) getLastBlockInfo() (int64, *types.Block, []byte, int64, error) {
...@@ -430,9 +422,8 @@ func (client *client) GetBlockOnMainBySeq(seq int64) (*types.BlockDetail, int64, ...@@ -430,9 +422,8 @@ func (client *client) GetBlockOnMainBySeq(seq int64) (*types.BlockDetail, int64,
} }
// preBlockHash to identify the same main node // preBlockHash to identify the same main node
func (client *client) RequestTx(seq *int64, preMainBlockHash *[]byte) ([]*types.Transaction, *types.Block, int64, error) { func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*types.Transaction, *types.Block, int64, error) {
plog.Debug("Para consensus RequestTx") plog.Debug("Para consensus RequestTx")
currSeq := *seq
lastSeq, err := client.GetLastSeqOnMainChain() lastSeq, err := client.GetLastSeqOnMainChain()
if err != nil { if err != nil {
return nil, nil, -1, err return nil, nil, -1, err
...@@ -446,8 +437,8 @@ func (client *client) RequestTx(seq *int64, preMainBlockHash *[]byte) ([]*types. ...@@ -446,8 +437,8 @@ func (client *client) RequestTx(seq *int64, preMainBlockHash *[]byte) ([]*types.
//genesis block with seq=-1 not check //genesis block with seq=-1 not check
if currSeq == 0 || if currSeq == 0 ||
(bytes.Equal(*preMainBlockHash, blockDetail.Block.ParentHash) && seqTy == addAct) || (bytes.Equal(preMainBlockHash, blockDetail.Block.ParentHash) && seqTy == addAct) ||
(bytes.Equal(*preMainBlockHash, blockDetail.Block.Hash()) && seqTy == delAct) { (bytes.Equal(preMainBlockHash, blockDetail.Block.Hash()) && seqTy == delAct) {
txs := client.FilterTxsForPara(blockDetail) txs := client.FilterTxsForPara(blockDetail)
plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", seqTy) plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", seqTy)
...@@ -465,58 +456,54 @@ func (client *client) RequestTx(seq *int64, preMainBlockHash *[]byte) ([]*types. ...@@ -465,58 +456,54 @@ func (client *client) RequestTx(seq *int64, preMainBlockHash *[]byte) ([]*types.
return txs, blockDetail.Block, seqTy, nil return txs, blockDetail.Block, seqTy, nil
} }
//not consistent case be processed at below //not consistent case be processed at below
plog.Error("RequestTx", "preMainHash", common.Bytes2Hex(*preMainBlockHash), "currSeq preMainHash", common.Bytes2Hex(blockDetail.Block.ParentHash), plog.Error("RequestTx", "preMainHash", common.Bytes2Hex(preMainBlockHash), "currSeq preMainHash", common.Bytes2Hex(blockDetail.Block.ParentHash),
"currSeq mainHash", common.Bytes2Hex(blockDetail.Block.Hash()), "curr seq", currSeq, "ty", seqTy, "currSeq Mainheight", blockDetail.Block.Height) "currSeq mainHash", common.Bytes2Hex(blockDetail.Block.Hash()), "curr seq", currSeq, "ty", seqTy, "currSeq Mainheight", blockDetail.Block.Height)
return nil, nil, -1, paracross.ErrParaCurHashNotMatch
} }
//lastSeq < CurrSeq case: //lastSeq < CurrSeq case:
//lastSeq = currSeq-1, main node not update //lastSeq = currSeq-1, main node not update
if lastSeq+1 == currSeq { if lastSeq+1 == currSeq {
plog.Debug("Waiting new sequence from main chain") plog.Debug("Waiting new sequence from main chain")
time.Sleep(time.Second * time.Duration(blockSec*2)) time.Sleep(time.Second * time.Duration(blockSec*2))
return nil, nil, -1, errors.New("Waiting new sequence") return nil, nil, -1, paracross.ErrParaWaitingNewSeq
} }
// 1. lastSeq < currSeq-1 // 1. lastSeq < currSeq-1
// 2. lastSeq >= currSeq and seq not consistent or fork case // 2. lastSeq >= currSeq and seq not consistent or fork case
// whether found matched block or not, return err to re-requestTx return nil, nil, -1, paracross.ErrParaCurHashNotMatch
client.switchHashMatchedBlock(seq, preMainBlockHash)
return nil, nil, -1, errors.New("hash not matched")
} }
func (client *client) syncFromGenesisBlock(currSeq *int64, preMainBlockHash *[]byte) { func (client *client) syncFromGenesisBlock() (int64, []byte, error) {
lastSeq, _, lastSeqMainHash, _, err := client.getLastBlockInfo() lastSeq, _, lastSeqMainHash, _, err := client.getLastBlockInfo()
if err != nil { if err != nil {
plog.Error("Parachain getLastBlockInfo fail", "err", err) plog.Error("Parachain getLastBlockInfo fail", "err", err)
return return -2, nil, err
} }
*currSeq = lastSeq + 1
*preMainBlockHash = lastSeqMainHash
plog.Info("syncFromGenesisBlock sync from height 0") plog.Info("syncFromGenesisBlock sync from height 0")
return lastSeq + 1, lastSeqMainHash, nil
} }
// search base on para block but not last MainBlockHash, last MainBlockHash can not back tracing // search base on para block but not last MainBlockHash, last MainBlockHash can not back tracing
func (client *client) switchHashMatchedBlock(currSeq *int64, preMainBlockHash *[]byte) { func (client *client) switchHashMatchedBlock(currSeq int64, preMainBlockHash []byte) (int64, []byte, error) {
lastBlock, err := client.RequestLastBlock() lastBlock, err := client.RequestLastBlock()
if err != nil { if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err) plog.Error("Parachain RequestLastBlock fail", "err", err)
return return -2, nil, err
} }
//genesis block scenario, get new main node's blockHash as preMainHash, genesis sequence as currSeq //genesis block scenario, get new main node's blockHash as preMainHash, genesis sequence as currSeq
if lastBlock.Height == 0 { if lastBlock.Height == 0 {
client.syncFromGenesisBlock(currSeq, preMainBlockHash) return client.syncFromGenesisBlock()
return
} }
depth := searchHashMatchDepth depth := searchHashMatchDepth
for height := lastBlock.Height; height > 0 && depth > 0; height-- { for height := lastBlock.Height; height > 0 && depth > 0; height-- {
block, err := client.GetBlockByHeight(height) block, err := client.GetBlockByHeight(height)
if err != nil { if err != nil {
return return -2, nil, err
} }
miner, err := getMinerTxInfo(block) miner, err := getMinerTxInfo(block)
if err != nil { if err != nil {
return return -2, nil, err
} }
plog.Info("switchHashMatchedBlock", "lastParaBlock height", miner.Height, "mainHeight", plog.Info("switchHashMatchedBlock", "lastParaBlock height", miner.Height, "mainHeight",
miner.MainBlockHeight, "mainHash", common.Bytes2Hex(miner.MainBlockHash)) miner.MainBlockHeight, "mainHash", common.Bytes2Hex(miner.MainBlockHash))
...@@ -534,27 +521,24 @@ func (client *client) switchHashMatchedBlock(currSeq *int64, preMainBlockHash *[ ...@@ -534,27 +521,24 @@ func (client *client) switchHashMatchedBlock(currSeq *int64, preMainBlockHash *[
"height1 mainHash", common.Bytes2Hex(miner.MainBlockHash)) "height1 mainHash", common.Bytes2Hex(miner.MainBlockHash))
err = client.removeBlocks(0) err = client.removeBlocks(0)
if err != nil { if err != nil {
*preMainBlockHash = nil return currSeq, nil, nil
return
} }
client.syncFromGenesisBlock(currSeq, preMainBlockHash) return client.syncFromGenesisBlock()
return
} }
continue continue
} }
//remove fail, set the preMainBlockHash to nil, to match nothing, force to search again //remove fail, the para chain may be remove part, set the preMainBlockHash to nil, to match nothing, force to search from last
err = client.removeBlocks(height) err = client.removeBlocks(height)
if err != nil { if err != nil {
*preMainBlockHash = nil return currSeq, nil, nil
return
} }
*currSeq = mainSeq + 1 plog.Info("switchHashMatchedBlock succ", "currHeight", height, "initHeight", lastBlock.Height, "set new currSeq", currSeq, "new preMainBlockHash", common.Bytes2Hex(preMainBlockHash))
*preMainBlockHash = miner.MainBlockHash return mainSeq + 1, miner.MainBlockHash, nil
plog.Info("switchHashMatchedBlock succ", "currHeight", height, "initHeight", lastBlock.Height, "set new currSeq", *currSeq, "new preMainBlockHash", common.Bytes2Hex(*preMainBlockHash))
return
} }
return -2, nil, paracross.ErrParaCurHashNotMatch
} }
func (client *client) removeBlocks(endHeight int64) error { func (client *client) removeBlocks(endHeight int64) error {
...@@ -605,8 +589,15 @@ func (client *client) CreateBlock() { ...@@ -605,8 +589,15 @@ func (client *client) CreateBlock() {
currSeq++ currSeq++
} }
txs, blockOnMain, seqTy, err := client.RequestTx(&currSeq, &lastSeqMainHash) txs, blockOnMain, seqTy, err := client.RequestTx(currSeq, lastSeqMainHash)
if err != nil { if err != nil {
if err == paracross.ErrParaCurHashNotMatch {
newSeq, newSeqMainHash, err := client.switchHashMatchedBlock(currSeq, lastSeqMainHash)
if err == nil {
currSeq = newSeq
lastSeqMainHash = newSeqMainHash
}
}
incSeqFlag = false incSeqFlag = false
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
......
...@@ -342,14 +342,10 @@ func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossN ...@@ -342,14 +342,10 @@ func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossN
keys.Keys = append(keys.Keys, key) keys.Keys = append(keys.Keys, key)
} }
msg := client.paraClient.GetQueueClient().NewMessage("blockchain", types.EventLocalGet, keys) r, err := client.paraClient.GetAPI().LocalGet(keys)
client.paraClient.GetQueueClient().Send(msg, true)
resp, err := client.paraClient.GetQueueClient().Wait(msg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
r := resp.GetData().(*types.LocalReplyValue)
if count != int64(len(r.Values)) { if count != int64(len(r.Values)) {
plog.Error("paracommitmsg get node status key", "expect count", count, "actual count", len(r.Values)) plog.Error("paracommitmsg get node status key", "expect count", count, "actual count", len(r.Values))
return nil, err return nil, err
...@@ -375,13 +371,10 @@ func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossN ...@@ -375,13 +371,10 @@ func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossN
} }
} }
msg = client.paraClient.GetQueueClient().NewMessage("blockchain", types.EventGetBlocks, req) v, err := client.paraClient.GetAPI().GetBlocks(req)
client.paraClient.GetQueueClient().Send(msg, true)
resp, err = client.paraClient.GetQueueClient().Wait(msg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
v := resp.GetData().(*types.BlockDetails)
if count != int64(len(v.Items)) { if count != int64(len(v.Items)) {
plog.Error("paracommitmsg get node status block", "expect count", count, "actual count", len(v.Items)) plog.Error("paracommitmsg get node status block", "expect count", count, "actual count", len(v.Items))
return nil, err return nil, err
...@@ -405,13 +398,10 @@ func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossN ...@@ -405,13 +398,10 @@ func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossN
func (client *commitMsgClient) getGenesisNodeStatus() (*pt.ParacrossNodeStatus, error) { func (client *commitMsgClient) getGenesisNodeStatus() (*pt.ParacrossNodeStatus, error) {
var status pt.ParacrossNodeStatus var status pt.ParacrossNodeStatus
req := &types.ReqBlocks{Start: 0, End: 0} req := &types.ReqBlocks{Start: 0, End: 0}
msg := client.paraClient.GetQueueClient().NewMessage("blockchain", types.EventGetBlocks, req) v, err := client.paraClient.GetAPI().GetBlocks(req)
client.paraClient.GetQueueClient().Send(msg, true)
resp, err := client.paraClient.GetQueueClient().Wait(msg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
v := resp.GetData().(*types.BlockDetails)
block := v.Items[0].Block block := v.Items[0].Block
if block.Height != 0 { if block.Height != 0 {
return nil, errors.New("block chain not return 0 height block") return nil, errors.New("block chain not return 0 height block")
......
...@@ -23,4 +23,8 @@ var ( ...@@ -23,4 +23,8 @@ var (
ErrParaEmptyMinerTx = errors.New("ErrParaEmptyMinerTx") ErrParaEmptyMinerTx = errors.New("ErrParaEmptyMinerTx")
// ErrParaMinerExecErr miner tx exec error // ErrParaMinerExecErr miner tx exec error
ErrParaMinerExecErr = errors.New("ErrParaMinerExecErr") ErrParaMinerExecErr = errors.New("ErrParaMinerExecErr")
// ErrParaWaitingNewSeq para waiting main node new seq coming
ErrParaWaitingNewSeq = errors.New("ErrParaWaitingNewSeq")
// ErrParaCurHashNotMatch para curr main hash not match with pre, main node may switched
ErrParaCurHashNotMatch = errors.New("ErrParaCurHashNotMatch")
) )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment