Commit d2ef4bda authored by hezhengjun's avatar hezhengjun Committed by vipwzw

correct statics

parent a55e9787
......@@ -3,6 +3,7 @@
# shellcheck source=/dev/null
source "./dockerRelayerTest.sh"
#source "./dockerRelayerTestInfinite.sh"
source "./paracrosstestcase.sh"
function cross2eth() {
......
......@@ -22,13 +22,14 @@ func StaticsCmd() *cobra.Command {
//ShowLockStaticsFlags ...
func ShowStaticsFlags(cmd *cobra.Command) {
cmd.Flags().StringP("symbol", "s", "", "token symbol")
_ = cmd.MarkFlagRequired("symbol")
cmd.Flags().StringP("symbol", "s", "", "token symbol(optional)")
cmd.Flags().Int32P("from", "f", 0, "source chain, 0=ethereum, and 1=chain33")
_ = cmd.MarkFlagRequired("from")
cmd.Flags().Int32P("operation", "o", 0, "operation type, 1=burn, and 2=lock")
_ = cmd.MarkFlagRequired("operation")
cmd.Flags().Int32P("status", "u", 0, "show with specified status, default to show all, 1=pending, 2=failed, 3=successful")
cmd.Flags().Int32P("status", "u", 0, "show with specified status, default to show all, 1=pending, 2=successful, 3=failed")
cmd.Flags().Int32P("count", "n", 0, "count to show, default to show all")
cmd.Flags().Int32P("index", "i", 0, "tx index(optional, exclude, default from 0)")
}
//ShowLockStatics ...
......@@ -38,6 +39,8 @@ func ShowStatics(cmd *cobra.Command, args []string) {
from, _ := cmd.Flags().GetInt32("from")
operation, _ := cmd.Flags().GetInt32("operation")
status, _ := cmd.Flags().GetInt32("status")
count, _ := cmd.Flags().GetInt32("count")
index, _ := cmd.Flags().GetInt32("index")
if from != 0 && 1 != from {
fmt.Println("Pls set correct source chain flag, 0=ethereum, and 1=chain33")
......@@ -50,15 +53,17 @@ func ShowStatics(cmd *cobra.Command, args []string) {
}
if status < 0 || status > 3 {
fmt.Println("Pls set correct status, default 0 to show all, 1=pending, 2=failed, 3=successful")
fmt.Println("Pls set correct status, default 0 to show all, 1=pending, 2=successful, 3=failed")
return
}
para := ebTypes.TokenStaticsRequest{
para := &ebTypes.TokenStaticsRequest{
Symbol: symbol,
From: from,
Operation: operation,
Status: status,
TxIndex: int64(index),
Count: count,
}
var res ebTypes.TokenStaticsResponse
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Manager.ShowTokenStatics", para, &res)
......
syntax = "proto3";
package types;
option go_package = "../types";
message SyncTxConfig {
string chain33host = 1;
......
syntax = "proto3";
package types;
option go_package = "../types";
//以太坊账户信息
// privkey : 账户地址对应的私钥
......@@ -159,6 +160,7 @@ message Chain33ToEthereumStatics {
string amount = 8;
int64 nonce = 9;
int64 txIndex = 10;
string operationType = 11;
}
message Ethereum2Chain33Statics {
......@@ -174,6 +176,7 @@ message Ethereum2Chain33Statics {
string amount = 8;
int64 nonce = 9;
int64 txIndex = 10;
string operationType = 11;
}
message TokenAddress {
......@@ -192,6 +195,7 @@ message TokenStaticsRequest {
int32 operation = 3;
int32 status = 4;
int64 txIndex = 5;
int32 count = 6;
}
message TokenStaticsResponse {
......
......@@ -53,7 +53,7 @@ type Relayer4Chain33 struct {
bridgeBankEventBurnSig string
bridgeBankAbi abi.ABI
deployInfo *ebTypes.Deploy
totalTx4Chain33ToEth int64
totalTx4RelayEth2chai33 int64
//新增//
ethBridgeClaimChan <-chan *ebTypes.EthBridgeClaim
chain33MsgChan chan<- *events.Chain33Msg
......@@ -91,7 +91,7 @@ func StartChain33Relayer(startPara *Chain33StartPara) *Relayer4Chain33 {
bridgeRegistryAddr: startPara.BridgeRegistryAddr,
ethBridgeClaimChan: startPara.EthBridgeClaimChan,
chain33MsgChan: startPara.Chain33MsgChan,
totalTx4Chain33ToEth: 0,
totalTx4RelayEth2chai33: 0,
symbol2Addr: make(map[string]string),
}
......@@ -115,17 +115,18 @@ func StartChain33Relayer(startPara *Chain33StartPara) *Relayer4Chain33 {
//输入地址为空,且数据库中保存地址不为空,则直接使用数据库中的地址
chain33Relayer.bridgeRegistryAddr = registrAddrInDB
}
chain33Relayer.totalTx4RelayEth2chai33 = chain33Relayer.getTotalTxAmount()
if 0 == chain33Relayer.totalTx4RelayEth2chai33 {
statics := &ebTypes.Ethereum2Chain33Statics{}
data := chain33Types.Encode(statics)
chain33Relayer.setLastestRelay2Chain33TxStatics(0, int32(events.ClaimTypeLock), data)
chain33Relayer.setLastestRelay2Chain33TxStatics(0, int32(events.ClaimTypeBurn), data)
}
go chain33Relayer.syncProc(syncCfg)
return chain33Relayer
}
//QueryTxhashRelay2Eth ...
func (chain33Relayer *Relayer4Chain33) QueryTxhashRelay2Eth() ebTypes.Txhashes {
txhashs := utils.QueryTxhashes([]byte(eth2Chain33BurnLockTxStaticsPrefix), chain33Relayer.db)
return ebTypes.Txhashes{Txhash: txhashs}
}
func (chain33Relayer *Relayer4Chain33) syncProc(syncCfg *ebTypes.SyncTxReceiptConfig) {
_, _ = fmt.Fprintln(os.Stdout, "Pls unlock or import private key for Chain33 relayer")
<-chain33Relayer.unlockChan
......@@ -186,6 +187,7 @@ func (chain33Relayer *Relayer4Chain33) getCurrentHeight() int64 {
func (chain33Relayer *Relayer4Chain33) onNewHeightProc(currentHeight int64) {
//检查已经提交的交易结果
chain33Relayer.updateTxStatus()
//未达到足够的成熟度,不进行处理
// +++++++++||++++++++++++||++++++++++||
......@@ -368,8 +370,10 @@ func (chain33Relayer *Relayer4Chain33) relayLockBurnToChain33(claim *ebTypes.Eth
}
var tokenAddr string
operationType := ""
if int32(events.ClaimTypeBurn) == claim.ClaimType {
//burn 分支
operationType = "Burn"
if ebTypes.SYMBOL_BTY == claim.Symbol {
tokenAddr = ebTypes.BTYAddrChain33
} else {
......@@ -379,9 +383,9 @@ func (chain33Relayer *Relayer4Chain33) relayLockBurnToChain33(claim *ebTypes.Eth
return
}
}
} else {
//lock 分支
operationType = "Lock"
var exist bool
tokenAddr, exist = chain33Relayer.symbol2Addr[claim.Symbol]
if !exist {
......@@ -424,18 +428,17 @@ func (chain33Relayer *Relayer4Chain33) relayLockBurnToChain33(claim *ebTypes.Eth
claim.ChainName = chain33Relayer.chainName
txhash, err := relayEvmTx2Chain33(chain33Relayer.privateKey4Chain33, claim, parameter, chain33Relayer.rpcLaddr, chain33Relayer.oracleAddr)
if err != nil {
relayerLog.Error("relayLockBurnToChain33", "Failed to RelayEvmTx2Chain33 due to:", err.Error())
relayerLog.Error("relayLockBurnToChain33", "Failed to RelayEvmTx2Chain33 due to:", err.Error(), "EthereumTxhash", claim.EthTxHash)
return
}
relayerLog.Info("relayLockBurnToChain33", "tx is sent to relay lock or burn with hash", txhash)
//保存交易hash,方便查询
atomic.AddInt64(&chain33Relayer.totalTx4Chain33ToEth, 1)
txIndex := atomic.LoadInt64(&chain33Relayer.totalTx4Chain33ToEth)
//第一个有效的index从1开始,方便list
txIndex := atomic.AddInt64(&chain33Relayer.totalTx4RelayEth2chai33, 1)
if err = chain33Relayer.updateTotalTxAmount2Eth(txIndex); nil != err {
relayerLog.Error("relayLockBurnToChain33", "Failed to RelayEvmTx2Chain33 due to:", err.Error())
relayerLog.Error("relayLockBurnToChain33", "Failed to updateTotalTxAmount2Eth due to:", err.Error())
return
}
statics := &ebTypes.Ethereum2Chain33Statics{
Chain33Txstatus: ebTypes.Tx_Status_Pending,
Chain33Txhash: txhash,
......@@ -447,12 +450,22 @@ func (chain33Relayer *Relayer4Chain33) relayLockBurnToChain33(claim *ebTypes.Eth
Amount: claim.Amount,
Nonce: claim.Nonce,
TxIndex: txIndex,
OperationType: operationType,
}
data := chain33Types.Encode(statics)
if err = chain33Relayer.setLastestRelay2Chain33TxStatics(txIndex, claim.ClaimType, data); nil != err {
relayerLog.Error("relayLockBurnToChain33", "Failed to RelayEvmTx2Chain33 due to:", err.Error())
relayerLog.Error("relayLockBurnToChain33", "Failed to setLastestRelay2Chain33TxStatics due to:", err.Error())
return
}
relayerLog.Info("relayLockBurnToChain33::successful",
"txIndex", txIndex,
"Chain33Txhash", txhash,
"EthereumTxhash", claim.EthTxHash,
"type", operationType,
"Symbol", claim.Symbol,
"Amount", claim.Amount,
"EthereumSender", claim.EthereumSender,
"Chain33Receiver", claim.Chain33Receiver)
}
func (chain33Relayer *Relayer4Chain33) BurnAsyncFromChain33(ownerPrivateKey, tokenAddr, ethereumReceiver, amount string) (string, error) {
......@@ -476,10 +489,10 @@ func (chain33Relayer *Relayer4Chain33) ShowBridgeRegistryAddr() (string, error)
return chain33Relayer.bridgeRegistryAddr, nil
}
func (chain33Relayer *Relayer4Chain33) ShowStatics(request ebTypes.TokenStaticsRequest) (*ebTypes.TokenStaticsResponse, error) {
func (chain33Relayer *Relayer4Chain33) ShowStatics(request *ebTypes.TokenStaticsRequest) (*ebTypes.TokenStaticsResponse, error) {
res := &ebTypes.TokenStaticsResponse{}
datas, err := chain33Relayer.getStatics(request.Operation, request.TxIndex)
datas, err := chain33Relayer.getStatics(request.Operation, request.TxIndex, request.Count)
if nil != err {
return nil, err
}
......@@ -487,10 +500,11 @@ func (chain33Relayer *Relayer4Chain33) ShowStatics(request ebTypes.TokenStaticsR
for _, data := range datas {
var statics ebTypes.Ethereum2Chain33Statics
_ = chain33Types.Decode(data, &statics)
if request.Status != 0 {
if ebTypes.Tx_Status_Map[request.Status] != statics.Chain33Txstatus {
if request.Status != 0 && ebTypes.Tx_Status_Map[request.Status] != statics.Chain33Txstatus {
continue
}
if len(request.Symbol) > 0 && request.Symbol != statics.Symbol {
continue
}
res.E2Cstatics = append(res.E2Cstatics, &statics)
}
......@@ -504,30 +518,28 @@ func (chain33Relayer *Relayer4Chain33) updateTxStatus() {
func (chain33Relayer *Relayer4Chain33) updateSingleTxStatus(claimType events.ClaimType) {
txIndex := chain33Relayer.getChain33UpdateTxIndex(claimType)
if ebTypes.Invalid_Tx_Index == txIndex {
return
}
datas, _ := chain33Relayer.getStatics(int32(claimType), txIndex)
datas, _ := chain33Relayer.getStatics(int32(claimType), txIndex, 0)
if nil == datas {
return
}
for _, data := range datas {
var statics ebTypes.Chain33ToEthereumStatics
var statics ebTypes.Ethereum2Chain33Statics
_ = chain33Types.Decode(data, &statics)
result := getTxStatusByHashesRpc(statics.Chain33Txhash, chain33Relayer.rpcLaddr)
//当前处理机制比较简单,如果发现该笔交易未执行,就不再产寻后续交易的回执
if ebTypes.Invalid_Chain33Tx_Status == result {
relayerLog.Debug("chain33Relayer::updateSingleTxStatus", "no receipt for tx index", statics.TxIndex)
break
}
status := ebTypes.Tx_Status_Success
if result != chain33Types.ExecOk {
status = ebTypes.Tx_Status_Failed
}
statics.EthTxstatus = status
statics.Chain33Txstatus = status
dataNew := chain33Types.Encode(&statics)
_ = chain33Relayer.setLastestRelay2Chain33TxStatics(statics.TxIndex, int32(claimType), dataNew)
_ = chain33Relayer.setChain33UpdateTxIndex(statics.TxIndex, claimType)
relayerLog.Info("updateSingleTxStatus", "txHash", statics.Chain33Txhash, "updated status", status)
relayerLog.Debug("updateSingleTxStatus", "TxIndex", statics.TxIndex, "operationType", statics.OperationType, "txHash", statics.Chain33Txhash, "updated status", status)
}
}
......
......@@ -111,7 +111,7 @@ func newChain33Relayer(x2EthDeployInfo *ethtxs.X2EthDeployInfo, pushBind string)
deployInfo: cfg.Deploy,
ethBridgeClaimChan: ethBridgeClaimchan,
chain33MsgChan: chain33Msgchan,
totalTx4Chain33ToEth: 0,
totalTx4RelayEth2chai33: 0,
symbol2Addr: make(map[string]string),
oracleAddr: x2EthDeployInfo.Oracle.Address.String(),
bridgeBankAddr: x2EthDeployInfo.BridgeBank.Address.String(),
......
......@@ -3,7 +3,6 @@ package chain33
import (
"errors"
"fmt"
"sync/atomic"
dbm "github.com/33cn/chain33/common/db"
chain33Types "github.com/33cn/chain33/types"
......@@ -16,7 +15,8 @@ import (
var (
lastSyncHeightPrefix = []byte("chain33-lastSyncHeight:")
eth2Chain33BurnLockTxStaticsPrefix = "chain33-eth2chain33BurnLockStatics"
chain33ToEthBurnLockTxTotalAmount = []byte("chain33-chain33ToEthBurnLockTxTotalAmount")
eth2Chain33BurnLockTxFinished = "chain33-eth2Chain33BurnLockTxFinished"
relayEthBurnLockTxTotalAmount = []byte("chain33-relayEthBurnLockTxTotalAmount")
chain33BurnTxUpdateTxIndex = []byte("chain33-chain33BurnTxUpdateTxIndx")
chain33LockTxUpdateTxIndex = []byte("chain33-chain33LockTxUpdateTxIndex")
bridgeRegistryAddrOnChain33 = []byte("chain33-x2EthBridgeRegistryAddrOnChain33")
......@@ -32,20 +32,29 @@ func calcRelayFromEthStaticsKey(txindex int64, claimType int32) []byte {
return []byte(fmt.Sprintf("%s-%d-%012d", eth2Chain33BurnLockTxStaticsPrefix, claimType, txindex))
}
//未完成,处在pending状态
func calcRelayFromEthStaticsList(claimType int32) []byte {
return []byte(fmt.Sprintf("%s-%d-", eth2Chain33BurnLockTxStaticsPrefix, claimType))
}
func (chain33Relayer *Relayer4Chain33) updateTotalTxAmount2Eth(total int64) error {
func calcFromEthFinishedStaticsKey(txindex int64, claimType int32) []byte {
return []byte(fmt.Sprintf("%s-%d-%012d", eth2Chain33BurnLockTxFinished, claimType, txindex))
}
func calcFromEthFinishedStaticsList(claimType int32) []byte {
return []byte(fmt.Sprintf("%s-%d-", eth2Chain33BurnLockTxFinished, claimType))
}
func (chain33Relayer *Relayer4Chain33) updateTotalTxAmount2Eth(txIndex int64) error {
totalTx := &chain33Types.Int64{
Data: atomic.LoadInt64(&chain33Relayer.totalTx4Chain33ToEth),
Data: txIndex,
}
//更新成功见证的交易数
return chain33Relayer.db.Set(chain33ToEthBurnLockTxTotalAmount, chain33Types.Encode(totalTx))
return chain33Relayer.db.Set(relayEthBurnLockTxTotalAmount, chain33Types.Encode(totalTx))
}
func (chain33Relayer *Relayer4Chain33) getTotalTxAmount2Eth() int64 {
totalTx, _ := utils.LoadInt64FromDB(chain33ToEthBurnLockTxTotalAmount, chain33Relayer.db)
func (chain33Relayer *Relayer4Chain33) getTotalTxAmount() int64 {
totalTx, _ := utils.LoadInt64FromDB(relayEthBurnLockTxTotalAmount, chain33Relayer.db)
return totalTx
}
......@@ -54,18 +63,19 @@ func (chain33Relayer *Relayer4Chain33) setLastestRelay2Chain33TxStatics(txIndex
return chain33Relayer.db.Set(key, data)
}
func (chain33Relayer *Relayer4Chain33) getStatics(claimType int32, txIndex int64) ([][]byte, error) {
func (chain33Relayer *Relayer4Chain33) getStatics(claimType int32, txIndex int64, count int32) ([][]byte, error) {
//第一步:获取处在pending状态的
keyPrefix := calcRelayFromEthStaticsList(claimType)
keyFrom := calcRelayFromEthStaticsKey(txIndex, claimType)
helper := dbm.NewListHelper(chain33Relayer.db)
datas := helper.List(keyPrefix, keyFrom, 20, dbm.ListASC)
datas := helper.List(keyPrefix, keyFrom, count, dbm.ListASC)
if nil == datas {
return nil, errors.New("Not found")
}
return datas, nil
}
func (chain33Relayer *Relayer4Chain33) setChain33UpdateTxIndex(txindex int64, claimType events.ClaimType) error {
txIndexWrapper := &chain33Types.Int64{
Data: txindex,
......
......@@ -69,7 +69,7 @@ type Relayer4Ethereum struct {
x2EthContracts *ethtxs.X2EthContracts
ethBridgeClaimChan chan<- *ebTypes.EthBridgeClaim
chain33MsgChan <-chan *events.Chain33Msg
totalTx4Eth2Chain33 int64
totalTxRelayFromChain33 int64
symbol2Addr map[string]common.Address
symbol2LockAddr map[string]common.Address
mulSignAddr string
......@@ -109,7 +109,7 @@ func StartEthereumRelayer(startPara *EthereumStartPara) *Relayer4Ethereum {
fetchHeightPeriodMs: startPara.BlockInterval,
ethBridgeClaimChan: startPara.EthBridgeClaimChan,
chain33MsgChan: startPara.Chain33MsgChan,
totalTx4Eth2Chain33: 0,
totalTxRelayFromChain33: 0,
symbol2Addr: make(map[string]common.Address),
symbol2LockAddr: make(map[string]common.Address),
}
......@@ -142,6 +142,13 @@ func StartEthereumRelayer(startPara *EthereumStartPara) *Relayer4Ethereum {
panic(errinfo)
}
ethRelayer.clientChainID = clientChainID
ethRelayer.totalTxRelayFromChain33 = ethRelayer.getTotalTxAmount2Eth()
if 0 == ethRelayer.totalTxRelayFromChain33 {
statics := &ebTypes.Ethereum2Chain33Statics{}
data := chain33Types.Encode(statics)
_ = ethRelayer.setLastestStatics(int32(events.ClaimTypeLock), 0, data)
_ = ethRelayer.setLastestStatics(int32(events.ClaimTypeBurn), 0, data)
}
go ethRelayer.proc()
return ethRelayer
......@@ -466,7 +473,9 @@ func (ethRelayer *Relayer4Ethereum) handleChain33Msg(chain33Msg *events.Chain33M
prophecyClaim := ethtxs.Chain33MsgToProphecyClaim(*chain33Msg)
var tokenAddr common.Address
exist := false
operationType := ""
if chain33Msg.ClaimType == events.ClaimTypeLock {
operationType = "lock"
tokenAddr, exist = ethRelayer.symbol2Addr[prophecyClaim.Symbol]
if !exist {
relayerLog.Info("handleChain33Msg", "Query address from ethereum for symbol", prophecyClaim.Symbol)
......@@ -488,6 +497,7 @@ func (ethRelayer *Relayer4Ethereum) handleChain33Msg(chain33Msg *events.Chain33M
tokenAddr = common.HexToAddress(addr)
}
} else {
operationType = "burn"
tokenAddr, exist = ethRelayer.symbol2LockAddr[prophecyClaim.Symbol]
if !exist {
//因为是burn操作,必须从允许lock的token地址中进行查询
......@@ -516,8 +526,7 @@ func (ethRelayer *Relayer4Ethereum) handleChain33Msg(chain33Msg *events.Chain33M
relayerLog.Info("handleChain33Msg", "RelayOracleClaimToEthereum with tx hash", txhash)
//保存交易hash,方便查询
atomic.AddInt64(&ethRelayer.totalTx4Eth2Chain33, 1)
txIndex := atomic.LoadInt64(&ethRelayer.totalTx4Eth2Chain33)
txIndex := atomic.AddInt64(&ethRelayer.totalTxRelayFromChain33, 1)
if err = ethRelayer.updateTotalTxAmount2chain33(txIndex); nil != err {
relayerLog.Error("handleChain33Msg", "Failed to RelayLockToChain33 due to:", err.Error())
return
......@@ -533,12 +542,22 @@ func (ethRelayer *Relayer4Ethereum) handleChain33Msg(chain33Msg *events.Chain33M
Amount: chain33Msg.Amount.String(),
Nonce: chain33Msg.Nonce,
TxIndex: txIndex,
OperationType: operationType,
}
data := chain33Types.Encode(statics)
if err = ethRelayer.setLastestStatics(int32(chain33Msg.ClaimType), txIndex, data); nil != err {
relayerLog.Error("handleChain33Msg", "Failed to RelayLockToChain33 due to:", err.Error())
return
}
relayerLog.Info("RelayOracleClaimToEthereum::successful",
"txIndex", txIndex,
"Chain33Txhash", statics.Chain33Txhash,
"EthereumTxhash", statics.EthereumTxhash,
"type", operationType,
"Symbol", chain33Msg.Symbol,
"Amount", chain33Msg.Amount,
"EthereumReceiver", statics.EthereumReceiver,
"Chain33Sender", statics.Chain33Sender)
}
func (ethRelayer *Relayer4Ethereum) procNewHeight(ctx context.Context, continueFailCount *int32) {
......@@ -889,10 +908,10 @@ func (ethRelayer *Relayer4Ethereum) handleLogBurnEvent(clientChainID *big.Int, c
return nil
}
func (ethRelayer *Relayer4Ethereum) ShowStatics(request ebTypes.TokenStaticsRequest) (*ebTypes.TokenStaticsResponse, error) {
func (ethRelayer *Relayer4Ethereum) ShowStatics(request *ebTypes.TokenStaticsRequest) (*ebTypes.TokenStaticsResponse, error) {
res := &ebTypes.TokenStaticsResponse{}
datas, err := ethRelayer.getStatics(request.Operation, request.TxIndex)
datas, err := ethRelayer.getStatics(request.Operation, request.TxIndex, request.Count)
if nil != err {
return nil, err
}
......@@ -900,10 +919,11 @@ func (ethRelayer *Relayer4Ethereum) ShowStatics(request ebTypes.TokenStaticsRequ
for _, data := range datas {
var statics ebTypes.Chain33ToEthereumStatics
_ = chain33Types.Decode(data, &statics)
if request.Status != 0 {
if ebTypes.Tx_Status_Map[request.Status] != statics.EthTxstatus {
if request.Status != 0 && ebTypes.Tx_Status_Map[request.Status] != statics.EthTxstatus {
continue
}
if len(request.Symbol) > 0 && request.Symbol != statics.Symbol {
continue
}
res.C2Estatics = append(res.C2Estatics, &statics)
}
......@@ -917,11 +937,9 @@ func (ethRelayer *Relayer4Ethereum) updateTxStatus() {
func (ethRelayer *Relayer4Ethereum) updateSingleTxStatus(claimType events.ClaimType) {
txIndex := ethRelayer.getEthLockTxUpdateTxIndex(claimType)
if ebTypes.Invalid_Tx_Index == txIndex {
return
}
datas, _ := ethRelayer.getStatics(int32(claimType), txIndex)
datas, _ := ethRelayer.getStatics(int32(claimType), txIndex, 0)
if nil == datas {
relayerLog.Debug("ethRelayer::updateSingleTxStatus", "no new tx need to be update status for claimType", claimType, "from tx index", txIndex)
return
}
for _, data := range datas {
......
......@@ -412,7 +412,7 @@ func newEthRelayer(para *ethtxs.DeployPara, sim *ethinterface.SimExtend, x2EthCo
bridgeRegistryAddr: x2EthDeployInfo.BridgeRegistry.Address,
maturityDegree: cfg.EthMaturityDegree,
fetchHeightPeriodMs: cfg.EthBlockFetchPeriod,
totalTx4Eth2Chain33: 0,
totalTxRelayFromChain33: 0,
symbol2Addr: make(map[string]common.Address),
symbol2LockAddr: make(map[string]common.Address),
......@@ -447,6 +447,7 @@ func newEthRelayer(para *ethtxs.DeployPara, sim *ethinterface.SimExtend, x2EthCo
relayer.x2EthDeployInfo = x2EthDeployInfo
relayer.rwLock.Unlock()
relayer.totalTxRelayFromChain33 = relayer.getTotalTxAmount2Eth()
go relayer.proc()
return relayer
}
......
......@@ -4,7 +4,6 @@ import (
"encoding/json"
"errors"
"fmt"
"sync/atomic"
"github.com/ethereum/go-ethereum/common"
......@@ -50,12 +49,12 @@ func calcRelayFromChain33ListPrefix(claimType int32) []byte {
return []byte(fmt.Sprintf("%s-%d-", chain33ToEthStaticsPrefix, claimType))
}
func (ethRelayer *Relayer4Ethereum) getStatics(claimType int32, txIndex int64) ([][]byte, error) {
func (ethRelayer *Relayer4Ethereum) getStatics(claimType int32, txIndex int64, count int32) ([][]byte, error) {
keyPrefix := calcRelayFromChain33ListPrefix(claimType)
keyFrom := calcRelayFromChain33Key(claimType, txIndex)
helper := dbm.NewListHelper(ethRelayer.db)
datas := helper.List(keyPrefix, keyFrom, 20, dbm.ListASC)
datas := helper.List(keyPrefix, keyFrom, count, dbm.ListASC)
if nil == datas {
return nil, errors.New("Not found")
}
......@@ -106,14 +105,19 @@ func (ethRelayer *Relayer4Ethereum) getBridgeRegistryAddr() (string, error) {
return string(addr), nil
}
func (ethRelayer *Relayer4Ethereum) updateTotalTxAmount2chain33(total int64) error {
func (ethRelayer *Relayer4Ethereum) updateTotalTxAmount2chain33(totalIndex int64) error {
totalTx := &chain33Types.Int64{
Data: atomic.LoadInt64(&ethRelayer.totalTx4Eth2Chain33),
Data: totalIndex,
}
//更新成功见证的交易数
return ethRelayer.db.Set(chain33ToEthTxTotalAmount, chain33Types.Encode(totalTx))
}
func (ethRelayer *Relayer4Ethereum) getTotalTxAmount2Eth() int64 {
totalTx, _ := utils.LoadInt64FromDB(chain33ToEthTxTotalAmount, ethRelayer.db)
return totalTx
}
func (ethRelayer *Relayer4Ethereum) setLastestStatics(claimType int32, txIndex int64, data []byte) error {
key := calcRelayFromChain33Key(claimType, txIndex)
return ethRelayer.db.Set(key, data)
......
......@@ -848,7 +848,7 @@ func (manager *Manager) checkPermission() error {
}
// ShowTokenStatics ShowEthRelayer2Chain33Txs ...
func (manager *Manager) ShowTokenStatics(request relayerTypes.TokenStaticsRequest, result *interface{}) error {
func (manager *Manager) ShowTokenStatics(request *relayerTypes.TokenStaticsRequest, result *interface{}) error {
manager.mtx.Lock()
defer manager.mtx.Unlock()
if err := manager.checkPermission(); nil != err {
......
......@@ -16,7 +16,7 @@ const (
Tx_Status_Failed = "Failed"
Source_Chain_Ethereum = int32(0)
Source_Chain_Chain33 = int32(1)
Invalid_Tx_Index = int64(-1)
Invalid_Tx_Index = int64(0)
Invalid_Chain33Tx_Status = int32(-1)
)
......
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment