Commit a4ef67b0 authored by mdj33's avatar mdj33 Committed by vipwzw

adapt new blockchain filter para txs

parent 4593674c
......@@ -48,8 +48,8 @@ var (
mainBlockHashForkHeight int64 = 209186 //calc block hash fork height in main chain
mainParaSelfConsensusForkHeight int64 = types.MaxHeight //para chain self consensus height switch, must >= ForkParacrossCommitTx of main
mainForkParacrossCommitTx int64 = types.MaxHeight //support paracross commit tx fork height in main chain: ForkParacrossCommitTx
batchFetchSeqEnable bool
batchFetchSeqNum int64 = 128
fetchFilterParaTxsEnable bool
batchFetchBlockCount int64 = 128
)
func init() {
......@@ -86,8 +86,8 @@ type subConfig struct {
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
MaxCacheCount int64 `json:"maxCacheCount,omitempty"`
MaxSyncErrCount int32 `json:"maxSyncErrCount,omitempty"`
BatchFetchSeqEnable uint32 `json:"batchFetchSeqEnable,omitempty"`
BatchFetchSeqNum int64 `json:"batchFetchSeqNum,omitempty"`
FetchFilterParaTxsEnable uint32 `json:"fetchFilterParaTxsEnable,omitempty"`
BatchFetchBlockCount int64 `json:"batchFetchBlockCount,omitempty"`
ParaConsensStartHeight int64 `json:"paraConsensStartHeight,omitempty"`
}
......@@ -128,12 +128,12 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
mainForkParacrossCommitTx = subcfg.MainForkParacrossCommitTx
}
if subcfg.BatchFetchSeqEnable > 0 {
batchFetchSeqEnable = true
if subcfg.FetchFilterParaTxsEnable > 0 {
fetchFilterParaTxsEnable = true
}
if subcfg.BatchFetchSeqNum > 0 {
batchFetchSeqNum = subcfg.BatchFetchSeqNum
if subcfg.BatchFetchBlockCount > 0 {
batchFetchBlockCount = subcfg.BatchFetchBlockCount
}
pk, err := hex.DecodeString(minerPrivateKey)
......
......@@ -36,7 +36,7 @@ func TestFilterTxsForPara(t *testing.T) {
types.Init(Title, cfg)
detail, filterTxs, _ := createTestTxs(t)
rst := paraexec.FilterTxsForParaByBlock(Title, detail)
rst := paraexec.FilterTxsForPara(detail.FilterParaTxsByTitle(Title))
assert.Equal(t, filterTxs, rst)
......
......@@ -117,7 +117,7 @@ func (client *commitMsgClient) resetNotify() {
}
//新的区块产生,检查是否有commitTx正在发送入口
func (client *commitMsgClient) commitTxCheckNotify(txs []*pt.TxDetail) {
func (client *commitMsgClient) commitTxCheckNotify(txs []*types.TxDetail) {
if client.checkCommitTxSuccess(txs) {
client.sendCommitTx()
}
......@@ -203,7 +203,7 @@ func (client *commitMsgClient) verifyTx(curTx *types.Transaction, verifyTxs map[
}
func (client *commitMsgClient) checkCommitTxSuccess(txs []*pt.TxDetail) bool {
func (client *commitMsgClient) checkCommitTxSuccess(txs []*types.TxDetail) bool {
client.mutex.Lock()
defer client.mutex.Unlock()
......
......@@ -34,7 +34,7 @@ func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) er
return client.setLocalDb(set)
}
func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *pt.ParaTxDetail) error {
func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *types.ParaTxDetail) error {
var newblock pt.ParaLocalDbBlock
newblock.Height = lastBlock.Height + 1
......@@ -312,8 +312,8 @@ func (client *client) getBatchSeqCount(currSeq int64) (int64, error) {
} else {
atomic.StoreInt32(&client.caughtUp, 1)
}
if batchFetchSeqEnable && lastSeq-currSeq > batchFetchSeqNum {
return batchFetchSeqNum, nil
if fetchFilterParaTxsEnable && lastSeq-currSeq > batchFetchBlockCount {
return batchFetchBlockCount, nil
}
return 0, nil
}
......@@ -333,7 +333,7 @@ func (client *client) getBatchSeqCount(currSeq int64) (int64, error) {
}
func verifyMainBlockHash(preMainBlockHash []byte, mainBlock *pt.ParaTxDetail) error {
func verifyMainBlockHash(preMainBlockHash []byte, mainBlock *types.ParaTxDetail) error {
if (bytes.Equal(preMainBlockHash, mainBlock.Header.ParentHash) && mainBlock.Type == addAct) ||
(bytes.Equal(preMainBlockHash, mainBlock.Header.Hash) && mainBlock.Type == delAct) {
return nil
......@@ -344,7 +344,7 @@ func verifyMainBlockHash(preMainBlockHash []byte, mainBlock *pt.ParaTxDetail) er
return pt.ErrParaCurHashNotMatch
}
func verifyMainBlocks(preMainBlockHash []byte, mainBlocks *pt.ParaTxDetails) error {
func verifyMainBlocks(preMainBlockHash []byte, mainBlocks *types.ParaTxDetails) error {
pre := preMainBlockHash
for _, block := range mainBlocks.Items {
err := verifyMainBlockHash(pre, block)
......@@ -361,45 +361,44 @@ func verifyMainBlocks(preMainBlockHash []byte, mainBlocks *pt.ParaTxDetails) err
return nil
}
func (client *client) requestAllMainTxs(currSeq int64, preMainBlockHash []byte) (*pt.ParaTxDetails, error) {
func (client *client) requestTxsFromBlock(currSeq int64, preMainBlockHash []byte) (*types.ParaTxDetails, error) {
blockSeq, err := client.GetBlockOnMainBySeq(currSeq)
if err != nil {
return nil, err
}
txDetail := paraexec.BlockDetail2ParaTxs(blockSeq.Seq.Type, blockSeq.Seq.Hash, blockSeq.Detail)
txDetail := blockSeq.Detail.FilterParaTxsByTitle(types.GetTitle())
txDetail.Type = blockSeq.Seq.Type
err = verifyMainBlockHash(preMainBlockHash, txDetail)
if err != nil {
plog.Error("requestAllMainTxs", "curr seq", currSeq, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
plog.Error("requestTxsFromBlock", "curr seq", currSeq, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
return nil, err
}
return &pt.ParaTxDetails{Items: []*pt.ParaTxDetail{txDetail}}, nil
return &types.ParaTxDetails{Items: []*types.ParaTxDetail{txDetail}}, nil
}
func (client *client) requestParaTxs(currSeq int64, count int64, preMainBlockHash []byte) (*pt.ParaTxDetails, error) {
//req := &pt.ReqParaTxByTitle{Start: currSeq, End: currSeq + count, Title: types.GetTitle()}
//details, err := client.GetParaTxByTitle(req)
//if err != nil {
// return nil, err
//}
func (client *client) requestFilterParaTxs(currSeq int64, count int64, preMainBlockHash []byte) (*types.ParaTxDetails, error) {
req := &types.ReqParaTxByTitle{Start: currSeq, End: currSeq + count, Title: types.GetTitle()}
details, err := client.GetParaTxByTitle(req)
if err != nil {
return nil, err
}
details := &pt.ParaTxDetails{}
err := verifyMainBlocks(preMainBlockHash, details)
err = verifyMainBlocks(preMainBlockHash, details)
if err != nil {
plog.Error("requestParaTxs", "curSeq", currSeq, "count", count, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
plog.Error("requestTxsOnlyPara", "curSeq", currSeq, "count", count, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
return nil, err
}
return details, nil
}
func (client *client) RequestTx(currSeq int64, count int64, preMainBlockHash []byte) (*pt.ParaTxDetails, error) {
if !batchFetchSeqEnable {
return client.requestAllMainTxs(currSeq, preMainBlockHash)
func (client *client) RequestTx(currSeq int64, count int64, preMainBlockHash []byte) (*types.ParaTxDetails, error) {
if fetchFilterParaTxsEnable {
return client.requestFilterParaTxs(currSeq, count, preMainBlockHash)
}
return client.requestParaTxs(currSeq, count, preMainBlockHash)
return client.requestTxsFromBlock(currSeq, preMainBlockHash)
}
func (client *client) processHashNotMatchError(currSeq int64, lastSeqMainHash []byte, err error) (int64, []byte, error) {
......@@ -412,7 +411,7 @@ func (client *client) processHashNotMatchError(currSeq int64, lastSeqMainHash []
return currSeq, lastSeqMainHash, err
}
func (client *client) procLocalBlock(mainBlock *pt.ParaTxDetail) (bool, error) {
func (client *client) procLocalBlock(mainBlock *types.ParaTxDetail) (bool, error) {
lastSeqMainHeight := mainBlock.Header.Height
lastBlock, err := client.getLastLocalBlock()
......@@ -421,7 +420,7 @@ func (client *client) procLocalBlock(mainBlock *pt.ParaTxDetail) (bool, error) {
return false, err
}
txs := paraexec.FilterTxsForPara(types.GetTitle(), mainBlock)
txs := paraexec.FilterTxsForPara(mainBlock)
plog.Info("Parachain process block", "lastBlockHeight", lastBlock.Height, "lastBlockMainHeight", lastBlock.MainHeight,
"lastBlockMainHash", common.ToHex(lastBlock.MainHash), "currMainHeight", lastSeqMainHeight,
......@@ -450,7 +449,7 @@ func (client *client) procLocalBlock(mainBlock *pt.ParaTxDetail) (bool, error) {
}
func (client *client) procLocalBlocks(mainBlocks *pt.ParaTxDetails) error {
func (client *client) procLocalBlocks(mainBlocks *types.ParaTxDetails) error {
var notify bool
for _, main := range mainBlocks.Items {
changed, err := client.procLocalBlock(main)
......
......@@ -149,15 +149,15 @@ func (client *client) GetBlockOnMainBySeq(seq int64) (*types.BlockSeq, error) {
return blockSeq, nil
}
//func (client *client) GetParaTxByTitle(req *pt.ReqParaTxByTitle) (*pt.ParaTxDetails, error) {
// txDetails, err := client.grpcClient.GetParaTxByTitle(context.Background(), req)
// if err != nil {
// plog.Error("GetParaTxByTitle wrong", "err", err.Error(),"start",req.Start,"end",req.End)
// return nil, err
// }
//
// return txDetails, nil
//}
func (client *client) GetParaTxByTitle(req *types.ReqParaTxByTitle) (*types.ParaTxDetails, error) {
txDetails, err := client.grpcClient.GetParaTxByTitle(context.Background(), req)
if err != nil {
plog.Error("GetParaTxByTitle wrong", "err", err.Error(), "start", req.Start, "end", req.End)
return nil, err
}
return txDetails, nil
}
func (client *client) QueryTxOnMainByHash(hash []byte) (*types.TransactionDetail, error) {
detail, err := client.grpcClient.QueryTransaction(context.Background(), &types.ReqHash{Hash: hash})
......
......@@ -152,7 +152,7 @@ func TestVerifyMainBlocks(t *testing.T) {
ParentHash: hash0,
Hash: hash1,
}
block1 := &pt.ParaTxDetail{
block1 := &types.ParaTxDetail{
Type: addAct,
Header: header1,
}
......@@ -161,7 +161,7 @@ func TestVerifyMainBlocks(t *testing.T) {
ParentHash: hash1,
Hash: hash2,
}
block2 := &pt.ParaTxDetail{
block2 := &types.ParaTxDetail{
Type: addAct,
Header: header2,
}
......@@ -170,7 +170,7 @@ func TestVerifyMainBlocks(t *testing.T) {
ParentHash: hash2,
Hash: hash3,
}
block3 := &pt.ParaTxDetail{
block3 := &types.ParaTxDetail{
Type: addAct,
Header: header3,
}
......@@ -180,7 +180,7 @@ func TestVerifyMainBlocks(t *testing.T) {
ParentHash: hash2,
Hash: hash3,
}
block4 := &pt.ParaTxDetail{
block4 := &types.ParaTxDetail{
Type: delAct,
Header: header4,
}
......@@ -189,7 +189,7 @@ func TestVerifyMainBlocks(t *testing.T) {
ParentHash: hash1,
Hash: hash2,
}
block5 := &pt.ParaTxDetail{
block5 := &types.ParaTxDetail{
Type: delAct,
Header: header5,
}
......@@ -198,13 +198,13 @@ func TestVerifyMainBlocks(t *testing.T) {
ParentHash: hash1,
Hash: hash6,
}
block6 := &pt.ParaTxDetail{
block6 := &types.ParaTxDetail{
Type: addAct,
Header: header6,
}
mainBlocks := &pt.ParaTxDetails{
Items: []*pt.ParaTxDetail{block1, block2, block3, block4, block5, block6},
mainBlocks := &types.ParaTxDetails{
Items: []*types.ParaTxDetail{block1, block2, block3, block4, block5, block6},
}
err := verifyMainBlocks(hash0, mainBlocks)
......
......@@ -774,12 +774,12 @@ func getCrossTxHashsByRst(api client.QueueProtocolAPI, status *pt.ParacrossNodeS
}
//抽取平行链交易和跨链交易
paraAllTxs := FilterTxsForParaByBlock(status.Title, blockDetail)
paraAllTxs := FilterTxsForPara(blockDetail.FilterParaTxsByTitle(status.Title))
var baseHashs [][]byte
for _, tx := range paraAllTxs {
baseHashs = append(baseHashs, tx.Hash())
}
paraCrossHashs := FilterParaCrossTxHashes(status.Title, paraAllTxs)
paraCrossHashs := FilterParaCrossTxHashes(paraAllTxs)
crossRst := util.CalcBitMapByBitMap(paraCrossHashs, baseHashs, rst)
return paraCrossHashs, crossRst, nil
......@@ -805,8 +805,8 @@ func getCrossTxHashs(api client.QueueProtocolAPI, status *pt.ParacrossNodeStatus
return nil, nil, err
}
//校验
paraBaseTxs := FilterTxsForParaByBlock(status.Title, blockDetail)
paraCrossHashs := FilterParaCrossTxHashes(status.Title, paraBaseTxs)
paraBaseTxs := FilterTxsForPara(blockDetail.FilterParaTxsByTitle(status.Title))
paraCrossHashs := FilterParaCrossTxHashes(paraBaseTxs)
var baseHashs [][]byte
for _, tx := range paraBaseTxs {
baseHashs = append(baseHashs, tx.Hash())
......
......@@ -230,7 +230,7 @@ func setMinerTxResultFork(status *pt.ParacrossNodeStatus, txs []*types.Transacti
//ForkLoopCheckCommitTxDone 后只保留全部txreseult 结果
if !pt.IsParaForkHeight(status.MainBlockHeight, pt.ForkLoopCheckCommitTxDone) {
//跨链tx结果
crossTxHashs := FilterParaCrossTxHashes(types.GetTitle(), txs)
crossTxHashs := FilterParaCrossTxHashes(txs)
status.CrossTxResult = []byte(hex.EncodeToString(util.CalcBitMap(crossTxHashs, curTxHashs, receipts)))
status.TxHashs = [][]byte{CalcTxHashsHash(curTxHashs)}
status.CrossTxHashs = [][]byte{CalcTxHashsHash(crossTxHashs)}
......
......@@ -46,7 +46,7 @@ func checkReceiptExecOk(receipt *types.ReceiptData) bool {
// 1, 主链+平行链 user.p.xx.paracross 交易组 混合跨链资产转移 paracross主链执行成功
// 2, 平行链 user.p.xx.paracross + user.p.xx.other 混合平行链组合 paracross主链执行成功
// 3, 平行链 user.p.xx.other 交易组 混合平行链组合 other主链pack
func filterParaTxGroup(title string, tx *types.Transaction, allTxs []*pt.TxDetail, index int, blockHeight, forkHeight int64) ([]*types.Transaction, int) {
func filterParaTxGroup(tx *types.Transaction, allTxs []*types.TxDetail, index int, blockHeight, forkHeight int64) ([]*types.Transaction, int) {
var headIdx int
for i := index; i >= 0; i-- {
......@@ -59,7 +59,7 @@ func filterParaTxGroup(title string, tx *types.Transaction, allTxs []*pt.TxDetai
endIdx := headIdx + int(tx.GroupCount)
for i := headIdx; i < endIdx; i++ {
if types.IsPara() && blockHeight < forkHeight {
if types.IsSpecificParaExecName(title, string(allTxs[i].Tx.Execer)) {
if types.IsParaExecName(string(allTxs[i].Tx.Execer)) {
continue
}
}
......@@ -77,40 +77,32 @@ func filterParaTxGroup(title string, tx *types.Transaction, allTxs []*pt.TxDetai
}
//FilterTxsForPara include some main tx in tx group before ForkParacrossCommitTx
func FilterTxsForPara(title string, main *pt.ParaTxDetail) []*types.Transaction {
func FilterTxsForPara(main *types.ParaTxDetail) []*types.Transaction {
var txs []*types.Transaction
forkHeight := pt.GetDappForkHeight(pt.ForkCommitTx)
for i := 0; i < len(main.TxDetails); i++ {
tx := main.TxDetails[i].Tx
if types.IsSpecificParaExecName(title, string(tx.Execer)) {
if tx.GroupCount >= 2 {
mainTxs, endIdx := filterParaTxGroup(title, tx, main.TxDetails, i, main.Header.Height, forkHeight)
txs = append(txs, mainTxs...)
i = endIdx - 1
continue
}
//单独的paracross tx 如果主链执行失败也要排除, 6.2fork原因 没有排除 非user.p.xx.paracross的平行链交易
if main.Header.Height >= forkHeight && bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) && !checkReceiptExecOk(main.TxDetails[i].Receipt) {
continue
}
txs = append(txs, tx)
if tx.GroupCount >= 2 {
mainTxs, endIdx := filterParaTxGroup(tx, main.TxDetails, i, main.Header.Height, forkHeight)
txs = append(txs, mainTxs...)
i = endIdx - 1
continue
}
//单独的paracross tx 如果主链执行失败也要排除, 6.2fork原因 没有排除 非user.p.xx.paracross的平行链交易
if main.Header.Height >= forkHeight && bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) && !checkReceiptExecOk(main.TxDetails[i].Receipt) {
continue
}
txs = append(txs, tx)
}
return txs
}
//FilterTxsForParaByBlock include some main tx in tx group before ForkParacrossCommitTx
func FilterTxsForParaByBlock(title string, main *types.BlockDetail) []*types.Transaction {
txDetail := BlockDetail2ParaTxs(0, main.Block.Hash(), main)
return FilterTxsForPara(title, txDetail)
}
// FilterParaCrossTxHashes only all para chain cross txs like xx.paracross exec
func FilterParaCrossTxHashes(title string, txs []*types.Transaction) [][]byte {
func FilterParaCrossTxHashes(txs []*types.Transaction) [][]byte {
var txHashs [][]byte
for _, tx := range txs {
if types.IsSpecificParaExecName(title, string(tx.Execer)) && bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) {
if types.IsParaExecName(string(tx.Execer)) && bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) {
txHashs = append(txHashs, tx.Hash())
}
}
......@@ -189,32 +181,3 @@ func CalcTxHashsHash(txHashs [][]byte) []byte {
data := types.Encode(totalTxHash)
return common.Sha256(data)
}
//BlockDetail2ParaTxs blockDetail transfer to paraTxDetail
func BlockDetail2ParaTxs(seqType int64, blockHash []byte, blockDetail *types.BlockDetail) *pt.ParaTxDetail {
header := &types.Header{
Version: blockDetail.Block.Version,
ParentHash: blockDetail.Block.ParentHash,
TxHash: blockDetail.Block.TxHash,
StateHash: blockDetail.Block.StateHash,
Height: blockDetail.Block.Height,
BlockTime: blockDetail.Block.BlockTime,
Difficulty: blockDetail.Block.Difficulty,
Signature: blockDetail.Block.Signature,
}
header.Hash = blockHash
txDetail := &pt.ParaTxDetail{
Type: seqType,
Header: header,
}
for i, tx := range blockDetail.Block.Txs {
detail := &pt.TxDetail{
Tx: tx,
Receipt: blockDetail.Receipts[i],
}
txDetail.TxDetails = append(txDetail.TxDetails, detail)
}
return txDetail
}
......@@ -580,7 +580,7 @@ func (s *VoteTestSuite) TestVoteTxFork() {
for _, tx := range txs {
status.TxHashs = append(status.TxHashs, tx.Hash())
}
txHashs := FilterParaCrossTxHashes(Title, txs)
txHashs := FilterParaCrossTxHashes(txs)
status.CrossTxHashs = append(status.CrossTxHashs, txHashs...)
baseCheckTxHash := CalcTxHashsHash(status.TxHashs)
......
......@@ -310,30 +310,6 @@ message ParaLocalDbBlock {
repeated Transaction txs = 6;
}
message ReqParaTxByTitle{
int64 start = 1;
int64 end = 2;
string title = 3;
}
message TxDetail {
uint32 index = 1;
Transaction tx = 2;
ReceiptData receipt = 3;
repeated bytes proofs = 4;
}
message ParaTxDetail{
int64 type = 1;
Header header = 2;
repeated TxDetail txDetails = 3;
}
message ParaTxDetails{
repeated ParaTxDetail items = 1;
}
service paracross {
rpc GetTitle(ReqString) returns (ParacrossConsensusStatus) {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment