Commit a5774732 authored by harrylee's avatar harrylee

update fisco

parent 17bbdbd8
...@@ -39,6 +39,12 @@ type Client struct { ...@@ -39,6 +39,12 @@ type Client struct {
client *fiscoClient.Client client *fiscoClient.Client
session *broker.BrokerSession session *broker.BrokerSession
eventC chan *pb.IBTP eventC chan *pb.IBTP
appchainID string
name string
serviceMeta map[string]*pb.Interchain
ticker *time.Ticker
done chan bool
timeoutHeight int64
} }
var ( var (
...@@ -66,6 +72,7 @@ func (c *Client) Bind(kern plugins.Kernel) { ...@@ -66,6 +72,7 @@ func (c *Client) Bind(kern plugins.Kernel) {
} }
func (c *Client) Start() error { func (c *Client) Start() error {
go c.polling()
return c.StartConsumer() return c.StartConsumer()
} }
...@@ -100,12 +107,73 @@ func (c *Client) Initialize(configPath, appchainID string, extra []byte) error { ...@@ -100,12 +107,73 @@ func (c *Client) Initialize(configPath, appchainID string, extra []byte) error {
c.eventC = make(chan *pb.IBTP, 1024) c.eventC = make(chan *pb.IBTP, 1024)
c.client = client c.client = client
c.session = brokerSession c.session = brokerSession
//c.abi = ab c.ticker = time.NewTicker(2*time.Second)
c.ctx, c.cancel = context.WithCancel(context.Background()) c.ctx, c.cancel = context.WithCancel(context.Background())
return nil return nil
} }
// polling event from broker
// 从应用链broker合约拉取跨链事件(出去的)
func (c *Client) polling() {
for {
select {
case <-c.ticker.C:
// 应用链broker合约需要适配一个查询接口,用户获取outMeta(跨到其他应用链上统计累计交易数量的索引)
outMeta, err := c.GetOutMeta()
if err != nil {
continue
}
for servicePair, index := range outMeta {
srcChainServiceID, dstChainServiceID, err := parseServicePair(servicePair)
if err != nil {
logger.Error("Polling out invalid service pair",
"servicePair", servicePair,
"index", index,
"error", err.Error())
continue
}
// 用于本地记录本应用链与其他目标链之间发送得跨链数据包及收到跨链数据包之间的统计
meta, ok := c.serviceMeta[srcChainServiceID]
if !ok {
meta = &pb.Interchain{
ID: srcChainServiceID,
InterchainCounter: make(map[string]uint64),
ReceiptCounter: make(map[string]uint64),
// SourceInterchainCounter: make(map[string]uint64),
SourceReceiptCounter: make(map[string]uint64),
}
//FIXME 临时修复无法从本地DB加载处理日志的bug
callBackMeta,err:=c.GetCallbackMeta()
if err !=nil {
logger.Error("get callback meta",
"error", err.Error())
}
meta.InterchainCounter[dstChainServiceID]=callBackMeta[servicePair]
c.serviceMeta[srcChainServiceID] = meta
}
for i := meta.InterchainCounter[dstChainServiceID] + 1; i <= index; i++ {
// 根据索引去查询要跨到目标链上面的具体信息,并封装成ibtp协议包
ibtp, err := c.GetOutMessage(servicePair, i)
if err != nil {
logger.Error("Polling out message",
"servicePair", servicePair,
"index", i,
"error", err.Error())
continue
}
// 将ibtp协议包发送给eventC通道,进行处理,同时跨链计数器到目标链项累计加1
c.eventC <- ibtp
meta.InterchainCounter[dstChainServiceID]++
}
}
case <-c.done:
logger.Info("Stop long polling")
return
}
}
}
func (c *Client) GetIBTP() chan *pb.IBTP { func (c *Client) GetIBTP() chan *pb.IBTP {
return c.eventC return c.eventC
} }
...@@ -137,8 +205,41 @@ func (c *Client) SubmitIBTP(ibtp *pb.IBTP) (*pb.SubmitIBTPResponse, error) { ...@@ -137,8 +205,41 @@ func (c *Client) SubmitIBTP(ibtp *pb.IBTP) (*pb.SubmitIBTPResponse, error) {
return nil, fmt.Errorf("ibtp payload unmarshal: %w", err) return nil, fmt.Errorf("ibtp payload unmarshal: %w", err)
} }
if ibtp.Category() == pb.IBTP_UNKNOWN {
return nil, fmt.Errorf("invalid ibtp category")
}
_,_,serviceID,err:=parseChainServiceID(ibtp.GetTo())
if err !=nil {
return ret, err
}
var result [][]byte
if ibtp.Category() == pb.IBTP_RESPONSE && content.Func == "" || ibtp.Type == pb.IBTP_ROLLBACK {
logger.Info("InvokeIndexUpdate", "ibtp", ibtp.ID())
receipt, err := c.invokeReceipt(serviceID, ibtp.GetFrom(), ibtp.GetNonce(), uint64(ibtp.Type), nil, uint64(1), nil)
if err != nil {
ret.Status = false
ret.Message = err.Error()
return ret, nil
}
if receipt.Status != types.Success {
ret.Status = false
ret.Message = fmt.Sprintf("SubmitReceipt tx execution failed")
}else{
ret.Status =true
ret.Message = receipt.GetTransactionHash()
}
if ibtp.Type == pb.IBTP_ROLLBACK {
ret.Result, err = c.generateCallback(ibtp, content.GetArgsRb(),ret.Status)
if err != nil {
return nil, err
}
}
return ret, nil
}
//TODO 参数有可能有问题 //TODO 参数有可能有问题
receipt, err := c.invokeInterchain(ibtp.GetFrom(), ibtp.GetNonce(), ibtp.GetTo(), uint64(ibtp.Type), content.Func, content.Args, uint64(0), nil, false) receipt, err := c.invokeInterchain(ibtp.GetFrom(), ibtp.GetNonce(), serviceID, uint64(ibtp.Type), content.Func, content.Args, uint64(0), nil, false)
if err != nil { if err != nil {
ret.Status = false ret.Status = false
ret.Message = err.Error() ret.Message = err.Error()
...@@ -150,6 +251,20 @@ func (c *Client) SubmitIBTP(ibtp *pb.IBTP) (*pb.SubmitIBTPResponse, error) { ...@@ -150,6 +251,20 @@ func (c *Client) SubmitIBTP(ibtp *pb.IBTP) (*pb.SubmitIBTPResponse, error) {
ret.Message = fmt.Sprintf("SubmitIBTP tx execution failed") ret.Message = fmt.Sprintf("SubmitIBTP tx execution failed")
return ret, nil return ret, nil
} }
// If is response IBTP, then simply return
if ibtp.Category() == pb.IBTP_RESPONSE {
return ret, nil
}
//
//proof, err := c.getProof(*chResp)
//if err != nil {
// return ret, err
//}
ret.Result, err = c.generateCallback(ibtp, result, ret.Status)
if err != nil {
return ret, err
}
return nil, nil return nil, nil
} }
...@@ -183,6 +298,7 @@ func (c *Client) invokeInterchain(srcFullID string, index uint64, destAddr strin ...@@ -183,6 +298,7 @@ func (c *Client) invokeInterchain(srcFullID string, index uint64, destAddr strin
var tx *types.Transaction var tx *types.Transaction
var receipt *types.Receipt var receipt *types.Receipt
var txErr error var txErr error
if err := retry.Retry(func(attempt uint) error { if err := retry.Retry(func(attempt uint) error {
tx, receipt, txErr = c.session.InvokeInterchain(srcFullID, common.HexToAddress(destAddr), index, reqType, callFunc, args, txStatus, multiSign, encrypt) tx, receipt, txErr = c.session.InvokeInterchain(srcFullID, common.HexToAddress(destAddr), index, reqType, callFunc, args, txStatus, multiSign, encrypt)
if txErr != nil { if txErr != nil {
...@@ -223,12 +339,12 @@ func (c *Client) invokeInterchain(srcFullID string, index uint64, destAddr strin ...@@ -223,12 +339,12 @@ func (c *Client) invokeInterchain(srcFullID string, index uint64, destAddr strin
return c.waitForConfirmed(tx.Hash()), nil return c.waitForConfirmed(tx.Hash()), nil
} }
func (c *Client) invokeReceipt(srcAddr string, dstFullID string, index uint64, reqType uint64, result [][]byte, txStatus uint64, multiSign [][]byte) (*types.Receipt, error) { func (c *Client) invokeReceipt(srcAddr string, dstFullID string, index uint64, reqType uint64, result [][]byte, txStatus uint64, multiSign [][]byte) (*types.Receipt, error) {
var tx *types.Transaction var tx *types.Transaction
var receipt *types.Receipt var receipt *types.Receipt
var txErr error var txErr error
if err := retry.Retry(func(attempt uint) error { if err := retry.Retry(func(attempt uint) error {
tx, receipt, txErr = c.session.InvokeReceipt(common.HexToAddress(srcAddr), dstFullID, index, reqType, result, txStatus, multiSign) tx, receipt, txErr = c.session.InvokeReceipt(common.HexToAddress(srcAddr), dstFullID, index, reqType, result, txStatus, multiSign)
if txErr != nil { if txErr != nil {
logger.Warn("Call InvokeReceipt failed", logger.Warn("Call InvokeReceipt failed",
...@@ -267,6 +383,9 @@ func (c *Client) invokeReceipt(srcAddr string, dstFullID string, index uint64, r ...@@ -267,6 +383,9 @@ func (c *Client) invokeReceipt(srcAddr string, dstFullID string, index uint64, r
return c.waitForConfirmed(tx.Hash()), nil return c.waitForConfirmed(tx.Hash()), nil
} }
//func (c *Client) InvokeUpdateIndex(){
//}
// GetOutMessage gets crosschain tx by `to` address and index // GetOutMessage gets crosschain tx by `to` address and index
func (c *Client) GetOutMessage(servicePair string, idx uint64) (*pb.IBTP, error) { func (c *Client) GetOutMessage(servicePair string, idx uint64) (*pb.IBTP, error) {
srcService, dstService, err := pb.ParseServicePair(servicePair) srcService, dstService, err := pb.ParseServicePair(servicePair)
...@@ -429,6 +548,16 @@ func (c *Client) GetAppchainInfo(chainID string) (string, []byte, string, error) ...@@ -429,6 +548,16 @@ func (c *Client) GetAppchainInfo(chainID string) (string, []byte, string, error)
return broker, trustRoot, ruleAddr.String(), nil return broker, trustRoot, ruleAddr.String(), nil
} }
func parseChainServiceID(id string) (string, string, string, error) {
splits := strings.Split(id, ":")
if len(splits) != 3 {
return "", "", "", fmt.Errorf("invalid chain service ID: %s", id)
}
return splits[0], splits[1], splits[2], nil
}
func parseServicePair(servicePair string) (string, string, error) { func parseServicePair(servicePair string) (string, string, error) {
splits := strings.Split(servicePair, "-") splits := strings.Split(servicePair, "-")
if len(splits) != 2 { if len(splits) != 2 {
......
package main package main
import ( import (
//"github.com/ethereum/go-ethereum/crypto" "fmt"
"gitlab.33.cn/link33/sidecar/model/pb" "gitlab.33.cn/link33/sidecar/model/pb"
) )
func (c *Client) generateCallback(original *pb.IBTP, data [][]byte, typ uint64) (result *pb.IBTP, err error) { //func (c *Client) generateCallback(original *pb.IBTP, data [][]byte, typ uint64) (result *pb.IBTP, err error) {
payload := &pb.Payload{} // payload := &pb.Payload{}
if err := payload.Unmarshal(original.Payload); err != nil { // if err := payload.Unmarshal(original.Payload); err != nil {
return nil, err // return nil, err
} // }
//
return generateReceipt(original.From, original.To, original.Nonce, data, typ, payload.Encrypted) // return generateReceipt(original.From, original.To, original.Nonce, data, typ, payload.Encrypted)
} //}
//
func generateReceipt(from, to string, idx uint64, data [][]byte, typ uint64, encrypt bool) (*pb.IBTP, error) { func generateReceipt(from, to string, idx uint64, data [][]byte, typ uint64, encrypt bool) (*pb.IBTP, error) {
result := &pb.Result{Data:data} result := &pb.Result{Data:data}
content, err := result.Marshal() content, err := result.Marshal()
...@@ -52,3 +52,59 @@ func generateReceipt(from, to string, idx uint64, data [][]byte, typ uint64, enc ...@@ -52,3 +52,59 @@ func generateReceipt(from, to string, idx uint64, data [][]byte, typ uint64, enc
func unpackToBytesArray(data []byte) ([][]byte, error) { func unpackToBytesArray(data []byte) ([][]byte, error) {
return nil, nil return nil, nil
} }
func (c *Client) generateCallback(original *pb.IBTP, args [][]byte, status bool) (result *pb.IBTP, err error) {
if original == nil {
return nil, fmt.Errorf("got nil ibtp To generate receipt: %w", err)
}
pd := &pb.Payload{}
if err := pd.Unmarshal(original.Payload); err != nil {
return nil, fmt.Errorf("ibtp payload unmarshal: %w", err)
}
originalContent := &pb.Content{}
if err := originalContent.Unmarshal(pd.Content); err != nil {
return nil, fmt.Errorf("ibtp payload unmarshal: %w", err)
}
content := &pb.Content{}
typ := pb.IBTP_RECEIPT_SUCCESS
if status {
content.Func = originalContent.Callback
content.Args = append(originalContent.ArgsCb, args...)
} else {
content.Func = originalContent.Rollback
content.Args = originalContent.ArgsRb
typ = pb.IBTP_RECEIPT_FAILURE
}
// TODO
if original.Type == pb.IBTP_ROLLBACK {
typ = pb.IBTP_RECEIPT_ROLLBACK
}
b, err := content.Marshal()
if err != nil {
return nil, err
}
retPd := &pb.Payload{
Content: b,
}
pdb, err := retPd.Marshal()
if err != nil {
return nil, err
}
return &pb.IBTP{
From: original.From,
To: original.To,
Nonce: original.Nonce,
Type: typ,
Proof: original.Proof,
Payload: pdb,
Version: original.Version,
}, nil
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment