Commit c7ef3f51 authored by mdj33's avatar mdj33 Committed by vipwzw

move filtertx to paracross

parent ae8d97aa
......@@ -25,9 +25,8 @@ import (
drivers "github.com/33cn/chain33/system/consensus"
cty "github.com/33cn/chain33/system/dapp/coins/types"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
paracross "github.com/33cn/plugin/plugin/dapp/paracross/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
paraexec "github.com/33cn/plugin/plugin/dapp/paracross/executor"
)
const (
......@@ -427,7 +426,7 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type
(bytes.Equal(preMainBlockHash, blockSeq.Detail.Block.ParentHash) && blockSeq.Seq.Type == addAct) ||
(bytes.Equal(preMainBlockHash, blockSeq.Seq.Hash) && blockSeq.Seq.Type == delAct) {
txs := util.FilterTxsForPara(types.GetTitle(), blockSeq.Detail)
txs := paraexec.FilterTxsForPara(types.GetTitle(), blockSeq.Detail)
plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", blockSeq.Seq.Type)
client.mtx.Lock()
......@@ -447,18 +446,18 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type
//not consistent case be processed at below
plog.Error("RequestTx", "preMainHash", hex.EncodeToString(preMainBlockHash), "currSeq preMainHash", hex.EncodeToString(blockSeq.Detail.Block.ParentHash),
"currSeq mainHash", hex.EncodeToString(blockSeq.Seq.Hash), "curr seq", currSeq, "ty", blockSeq.Seq.Type, "currSeq Mainheight", blockSeq.Detail.Block.Height)
return nil, nil, paracross.ErrParaCurHashNotMatch
return nil, nil, pt.ErrParaCurHashNotMatch
}
//lastSeq < CurrSeq case:
//lastSeq = currSeq-1, main node not update
if lastSeq+1 == currSeq {
plog.Debug("Waiting new sequence from main chain")
return nil, nil, paracross.ErrParaWaitingNewSeq
return nil, nil, pt.ErrParaWaitingNewSeq
}
// 1. lastSeq < currSeq-1
// 2. lastSeq >= currSeq and seq not consistent or fork case
return nil, nil, paracross.ErrParaCurHashNotMatch
return nil, nil, pt.ErrParaCurHashNotMatch
}
//genesis block scenario, new main node's blockHash as preMainHash, genesis sequence+1 as currSeq
......@@ -527,7 +526,7 @@ func (client *client) switchHashMatchedBlock(currSeq int64) (int64, []byte, erro
"new currSeq", mainSeq+1, "new preMainBlockHash", hex.EncodeToString(block.MainHash))
return mainSeq + 1, block.MainHash, nil
}
return -2, nil, paracross.ErrParaCurHashNotMatch
return -2, nil, pt.ErrParaCurHashNotMatch
}
func (client *client) removeBlocks(endHeight int64) error {
......@@ -582,7 +581,7 @@ func (client *client) CreateBlock() {
txs, blockOnMain, err := client.RequestTx(currSeq, lastSeqMainHash)
if err != nil {
incSeqFlag = false
if err == paracross.ErrParaCurHashNotMatch {
if err == pt.ErrParaCurHashNotMatch {
newSeq, newSeqMainHash, err := client.switchHashMatchedBlock(currSeq)
if err == nil {
currSeq = newSeq
......@@ -664,11 +663,11 @@ func (client *client) addMinerTx(preStateHash []byte, block *types.Block, main *
for _, tx := range txs {
status.TxHashs = append(status.TxHashs, tx.Hash())
}
txHashs := util.FilterParaCrossTxHashes(types.GetTitle(), txs)
txHashs := paraexec.FilterParaCrossTxHashes(types.GetTitle(), txs)
status.CrossTxHashs = append(status.CrossTxHashs, txHashs...)
}
tx, err := paracross.CreateRawMinerTx(&pt.ParacrossMinerAction{
tx, err := pt.CreateRawMinerTx(&pt.ParacrossMinerAction{
Status: status,
IsSelfConsensus: isParaSelfConsensusForked(status.MainBlockHeight),
})
......@@ -794,22 +793,22 @@ func checkMinerTx(current *types.BlockDetail) error {
}
baseTx := current.Block.Txs[0]
//判断交易类型和执行情况
var action paracross.ParacrossAction
var action pt.ParacrossAction
err := types.Decode(baseTx.GetPayload(), &action)
if err != nil {
return err
}
if action.GetTy() != paracross.ParacrossActionMiner {
return paracross.ErrParaMinerTxType
if action.GetTy() != pt.ParacrossActionMiner {
return pt.ErrParaMinerTxType
}
//判断交易执行是否OK
if action.GetMiner() == nil {
return paracross.ErrParaEmptyMinerTx
return pt.ErrParaEmptyMinerTx
}
//判断exec 是否成功
if current.Receipts[0].Ty != types.ExecOk {
return paracross.ErrParaMinerExecErr
return pt.ErrParaMinerExecErr
}
return nil
}
......
......@@ -15,9 +15,9 @@ import (
"github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/types"
typesmocks "github.com/33cn/chain33/types/mocks"
"github.com/33cn/chain33/util"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
"github.com/stretchr/testify/mock"
paraexec "github.com/33cn/plugin/plugin/dapp/paracross/executor"
)
var (
......@@ -121,7 +121,7 @@ func TestFilterTxsForPara(t *testing.T) {
Receipts: receipts,
}
rst := util.FilterTxsForPara(Title, detail)
rst := paraexec.FilterTxsForPara(Title, detail)
filterTxs := []*types.Transaction{tx3, tx4, tx5, tx6, txA, txB, txC}
assert.Equal(t, filterTxs, rst)
......
......@@ -454,14 +454,14 @@ func getCrossTxHashs(api client.QueueProtocolAPI, commit *pt.ParacrossCommitActi
return nil, nil, err
}
//校验
paraBaseTxs := util.FilterTxsForPara(commit.Status.Title, blockDetail)
paraCrossHashs := util.FilterParaCrossTxHashes(commit.Status.Title, paraBaseTxs)
paraBaseTxs := FilterTxsForPara(commit.Status.Title, blockDetail)
paraCrossHashs := FilterParaCrossTxHashes(commit.Status.Title, paraBaseTxs)
var baseHashs [][]byte
for _, tx := range paraBaseTxs {
baseHashs = append(baseHashs, tx.Hash())
}
baseCheckTxHash := util.CalcTxHashsHash(baseHashs)
crossCheckHash := util.CalcTxHashsHash(paraCrossHashs)
baseCheckTxHash := CalcTxHashsHash(baseHashs)
crossCheckHash := CalcTxHashsHash(paraCrossHashs)
if !bytes.Equal(commit.Status.CrossTxHashs[0], crossCheckHash) {
clog.Error("getCrossTxHashs para hash not equal", "main.crossHash", hex.EncodeToString(crossCheckHash),
"commit.crossHash", hex.EncodeToString(commit.Status.CrossTxHashs[0]),
......
......@@ -110,7 +110,7 @@ func setMinerTxResult(payload *pt.ParacrossMinerAction, txs []*types.Transaction
paraTxHashs = append(paraTxHashs, hash)
}
}
crossTxHashs := util.FilterParaMainCrossTxHashes(types.GetTitle(), txs)
crossTxHashs := FilterParaMainCrossTxHashes(types.GetTitle(), txs)
payload.Status.TxHashs = paraTxHashs
payload.Status.TxResult = util.CalcSubBitMap(curTxHashs, paraTxHashs, receipts)
payload.Status.CrossTxHashs = crossTxHashs
......@@ -133,8 +133,8 @@ func setMinerTxResultFork(payload *pt.ParacrossMinerAction, txs []*types.Transac
//跨链tx结果
payload.Status.CrossTxResult = util.CalcBitMap(baseCrossTxHashs, curTxHashs, receipts)
payload.Status.TxHashs = [][]byte{util.CalcTxHashsHash(baseTxHashs)}
payload.Status.CrossTxHashs = [][]byte{util.CalcTxHashsHash(baseCrossTxHashs)}
payload.Status.TxHashs = [][]byte{CalcTxHashsHash(baseTxHashs)}
payload.Status.CrossTxHashs = [][]byte{CalcTxHashsHash(baseCrossTxHashs)}
}
//ExecLocal_Miner miner tx local db process
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package executor
import (
"bytes"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
)
//1,如果全部是paracross的,主链成功会ExecOk,如果有一个不成功,全部回退回PACK,后面检查TyLogErr,OK意味着全部成功
//2,如果是paracross+other, other的是PACK,如果有一个是OK,那意味着全部OK,如果全部是PACK,检查TyLogErr
//3,如果是全部other,全部是PACK
func checkReceiptExecOk(receipt *types.ReceiptData) bool {
if receipt.Ty == types.ExecOk {
return true
}
//如果主链allow 平行链tx 主链执行出错场景 比如paracross
for _, log := range receipt.Logs {
if log.Ty == types.TyLogErr {
return false
}
}
return true
}
//1. 如果涉及跨链合约,如果有超过两条平行链的交易被判定为失败,交易组会执行不成功,也不PACK。(这样的情况下,主链交易一定会执行不成功,最终也不会进到block里面)
//2. 跨链合约交易组,要么是paracross+user.p.xx.paracross组合,要么全是user.p.xx.paracross组合,后面是资产转移
//3. 如果交易组有一个ExecOk,主链上的交易都是ok的,可以全部打包
//4. 不论是否涉及跨链合约, 不同用途的tx打到一个group里面,如果主链交易有失败,平行链也不会执行,也需要排除掉
//5. 如果全部是ExecPack,有两种情况,一是交易组所有交易都是平行链交易,另一是主链有交易失败而打包了的交易,需要检查LogErr,如果有错,全部不打包
//经para filter之后, 交易组会存在如下几种tx:
// 1, 主链 paracross + 平行链 user.p.xx.paracross 跨链兑换合约
// 2, 主链 paracross + 平行链 user.p.xx.other 混合交易组合
// 3, 主链 other + 平行链 user.p.xx.paracross 混合交易组合约
// 4, 主链 other + 平行链 user.p.xx.other 混合交易组合
// 5, 主链+平行链 user.p.xx.paracross 交易组 混合跨链资产转移
// 6, 平行链 user.p.xx.paracross + user.p.xx.other 混合平行链组合
// 7, 平行链 all user.p.xx.other 混合平行链组合
///// 分叉以后只考虑平行链交易组全部是平行链tx,没有主链tx
//经para filter之后, 交易组会存在如下几种tx:
// 1, 主链+平行链 user.p.xx.paracross 交易组 混合跨链资产转移 paracross主链执行成功
// 2, 平行链 user.p.xx.paracross + user.p.xx.other 混合平行链组合 paracross主链执行成功
// 3, 平行链 user.p.xx.other 交易组 混合平行链组合 other主链pack
func filterParaTxGroup(title string, tx *types.Transaction, main *types.BlockDetail, index int) ([]*types.Transaction, int) {
var headIdx int
for i := index; i >= 0; i-- {
if bytes.Equal(tx.Header, main.Block.Txs[i].Hash()) {
headIdx = i
break
}
}
endIdx := headIdx + int(tx.GroupCount)
for i := headIdx; i < endIdx; i++ {
if !checkReceiptExecOk(main.Receipts[i]) {
return nil, endIdx
}
}
//全部是平行链交易 或平行链在主链执行成功的tx
return main.Block.Txs[headIdx:endIdx], endIdx
}
//FilterTxsForPara include some main tx in tx group before ForkParacrossCommitTx
func FilterTxsForPara(title string, main *types.BlockDetail) []*types.Transaction {
var txs []*types.Transaction
for i := 0; i < len(main.Block.Txs); i++ {
tx := main.Block.Txs[i]
if types.IsSpecificParaExecName(title, string(tx.Execer)) {
if tx.GroupCount >= 2 {
mainTxs, endIdx := filterParaTxGroup(title, tx, main, i)
txs = append(txs, mainTxs...)
i = endIdx - 1
continue
}
//单独的paracross跨链合约 如果主链执行失败也要排除
if bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) && !checkReceiptExecOk(main.Receipts[i]) {
continue
}
txs = append(txs, tx)
}
}
return txs
}
// FilterParaCrossTxHashes only all para chain cross txs like xx.paracross exec
func FilterParaCrossTxHashes(title string, txs []*types.Transaction) [][]byte {
var txHashs [][]byte
for _, tx := range txs {
if types.IsSpecificParaExecName(title, string(tx.Execer)) && bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) {
txHashs = append(txHashs, tx.Hash())
}
}
return txHashs
}
//经para filter之后, 交易组会存在如下几种tx:
// 1, 主链 paracross + 平行链 user.p.xx.paracross 跨链兑换合约
// 2, 主链 paracross + 平行链 user.p.xx.other 混合交易组合
// 3, 主链 other + 平行链 user.p.xx.paracross 混合交易组合
// 4, 主链 other + 平行链 user.p.xx.other 混合交易组合
// 5, 主链+平行链 user.p.xx.paracross 交易组 混合跨链资产转移
// 6, 平行链 user.p.xx.paracross + user.p.xx.other 混合平行链组合
// 7, 平行链 all user.p.xx.other 混合平行链组合
// 这里只取跨链兑换和任何有user.p.xx.paracross的资产转移交易,资产兑换可能主链会需要查看平行链执行结果再对主链的paracross合约做后续处理
func crossTxGroupProc(title string, txs []*types.Transaction, index int) ([]*types.Transaction, int32) {
var headIdx, endIdx int32
for i := index; i >= 0; i-- {
if bytes.Equal(txs[index].Header, txs[i].Hash()) {
headIdx = int32(i)
break
}
}
//cross mix tx, contain main and para tx, main prefix with pt.paraX
//最初设计是主链平行链跨链交换,都在paracross合约处理,平行链在主链共识结束后主链做unfreeze操作,但是那样出错时候回滚不好处理
//目前只设计跨链转移场景,转移到平行链通过trade交换
endIdx = headIdx + txs[index].GroupCount
for i := headIdx; i < endIdx; i++ {
if bytes.HasPrefix(txs[i].Execer, []byte(pt.ParaX)) {
return txs[headIdx:endIdx], endIdx
}
}
//cross asset transfer in tx group
var transfers []*types.Transaction
for i := headIdx; i < endIdx; i++ {
if types.IsSpecificParaExecName(title, string(txs[i].Execer)) && bytes.HasSuffix(txs[i].Execer, []byte(pt.ParaX)) {
transfers = append(transfers, txs[i])
}
}
return transfers, endIdx
}
//FilterParaMainCrossTxHashes ForkParacrossCommitTx之前允许txgroup里面有main chain tx的跨链
func FilterParaMainCrossTxHashes(title string, txs []*types.Transaction) [][]byte {
var crossTxHashs [][]byte
//跨链tx 必须是paracross合约且user.p.打头, user.p.xx.的非paracross合约不是跨链
for i := 0; i < len(txs); i++ {
tx := txs[i]
if tx.GroupCount > 1 {
groupTxs, end := crossTxGroupProc(title, txs, i)
for _, tx := range groupTxs {
crossTxHashs = append(crossTxHashs, tx.Hash())
}
i = int(end) - 1
continue
}
if types.IsSpecificParaExecName(title, string(tx.Execer)) && bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) {
crossTxHashs = append(crossTxHashs, tx.Hash())
}
}
return crossTxHashs
}
//CalcTxHashsHash 计算几个txhash的hash值 作校验使用
func CalcTxHashsHash(txHashs [][]byte) []byte {
if len(txHashs) == 0 {
return nil
}
totalTxHash := &types.ReqHashes{}
totalTxHash.Hashes = append(totalTxHash.Hashes, txHashs...)
data := types.Encode(totalTxHash)
return common.Sha256(data)
}
......@@ -360,6 +360,85 @@ func TestCrossLimits(t *testing.T) {
exec.SetEnv(0, 0, 0)
exec.SetAPI(api)
func (s *VoteTestSuite) TestFilterTxsForPara() {
tx1, err := createAssetTransferTx(s.Suite, PrivKeyA, nil)
s.Nil(err)
tx2, err := createParaNormalTx(s.Suite, PrivKeyB, nil)
s.Nil(err)
tx3, err := createParaNormalTx(s.Suite,PrivKeyA,[]byte("toA"))
s.Nil(err)
tx4, err := createCrossParaTx(s.Suite, []byte("toB"))
s.Nil(err)
tx5, err := createParaNormalTx(s.Suite,PrivKeyA,[]byte("toB"))
s.Nil(err)
tx345 := []*types.Transaction{tx3, tx4,tx5}
txGroup345, err := createTxsGroup(s.Suite, tx345)
s.Nil(err)
tx6, err := createCrossParaTx(s.Suite, nil)
s.Nil(err)
tx7, err := createCrossParaTx(s.Suite, nil)
s.Nil(err)
tx67 := []*types.Transaction{tx6, tx7}
txGroup67, err := createTxsGroup(s.Suite, tx67)
s.Nil(err)
tx71, err := createParaNormalTx(s.Suite,PrivKeyA,[]byte("toA"))
s.Nil(err)
tx72, err := createCrossParaTx(s.Suite, []byte("toB"))
s.Nil(err)
tx73, err := createParaNormalTx(s.Suite,PrivKeyA,[]byte("toB"))
s.Nil(err)
tx777 := []*types.Transaction{tx71, tx72,tx73}
txGroup777, err := createTxsGroup(s.Suite, tx777)
s.Nil(err)
tx8, err := createAssetTransferTx(s.Suite, PrivKeyA, nil)
s.Nil(err)
tx9, err := createAssetTransferTx(s.Suite, PrivKeyC, nil)
s.Nil(err)
txs := []*types.Transaction{tx1, tx2}
txs = append(txs, txGroup345...)
txs = append(txs, txGroup67...)
txs = append(txs, txGroup777...)
txs = append(txs, tx8)
txs = append(txs, tx9)
errlog := &types.ReceiptLog{Ty: types.TyLogErr, Log: []byte("")}
feelog := &types.Receipt{}
feelog.Logs = append(feelog.Logs, errlog)
recpt1 := &types.ReceiptData{Ty: types.ExecPack,Logs:feelog.Logs}
recpt2 := &types.ReceiptData{Ty: types.ExecPack}
recpt3 := &types.ReceiptData{Ty: types.ExecOk}
recpt4 := &types.ReceiptData{Ty: types.ExecOk}
recpt5 := &types.ReceiptData{Ty: types.ExecOk}
recpt6 := &types.ReceiptData{Ty: types.ExecPack,Logs:feelog.Logs}
recpt7 := &types.ReceiptData{Ty: types.ExecPack}
recpt71 := &types.ReceiptData{Ty: types.ExecPack}
recpt72 := &types.ReceiptData{Ty: types.ExecPack}
recpt73 := &types.ReceiptData{Ty: types.ExecPack}
recpt8 := &types.ReceiptData{Ty: types.ExecPack,Logs:feelog.Logs}
recpt9 := &types.ReceiptData{Ty: types.ExecOk}
receipts := []*types.ReceiptData{recpt1, recpt2, recpt3, recpt4, recpt5, recpt6, recpt7, recpt71,recpt72, recpt73, recpt8,recpt9}
block := &types.Block{Txs: txs}
detail := &types.BlockDetail{
Block: block,
Receipts: receipts,
}
rst := FilterTxsForPara(Title, detail)
filterTxs := []*types.Transaction{ tx2,tx3, tx4, tx5,tx71,tx72,tx73,tx9}
s.Equal( filterTxs, rst)
}
tx := &types.Transaction{Execer: []byte("p.user.test.paracross")}
res := exec.CrossLimits(tx, 1)
......@@ -492,11 +571,11 @@ func (s *VoteTestSuite) TestVoteTxFork() {
for _, tx := range txs {
status.TxHashs = append(status.TxHashs, tx.Hash())
}
txHashs := util.FilterParaCrossTxHashes(Title, txs)
txHashs := FilterParaCrossTxHashes(Title, txs)
status.CrossTxHashs = append(status.CrossTxHashs, txHashs...)
baseCheckTxHash := util.CalcTxHashsHash(status.TxHashs)
baseCrossTxHash := util.CalcTxHashsHash(status.CrossTxHashs)
baseCheckTxHash := CalcTxHashsHash(status.TxHashs)
baseCrossTxHash := CalcTxHashsHash(status.CrossTxHashs)
tx, err := s.createVoteTx(status, PrivKeyA)
s.Nil(err)
......@@ -668,3 +747,5 @@ func createParaNormalTx(s suite.Suite, privFrom string, to []byte) (*types.Trans
return tx, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment