Commit ce619cf7 authored by mdj33's avatar mdj33 Committed by vipwzw

support batch fetch tx

parent 7174ecf7
...@@ -49,6 +49,8 @@ var ( ...@@ -49,6 +49,8 @@ var (
mainParaSelfConsensusForkHeight int64 = types.MaxHeight //para chain self consensus height switch, must >= ForkParacrossCommitTx of main mainParaSelfConsensusForkHeight int64 = types.MaxHeight //para chain self consensus height switch, must >= ForkParacrossCommitTx of main
mainForkParacrossCommitTx int64 = types.MaxHeight //support paracross commit tx fork height in main chain: ForkParacrossCommitTx mainForkParacrossCommitTx int64 = types.MaxHeight //support paracross commit tx fork height in main chain: ForkParacrossCommitTx
localCacheCount int64 = 1000 // local cache block max count localCacheCount int64 = 1000 // local cache block max count
batchFetchSeqEnable bool
batchFetchSeqNum int64 = 128
) )
func init() { func init() {
...@@ -86,6 +88,8 @@ type subConfig struct { ...@@ -86,6 +88,8 @@ type subConfig struct {
MainForkParacrossCommitTx int64 `json:"mainForkParacrossCommitTx,omitempty"` MainForkParacrossCommitTx int64 `json:"mainForkParacrossCommitTx,omitempty"`
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"` WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
LocalCacheCount int64 `json:"localCacheCount,omitempty"` LocalCacheCount int64 `json:"localCacheCount,omitempty"`
BatchFetchSeqEnable uint32 `json:"batchFetchSeqEnable,omitempty"`
BatchFetchSeqNum int64 `json:"batchFetchSeqNum,omitempty"`
} }
// New function to init paracross env // New function to init paracross env
...@@ -129,6 +133,14 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -129,6 +133,14 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
localCacheCount = subcfg.LocalCacheCount localCacheCount = subcfg.LocalCacheCount
} }
if subcfg.BatchFetchSeqEnable > 0 {
batchFetchSeqEnable = true
}
if subcfg.BatchFetchSeqNum > 0 {
batchFetchSeqNum = subcfg.BatchFetchSeqNum
}
pk, err := hex.DecodeString(minerPrivateKey) pk, err := hex.DecodeString(minerPrivateKey)
if err != nil { if err != nil {
panic(err) panic(err)
......
...@@ -76,16 +76,24 @@ func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) er ...@@ -76,16 +76,24 @@ func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) er
return client.setLocalDb(set) return client.setLocalDb(set)
} }
func (client *client) checkCommitTxSuccess(detail *types.BlockDetail) { func (client *client) checkCommitTxSuccess(txs []*pt.TxDetail) {
if atomic.LoadInt32(&client.isCaughtUp) != 1 || !client.commitMsgClient.isSendingCommitMsg() { if atomic.LoadInt32(&client.isCaughtUp) != 1 || !client.commitMsgClient.isSendingCommitMsg() {
return return
} }
txMap := make(map[string]bool) txMap := make(map[string]bool)
curTx := client.commitMsgClient.getCurrentTx()
for i, tx := range detail.Block.Txs { if types.IsParaExecName(string(curTx.Execer)) {
if bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) && detail.Receipts[i].Ty == types.ExecOk { for _, tx := range txs {
txMap[string(tx.Hash())] = true if bytes.HasSuffix(tx.Tx.Execer, []byte(pt.ParaX)) && tx.Receipt.Ty == types.ExecOk {
txMap[string(tx.Tx.Hash())] = true
}
}
} else {
//去主链查询
receipt, _ := client.QueryTxOnMainByHash(curTx.Hash())
if receipt != nil && receipt.Receipt.Ty == types.ExecOk {
txMap[string(curTx.Hash())] = true
} }
} }
...@@ -93,14 +101,14 @@ func (client *client) checkCommitTxSuccess(detail *types.BlockDetail) { ...@@ -93,14 +101,14 @@ func (client *client) checkCommitTxSuccess(detail *types.BlockDetail) {
} }
func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *types.BlockSeq) error { func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *pt.ParaTxDetail) error {
var newblock pt.ParaLocalDbBlock var newblock pt.ParaLocalDbBlock
newblock.Height = lastBlock.Height + 1 newblock.Height = lastBlock.Height + 1
newblock.MainHash = mainBlock.Seq.Hash newblock.MainHash = mainBlock.Header.Hash
newblock.MainHeight = mainBlock.Detail.Block.Height newblock.MainHeight = mainBlock.Header.Height
newblock.ParentMainHash = lastBlock.MainHash newblock.ParentMainHash = lastBlock.MainHash
newblock.BlockTime = mainBlock.Detail.Block.BlockTime newblock.BlockTime = mainBlock.Header.BlockTime
newblock.Txs = txs newblock.Txs = txs
...@@ -108,7 +116,7 @@ func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*ty ...@@ -108,7 +116,7 @@ func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*ty
if err != nil { if err != nil {
return err return err
} }
client.checkCommitTxSuccess(mainBlock.Detail) client.checkCommitTxSuccess(mainBlock.TxDetails)
//err = client.createBlockTemp(txs, mainBlock) //err = client.createBlockTemp(txs, mainBlock)
return err return err
} }
...@@ -365,8 +373,41 @@ func (client *client) switchLocalHashMatchedBlock() (int64, []byte, error) { ...@@ -365,8 +373,41 @@ func (client *client) switchLocalHashMatchedBlock() (int64, []byte, error) {
return -2, nil, pt.ErrParaCurHashNotMatch return -2, nil, pt.ErrParaCurHashNotMatch
} }
func (client *client) getBatchFetchSeqCount(currSeq int64) (int64, error) {
lastSeq, err := client.GetLastSeqOnMainChain()
if err != nil {
return 0, err
}
if lastSeq > currSeq {
if lastSeq-currSeq > emptyBlockInterval {
atomic.StoreInt32(&client.isCaughtUp, 0)
} else {
atomic.StoreInt32(&client.isCaughtUp, 1)
}
if batchFetchSeqEnable && lastSeq-currSeq > batchFetchSeqNum {
return batchFetchSeqNum, nil
}
return 0, nil
}
if lastSeq == currSeq {
return 0, nil
}
// lastSeq = currSeq -1
if lastSeq+1 == currSeq {
plog.Debug("Waiting new sequence from main chain")
return 0, pt.ErrParaWaitingNewSeq
}
// lastSeq < currSeq-1
return 0, pt.ErrParaCurHashNotMatch
}
// preBlockHash to identify the same main node // preBlockHash to identify the same main node
func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*types.Transaction, *types.BlockSeq, error) { func (client *client) RequestTxOld(currSeq int64, preMainBlockHash []byte) ([]*types.Transaction, *types.BlockSeq, error) {
plog.Debug("Para consensus RequestTx") plog.Debug("Para consensus RequestTx")
lastSeq, err := client.GetLastSeqOnMainChain() lastSeq, err := client.GetLastSeqOnMainChain()
if err != nil { if err != nil {
...@@ -410,6 +451,128 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type ...@@ -410,6 +451,128 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type
return nil, nil, pt.ErrParaCurHashNotMatch return nil, nil, pt.ErrParaCurHashNotMatch
} }
func (client *client) RequestTxOldVer(currSeq int64, preMainBlockHash []byte) (*pt.ParaTxDetails, error) {
blockSeq, err := client.GetBlockOnMainBySeq(currSeq)
if err != nil {
return nil, err
}
txDetail := paraexec.BlockDetail2ParaTxs(blockSeq.Seq.Type, blockSeq.Seq.Hash, blockSeq.Detail)
err = verifyTxDetailsHash(preMainBlockHash, txDetail)
if err != nil {
plog.Error("RequestTxOldVer", "curr seq", currSeq, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
return nil, err
}
return &pt.ParaTxDetails{Items: []*pt.ParaTxDetail{txDetail}}, nil
}
func verifyTxDetailsHash(preMainBlockHash []byte, mainBlock *pt.ParaTxDetail) error {
if (bytes.Equal(preMainBlockHash, mainBlock.Header.ParentHash) && mainBlock.Type == addAct) ||
(bytes.Equal(preMainBlockHash, mainBlock.Header.Hash) && mainBlock.Type == delAct) {
return nil
}
plog.Error("verifyTxDetailsHash", "preMainBlockHash", hex.EncodeToString(preMainBlockHash),
"mainParentHash", hex.EncodeToString(mainBlock.Header.ParentHash), "mainHash", hex.EncodeToString(mainBlock.Header.Hash),
"type", mainBlock.Type, "height", mainBlock.Header.Height)
return pt.ErrParaCurHashNotMatch
}
func verifyTxDetails(preMainBlockHash []byte, mainBlocks *pt.ParaTxDetails) error {
pre := preMainBlockHash
for _, block := range mainBlocks.Items {
err := verifyTxDetailsHash(pre, block)
if err != nil {
return err
}
pre = block.Header.Hash
}
return nil
}
func (client *client) RequestTxBatch(currSeq int64, count int64, preMainBlockHash []byte) (*pt.ParaTxDetails, error) {
//req := &pt.ReqParaTxByTitle{Start: currSeq, End: currSeq + count, Title: types.GetTitle()}
//items, err := client.GetBlockOnMainBySeq(req)
//if err != nil {
// return nil, nil, err
//}
details := &pt.ParaTxDetails{}
err := verifyTxDetails(preMainBlockHash, details)
if err != nil {
plog.Error("RequestTxBatch", "curr seq", currSeq, "count", count, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
return nil, err
}
return details, nil
}
func (client *client) RequestTx(currSeq int64, count int64, preMainBlockHash []byte) (*pt.ParaTxDetails, error) {
if !batchFetchSeqEnable {
return client.RequestTxOldVer(currSeq, preMainBlockHash)
}
return client.RequestTxBatch(currSeq, count, preMainBlockHash)
}
func (client *client) processHashNotMatchError(currSeq int64, lastSeqMainHash []byte, err error) (int64, []byte, error) {
if err == pt.ErrParaCurHashNotMatch {
preSeq, preSeqMainHash, err := client.switchHashMatchedBlock()
if err == nil {
return preSeq + 1, preSeqMainHash, nil
}
}
return currSeq, lastSeqMainHash, err
}
func (client *client) procLocalBlock(mainBlock *pt.ParaTxDetail) error {
lastSeqMainHeight := mainBlock.Header.Height
lastBlock, err := client.getLastLocalBlock()
if err != nil {
plog.Error("Parachain getLastLocalBlock", "err", err)
return err
}
txs := paraexec.FilterTxsForParaPlus(types.GetTitle(), mainBlock)
plog.Info("Parachain process block", "lastBlockHeight", lastBlock.Height, "currSeqMainHeight", lastSeqMainHeight,
"lastBlockMainHeight", lastBlock.MainHeight, "lastBlockMainHash", common.ToHex(lastBlock.MainHash), "seqTy", mainBlock.Type)
if mainBlock.Type == delAct {
if len(txs) == 0 {
if lastSeqMainHeight > lastBlock.MainHeight {
return nil
}
plog.Info("Delete empty block")
}
//client.DelBlock(lastBlock.Height, 0)
return client.delLocalBlock(lastBlock.Height)
} else if mainBlock.Type == addAct {
if len(txs) == 0 {
if lastSeqMainHeight-lastBlock.MainHeight < emptyBlockInterval {
return nil
}
plog.Info("Create empty block")
}
return client.createLocalBlock(lastBlock, txs, mainBlock)
}
return types.ErrInvalidParam
}
func (client *client) procLocalBlocks(mainBlocks *pt.ParaTxDetails) error {
for _, main := range mainBlocks.Items {
err := client.procLocalBlock(main)
if nil != err {
return err
}
}
return nil
}
func (client *client) CreateBlock() { func (client *client) CreateBlock() {
lastSeq, lastSeqMainHash, err := client.getLastLocalBlockSeq() lastSeq, lastSeqMainHash, err := client.getLastLocalBlockSeq()
if err != nil { if err != nil {
...@@ -418,70 +581,42 @@ func (client *client) CreateBlock() { ...@@ -418,70 +581,42 @@ func (client *client) CreateBlock() {
} }
currSeq := lastSeq + 1 currSeq := lastSeq + 1
for { for {
txs, mainBlock, err := client.RequestTx(currSeq, lastSeqMainHash) count, err := client.getBatchFetchSeqCount(currSeq)
if err != nil { if err != nil {
if err == pt.ErrParaCurHashNotMatch { currSeq, lastSeqMainHash, err = client.processHashNotMatchError(currSeq, lastSeqMainHash, err)
preSeq, preSeqMainHash, err := client.switchHashMatchedBlock() if err == nil {
if err == nil { continue
currSeq = preSeq + 1
lastSeqMainHash = preSeqMainHash
continue
}
} }
time.Sleep(time.Second * time.Duration(blockSec)) time.Sleep(time.Second * time.Duration(blockSec))
continue continue
} }
lastSeqMainHeight := mainBlock.Detail.Block.Height plog.Info("Parachain CreateBlock", "curSeq", currSeq, "count", count, "lastSeqMainHash", common.ToHex(lastSeqMainHash))
lastSeqMainHash = mainBlock.Seq.Hash paraTxs, err := client.RequestTx(currSeq, count, lastSeqMainHash)
if mainBlock.Seq.Type == delAct {
lastSeqMainHash = mainBlock.Detail.Block.ParentHash
}
lastBlock, err := client.getLastLocalBlock()
if err != nil { if err != nil {
plog.Error("Parachain getLastLocalBlock", "err", err) currSeq, lastSeqMainHash, err = client.processHashNotMatchError(currSeq, lastSeqMainHash, err)
time.Sleep(time.Second)
continue continue
} }
plog.Info("Parachain process block", "curSeq", currSeq, "lastBlockHeight", lastBlock.Height, if count+1 != int64(len(paraTxs.Items)) {
"currSeqMainHeight", lastSeqMainHeight, "currSeqMainHash", common.ToHex(lastSeqMainHash), plog.Error("para CreateBlock count not match", "count", count+1, "items", len(paraTxs.Items))
"lastBlockMainHeight", lastBlock.MainHeight, "lastBlockMainHash", common.ToHex(lastBlock.MainHash), "seqTy", mainBlock.Seq.Type) continue
if mainBlock.Seq.Type == delAct {
if len(txs) == 0 {
if lastSeqMainHeight > lastBlock.MainHeight {
currSeq++
continue
}
plog.Info("Delete empty block")
}
err = client.delLocalBlock(lastBlock.Height)
client.NotifyLocalChange()
} else if mainBlock.Seq.Type == addAct {
if len(txs) == 0 {
if lastSeqMainHeight-lastBlock.MainHeight < emptyBlockInterval {
currSeq++
continue
}
plog.Info("Create empty block")
}
err = client.createLocalBlock(lastBlock, txs, mainBlock)
client.NotifyLocalChange()
} else {
err = types.ErrInvalidParam
} }
err = client.procLocalBlocks(paraTxs)
if err != nil { if err != nil {
plog.Error("para CreateBlock", "type", mainBlock.Seq.Type, "err", err.Error()) //根据localblock,重新搜索匹配
time.Sleep(time.Second) lastSeqMainHash = nil
plog.Error("para CreateBlock.procLocalBlocks", "err", err.Error())
continue continue
} }
currSeq++
//重新设定seq和lastSeqMainHash
lastSeqMainHash = paraTxs.Items[count].Header.Hash
if paraTxs.Items[count].Type == delAct {
lastSeqMainHash = paraTxs.Items[count].Header.ParentHash
}
currSeq = currSeq + count + 1
} }
} }
...@@ -145,3 +145,13 @@ func (client *client) GetBlockOnMainByHash(hash []byte) (*types.Block, error) { ...@@ -145,3 +145,13 @@ func (client *client) GetBlockOnMainByHash(hash []byte) (*types.Block, error) {
return blocks.Items[0].Block, nil return blocks.Items[0].Block, nil
} }
func (client *client) QueryTxOnMainByHash(hash []byte) (*types.TransactionDetail, error) {
detail, err := client.grpcClient.QueryTransaction(context.Background(), &types.ReqHash{Hash: hash})
if err != nil {
plog.Error("QueryTxOnMainByHash Not found", "txhash", common.ToHex(hash))
return nil, err
}
return detail, nil
}
...@@ -72,6 +72,36 @@ func filterParaTxGroup(title string, tx *types.Transaction, main *types.BlockDet ...@@ -72,6 +72,36 @@ func filterParaTxGroup(title string, tx *types.Transaction, main *types.BlockDet
return main.Block.Txs[headIdx:endIdx], endIdx return main.Block.Txs[headIdx:endIdx], endIdx
} }
func filterParaTxGroupPlus(title string, tx *types.Transaction, allTxs []*pt.TxDetail, index int, blockHeight, forkHeight int64) ([]*types.Transaction, int) {
var headIdx int
for i := index; i >= 0; i-- {
if bytes.Equal(tx.Header, allTxs[i].Tx.Hash()) {
headIdx = i
break
}
}
endIdx := headIdx + int(tx.GroupCount)
for i := headIdx; i < endIdx; i++ {
if types.IsPara() && blockHeight < forkHeight {
if types.IsSpecificParaExecName(title, string(allTxs[i].Tx.Execer)) {
continue
}
}
if !checkReceiptExecOk(allTxs[i].Receipt) {
return nil, endIdx
}
}
//全部是平行链交易 或平行链在主链执行成功的tx
var retTxs []*types.Transaction
for _, retTx := range allTxs[headIdx:endIdx] {
retTxs = append(retTxs, retTx.Tx)
}
return retTxs, endIdx
}
//FilterTxsForPara include some main tx in tx group before ForkParacrossCommitTx //FilterTxsForPara include some main tx in tx group before ForkParacrossCommitTx
func FilterTxsForPara(title string, main *types.BlockDetail) []*types.Transaction { func FilterTxsForPara(title string, main *types.BlockDetail) []*types.Transaction {
var txs []*types.Transaction var txs []*types.Transaction
...@@ -96,6 +126,30 @@ func FilterTxsForPara(title string, main *types.BlockDetail) []*types.Transactio ...@@ -96,6 +126,30 @@ func FilterTxsForPara(title string, main *types.BlockDetail) []*types.Transactio
return txs return txs
} }
//FilterTxsForPara include some main tx in tx group before ForkParacrossCommitTx
func FilterTxsForParaPlus(title string, main *pt.ParaTxDetail) []*types.Transaction {
var txs []*types.Transaction
forkHeight := pt.GetDappForkHeight(pt.ForkCommitTx)
for i := 0; i < len(main.TxDetails); i++ {
tx := main.TxDetails[i].Tx
if types.IsSpecificParaExecName(title, string(tx.Execer)) {
if tx.GroupCount >= 2 {
mainTxs, endIdx := filterParaTxGroupPlus(title, tx, main.TxDetails, i, main.Header.Height, forkHeight)
txs = append(txs, mainTxs...)
i = endIdx - 1
continue
}
//单独的paracross tx 如果主链执行失败也要排除, 6.2fork原因 没有排除 非user.p.xx.paracross的平行链交易
if main.Header.Height >= forkHeight && bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) && !checkReceiptExecOk(main.TxDetails[i].Receipt) {
continue
}
txs = append(txs, tx)
}
}
return txs
}
// FilterParaCrossTxHashes only all para chain cross txs like xx.paracross exec // FilterParaCrossTxHashes only all para chain cross txs like xx.paracross exec
func FilterParaCrossTxHashes(title string, txs []*types.Transaction) [][]byte { func FilterParaCrossTxHashes(title string, txs []*types.Transaction) [][]byte {
var txHashs [][]byte var txHashs [][]byte
...@@ -179,3 +233,32 @@ func CalcTxHashsHash(txHashs [][]byte) []byte { ...@@ -179,3 +233,32 @@ func CalcTxHashsHash(txHashs [][]byte) []byte {
data := types.Encode(totalTxHash) data := types.Encode(totalTxHash)
return common.Sha256(data) return common.Sha256(data)
} }
//BlockDetail2ParaTxs blockDetail transfer to paraTxDetail
func BlockDetail2ParaTxs(seqType int64, blockHash []byte, blockDetail *types.BlockDetail) *pt.ParaTxDetail {
header := &types.Header{
Version: blockDetail.Block.Version,
ParentHash: blockDetail.Block.ParentHash,
TxHash: blockDetail.Block.TxHash,
StateHash: blockDetail.Block.StateHash,
Height: blockDetail.Block.Height,
BlockTime: blockDetail.Block.BlockTime,
Difficulty: blockDetail.Block.Difficulty,
Signature: blockDetail.Block.Signature,
}
header.Hash = blockHash
txDetail := &pt.ParaTxDetail{
Type: seqType,
Header: header,
}
for i, tx := range blockDetail.Block.Txs {
detail := &pt.TxDetail{
Tx: tx,
Receipt: blockDetail.Receipts[i],
}
txDetail.TxDetails = append(txDetail.TxDetails, detail)
}
return txDetail
}
...@@ -310,6 +310,30 @@ message ParaLocalDbBlock { ...@@ -310,6 +310,30 @@ message ParaLocalDbBlock {
repeated Transaction txs = 6; repeated Transaction txs = 6;
} }
message ReqParaTxByTitle{
int64 start = 1;
int64 end = 2;
string title = 3;
}
message TxDetail {
uint32 index = 1;
Transaction tx = 2;
ReceiptData receipt = 3;
repeated bytes proofs = 4;
}
message ParaTxDetail{
int64 type = 1;
Header header = 2;
repeated TxDetail txDetails = 3;
}
message ParaTxDetails{
repeated ParaTxDetail items = 1;
}
service paracross { service paracross {
rpc GetTitle(ReqString) returns (ParacrossConsensusStatus) {} rpc GetTitle(ReqString) returns (ParacrossConsensusStatus) {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment