Commit c37a551b authored by mdj33's avatar mdj33 Committed by vipwzw

add para support remote main node switch

parent 7af54530
...@@ -47,6 +47,7 @@ var ( ...@@ -47,6 +47,7 @@ var (
grpcRecSize = 30 * 1024 * 1024 //the size should be limited in server grpcRecSize = 30 * 1024 * 1024 //the size should be limited in server
//current miner tx take any privatekey for unify all nodes sign purpose, and para chain is free //current miner tx take any privatekey for unify all nodes sign purpose, and para chain is free
minerPrivateKey = "6da92a632ab7deb67d38c0f6560bcfed28167998f6496db64c258d5e8393a81b" minerPrivateKey = "6da92a632ab7deb67d38c0f6560bcfed28167998f6496db64c258d5e8393a81b"
searchHashMatchBlockDepth int64 = 100
) )
func init() { func init() {
...@@ -83,6 +84,10 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -83,6 +84,10 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
emptyBlockInterval = cfg.EmptyBlockInterval emptyBlockInterval = cfg.EmptyBlockInterval
} }
if cfg.searchHashMatchBlockDepth > 0 {
searchHashMatchBlockDepth = cfg.searchHashMatchBlockDepth
}
pk, err := hex.DecodeString(minerPrivateKey) pk, err := hex.DecodeString(minerPrivateKey)
if err != nil { if err != nil {
panic(err) panic(err)
...@@ -310,6 +315,55 @@ func (client *client) GetBlockedSeq(hash []byte) (int64, error) { ...@@ -310,6 +315,55 @@ func (client *client) GetBlockedSeq(hash []byte) (int64, error) {
return -2, errors.New("Not an int64 data") return -2, errors.New("Not an int64 data")
} }
func (client *client) GetBlockByHeight(height int64) (*types.Block, error) {
//from blockchain db
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetBlocks, &types.ReqBlocks{Start: height, End: height})
client.GetQueueClient().Send(msg, true)
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return nil, err
}
v := resp.GetData().(*types.BlockDetails)
if 1 != int64(len(v.Items)) {
plog.Error("paracommitmsg get node status block count fail")
return nil, err
}
return v.Items[0].Block, nil
}
func (client *client) getLastBlockInfo() (int64, *types.Block, []byte, int64, error) {
lastBlock, err := client.RequestLastBlock()
if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err)
return -2, nil, nil, -2, err
}
blockedSeq, err := client.GetBlockedSeq(lastBlock.Hash())
if err != nil {
plog.Error("Parachain GetBlockedSeq fail", "err", err)
return -2, nil, nil, -2, err
}
if lastBlock.Height > 0 {
miner, err := getMinerTxInfo(lastBlock)
if err != nil {
return -2, nil, nil, -2, err
}
return blockedSeq, lastBlock, miner.MainBlockHash, miner.MainBlockHeight, nil
}
//sequence in main chain start from 0
seq := blockedSeq
if seq == -1 {
seq = 0
}
savedBlockOnMain, _, err := client.GetBlockOnMainBySeq(seq)
if err != nil {
return -2, nil, nil, -2, err
}
return blockedSeq, lastBlock, savedBlockOnMain.Block.Hash(), savedBlockOnMain.Block.Height, nil
}
func (client *client) GetLastSeqOnMainChain() (int64, error) { func (client *client) GetLastSeqOnMainChain() (int64, error) {
seq, err := client.grpcClient.GetLastBlockSequence(context.Background(), &types.ReqNil{}) seq, err := client.grpcClient.GetLastBlockSequence(context.Background(), &types.ReqNil{})
if err != nil { if err != nil {
...@@ -320,6 +374,26 @@ func (client *client) GetLastSeqOnMainChain() (int64, error) { ...@@ -320,6 +374,26 @@ func (client *client) GetLastSeqOnMainChain() (int64, error) {
return seq.Data, nil return seq.Data, nil
} }
func (client *client) GetSeqByHashOnMainChain(hash []byte) (int64, error) {
seq, err := client.grpcClient.GetSequenceByHash(context.Background(), &types.ReqHash{Hash: hash})
if err != nil {
plog.Error("GetLastSeqOnMainChain", "Error", err.Error())
return -1, err
}
//the reflect checked in grpcHandle
return seq.Data, nil
}
func (client *client) GetHashByHeightFromMainChain(height int64) ([]byte, error) {
req := &types.ReqInt{Height: height}
hash, err := client.grpcClient.GetBlockHash(context.Background(), req)
if err != nil {
plog.Error("GetBlocksByHashesFromMainChain", "Error", err.Error())
return nil, err
}
return hash, nil
}
func (client *client) GetBlocksByHashesFromMainChain(hashes [][]byte) (*types.BlockDetails, error) { func (client *client) GetBlocksByHashesFromMainChain(hashes [][]byte) (*types.BlockDetails, error) {
req := &types.ReqHashes{Hashes: hashes} req := &types.ReqHashes{Hashes: hashes}
blocks, err := client.grpcClient.GetBlockByHashes(context.Background(), req) blocks, err := client.grpcClient.GetBlockByHashes(context.Background(), req)
...@@ -365,94 +439,132 @@ func (client *client) GetBlockOnMainBySeq(seq int64) (*types.BlockDetail, int64, ...@@ -365,94 +439,132 @@ func (client *client) GetBlockOnMainBySeq(seq int64) (*types.BlockDetail, int64,
return blockDetails.Items[0], blockSeqs.Items[0].Type, nil return blockDetails.Items[0], blockSeqs.Items[0].Type, nil
} }
func (client *client) RequestTx(currSeq int64) ([]*types.Transaction, *types.Block, int64, error) { // preBlockHash to identify the same main node
func (client *client) RequestTx(seq *int64, lastBlock *types.Block, preMainBlockHash *[]byte) ([]*types.Transaction, *types.Block, int64, error) {
plog.Debug("Para consensus RequestTx") plog.Debug("Para consensus RequestTx")
currSeq := *seq
lastSeq, err := client.GetLastSeqOnMainChain() lastSeq, err := client.GetLastSeqOnMainChain()
if err != nil { if err != nil {
return nil, nil, -1, err return nil, nil, -1, err
} }
plog.Info("RequestTx", "LastSeq", lastSeq, "CurrSeq", currSeq) plog.Info("RequestTx", "LastSeq", lastSeq, "CurrSeq", currSeq)
if lastSeq >= currSeq { if lastSeq >= currSeq {
if lastSeq-currSeq > emptyBlockInterval {
client.isCatchingUp = true
} else {
client.isCatchingUp = false
}
blockDetail, seqTy, err := client.GetBlockOnMainBySeq(currSeq) blockDetail, seqTy, err := client.GetBlockOnMainBySeq(currSeq)
if err != nil { if err != nil {
return nil, nil, -1, err return nil, nil, -1, err
} }
//genesis block not check
if lastBlock.Height == 0 ||
(bytes.Equal(*preMainBlockHash, blockDetail.Block.ParentHash) && seqTy == addAct) ||
(bytes.Equal(*preMainBlockHash, blockDetail.Block.Hash()) && seqTy == delAct) {
txs := client.FilterTxsForPara(blockDetail) txs := client.FilterTxsForPara(blockDetail)
plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", seqTy) plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", seqTy)
if lastSeq-currSeq > emptyBlockInterval {
client.isCatchingUp = true
} else {
client.isCatchingUp = false
}
if client.authAccount != "" { if client.authAccount != "" {
client.commitMsgClient.onMainBlockAdded(blockDetail) client.commitMsgClient.onMainBlockAdded(blockDetail)
} }
return txs, blockDetail.Block, seqTy, nil return txs, blockDetail.Block, seqTy, nil
} }
//not consistent case be processed at below
}
//lastSeq < CurrSeq case:
//lastSeq = currSeq-1, main node not update
if lastSeq+1 == currSeq {
plog.Debug("Waiting new sequence from main chain") plog.Debug("Waiting new sequence from main chain")
time.Sleep(time.Second * time.Duration(blockSec*2)) time.Sleep(time.Second * time.Duration(blockSec*2))
return nil, nil, -1, errors.New("Waiting new sequence") return nil, nil, -1, errors.New("Waiting new sequence")
}
// 1. lastSeq < currSeq-1
// 2. lastSeq >= currSeq and seq not consistent or fork case
// to search base on para block but not preMainBlock, preMainBlock can not back tracking
// whether found matched block or not, return err to re-requestTx
client.findHashMatchedBlock(seq, lastBlock, preMainBlockHash)
return nil, nil, -1, errors.New("hash not matched")
} }
//正常情况下,打包交易 //
func (client *client) CreateBlock() { func (client *client) findHashMatchedBlock(currSeq *int64, lastBlock *types.Block, preMainBlockHash *[]byte) {
incSeqFlag := true findDepth := searchHashMatchBlockDepth
currSeq, err := client.GetLastSeq() for height := lastBlock.Height; height > 0 && findDepth > 0; height-- {
block, err := client.GetBlockByHeight(height)
if err != nil { if err != nil {
plog.Error("Parachain GetLastSeq fail", "err", err)
return return
} }
for { miner, err := getMinerTxInfo(block)
lastSeq, err := client.GetLastSeq()
if err != nil { if err != nil {
plog.Error("Parachain GetLastSeq fail", "err", err) return
time.Sleep(time.Second) }
mainSeq, err := client.GetSeqByHashOnMainChain(miner.MainBlockHash)
if err != nil {
findDepth--
if findDepth == 0 {
plog.Error("findHashMatchedBlock depth overflow", "last info:mainHeight", miner.MainBlockHeight,
"mainHash", string(miner.MainBlockHash), "search depth", searchHashMatchBlockDepth)
panic("findHashMatchedBlock depth overflow, restart and re-connect main node")
}
continue continue
} }
if incSeqFlag || currSeq == lastSeq { //try 100 times for remove fail case, if >100, panic to restart system to keep lastblock and sequence unify
currSeq++ for i := 0; i < 100; i++ {
err = client.removeBlocks(height)
if err == nil {
break
}
if i >= 100 {
panic("findHashMatchedBlock del blocks fail, restart and retry")
}
} }
txs, blockOnMain, seqTy, err := client.RequestTx(currSeq) *currSeq = mainSeq + 1
if err != nil { *preMainBlockHash = miner.MainBlockHash
incSeqFlag = false return
time.Sleep(time.Second)
continue
} }
}
lastBlock, err := client.RequestLastBlock() //正常情况下,打包交易
func (client *client) CreateBlock() {
incSeqFlag := true
currSeq, _, lastSeqMainHash, _, err := client.getLastBlockInfo()
if err != nil { if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err) plog.Error("Parachain GetLastSeq fail", "err", err)
incSeqFlag = false return
time.Sleep(time.Second)
continue
} }
blockedSeq, err := client.GetBlockedSeq(lastBlock.Hash()) for {
lastSeq, lastBlock, _, lastBlockMainHeight, err := client.getLastBlockInfo()
if err != nil { if err != nil {
plog.Error("Parachain GetBlockedSeq fail", "err", err) plog.Error("Parachain GetLastSeq fail", "err", err)
incSeqFlag = false
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
} }
//sequence in main chain start from 0
if blockedSeq == -1 { if incSeqFlag || currSeq == lastSeq {
blockedSeq = 0 currSeq++
} }
savedBlockOnMain, _, err := client.GetBlockOnMainBySeq(blockedSeq)
txs, blockOnMain, seqTy, err := client.RequestTx(&currSeq, lastBlock, &lastSeqMainHash)
if err != nil { if err != nil {
incSeqFlag = false incSeqFlag = false
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
} }
plog.Info("Parachain process block", "blockedSeq", blockedSeq, "blockOnMain.Height", blockOnMain.Height, "savedBlockOnMain.Height", savedBlockOnMain.Block.Height) lastSeqMainHash = blockOnMain.Hash()
lastSeqMainHeight := blockOnMain.Height
plog.Info("Parachain process block", "lastSeq", lastSeq, "curSeq", currSeq, "lastSeqMainHeight", lastSeqMainHeight, "lastBlockMainHeight", lastBlockMainHeight)
if seqTy == delAct { if seqTy == delAct {
if len(txs) == 0 { if len(txs) == 0 {
if blockOnMain.Height > savedBlockOnMain.Block.Height { if lastSeqMainHeight > lastBlockMainHeight {
incSeqFlag = true incSeqFlag = true
continue continue
} }
...@@ -465,7 +577,7 @@ func (client *client) CreateBlock() { ...@@ -465,7 +577,7 @@ func (client *client) CreateBlock() {
} }
} else if seqTy == addAct { } else if seqTy == addAct {
if len(txs) == 0 { if len(txs) == 0 {
if blockOnMain.Height-savedBlockOnMain.Block.Height < emptyBlockInterval { if lastSeqMainHeight-lastBlockMainHeight < emptyBlockInterval {
incSeqFlag = true incSeqFlag = true
continue continue
} }
...@@ -558,6 +670,30 @@ func (client *client) WriteBlock(prev []byte, paraBlock *types.Block, seq int64) ...@@ -558,6 +670,30 @@ func (client *client) WriteBlock(prev []byte, paraBlock *types.Block, seq int64)
return nil return nil
} }
func (client *client) removeBlocks(endHeight int64) error {
for {
lastBlock, err := client.RequestLastBlock()
if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err)
return err
}
if lastBlock.Height == endHeight {
return nil
}
blockedSeq, err := client.GetBlockedSeq(lastBlock.Hash())
if err != nil {
plog.Error("Parachain GetBlockedSeq fail", "err", err)
return err
}
err = client.DelBlock(lastBlock, blockedSeq)
if err != nil {
plog.Error("Parachain GetBlockedSeq fail", "err", err)
return err
}
}
}
// 向blockchain删区块 // 向blockchain删区块
func (client *client) DelBlock(block *types.Block, seq int64) error { func (client *client) DelBlock(block *types.Block, seq int64) error {
plog.Debug("delete block in parachain") plog.Debug("delete block in parachain")
...@@ -591,3 +727,45 @@ func (client *client) DelBlock(block *types.Block, seq int64) error { ...@@ -591,3 +727,45 @@ func (client *client) DelBlock(block *types.Block, seq int64) error {
} }
return nil return nil
} }
func checkMinerTx(current *types.BlockDetail) error {
//检查第一个笔交易的execs, 以及执行状态
if len(current.Block.Txs) == 0 {
return types.ErrEmptyTx
}
baseTx := current.Block.Txs[0]
//判断交易类型和执行情况
var action paracross.ParacrossAction
err := types.Decode(baseTx.GetPayload(), &action)
if err != nil {
return err
}
if action.GetTy() != paracross.ParacrossActionMiner {
return paracross.ErrParaMinerTxType
}
//判断交易执行是否OK
if action.GetMiner() == nil {
return paracross.ErrParaEmptyMinerTx
}
//判断exec 是否成功
if current.Receipts[0].Ty != types.ExecOk {
return paracross.ErrParaMinerExecErr
}
return nil
}
func getMinerTxInfo(block *types.Block) (*paracross.ParacrossNodeStatus, error) {
baseTx := block.Txs[0]
//判断交易类型和执行情况
var action paracross.ParacrossAction
err := types.Decode(baseTx.GetPayload(), &action)
if err != nil {
return nil, err
}
if action.GetTy() != paracross.ParacrossActionMiner {
return nil, paracross.ErrParaMinerTxType
}
return action.GetMiner().Status, nil
}
...@@ -543,30 +543,3 @@ out: ...@@ -543,30 +543,3 @@ out:
} }
} }
func checkMinerTx(current *types.BlockDetail) error {
//检查第一个笔交易的execs, 以及执行状态
if len(current.Block.Txs) == 0 {
return types.ErrEmptyTx
}
baseTx := current.Block.Txs[0]
//判断交易类型和执行情况
var action paracross.ParacrossAction
err := types.Decode(baseTx.GetPayload(), &action)
if err != nil {
return err
}
if action.GetTy() != paracross.ParacrossActionMiner {
return paracross.ErrParaMinerTxType
}
//判断交易执行是否OK
if action.GetMiner() == nil {
return paracross.ErrParaEmptyMinerTx
}
//判断exec 是否成功
if current.Receipts[0].Ty != types.ExecOk {
return paracross.ErrParaMinerExecErr
}
return nil
}
...@@ -83,6 +83,7 @@ emptyBlockInterval=50 ...@@ -83,6 +83,7 @@ emptyBlockInterval=50
authAccount="" authAccount=""
#等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2 #等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2
waitBlocks4CommitMsg=2 waitBlocks4CommitMsg=2
searchHashMatchedBlockDepth=100
[mver.consensus] [mver.consensus]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment