Commit d0d26641 authored by mdj33's avatar mdj33 Committed by vipwzw

asset transfer fail rollbak

parent 1e556e29
......@@ -10,17 +10,13 @@ function dapp_test_rpc() {
if [ -d dapptest ]; then
cp "$DAPP_TEST_COMMON" dapptest/
cd dapptest || return
rm -f "retries.log"
rm -f "jobs.log"
dapps=$(find . -maxdepth 1 -type d ! -name dapptest ! -name . | sed 's/^\.\///' | sort)
echo "dapps list: $dapps"
set +e
parallel -k --joblog ./jobs.log 'echo tried {} >>./retries.log; ./{}/"'"${RPC_TESTFILE}"'" "'"$ip"'"' ::: "$dapps"
parallel -k --joblog ./jobs.log ./{}/"${RPC_TESTFILE}" "$ip" ::: "$dapps"
local ret=$?
# retries 3 times if one dapp fail
echo "============ # retried dapps log: ============="
cat ./retries.log
echo "============ # check dapps test log: ============="
cat ./jobs.log
set -e
......
......@@ -255,6 +255,7 @@ ForkParacrossCommitTx=0
ForkLoopCheckCommitTxDone=0
#平行链分阶段自共识支持合约配置,缺省是0
ForkParaSelfConsStages=0
ForkParaAssetTransferRbk=0
[fork.sub.evm]
Enable=0
......
This diff is collapsed.
......@@ -37,6 +37,7 @@ func ParcCmd() *cobra.Command {
paraConfigCmd(),
GetParaInfoCmd(),
GetParaListCmd(),
GetParaAssetTransCmd(),
IsSyncCmd(),
GetHeightCmd(),
GetBlockInfoCmd(),
......@@ -936,6 +937,35 @@ func getNodeGroupAddrsCmd() *cobra.Command {
return cmd
}
func addParaAssetTranCmdFlags(cmd *cobra.Command) {
cmd.Flags().StringP("hash", "s", "", "asset transfer tx hash")
cmd.MarkFlagRequired("hash")
}
func paraAssetTransfer(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
hash, _ := cmd.Flags().GetString("hash")
params := types.ReqString{
Data: hash,
}
var res pt.ParacrossAssetRsp
ctx := jsonclient.NewRPCCtx(rpcLaddr, "paracross.GetAssetTxResult", params, &res)
ctx.Run()
}
// GetParaAssetTransCmd get para chain asset transfer info
func GetParaAssetTransCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "asset_tranfer",
Short: "Get para chain cross asset transfer info",
Run: paraAssetTransfer,
}
addParaAssetTranCmdFlags(cmd)
return cmd
}
func nodeGroup(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
paraName, _ := cmd.Flags().GetString("paraName")
......
......@@ -762,7 +762,7 @@ func (a *action) execCrossTx(tx *types.TransactionDetail, crossTxHash []byte) (*
if payload.Ty == pt.ParacrossActionAssetWithdraw {
receiptWithdraw, err := a.assetWithdraw(payload.GetAssetWithdraw(), tx.Tx)
if err != nil {
clog.Crit("paracross.Commit Decode Tx failed", "error", err, "txHash", hex.EncodeToString(crossTxHash))
clog.Crit("paracross.Commit withdraw Tx failed", "error", err, "txHash", hex.EncodeToString(crossTxHash))
return nil, errors.Cause(err)
}
......@@ -773,6 +773,31 @@ func (a *action) execCrossTx(tx *types.TransactionDetail, crossTxHash []byte) (*
}
func (a *action) rollbackCrossTx(tx *types.TransactionDetail, crossTxHash []byte) (*types.Receipt, error) {
if !bytes.HasSuffix(tx.Tx.Execer, []byte(pt.ParaX)) {
return nil, nil
}
var payload pt.ParacrossAction
err := types.Decode(tx.Tx.Payload, &payload)
if err != nil {
clog.Crit("paracross.Commit.rollbackCrossTx Decode Tx failed", "error", err, "txHash", hex.EncodeToString(crossTxHash))
return nil, err
}
if payload.Ty == pt.ParacrossActionAssetTransfer {
receipt, err := a.assetTransferRollback(payload.GetAssetTransfer(), tx.Tx)
if err != nil {
clog.Crit("paracross.Commit rbk Tx failed", "error", err, "txHash", hex.EncodeToString(crossTxHash))
return nil, errors.Cause(err)
}
clog.Debug("paracross.Commit rollbackCrossTx", "txHash", hex.EncodeToString(crossTxHash), "mainHeight", a.height)
return receipt, nil
}
return nil, nil
}
func getCrossTxHashsByRst(api client.QueueProtocolAPI, status *pt.ParacrossNodeStatus) ([][]byte, []byte, error) {
//只获取跨链tx
cfg := api.GetConfig()
......@@ -781,10 +806,12 @@ func getCrossTxHashsByRst(api client.QueueProtocolAPI, status *pt.ParacrossNodeS
clog.Error("getCrossTxHashs decode rst", "CrossTxResult", string(status.TxResult), "paraHeight", status.Height)
return nil, nil, types.ErrInvalidParam
}
clog.Debug("getCrossTxHashsByRst", "height", status.Height, "txResult", string(status.TxResult))
//空块
if len(rst) == 0 {
return nil, nil, nil
if !cfg.IsDappFork(status.MainBlockHeight, pt.ParaX, pt.ForkParaAssetTransferRbk) {
if len(rst) == 0 {
return nil, nil, nil
}
}
blockDetail, err := GetBlock(api, status.MainBlockHash)
......@@ -800,6 +827,7 @@ func getCrossTxHashsByRst(api client.QueueProtocolAPI, status *pt.ParacrossNodeS
}
paraCrossHashs := FilterParaCrossTxHashes(paraAllTxs)
crossRst := util.CalcBitMapByBitMap(paraCrossHashs, baseHashs, rst)
clog.Debug("getCrossTxHashsByRst.crossRst", "height", status.Height, "txResult", hex.EncodeToString(crossRst), "len", len(paraCrossHashs))
return paraCrossHashs, crossRst, nil
......@@ -858,6 +886,9 @@ func getCrossTxHashs(api client.QueueProtocolAPI, status *pt.ParacrossNodeStatus
}
func (a *action) execCrossTxs(status *pt.ParacrossNodeStatus) (*types.Receipt, error) {
var receipt types.Receipt
......@@ -881,7 +912,7 @@ func (a *action) execCrossTxs(status *pt.ParacrossNodeStatus) (*types.Receipt, e
return nil, err
}
if tx == nil {
clog.Error("paracross.Commit Load Tx failed", "para title", title, "para height", status.Height,
clog.Error("paracross.Commit Load Tx nil", "para title", title, "para height", status.Height,
"para tx index", i, "error", err, "txHash", hex.EncodeToString(crossTxHashs[i]))
return nil, types.ErrHashNotExist
}
......@@ -900,6 +931,31 @@ func (a *action) execCrossTxs(status *pt.ParacrossNodeStatus) (*types.Receipt, e
clog.Error("paracross.Commit commitDone", "do cross number", i, "hash",
hex.EncodeToString(crossTxHashs[i]),
"para res", util.BitMapBit(crossTxResult, uint32(i)))
cfg := a.api.GetConfig()
if cfg.IsDappFork(a.height, pt.ParaX, pt.ForkParaAssetTransferRbk) {
tx, err := GetTx(a.api, crossTxHashs[i])
if err != nil {
clog.Crit("paracross.Commit rbk Load Tx failed", "para title", title, "para height", status.Height,
"para tx index", i, "error", err, "txHash", hex.EncodeToString(crossTxHashs[i]))
return nil, err
}
if tx == nil {
clog.Error("paracross.Commit rbk Load Tx nil", "para title", title, "para height", status.Height,
"para tx index", i, "error", err, "txHash", hex.EncodeToString(crossTxHashs[i]))
return nil, types.ErrHashNotExist
}
receiptCross, err := a.rollbackCrossTx(tx, crossTxHashs[i])
if err != nil {
clog.Error("paracross.Commit rollbackCrossTx", "para title", title, "para height", status.Height,
"para tx index", i, "error", err)
return nil, errors.Cause(err)
}
if receiptCross == nil {
continue
}
receipt.KV = append(receipt.KV, receiptCross.KV...)
receipt.Logs = append(receipt.Logs, receiptCross.Logs...)
}
}
}
......
......@@ -87,6 +87,24 @@ func (a *action) assetWithdraw(withdraw *types.AssetsWithdraw, withdrawTx *types
return assetWithdrawBalance(paraAcc, a.fromaddr, withdraw.Amount)
}
func (a *action) assetTransferRollback(transfer *types.AssetsTransfer, transferTx *types.Transaction) (*types.Receipt, error) {
cfg := a.api.GetConfig()
isPara := cfg.IsPara()
//主链处理分支
if !isPara {
accDB, err := createAccount(cfg, a.db, transfer.Cointoken)
if err != nil {
return nil, errors.Wrap(err, "assetTransferToken call account.NewAccountDB failed")
}
execAddr := address.ExecAddress(pt.ParaX)
fromAcc := address.ExecAddress(string(transferTx.Execer))
clog.Debug("paracross.AssetTransferRbk ", "execer", string(transferTx.Execer),
"transfer.txHash", hex.EncodeToString(transferTx.Hash()), "curTx", hex.EncodeToString(a.tx.Hash()))
return accDB.ExecTransfer(fromAcc, transferTx.From(), execAddr, transfer.Amount)
}
return nil, nil
}
func createAccount(cfg *types.Chain33Config, db db.KV, symbol string) (*account.DB, error) {
var accDB *account.DB
var err error
......
......@@ -228,6 +228,7 @@ func setMinerTxResultFork(cfg *types.Chain33Config, status *pt.ParacrossNodeStat
//主链自己过滤平行链tx, 对平行链执行失败的tx主链无法识别,主链和平行链需要获取相同的最初的tx map
//全部平行链tx结果
status.TxResult = []byte(hex.EncodeToString(util.CalcSingleBitMap(curTxHashs, receipts)))
clog.Debug("setMinerTxResultFork", "height", status.Height, "txResult", string(status.TxResult))
//ForkLoopCheckCommitTxDone 后只保留全部txreseult 结果
if !pt.IsParaForkHeight(cfg, status.MainBlockHeight, pt.ForkLoopCheckCommitTxDone) {
......
......@@ -218,11 +218,15 @@ func (p *Paracross) Query_GetDoneTitleHeight(in *pt.ReqParacrossTitleHeight) (ty
}
// Query_GetAssetTxResult query get asset tx reseult
func (p *Paracross) Query_GetAssetTxResult(in *types.ReqHash) (types.Message, error) {
if in == nil {
func (p *Paracross) Query_GetAssetTxResult(in *types.ReqString) (types.Message, error) {
if in == nil || in.Data == "" {
return nil, types.ErrInvalidParam
}
return p.paracrossGetAssetTxResult(in.Hash)
hash, err := common.FromHex(in.Data)
if err != nil {
return nil, errors.Wrap(err, "fromHex")
}
return p.paracrossGetAssetTxResult(hash)
}
// Query_GetMainBlockHash query get mainblockHash by tx
......@@ -296,7 +300,7 @@ func listLocalTitles(db dbm.KVDB) (types.Message, error) {
MostSameCommit: st.MostSameCommit,
Title: st.Title,
Height: st.Height,
TxResult: hex.EncodeToString(st.TxResult),
TxResult: string(st.TxResult),
}
resp.Titles = append(resp.Titles, rst)
......@@ -400,13 +404,34 @@ func (p *Paracross) paracrossGetAssetTxResult(hash []byte) (types.Message, error
return nil, err
}
var result pt.ParacrossAsset
err = types.Decode(value, &result)
var rst pt.ParacrossAsset
err = types.Decode(value, &rst)
if err != nil {
return nil, err
}
return &result, nil
rsp := &pt.ParacrossAssetRsp{
From: rst.From,
To: rst.To,
Amount: rst.Amount,
Exec: rst.Exec,
Symbol: rst.Symbol,
Height: rst.Height,
CommitDoneHeight: rst.CommitDoneHeight,
ParaHeight: rst.ParaHeight,
}
rsp.TxHash = common.ToHex(rst.TxHash)
rsp.IsWithdraw = "false"
if rst.IsWithdraw {
rsp.IsWithdraw = "true"
}
rsp.Success = "false"
if rst.Success {
rsp.Success = "true"
}
return rsp, nil
}
//Query_GetSelfConsStages get self consensus stages configed
......
......@@ -385,6 +385,23 @@ message ParacrossAsset {
bool success = 23;
}
message ParacrossAssetRsp {
// input
string from = 1;
string to = 2;
string isWithdraw = 3;
string txHash = 4;
int64 amount = 5;
string exec = 6;
string symbol = 7;
// 主链部分
int64 height = 10;
// 平行链部分
int64 commitDoneHeight = 21;
int64 paraHeight = 22;
string success = 23;
}
message ParaLocalDbBlock {
int64 height = 1;
bytes mainHash = 2;
......@@ -408,6 +425,6 @@ service paracross {
rpc ListTitles(ReqNil) returns (RespParacrossTitles) {}
rpc GetDoneTitleHeight(ReqParacrossTitleHeight) returns (RespParacrossDone) {}
rpc GetTitleHeight(ReqParacrossTitleHeight) returns (ParacrossHeightStatusRsp) {}
rpc GetAssetTxResult(ReqHash) returns (ParacrossAsset) {}
rpc GetAssetTxResult(ReqString) returns (ParacrossAssetRsp) {}
rpc IsSync(ReqNil) returns (IsCaughtUp) {}
}
\ No newline at end of file
......@@ -118,20 +118,20 @@ func (c *channelClient) GetDoneTitleHeight(ctx context.Context, req *pt.ReqParac
return nil, types.ErrDecode
}
func (c *channelClient) GetAssetTxResult(ctx context.Context, req *types.ReqHash) (*pt.ParacrossAsset, error) {
func (c *channelClient) GetAssetTxResult(ctx context.Context, req *types.ReqString) (*pt.ParacrossAssetRsp, error) {
cfg := c.GetConfig()
data, err := c.Query(pt.GetExecName(cfg), "GetAssetTxResult", req)
if err != nil {
return nil, err
}
if resp, ok := data.(*pt.ParacrossAsset); ok {
if resp, ok := data.(*pt.ParacrossAssetRsp); ok {
return resp, nil
}
return nil, types.ErrDecode
}
// GetAssetTxResult get asset tx result
func (c *Jrpc) GetAssetTxResult(req *types.ReqHash, result *interface{}) error {
func (c *Jrpc) GetAssetTxResult(req *types.ReqString, result *interface{}) error {
if req == nil {
return types.ErrInvalidParam
}
......
......@@ -124,8 +124,8 @@ func TestChannelClient_GetAssetTxResult(t *testing.T) {
api.On("GetConfig", mock.Anything).Return(cfg, nil)
client := newGrpc(api)
client.Init("paracross", nil, nil, nil)
req := &types.ReqHash{}
api.On("Query", pt.GetExecName(cfg), "GetAssetTxResult", req).Return(&pt.ParacrossAsset{}, nil)
req := &types.ReqString{}
api.On("Query", pt.GetExecName(cfg), "GetAssetTxResult", req).Return(&pt.ParacrossAssetRsp{}, nil)
_, err := client.GetAssetTxResult(context.Background(), req)
assert.Nil(t, err)
}
......@@ -135,9 +135,9 @@ func TestJrpc_GetAssetTxResult(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
api.On("GetConfig", mock.Anything).Return(cfg, nil)
j := newJrpc(api)
req := &types.ReqHash{}
req := &types.ReqString{}
var result interface{}
api.On("Query", pt.GetExecName(cfg), "GetAssetTxResult", req).Return(&pt.ParacrossAsset{}, nil)
api.On("Query", pt.GetExecName(cfg), "GetAssetTxResult", req).Return(&pt.ParacrossAssetRsp{}, nil)
err := j.GetAssetTxResult(req, &result)
assert.Nil(t, err)
}
......
......@@ -26,6 +26,8 @@ var (
MainLoopCheckCommitTxDoneForkHeight = "mainLoopCheckCommitTxDoneForkHeight"
// ForkParaSelfConsStages 平行链自共识分阶段共识
ForkParaSelfConsStages = "ForkParaSelfConsStages"
// ForkParaAssetTransferRbk 平行链资产转移平行链失败主链回滚
ForkParaAssetTransferRbk = "ForkParaAssetTransferRbk"
// ParaConsSubConf sub
ParaConsSubConf = "consensus.sub.para"
......@@ -52,6 +54,8 @@ func InitFork(cfg *types.Chain33Config) {
cfg.RegisterDappFork(ParaX, "ForkParacrossWithdrawFromParachain", 1298600)
cfg.RegisterDappFork(ParaX, ForkCommitTx, 1850000)
cfg.RegisterDappFork(ParaX, ForkLoopCheckCommitTxDone, 3230000)
cfg.RegisterDappFork(ParaX, ForkParaAssetTransferRbk, 4500000)
//只在平行链启用
cfg.RegisterDappFork(ParaX, ForkParaSelfConsStages, types.MaxHeight)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment