Commit 408ddd34 authored by mdj33's avatar mdj33 Committed by vipwzw

proc filter tx

parent 93acdf5c
......@@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/assert"
//"github.com/stretchr/testify/mock"
"encoding/hex"
"errors"
"math/rand"
"testing"
"time"
......@@ -116,20 +115,6 @@ func createTxsGroup(txs []*types.Transaction) ([]*types.Transaction, error) {
return group.Txs, nil
}
func TestGetBlockHashForkHeightOnMainChain(t *testing.T) {
para := new(client)
grpcClient := &typesmocks.Chain33Client{}
grpcClient.On("GetFork", mock.Anything, &types.ReqKey{Key: []byte("ForkBlockHash")}).Return(&types.Int64{Data: 1}, errors.New("err")).Once()
para.grpcClient = grpcClient
_, err := para.GetForkHeightOnMainChain("ForkBlockHash")
assert.NotNil(t, err)
grpcClient.On("GetFork", mock.Anything, &types.ReqKey{Key: []byte("ForkBlockHash")}).Return(&types.Int64{Data: 1}, nil).Once()
ret, err := para.GetForkHeightOnMainChain("ForkBlockHash")
assert.Nil(t, err)
assert.Equal(t, int64(1), ret)
}
func createTestTxs(t *testing.T) (*types.BlockDetail, []*types.Transaction, []*types.Transaction) {
//all para tx group
tx5, err := createCrossParaTx("toB", 5)
......
......@@ -5,7 +5,6 @@
package para
import (
"errors"
"time"
"encoding/hex"
......@@ -20,47 +19,6 @@ import (
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
)
func (client *client) setLocalDb(set *types.LocalDBSet) error {
//如果追赶上主链了,则落盘
if atomic.LoadInt32(&client.isCaughtUp) == 1 {
set.Txid = 1
}
msg := client.GetQueueClient().NewMessage("blockchain", types.EventSetValueByKey, set)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return err
}
if resp.GetData().(*types.Reply).IsOk {
return nil
}
return errors.New(string(resp.GetData().(*types.Reply).GetMsg()))
}
func (client *client) getLocalDb(set *types.LocalDBGet, count int) ([][]byte, error) {
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetValueByKey, set)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return nil, err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return nil, err
}
reply := resp.GetData().(*types.LocalReplyValue)
if len(reply.Values) != count {
plog.Error("Parachain getLocalDb count not match", "expert", count, "real", len(reply.Values))
return nil, types.ErrInvalidParam
}
return reply.Values, nil
}
func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) error {
set := &types.LocalDBSet{}
......@@ -367,7 +325,7 @@ func (client *client) switchLocalHashMatchedBlock() (int64, []byte, error) {
return -2, nil, pt.ErrParaCurHashNotMatch
}
func (client *client) getBatchFetchSeqCount(currSeq int64) (int64, error) {
func (client *client) getBatchSeqCount(currSeq int64) (int64, error) {
lastSeq, err := client.GetLastSeqOnMainChain()
if err != nil {
return 0, err
......@@ -400,82 +358,21 @@ func (client *client) getBatchFetchSeqCount(currSeq int64) (int64, error) {
}
// preBlockHash to identify the same main node
func (client *client) RequestTxOld(currSeq int64, preMainBlockHash []byte) ([]*types.Transaction, *types.BlockSeq, error) {
plog.Debug("Para consensus RequestTx")
lastSeq, err := client.GetLastSeqOnMainChain()
if err != nil {
return nil, nil, err
}
plog.Info("RequestTx", "LastMainSeq", lastSeq, "CurrSeq", currSeq)
if lastSeq >= currSeq {
blockSeq, err := client.GetBlockOnMainBySeq(currSeq)
if err != nil {
return nil, nil, err
}
if (bytes.Equal(preMainBlockHash, blockSeq.Detail.Block.ParentHash) && blockSeq.Seq.Type == addAct) ||
(bytes.Equal(preMainBlockHash, blockSeq.Seq.Hash) && blockSeq.Seq.Type == delAct) {
txs := paraexec.FilterTxsForPara(types.GetTitle(), blockSeq.Detail)
plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", blockSeq.Seq.Type)
if lastSeq-currSeq > emptyBlockInterval {
atomic.StoreInt32(&client.isCaughtUp, 0)
} else {
atomic.StoreInt32(&client.isCaughtUp, 1)
}
return txs, blockSeq, nil
}
//not consistent case be processed at below
plog.Error("RequestTx", "preMainHash", hex.EncodeToString(preMainBlockHash), "currSeq preMainHash", hex.EncodeToString(blockSeq.Detail.Block.ParentHash),
"currSeq mainHash", hex.EncodeToString(blockSeq.Seq.Hash), "curr seq", currSeq, "ty", blockSeq.Seq.Type, "currSeq Mainheight", blockSeq.Detail.Block.Height)
return nil, nil, pt.ErrParaCurHashNotMatch
}
//lastSeq < CurrSeq case:
//lastSeq = currSeq-1, main node not update
if lastSeq+1 == currSeq {
plog.Debug("Waiting new sequence from main chain")
return nil, nil, pt.ErrParaWaitingNewSeq
}
// 1. lastSeq < currSeq-1
// 2. lastSeq >= currSeq and seq not consistent or fork case
return nil, nil, pt.ErrParaCurHashNotMatch
}
func (client *client) RequestTxOldVer(currSeq int64, preMainBlockHash []byte) (*pt.ParaTxDetails, error) {
blockSeq, err := client.GetBlockOnMainBySeq(currSeq)
if err != nil {
return nil, err
}
txDetail := paraexec.BlockDetail2ParaTxs(blockSeq.Seq.Type, blockSeq.Seq.Hash, blockSeq.Detail)
err = verifyTxDetailsHash(preMainBlockHash, txDetail)
if err != nil {
plog.Error("RequestTxOldVer", "curr seq", currSeq, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
return nil, err
}
return &pt.ParaTxDetails{Items: []*pt.ParaTxDetail{txDetail}}, nil
}
func verifyTxDetailsHash(preMainBlockHash []byte, mainBlock *pt.ParaTxDetail) error {
func verifyMainBlockHash(preMainBlockHash []byte, mainBlock *pt.ParaTxDetail) error {
if (bytes.Equal(preMainBlockHash, mainBlock.Header.ParentHash) && mainBlock.Type == addAct) ||
(bytes.Equal(preMainBlockHash, mainBlock.Header.Hash) && mainBlock.Type == delAct) {
return nil
}
plog.Error("verifyTxDetailsHash", "preMainBlockHash", hex.EncodeToString(preMainBlockHash),
plog.Error("verifyMainBlockHash", "preMainBlockHash", hex.EncodeToString(preMainBlockHash),
"mainParentHash", hex.EncodeToString(mainBlock.Header.ParentHash), "mainHash", hex.EncodeToString(mainBlock.Header.Hash),
"type", mainBlock.Type, "height", mainBlock.Header.Height)
return pt.ErrParaCurHashNotMatch
}
func verifyTxDetails(preMainBlockHash []byte, mainBlocks *pt.ParaTxDetails) error {
func verifyMainBlocks(preMainBlockHash []byte, mainBlocks *pt.ParaTxDetails) error {
pre := preMainBlockHash
for _, block := range mainBlocks.Items {
err := verifyTxDetailsHash(pre, block)
err := verifyMainBlockHash(pre, block)
if err != nil {
return err
}
......@@ -484,17 +381,33 @@ func verifyTxDetails(preMainBlockHash []byte, mainBlocks *pt.ParaTxDetails) erro
return nil
}
func (client *client) RequestTxBatch(currSeq int64, count int64, preMainBlockHash []byte) (*pt.ParaTxDetails, error) {
func (client *client) requestAllMainTxs(currSeq int64, preMainBlockHash []byte) (*pt.ParaTxDetails, error) {
blockSeq, err := client.GetBlockOnMainBySeq(currSeq)
if err != nil {
return nil, err
}
txDetail := paraexec.BlockDetail2ParaTxs(blockSeq.Seq.Type, blockSeq.Seq.Hash, blockSeq.Detail)
err = verifyMainBlockHash(preMainBlockHash, txDetail)
if err != nil {
plog.Error("requestAllMainTxs", "curr seq", currSeq, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
return nil, err
}
return &pt.ParaTxDetails{Items: []*pt.ParaTxDetail{txDetail}}, nil
}
func (client *client) requestParaTxs(currSeq int64, count int64, preMainBlockHash []byte) (*pt.ParaTxDetails, error) {
//req := &pt.ReqParaTxByTitle{Start: currSeq, End: currSeq + count, Title: types.GetTitle()}
//items, err := client.GetBlockOnMainBySeq(req)
//details, err := client.GetParaTxByTitle(req)
//if err != nil {
// return nil, nil, err
// return nil, err
//}
details := &pt.ParaTxDetails{}
err := verifyTxDetails(preMainBlockHash, details)
err := verifyMainBlocks(preMainBlockHash, details)
if err != nil {
plog.Error("RequestTxBatch", "curr seq", currSeq, "count", count, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
plog.Error("requestParaTxs", "curSeq", currSeq, "count", count, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
return nil, err
}
return details, nil
......@@ -502,10 +415,10 @@ func (client *client) RequestTxBatch(currSeq int64, count int64, preMainBlockHas
func (client *client) RequestTx(currSeq int64, count int64, preMainBlockHash []byte) (*pt.ParaTxDetails, error) {
if !batchFetchSeqEnable {
return client.RequestTxOldVer(currSeq, preMainBlockHash)
return client.requestAllMainTxs(currSeq, preMainBlockHash)
}
return client.RequestTxBatch(currSeq, count, preMainBlockHash)
return client.requestParaTxs(currSeq, count, preMainBlockHash)
}
......@@ -519,50 +432,59 @@ func (client *client) processHashNotMatchError(currSeq int64, lastSeqMainHash []
return currSeq, lastSeqMainHash, err
}
func (client *client) procLocalBlock(mainBlock *pt.ParaTxDetail) error {
func (client *client) procLocalBlock(mainBlock *pt.ParaTxDetail) (bool, error) {
lastSeqMainHeight := mainBlock.Header.Height
lastBlock, err := client.getLastLocalBlock()
if err != nil {
plog.Error("Parachain getLastLocalBlock", "err", err)
return err
return false, err
}
txs := paraexec.FilterTxsForParaPlus(types.GetTitle(), mainBlock)
txs := paraexec.FilterTxsForPara(types.GetTitle(), mainBlock)
plog.Info("Parachain process block", "lastBlockHeight", lastBlock.Height, "currSeqMainHeight", lastSeqMainHeight,
"lastBlockMainHeight", lastBlock.MainHeight, "lastBlockMainHash", common.ToHex(lastBlock.MainHash), "seqTy", mainBlock.Type)
plog.Info("Parachain process block", "lastBlockHeight", lastBlock.Height, "lastBlockMainHeight", lastBlock.MainHeight,
"lastBlockMainHash", common.ToHex(lastBlock.MainHash), "currMainHeight", lastSeqMainHeight,
"curMainHash", common.ToHex(mainBlock.Header.Hash), "seqTy", mainBlock.Type)
if mainBlock.Type == delAct {
if len(txs) == 0 {
if lastSeqMainHeight > lastBlock.MainHeight {
return nil
return false, nil
}
plog.Info("Delete empty block")
}
return client.delLocalBlock(lastBlock.Height)
return true, client.delLocalBlock(lastBlock.Height)
} else if mainBlock.Type == addAct {
if len(txs) == 0 {
if lastSeqMainHeight-lastBlock.MainHeight < emptyBlockInterval {
return nil
return false, nil
}
plog.Info("Create empty block")
}
return client.createLocalBlock(lastBlock, txs, mainBlock)
return true, client.createLocalBlock(lastBlock, txs, mainBlock)
}
return types.ErrInvalidParam
return false, types.ErrInvalidParam
}
func (client *client) procLocalBlocks(mainBlocks *pt.ParaTxDetails) error {
var notify bool
for _, main := range mainBlocks.Items {
err := client.procLocalBlock(main)
if nil != err {
changed, err := client.procLocalBlock(main)
if err != nil {
return err
}
if changed {
notify = true
}
}
if notify {
client.NotifyLocalChange()
}
return nil
}
......@@ -580,7 +502,7 @@ out:
case <-client.quitCreate:
break out
default:
count, err := client.getBatchFetchSeqCount(currSeq)
count, err := client.getBatchSeqCount(currSeq)
if err != nil {
currSeq, lastSeqMainHash, err = client.processHashNotMatchError(currSeq, lastSeqMainHash, err)
if err == nil {
......
......@@ -8,19 +8,53 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"sync/atomic"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/types"
)
func (client *client) GetBlockedSeq(hash []byte) (int64, error) {
//from blockchain db
blockedSeq, err := client.GetAPI().GetMainSequenceByHash(&types.ReqHash{Hash: hash})
func (client *client) setLocalDb(set *types.LocalDBSet) error {
//如果追赶上主链了,则落盘
if atomic.LoadInt32(&client.isCaughtUp) == 1 {
set.Txid = 1
}
msg := client.GetQueueClient().NewMessage("blockchain", types.EventSetValueByKey, set)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return err
}
if resp.GetData().(*types.Reply).IsOk {
return nil
}
return errors.New(string(resp.GetData().(*types.Reply).GetMsg()))
}
func (client *client) getLocalDb(set *types.LocalDBGet, count int) ([][]byte, error) {
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetValueByKey, set)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return nil, err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return -2, err
return nil, err
}
return blockedSeq.Data, nil
reply := resp.GetData().(*types.LocalReplyValue)
if len(reply.Values) != count {
plog.Error("Parachain getLocalDb count not match", "expert", count, "real", len(reply.Values))
return nil, types.ErrInvalidParam
}
return reply.Values, nil
}
func (client *client) GetBlockByHeight(height int64) (*types.Block, error) {
......@@ -62,16 +96,6 @@ func (client *client) getLastBlockInfo() (*types.Block, error) {
return lastBlock, nil
}
func (client *client) GetForkHeightOnMainChain(key string) (int64, error) {
ret, err := client.grpcClient.GetFork(context.Background(), &types.ReqKey{Key: []byte(key)})
if err != nil {
plog.Error("para get rpc ForkHeight fail", "key", key, "err", err.Error())
return types.MaxHeight, err
}
return ret.Data, nil
}
func (client *client) GetLastHeightOnMainChain() (int64, error) {
header, err := client.grpcClient.GetLastHeader(context.Background(), &types.ReqNil{})
if err != nil {
......@@ -91,15 +115,6 @@ func (client *client) GetLastSeqOnMainChain() (int64, error) {
return seq.Data, nil
}
func (client *client) GetSeqByHeightOnMainChain(height int64) (int64, []byte, error) {
hash, err := client.GetHashByHeightOnMainChain(height)
if err != nil {
return -1, nil, err
}
seq, err := client.GetSeqByHashOnMainChain(hash)
return seq, hash, err
}
func (client *client) GetHashByHeightOnMainChain(height int64) ([]byte, error) {
reply, err := client.grpcClient.GetBlockHash(context.Background(), &types.ReqInt{Height: height})
if err != nil {
......@@ -136,15 +151,15 @@ func (client *client) GetBlockOnMainBySeq(seq int64) (*types.BlockSeq, error) {
return blockSeq, nil
}
func (client *client) GetBlockOnMainByHash(hash []byte) (*types.Block, error) {
blocks, err := client.grpcClient.GetBlockByHashes(context.Background(), &types.ReqHashes{Hashes: [][]byte{hash}})
if err != nil || blocks.Items[0] == nil {
plog.Error("GetBlockOnMainByHash Not found", "blockhash", common.ToHex(hash))
return nil, err
}
return blocks.Items[0].Block, nil
}
//func (client *client) GetParaTxByTitle(req *pt.ReqParaTxByTitle) (*pt.ParaTxDetails, error) {
// txDetails, err := client.grpcClient.GetParaTxByTitle(context.Background(), req)
// if err != nil {
// plog.Error("GetParaTxByTitle wrong", "err", err.Error(),"start",req.Start,"end",req.End)
// return nil, err
// }
//
// return txDetails, nil
//}
func (client *client) QueryTxOnMainByHash(hash []byte) (*types.TransactionDetail, error) {
detail, err := client.grpcClient.QueryTransaction(context.Background(), &types.ReqHash{Hash: hash})
......
......@@ -759,7 +759,7 @@ func getCrossTxHashsByRst(api client.QueueProtocolAPI, status *pt.ParacrossNodeS
}
//抽取平行链交易和跨链交易
paraAllTxs := FilterTxsForPara(status.Title, blockDetail)
paraAllTxs := FilterTxsForParaByBlock(status.Title, blockDetail)
var baseHashs [][]byte
for _, tx := range paraAllTxs {
baseHashs = append(baseHashs, tx.Hash())
......@@ -790,7 +790,7 @@ func getCrossTxHashs(api client.QueueProtocolAPI, status *pt.ParacrossNodeStatus
return nil, nil, err
}
//校验
paraBaseTxs := FilterTxsForPara(status.Title, blockDetail)
paraBaseTxs := FilterTxsForParaByBlock(status.Title, blockDetail)
paraCrossHashs := FilterParaCrossTxHashes(status.Title, paraBaseTxs)
var baseHashs [][]byte
for _, tx := range paraBaseTxs {
......
......@@ -46,33 +46,7 @@ func checkReceiptExecOk(receipt *types.ReceiptData) bool {
// 1, 主链+平行链 user.p.xx.paracross 交易组 混合跨链资产转移 paracross主链执行成功
// 2, 平行链 user.p.xx.paracross + user.p.xx.other 混合平行链组合 paracross主链执行成功
// 3, 平行链 user.p.xx.other 交易组 混合平行链组合 other主链pack
func filterParaTxGroup(title string, tx *types.Transaction, main *types.BlockDetail, index int, forkHeight int64) ([]*types.Transaction, int) {
var headIdx int
for i := index; i >= 0; i-- {
if bytes.Equal(tx.Header, main.Block.Txs[i].Hash()) {
headIdx = i
break
}
}
endIdx := headIdx + int(tx.GroupCount)
for i := headIdx; i < endIdx; i++ {
if types.IsPara() && main.Block.Height < forkHeight {
if types.IsSpecificParaExecName(title, string(main.Block.Txs[i].Execer)) {
continue
}
}
if !checkReceiptExecOk(main.Receipts[i]) {
return nil, endIdx
}
}
//全部是平行链交易 或平行链在主链执行成功的tx
return main.Block.Txs[headIdx:endIdx], endIdx
}
func filterParaTxGroupPlus(title string, tx *types.Transaction, allTxs []*pt.TxDetail, index int, blockHeight, forkHeight int64) ([]*types.Transaction, int) {
func filterParaTxGroup(title string, tx *types.Transaction, allTxs []*pt.TxDetail, index int, blockHeight, forkHeight int64) ([]*types.Transaction, int) {
var headIdx int
for i := index; i >= 0; i-- {
......@@ -103,38 +77,14 @@ func filterParaTxGroupPlus(title string, tx *types.Transaction, allTxs []*pt.TxD
}
//FilterTxsForPara include some main tx in tx group before ForkParacrossCommitTx
func FilterTxsForPara(title string, main *types.BlockDetail) []*types.Transaction {
var txs []*types.Transaction
forkHeight := pt.GetDappForkHeight(pt.ForkCommitTx)
for i := 0; i < len(main.Block.Txs); i++ {
tx := main.Block.Txs[i]
if types.IsSpecificParaExecName(title, string(tx.Execer)) {
if tx.GroupCount >= 2 {
mainTxs, endIdx := filterParaTxGroup(title, tx, main, i, forkHeight)
txs = append(txs, mainTxs...)
i = endIdx - 1
continue
}
//单独的paracross tx 如果主链执行失败也要排除, 6.2fork原因 没有排除 非user.p.xx.paracross的平行链交易
if main.Block.Height >= forkHeight && bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) && !checkReceiptExecOk(main.Receipts[i]) {
continue
}
txs = append(txs, tx)
}
}
return txs
}
//FilterTxsForParaPlus include some main tx in tx group before ForkParacrossCommitTx
func FilterTxsForParaPlus(title string, main *pt.ParaTxDetail) []*types.Transaction {
func FilterTxsForPara(title string, main *pt.ParaTxDetail) []*types.Transaction {
var txs []*types.Transaction
forkHeight := pt.GetDappForkHeight(pt.ForkCommitTx)
for i := 0; i < len(main.TxDetails); i++ {
tx := main.TxDetails[i].Tx
if types.IsSpecificParaExecName(title, string(tx.Execer)) {
if tx.GroupCount >= 2 {
mainTxs, endIdx := filterParaTxGroupPlus(title, tx, main.TxDetails, i, main.Header.Height, forkHeight)
mainTxs, endIdx := filterParaTxGroup(title, tx, main.TxDetails, i, main.Header.Height, forkHeight)
txs = append(txs, mainTxs...)
i = endIdx - 1
continue
......@@ -150,6 +100,12 @@ func FilterTxsForParaPlus(title string, main *pt.ParaTxDetail) []*types.Transact
return txs
}
//FilterTxsForParaByBlock include some main tx in tx group before ForkParacrossCommitTx
func FilterTxsForParaByBlock(title string, main *types.BlockDetail) []*types.Transaction {
txDetail := BlockDetail2ParaTxs(0, main.Block.Hash(), main)
return FilterTxsForPara(title, txDetail)
}
// FilterParaCrossTxHashes only all para chain cross txs like xx.paracross exec
func FilterParaCrossTxHashes(title string, txs []*types.Transaction) [][]byte {
var txHashs [][]byte
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment