Commit 448268f9 authored by suyanlong's avatar suyanlong

remove exchanger

parent 6c138fef
Pipeline #7970 failed with stages
...@@ -41,8 +41,27 @@ func (a *appChain) Send(msg *pb.Message) (*pb.Message, error) { ...@@ -41,8 +41,27 @@ func (a *appChain) Send(msg *pb.Message) (*pb.Message, error) {
return nil, err return nil, err
} }
if ibtp == nil { //TODO if ibtp == nil { //TODO
return nil, nil return nil, errors.New("ibtp wrong")
} }
if ibtp.Type == pb.IBTP_RECEIPT_FAILURE || pb.IBTP_RECEIPT_SUCCESS == ibtp.Type {
//
//TODO 默认路由
ix := pb.IBTPX{
Ibtp: ibtp,
Mode: "",
RouteSign: nil,
RouteMethod: "",
RouteMethodArg: nil,
IsValid: false,
}
d, err := ix.Marshal()
if err != nil {
return nil, err
}
m := pb.Msg(pb.Message_IBTP_RECEIPT_SEND, true, d)
return m, nil
}
return nil, nil return nil, nil
default: default:
return nil, errors.New("msg error") return nil, errors.New("msg error")
......
package exchanger
import (
"github.com/link33/sidecar/api"
"github.com/link33/sidecar/internal/checker"
"github.com/link33/sidecar/internal/peermgr"
"github.com/link33/sidecar/internal/router"
"github.com/link33/sidecar/internal/syncer"
"github.com/meshplus/bitxhub-kit/storage"
"github.com/sirupsen/logrus"
)
type Config struct {
checker checker.Checker
store storage.Storage
peerMgr peermgr.PeerManager
router router.Router
syncer syncer.Syncer
apiServer *api.Server
logger logrus.FieldLogger
}
type Option func(*Config)
func WithChecker(checker checker.Checker) Option {
return func(config *Config) {
config.checker = checker
}
}
func WithPeerMgr(mgr peermgr.PeerManager) Option {
return func(config *Config) {
config.peerMgr = mgr
}
}
func WithRouter(router router.Router) Option {
return func(config *Config) {
config.router = router
}
}
func WithSyncer(syncer syncer.Syncer) Option {
return func(config *Config) {
config.syncer = syncer
}
}
func WithAPIServer(apiServer *api.Server) Option {
return func(config *Config) {
config.apiServer = apiServer
}
}
func WithStorage(store storage.Storage) Option {
return func(config *Config) {
config.store = store
}
}
func WithLogger(logger logrus.FieldLogger) Option {
return func(config *Config) {
config.logger = logger
}
}
func GenerateConfig(opts ...Option) *Config {
config := &Config{}
for _, opt := range opts {
opt(config)
}
return config
}
package exchanger
import (
"encoding/json"
"fmt"
"github.com/Rican7/retry"
"github.com/Rican7/retry/strategy"
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/model/pb"
"time"
)
func (ex *Exchanger) feedIBTP(wIbtp *pb.IBTPX) {
var pool *Pool
ibtp := wIbtp.Ibtp
act, loaded := ex.ibtps.Load(ibtp.From)
if !loaded {
pool = NewPool()
ex.ibtps.Store(ibtp.From, pool)
} else {
pool = act.(*Pool)
}
pool.feed(wIbtp)
if !loaded {
go func(pool *Pool) {
defer func() {
if e := recover(); e != nil {
ex.logger.Error(fmt.Errorf("%v", e))
}
}()
inMeta := ex.exec.QueryInterchainMeta()
for wIbtp := range pool.ch {
ibtp := wIbtp.Ibtp
idx := inMeta[ibtp.From]
if ibtp.Index <= idx {
pool.delete(ibtp.Index)
ex.logger.Warnf("ignore ibtp with invalid index: %d", ibtp.Index)
continue
}
if idx+1 == ibtp.Index {
ex.processIBTP(wIbtp)
pool.delete(ibtp.Index)
index := ibtp.Index + 1
wIbtp := pool.get(index)
for wIbtp != nil {
ex.processIBTP(wIbtp)
pool.delete(wIbtp.Ibtp.Index)
index++
wIbtp = pool.get(index)
}
} else {
pool.put(wIbtp)
}
}
}(pool)
}
}
// 直连
func (ex *Exchanger) processIBTP(wIbtp *pb.IBTPX) {
receipt, err := ex.exec.ExecuteIBTP(wIbtp)
if err != nil {
ex.logger.Errorf("Execute ibtp error: %s", err.Error())
return
}
ex.postHandleIBTP(wIbtp.Ibtp.From, receipt)
ex.sendIBTPCounter.Inc()
}
// 直连
func (ex *Exchanger) feedReceipt(receipt *pb.IBTP) {
var pool *Pool
act, loaded := ex.ibtps.Load(receipt.To)
if !loaded {
pool = NewPool()
ex.ibtps.Store(receipt.To, pool)
} else {
pool = act.(*Pool)
}
pool.feed(&pb.IBTPX{Ibtp: receipt, IsValid: true})
if !loaded {
go func(pool *Pool) {
defer func() {
if e := recover(); e != nil {
ex.logger.Error(fmt.Errorf("%v", e))
}
}()
callbackMeta := ex.exec.QueryCallbackMeta()
for wIbtp := range pool.ch {
ibtp := wIbtp.Ibtp
if ibtp.Index <= callbackMeta[ibtp.To] {
pool.delete(ibtp.Index)
ex.logger.Warn("ignore ibtp with invalid index")
continue
}
if callbackMeta[ibtp.To]+1 == ibtp.Index {
ex.processIBTP(wIbtp)
pool.delete(ibtp.Index)
index := ibtp.Index + 1
wIbtp := pool.get(index)
for wIbtp != nil {
ibtp := wIbtp.Ibtp
receipt, _ := ex.exec.ExecuteIBTP(wIbtp)
ex.postHandleIBTP(ibtp.From, receipt)
pool.delete(ibtp.Index)
index++
wIbtp = pool.get(index)
}
} else {
pool.put(wIbtp)
}
}
}(pool)
}
}
func (ex *Exchanger) postHandleIBTP(from string, receipt *pb.IBTP) {
if receipt == nil {
retMsg := pb.Msg(pb.Message_IBTP_RECEIPT_SEND, true, nil)
err := ex.peerMgr.AsyncSendByID(from, retMsg)
if err != nil {
ex.logger.Errorf("SendByID back empty ibtp receipt: %s", err.Error())
}
return
}
data, _ := receipt.Marshal()
retMsg := pb.Msg(pb.Message_IBTP_RECEIPT_SEND, true, data)
if err := ex.peerMgr.AsyncSendByID(from, retMsg); err != nil {
ex.logger.Errorf("SendByID back ibtp receipt: %s", err.Error())
}
}
//直链模式
func (ex *Exchanger) handleSendIBTPMessage(p port.Port, msg *pb.Message) {
ex.ch <- struct{}{}
go func(msg *pb.Message) {
wIbtp := &pb.IBTPX{}
if err := json.Unmarshal(msg.Payload.Data, wIbtp); err != nil {
ex.logger.Errorf("Unmarshal ibtp: %s", err.Error())
return
}
defer ex.timeCost()()
err := ex.checker.Check(wIbtp.Ibtp)
if err != nil {
ex.logger.Error("check ibtp: %w", err)
return
}
ex.feedIBTP(wIbtp)
<-ex.ch
}(msg)
}
//直链模式
func (ex *Exchanger) handleSendIBTPReceiptMessage(p port.Port, msg *pb.Message) {
if msg.Payload.Data == nil {
return
}
receipt := &pb.IBTP{}
if err := receipt.Unmarshal(msg.Payload.Data); err != nil {
ex.logger.Error("unmarshal ibtp: %w", err)
return
}
// ignore msg for receipt type
if receipt.Type == pb.IBTP_RECEIPT_SUCCESS || receipt.Type == pb.IBTP_RECEIPT_FAILURE {
//ex.logger.Warn("ignore receipt ibtp")
return
}
err := ex.checker.Check(receipt)
if err != nil {
ex.logger.Error("check ibtp: %w", err)
return
}
ex.feedReceipt(receipt)
ex.logger.Info("Receive ibtp receipt from other sidecar")
}
// 直连
func (ex *Exchanger) handleGetIBTPMessage(p port.Port, msg *pb.Message) {
ibtpID := string(msg.Payload.Data)
ibtp, err := ex.mnt.QueryIBTP(ibtpID)
if err != nil {
ex.logger.Error("Get wrong ibtp id")
return
}
data, err := ibtp.Marshal()
if err != nil {
return
}
retMsg := pb.Msg(pb.Message_ACK, true, data)
err = ex.peerMgr.AsyncSendWithPort(p, retMsg)
if err != nil {
ex.logger.Error(err)
}
}
// 直连
func (ex *Exchanger) handleNewConnection(dstSidecarID string) {
appchainMethod := []byte(ex.appchainDID)
msg := pb.Msg(pb.Message_INTERCHAIN_META_GET, true, appchainMethod)
indices := &struct {
InterchainIndex uint64 `json:"interchain_index"`
ReceiptIndex uint64 `json:"receipt_index"`
}{}
loop := func() error {
interchainMeta, err := ex.peerMgr.SendByID(dstSidecarID, msg)
if err != nil {
return err
}
if !interchainMeta.Payload.Ok {
return fmt.Errorf("interchain meta message payload is false")
}
if err = json.Unmarshal(interchainMeta.Payload.Data, indices); err != nil {
return err
}
return nil
}
if err := retry.Retry(func(attempt uint) error {
return loop()
}, strategy.Wait(1*time.Second)); err != nil {
ex.logger.Panic(err)
}
}
//直链模式
func (ex *Exchanger) handleGetInterchainMessage(p port.Port, msg *pb.Message) {
mntMeta := ex.mnt.QueryOuterMeta()
execMeta := ex.exec.QueryInterchainMeta()
indices := &struct {
InterchainIndex uint64 `json:"interchain_index"`
ReceiptIndex uint64 `json:"receipt_index"`
}{}
execLoad, ok := execMeta[string(msg.Payload.Data)]
if ok {
indices.InterchainIndex = execLoad
}
mntLoad, ok := mntMeta[string(msg.Payload.Data)]
if ok {
indices.InterchainIndex = mntLoad
}
data, err := json.Marshal(indices)
if err != nil {
panic(err)
}
retMsg := pb.Msg(pb.Message_ACK, true, data)
if err := ex.peerMgr.AsyncSendWithPort(p, retMsg); err != nil {
ex.logger.Error(err)
return
}
}
package exchanger
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/link33/sidecar/api"
"github.com/link33/sidecar/internal/checker"
"github.com/link33/sidecar/internal/peermgr"
"github.com/link33/sidecar/internal/repo"
"github.com/link33/sidecar/internal/router"
"github.com/link33/sidecar/internal/syncer"
"github.com/link33/sidecar/model/pb"
"github.com/meshplus/bitxhub-kit/storage"
"github.com/sirupsen/logrus"
"go.uber.org/atomic"
)
type Exchanger struct {
mode string
appchainDID string
store storage.Storage
syncer syncer.Syncer // WrapperSyncer represents the necessary data for sync tx wrappers from bitxhub
router router.Router // 占时不使用
apiServer *api.Server //直连模式使用
peerMgr peermgr.PeerManager
checker checker.Checker
sendIBTPCounter atomic.Uint64
sendIBTPTimer atomic.Duration
ch chan struct{} //control the concurrent count
ibtps sync.Map
receipts sync.Map
logger logrus.FieldLogger
ctx context.Context
cancel context.CancelFunc
}
func New(typ, appchainDID string, meta *pb.Interchain, opts ...Option) (*Exchanger, error) {
config := GenerateConfig(opts...)
ctx, cancel := context.WithCancel(context.Background())
return &Exchanger{
checker: config.checker,
apiServer: config.apiServer,
peerMgr: config.peerMgr,
syncer: config.syncer,
store: config.store,
router: config.router,
logger: config.logger,
ch: make(chan struct{}, 100),
mode: typ,
appchainDID: appchainDID,
ctx: ctx,
cancel: cancel,
}, nil
}
func (ex *Exchanger) Start() error {
ex.logger.Info("Exchanger started")
return nil
}
//直链模式
func (ex *Exchanger) startWithDirectMode() error {
if err := ex.apiServer.Start(); err != nil {
return fmt.Errorf("peerMgr start: %w", err)
}
if err := ex.peerMgr.RegisterConnectHandler(ex.handleNewConnection); err != nil {
return fmt.Errorf("register on connection handler: %w", err)
}
if err := ex.peerMgr.RegisterMsgHandler(pb.Message_INTERCHAIN_META_GET, ex.handleGetInterchainMessage); err != nil {
return fmt.Errorf("register query interchain msg handler: %w", err)
}
// API调用
if err := ex.peerMgr.RegisterMsgHandler(pb.Message_IBTP_SEND, ex.handleSendIBTPMessage); err != nil {
return fmt.Errorf("register ibtp handler: %w", err)
}
if err := ex.peerMgr.RegisterMsgHandler(pb.Message_IBTP_RECEIPT_SEND, ex.handleSendIBTPReceiptMessage); err != nil {
return fmt.Errorf("register ibtp handler: %w", err)
}
if err := ex.peerMgr.RegisterMsgHandler(pb.Message_IBTP_GET, ex.handleGetIBTPMessage); err != nil {
return fmt.Errorf("register ibtp receipt handler: %w", err)
}
if err := ex.peerMgr.Start(); err != nil {
return fmt.Errorf("peerMgr start: %w", err)
}
go ex.analysisDirectTPS()
return nil
}
func (ex *Exchanger) startWithRelayMode() error {
if err := ex.syncer.RegisterRollbackHandler(ex.handleRollback); err != nil {
return fmt.Errorf("register router handler: %w", err)
}
return nil
}
// 中继、直链
func (ex *Exchanger) listenAndSendIBTPFromMnt() {
ch := ex.mnt.ListenIBTP()
for {
select {
case <-ex.ctx.Done():
return
case ibtp, ok := <-ch:
ex.logger.Info("Receive interchain ibtp from monitor")
if !ok {
ex.logger.Warn("Unexpected closed channel while listening on interchain ibtp")
return
}
if err := retry.Retry(func(attempt uint) error {
if err := ex.sendIBTP(ibtp); err != nil {
ex.logger.Errorf("SendByID ibtp: %s", err.Error())
// if err occurs, try to get new ibtp and resend
ibtpID := ibtp.ID()
if err := retry.Retry(func(attempt uint) error {
ibtp, err = ex.mnt.QueryIBTP(ibtpID)
if err != nil {
ex.logger.Errorf("Query ibtp %s from appchain: %s", ibtpID, err.Error())
return err
}
return nil
}, strategy.Backoff(backoff.Fibonacci(500*time.Millisecond))); err != nil {
ex.logger.Panic(err)
}
return fmt.Errorf("retry sending ibtp")
}
return nil
}, strategy.Backoff(backoff.Fibonacci(500*time.Millisecond))); err != nil {
ex.logger.Panic(err)
}
}
}
}
// 中继模式
func (ex *Exchanger) listenAndSendIBTPFromSyncer() {
ch := ex.syncer.ListenIBTP()
for {
select {
case <-ex.ctx.Done():
return
case wIbtp, ok := <-ch:
if !ok {
ex.logger.Warn("Unexpected closed channel while listening on interchain ibtp")
return
}
entry := ex.logger.WithFields(logrus.Fields{"type": wIbtp.Ibtp.Type, "id": wIbtp.Ibtp.ID()})
entry.Debugf("Exchanger receives ibtp from syncer")
switch wIbtp.Ibtp.Type {
case pb.IBTP_INTERCHAIN:
ex.applyInterchain(wIbtp, entry) // 发送到HUB上
case pb.IBTP_RECEIPT_SUCCESS, pb.IBTP_RECEIPT_FAILURE:
ex.applyReceipt(wIbtp, entry)
default:
entry.Errorf("wrong type of ibtp")
}
}
}
}
// 共同
func (ex *Exchanger) sendIBTP(ibtp *pb.IBTP) error {
entry := ex.logger.WithFields(logrus.Fields{"index": ibtp.Index, "type": ibtp.Type, "to": ibtp.To, "id": ibtp.ID()})
//TODO!!!! 根据交易决定
//mode := ibtp.mode
mode := repo.RelayMode
switch mode {
case repo.RelayMode:
err := ex.syncer.SendIBTP(ibtp)
if err != nil {
entry.Errorf("SendByID ibtp to bitxhub: %s", err.Error())
return fmt.Errorf("send ibtp to bitxhub: %s", err.Error())
}
case repo.DirectMode:
// send ibtp to another sidecar
if err := retry.Retry(func(attempt uint) error {
data, err := ibtp.Marshal()
if err != nil {
panic(fmt.Sprintf("marshal ibtp: %s", err.Error()))
}
msg := pb.Msg(pb.Message_IBTP_SEND, true, data)
var dst string
if ibtp.Type == pb.IBTP_INTERCHAIN {
dst = ibtp.To
} else {
dst = ibtp.From
}
if err := ex.peerMgr.AsyncSendByID(dst, msg); err != nil {
ex.logger.Errorf("SendByID ibtp to sidecar %s: %s", ibtp.ID(), err.Error())
return err
}
return nil
}, strategy.Wait(1*time.Second)); err != nil {
ex.logger.Panic(err)
}
}
entry.Info("SendByID ibtp success from monitor")
return nil
}
//主动发起,说明是from自己,
func (ex *Exchanger) queryIBTP(mode string, id, target string) (*pb.IBTP, bool, error) {
verifiedTx := &pb.VerifiedTx{}
v := ex.store.Get(pb.IBTPKey(id))
if v != nil {
if err := verifiedTx.Unmarshal(v); err != nil {
return nil, false, err
}
return verifiedTx.Tx.GetIBTP(), verifiedTx.Valid, nil
}
// query ibtp from counterpart chain
var (
ibtp *pb.IBTP
isValid bool
err error
)
// 交易中,如果是用户,根据用户指定,如果没有按默认配置,依次排序。
switch mode {
case repo.RelayMode: // 中继模式是查询hub上的ibtp交易根据交易ID。
ibtp, isValid, err = ex.syncer.QueryIBTP(id)
if err != nil {
if errors.Is(err, syncer.ErrIBTPNotFound) {
ex.logger.Panicf("query ibtp by id %s from bitxhub: %s", id, err.Error())
}
return nil, false, fmt.Errorf("query ibtp from bitxhub: %s", err.Error())
}
case repo.DirectMode:
// query ibtp from another sidecar
msg := pb.Message(pb.Message_IBTP_GET, true, []byte(id))
result, err := ex.peerMgr.SendByID(target, msg)
if err != nil {
return nil, false, err
}
ibtp = &pb.IBTP{}
if err := ibtp.Unmarshal(result.Payload.Data); err != nil {
return nil, false, err
}
default:
return nil, false, fmt.Errorf("unsupported sidecar mode")
}
return ibtp, isValid, nil
}
func (ex *Exchanger) queryIBTPForRelay(id, target string) (*pb.IBTP, bool, error) {
verifiedTx := &pb.VerifiedTx{}
v := ex.store.Get(pb.IBTPKey(id))
if v != nil {
if err := verifiedTx.Unmarshal(v); err != nil {
return nil, false, err
}
return verifiedTx.Tx.GetIBTP(), verifiedTx.Valid, nil
}
ibtp, isValid, err := ex.syncer.QueryIBTP(id)
if err != nil {
if errors.Is(err, syncer.ErrIBTPNotFound) {
ex.logger.Panicf("query ibtp by id %s from bitxhub: %s", id, err.Error())
}
return nil, false, fmt.Errorf("query ibtp from bitxhub: %s", err.Error())
}
return ibtp, isValid, nil
}
package exchanger
import (
"github.com/link33/sidecar/model/pb"
)
//中继模式
func (ex *Exchanger) feedIBTPReceipt(receipt *pb.IBTPX) {
var pool *Pool
act, loaded := ex.receipts.Load(receipt.Ibtp.To)
if !loaded {
pool = NewPool()
ex.receipts.Store(receipt.Ibtp.To, pool)
} else {
pool = act.(*Pool)
}
pool.feed(receipt)
}
...@@ -94,6 +94,9 @@ func (r *router) Remove(p port.Port) error { ...@@ -94,6 +94,9 @@ func (r *router) Remove(p port.Port) error {
//路由的有那些数据?需要区分!!! //路由的有那些数据?需要区分!!!
func (r *router) Route(msg *pb.Message) error { func (r *router) Route(msg *pb.Message) error {
if msg == nil {
r.logger.Error("msg = nil")
}
ibtpx := &pb.IBTPX{} ibtpx := &pb.IBTPX{}
err := ibtpx.Unmarshal(msg.Payload.Data) err := ibtpx.Unmarshal(msg.Payload.Data)
if err != nil { if err != nil {
...@@ -174,6 +177,7 @@ func (r *router) Route(msg *pb.Message) error { ...@@ -174,6 +177,7 @@ func (r *router) Route(msg *pb.Message) error {
return err return err
} }
case pb.Message_IBTP_GET: case pb.Message_IBTP_GET:
case pb.Message_IBTP_RECEIPT_GET: case pb.Message_IBTP_RECEIPT_GET:
default: default:
return nil return nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment