Commit 96ef8012 authored by suyanlong's avatar suyanlong

refactor code

parent 72bfc44d
Pipeline #7936 failed with stages
......@@ -16,8 +16,8 @@ import (
"github.com/link33/sidecar/internal/loggers"
"github.com/link33/sidecar/internal/manger"
"github.com/link33/sidecar/internal/peermgr"
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/internal/repo"
"github.com/link33/sidecar/internal/router"
"github.com/link33/sidecar/internal/txcrypto"
"github.com/link33/sidecar/pkg/plugins"
)
......@@ -49,8 +49,8 @@ func NewSidecar(repoRoot string, config *repo.Config) (internal.Launcher, error)
//cryptor txcrypto.Cryptor
//apiServer *api.Server
)
portMap := port.NewPortMap()
pm, err := peermgr.New(config, portMap, nodePrivKey, privateKey, 1, loggers.Logger(loggers.PeerMgr))
r := router.NewRouter(loggers.Logger(loggers.Router))
pm, err := peermgr.New(config, r, nodePrivKey, privateKey, 1, loggers.Logger(loggers.PeerMgr))
Asset(err)
clients := plugins.CreateClients(config.Appchains, nil)
persister := manger.NewPersister(addr.String(), store, loggers.Logger(loggers.Manger))
......@@ -58,8 +58,8 @@ func NewSidecar(repoRoot string, config *repo.Config) (internal.Launcher, error)
cryptor, err := txcrypto.NewDirectCryptor(appchainMgr, privateKey)
Asset(err)
clientPort := appchain.NewPorts(clients, cryptor, logger)
portMap.Adds(clientPort)
mg, err := manger.NewManager(addr.String(), portMap, pm, appchainMgr, loggers.Logger(loggers.Manger))
r.Adds(clientPort)
mg, err := manger.NewManager(addr.String(), r, pm, appchainMgr, loggers.Logger(loggers.Manger))
Asset(err)
ctx, cancel := context.WithCancel(context.Background())
return &Sidecar{
......
......@@ -22,13 +22,22 @@ func (a *appChain) Name() string {
}
func (a *appChain) Send(msg port.Message) (*pb.Message, error) {
//TODO 调用该执行。
//a.ExecuteIBTP()
//a.Rollback()
panic("implement me")
}
func (a *appChain) AsyncSend(msg port.Message) error {
//TODO 调用该执行。
//a.ExecuteIBTP()
//a.Rollback()
panic("implement me")
}
func (a *appChain) ListenIBTPX() <-chan *pb.IBTPX {
panic("implement me")
}
......@@ -270,32 +270,3 @@ func (ex *Exchanger) handleGetInterchainMessage(p port.Port, msg *pb.Message) {
return
}
}
//直链模式
func (ex *Exchanger) analysisDirectTPS() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
current := time.Now()
counter := ex.sendIBTPCounter.Load()
for {
select {
case <-ticker.C:
tps := ex.sendIBTPCounter.Load() - counter
counter = ex.sendIBTPCounter.Load()
totalTimer := ex.sendIBTPTimer.Load()
if tps != 0 {
ex.logger.WithFields(logrus.Fields{
"tps": tps,
"tps_sum": counter,
"tps_time": totalTimer.Milliseconds() / int64(counter),
"tps_avg": float64(counter) / time.Since(current).Seconds(),
}).Info("analysis")
}
case <-ex.ctx.Done():
return
}
}
}
......@@ -64,27 +64,6 @@ func New(typ, appchainDID string, meta *pb.Interchain, opts ...Option) (*Exchang
}
func (ex *Exchanger) Start() error {
var err error
switch ex.mode {
case repo.DirectMode:
err = ex.startWithDirectMode()
case repo.RelayMode:
err = ex.startWithRelayMode()
}
if err != nil {
return err
}
// 这个同样也是,而不是在这里启动。
if ex.mnt != nil {
go ex.listenAndSendIBTPFromMnt()
}
//核心,就是转发
if ex.syncer != nil { //而是根据是否有配置hub判断,而不是这个判断.
go ex.listenAndSendIBTPFromSyncer()
}
ex.logger.Info("Exchanger started")
return nil
}
......@@ -196,28 +175,6 @@ func (ex *Exchanger) listenAndSendIBTPFromSyncer() {
}
}
func (ex *Exchanger) Stop() error {
ex.cancel()
switch ex.mode {
case repo.DirectMode:
if err := ex.apiServer.Stop(); err != nil {
return fmt.Errorf("gin service stop: %w", err)
}
if err := ex.peerMgr.Stop(); err != nil {
return fmt.Errorf("peerMgr stop: %w", err)
}
case repo.RelayMode:
if err := ex.syncer.Stop(); err != nil {
return fmt.Errorf("syncer stop: %w", err)
}
}
ex.logger.Info("Exchanger stopped")
return nil
}
// 共同
func (ex *Exchanger) sendIBTP(ibtp *pb.IBTP) error {
entry := ex.logger.WithFields(logrus.Fields{"index": ibtp.Index, "type": ibtp.Type, "to": ibtp.To, "id": ibtp.ID()})
......
......@@ -22,7 +22,7 @@ func (pool *Pool) feed(ibtp *pb.IBTPX) {
}
func (pool *Pool) put(ibtp *pb.IBTPX) {
pool.ibtps.Store(ibtp.Ibtp.Index, ibtp)
pool.ibtps.Store(ibtp.Ibtp.Nonce, ibtp)
}
func (pool *Pool) delete(idx uint64) {
......
package exchanger
import (
"time"
"github.com/link33/sidecar/model/pb"
"github.com/sirupsen/logrus"
)
//中继模式 handleIBTP handle ibtps from bitxhub
func (ex *Exchanger) handleIBTP(wIbtp *pb.IBTPX, entry logrus.FieldLogger) {
ibtp := wIbtp.Ibtp
err := ex.checker.Check(ibtp)
if err != nil {
// todo: send receipt back to bitxhub
return
}
entry.Debugf("IBTP pass check")
receipt, err := ex.exec.ExecuteIBTP(wIbtp)
if err != nil {
ex.logger.Errorf("execute ibtp error:%s", err.Error())
}
if receipt == nil {
ex.logger.WithFields(logrus.Fields{"type": ibtp.Type, "id": ibtp.ID()}).Info("Handle ibtp receipt success")
return
}
sendReceiptLoop:
for {
err = ex.syncer.SendIBTP(receipt) // 回执一定是要到中继链上的,作为数据凭证。
if err != nil {
ex.logger.Errorf("send ibtp error: %s", err.Error())
// if sending receipt failed, try to get new receipt from appchain and retry
queryLoop:
for {
// 死循环,直到成功。
receipt, err = ex.exec.QueryIBTPReceipt(ibtp)
if err != nil {
ex.logger.Errorf("Query ibtp receipt for %s error: %s", ibtp.ID(), err.Error())
time.Sleep(1 * time.Second)
continue queryLoop
}
time.Sleep(1 * time.Second)
continue sendReceiptLoop
}
}
break
}
ex.logger.WithFields(logrus.Fields{"type": ibtp.Type, "id": ibtp.ID()}).Info("Handle ibtp success")
}
// 中继模式:处理链间交易回执
func (ex *Exchanger) applyReceipt(wIbtp *pb.IBTPX, entry logrus.FieldLogger) {
ex.handleIBTP(wIbtp, entry)
}
// 中继链架构,处理链间交易,
func (ex *Exchanger) applyInterchain(wIbtp *pb.IBTPX, entry logrus.FieldLogger) {
ex.handleIBTP(wIbtp, entry)
}
//中继模式
func (ex *Exchanger) handleRollback(ibtp *pb.IBTP) {
if ibtp.Category() == pb.IBTP_RESPONSE {
// if this is receipt type of ibtp, no need to rollback
return
}
ex.feedIBTPReceipt(&pb.IBTPX{Ibtp: ibtp, IsValid: false})
}
func (ex *Exchanger) timeCost() func() {
start := time.Now()
return func() {
tc := time.Since(start)
ex.sendIBTPTimer.Add(tc)
}
}
package manger
import (
"context"
"encoding/json"
"errors"
"github.com/link33/sidecar/internal"
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/internal/repo"
"github.com/link33/sidecar/internal/router"
"github.com/link33/sidecar/model/pb"
......@@ -16,10 +12,10 @@ import (
)
type Manager struct {
logger logrus.FieldLogger
Mgr appchainmgr.AppchainMgr
pm peermgr.Handler
mangerPort *MangerPort
logger logrus.FieldLogger
Mgr appchainmgr.AppchainMgr
pm peermgr.PeerManager
router router.Router
}
func (mgr *Manager) Start() error {
......@@ -30,15 +26,12 @@ func (mgr *Manager) Stop() error {
panic("implement me")
}
func NewManager(addr string, portMap *port.PortMap, pm peermgr.PeerManager, Mgr appchainmgr.AppchainMgr, logger logrus.FieldLogger) (*Manager, error) {
mangerPort := NewMangerPort(portMap, logger)
func NewManager(addr string, router router.Router, pm peermgr.PeerManager, Mgr appchainmgr.AppchainMgr, logger logrus.FieldLogger) (*Manager, error) {
am := &Manager{
Mgr: Mgr,
logger: logger,
mangerPort: mangerPort,
pm: pm,
Mgr: Mgr,
logger: logger,
router: router,
pm: pm,
}
err := pm.RegisterMultiMsgHandler([]pb.Message_Type{
......@@ -95,186 +88,3 @@ func (mgr *Manager) handleMessage(s port.Port, msg *pb.Message) {
"consensus_type": appchainRes.ConsensusType,
}).Info("Handle appchain message")
}
type Manger interface {
internal.Launcher
Remove()
Add()
Query()
}
type MangerPort struct {
// peer manger
// sidecar manger
logger logrus.FieldLogger
ctx context.Context
cancel context.CancelFunc
router router.Router
portMap *port.PortMap
methodMap map[string]routeMethod
}
type routeMethod func([]string) []port.Port
func NewMangerPort(portMap *port.PortMap, logger logrus.FieldLogger) *MangerPort {
ctx, cancel := context.WithCancel(context.Background())
return &MangerPort{
logger: logger,
ctx: ctx,
cancel: cancel,
router: nil,
portMap: portMap,
}
}
func (m *MangerPort) Start() error {
if err := m.router.Start(); err != nil {
return err
}
m.methodMap["single"] = m.Single
m.methodMap["multicast"] = m.Multicast
m.methodMap["broadcast"] = m.Broadcast
m.methodMap["official"] = m.Official
return nil
}
func (m *MangerPort) Stop() error {
if err := m.router.Stop(); err != nil {
return err
}
return nil
}
//TODO
func (m *MangerPort) Add(p port.Port) error {
m.portMap.Add(p)
go func() {
c := p.ListenIBTPX()
for {
select {
case ibtpx := <-c:
err := m.Route(ibtpx)
if err != nil {
m.logger.Error(err)
}
}
}
}()
return nil
}
func (m *MangerPort) Remove(p port.Port) error {
m.portMap.Remove(p)
return nil
}
//TODO 本机找到的appchain是自己的appchain
func (m *MangerPort) Route(ibtpx *pb.IBTPX) error {
mode := ibtpx.Mode
//本网关已签名、中继链已背书、to是本网关内部的appchain,即顺利通过并转发,否则打断。
if !((m.isSign(ibtpx) && mode == repo.RelayMode && m.isEndorse(ibtpx)) || !m.isSign(ibtpx)) {
return nil
}
//本网关签名
if !m.isSign(ibtpx) {
m.Sign(ibtpx)
}
ibtp := ibtpx.Ibtp
_, to := ibtp.From, ibtp.To
if pp, is := m.portMap.Port(to); is {
switch {
case pp.Type() == port.Hub || pp.Type() == port.Sidecar:
return pp.AsyncSend(ibtpx)
case pp.Type() == port.Appchain:
switch mode {
case repo.RelayMode:
hub, is := m.getHub()
if is && !m.isEndorse(ibtpx) {
return hub.AsyncSend(ibtpx)
} else {
return pp.AsyncSend(ibtpx)
}
case repo.DirectMode:
return pp.AsyncSend(ibtpx)
default:
//TODO 跳过
return nil
}
default:
//TODO 跳过
return nil
}
}
//规则判断
method := ibtpx.RouteMethod
if md, is := m.methodMap[method]; is {
ports := md(ibtpx.RouteMethodArg)
if len(ports) == 0 {
m.firstRoute(ibtpx)
}
for _, p := range ports {
_ = p.AsyncSend(ibtpx)
}
}
m.firstRoute(ibtpx)
return nil
}
func (m *MangerPort) firstRoute(ibtp *pb.IBTPX) {
panic("implement me")
}
func (m *MangerPort) getHub() (port.Port, bool) {
return nil, false
}
func (m *MangerPort) isSign(ibtpx *pb.IBTPX) bool {
panic("implement me")
}
func (m *MangerPort) Sign(ibtpx *pb.IBTPX) {
panic("implement me")
}
// hub endorse
func (m *MangerPort) isEndorse(ibtpx *pb.IBTPX) bool {
panic("implement me")
}
func (m *MangerPort) HandlerMethod() {}
func (m *MangerPort) Single([]string) []port.Port {
panic("implement me")
}
func (m *MangerPort) Multicast([]string) []port.Port {
panic("implement me")
}
func (m *MangerPort) Broadcast([]string) []port.Port {
panic("implement me")
}
func (m *MangerPort) Official([]string) []port.Port {
panic("implement me")
}
func (m *MangerPort) Send(id string, msg port.Message) (*pb.Message, error) {
if p, is := m.portMap.Port(id); is {
return p.Send(msg)
}
return nil, errors.New("id error!")
}
func (m *MangerPort) AsyncSend(id string, msg port.Message) error {
if p, is := m.portMap.Port(id); is {
return p.AsyncSend(msg)
}
return errors.New("id error!")
}
package peermgr
import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/model/pb"
"github.com/meshplus/bitxhub-kit/crypto"
)
type local struct {
id peer.ID
privKey crypto.PrivateKey
tag string
rev chan *pb.IBTPX
}
func (l *local) ID() string {
return l.id.String()
}
func (l *local) Type() string {
return port.Sidecar
}
func (l *local) Name() string {
return l.ID()
}
func (l *local) Tag() string {
return l.tag
}
func (l *local) Send(msg port.Message) (*pb.Message, error) {
panic("implement me")
}
func (l *local) AsyncSend(msg port.Message) error {
panic("implement me")
}
func (l *local) ListenIBTPX() <-chan *pb.IBTPX {
return l.rev
}
......@@ -95,7 +95,7 @@ func TestSwarm_AsyncSend(t *testing.T) {
addr, err := AddrToPeerInfo(mockMultiAddr)
require.Nil(t, err)
mockSwarm.connectedPeers.Store(mockId, newSidecar(addr, mockSwarm))
mockSwarm.router.Store(mockId, newSidecar(addr, mockSwarm))
err = mockSwarm.AsyncSend(mockId, mockMsg)
require.Nil(t, err)
......@@ -112,7 +112,7 @@ func TestSwarm_Send(t *testing.T) {
addr, err := AddrToPeerInfo(mockMultiAddr)
require.Nil(t, err)
mockSwarm.connectedPeers.Store(mockId, newSidecar(addr, mockSwarm))
mockSwarm.router.Store(mockId, newSidecar(addr, mockSwarm))
_, err = mockSwarm.Send(mockId, mockMsg)
require.Nil(t, err)
......
package peermgr
import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/model/pb"
)
type sidecar struct {
addr *peer.AddrInfo
id string
swarm PeerManager
tag string
rev chan *pb.IBTPX
}
func (s *sidecar) ID() string {
return s.addr.String()
return s.id
}
func (s *sidecar) Type() string {
......@@ -29,7 +28,9 @@ func (s *sidecar) Tag() string {
return s.tag
}
//TODO 如何区别IBTPX与Message
func (s *sidecar) Send(msg port.Message) (*pb.Message, error) {
return s.swarm.Send(s.ID(), msg)
}
......
......@@ -30,6 +30,12 @@ sidecar did标识是否要和自己绑定的链did标识一致、用同一个标
6、其它sidecar节点、pulgin都是client。
7、hub连接点、其它链接点的都是plugin。
8、最终所有的都是port。只是对port进行细分,再细分。
9、协议细分,要分三层协议:IBTP、校验、加密、路由,传输。
10、sidecar与sidecar传输数据使用Message结构,内部pulgin传输使用IBTPX包就行了。
11、appchain 的公共部分需要抽象出来。
12、映射调用接口与方法名字。
13、
## Monitor、Syncer这些接口,可以是每一个client里面应该实现的接口。
......
......@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/internal/router"
"github.com/link33/sidecar/model/pb"
"strings"
"sync"
......@@ -30,13 +31,14 @@ const (
var _ PeerManager = (*Swarm)(nil)
type Swarm struct {
p2p network.Network
logger logrus.FieldLogger
peers map[string]*peer.AddrInfo
connectedPeers *port.PortMap
p2p network.Network
logger logrus.FieldLogger
peers map[string]*peer.AddrInfo
router router.Router
providers uint64
privKey crypto.PrivateKey
id peer.ID
msgHandlers sync.Map
connectHandlers []ConnectHandler
......@@ -45,20 +47,23 @@ type Swarm struct {
cancel context.CancelFunc
}
func New(config *repo.Config, portMap *port.PortMap, nodePrivKey crypto.PrivateKey, privKey crypto.PrivateKey, providers uint64, logger logrus.FieldLogger) (*Swarm, error) {
func New(config *repo.Config, router router.Router, nodePrivKey crypto.PrivateKey, privKey crypto.PrivateKey, providers uint64, logger logrus.FieldLogger) (*Swarm, error) {
libp2pPrivKey, err := convertToLibp2pPrivKey(nodePrivKey)
if err != nil {
return nil, fmt.Errorf("convert private key: %w", err)
}
var local string
var ll string
var remotes map[string]*peer.AddrInfo
local, remotes, err = loadPeers(config.Peer.Peers, libp2pPrivKey)
id, err := peer.IDFromPrivateKey(libp2pPrivKey)
if err != nil {
panic(err)
}
ll, remotes, err = loadPeers(config.Peer.Peers, libp2pPrivKey)
var protocolIDs = []string{protocolID}
p2p, err := network.New(
network.WithLocalAddr(local),
network.WithLocalAddr(ll),
network.WithPrivateKey(libp2pPrivKey),
network.WithProtocolIDs(protocolIDs),
network.WithLogger(logger),
......@@ -81,10 +86,20 @@ func New(config *repo.Config, portMap *port.PortMap, nodePrivKey crypto.PrivateK
privKey: privKey,
ctx: ctx,
cancel: cancel,
router: router,
id: id,
}, nil
}
func (swarm *Swarm) Start() error {
l := &local{
id: swarm.id,
privKey: swarm.privKey,
tag: "",
rev: make(chan *pb.IBTPX),
}
swarm.router.Add(l)
swarm.p2p.SetMessageHandler(swarm.handleMessage)
if err := swarm.RegisterMsgHandler(pb.Message_ADDRESS_GET, swarm.handleGetAddressMessage); err != nil {
......@@ -128,12 +143,13 @@ func (swarm *Swarm) Start() error {
rec := make(chan *pb.IBTPX)
p := &sidecar{
addr: addr,
id: id,
swarm: swarm,
tag: "",
rev: rec,
}
swarm.connectedPeers.Store(id, p)
_ = swarm.router.Add(p)
swarm.lock.RLock()
defer swarm.lock.RUnlock()
for _, handler := range swarm.connectHandlers {
......@@ -167,13 +183,14 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
pack := m.Payload.Data
t := m.Type
switch {
case t == pb.Message_IBTP_SEND || t == pb.Message_IBTP_GET || t == pb.Message_IBTP_RECEIPT_SEND:
// 接收其它sidecar节点发过来的交易、请求等
case t == pb.Message_IBTP_SEND || t == pb.Message_IBTP_GET || t == pb.Message_IBTP_RECEIPT_SEND || t == pb.Message_IBTP_RECEIPT_GET:
ibtpx := &pb.IBTPX{}
if err := m.Unmarshal(pack); err != nil {
swarm.logger.Error(err)
return
}
p, is := swarm.connectedPeers.Load(s.RemotePeerID())
p, is := swarm.router.Load(s.RemotePeerID())
if is {
ps, iss := p.(*sidecar)
if iss {
......@@ -184,14 +201,15 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
addr, _ := peer.AddrInfoFromP2pAddr(s.RemotePeerAddr())
rec := make(chan *pb.IBTPX)
newPort := &sidecar{
addr: addr,
id: addr.ID.String(),
swarm: swarm,
tag: "",
rev: rec,
}
swarm.connectedPeers.Store(addr.ID.String(), newPort)
swarm.router.Add(newPort)
rec <- ibtpx
default:
//TODO
}
}
......@@ -220,12 +238,12 @@ func (swarm *Swarm) Connect(addrInfo *peer.AddrInfo) (string, error) {
rec := make(chan *pb.IBTPX)
p := &sidecar{
addr: addrInfo,
id: addrInfo.ID.String(),
swarm: swarm,
tag: "",
rev: rec,
}
swarm.connectedPeers.Add(p)
swarm.router.Add(p)
return addrInfo.ID.String(), nil
}
......
......@@ -202,3 +202,8 @@ func (p *PortMap) Store(id string, port Port) {
func (p *PortMap) Load(key string) (value Port, ok bool) {
return p.Port(key)
}
func (p *PortMap) IsExist(id string) bool {
_, is := p.Load(id)
return is
}
......@@ -2,69 +2,199 @@ package router
import (
"context"
"errors"
"github.com/link33/sidecar/internal/checker"
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/internal/repo"
"github.com/link33/sidecar/model/pb"
appchainmgr "github.com/meshplus/bitxhub-core/appchain-mgr"
"github.com/meshplus/bitxhub-kit/storage"
"github.com/sirupsen/logrus"
"sync"
)
//TODO 程序动态注入与随机的删除。sync.Map
var routerMap sync.Map
func Register(port port.Port) {
routerMap.Store(port.ID(), port)
type router struct {
logger logrus.FieldLogger
ctx context.Context
cancel context.CancelFunc
checker checker.Checker
portMap *port.PortMap
methodMap map[string]routeMethod
}
func UnRegister(port port.Port) {
routerMap.Delete(port.ID())
}
type routeMethod func([]string) []port.Port
type router struct {
logger logrus.FieldLogger
ctx context.Context
cancel context.CancelFunc
store storage.Storage
func NewRouter(logger logrus.FieldLogger) Router {
ctx, cancel := context.WithCancel(context.Background())
return &router{
logger: logger,
ctx: ctx,
cancel: cancel,
portMap: port.NewPortMap(),
}
}
func (r *router) Start() error {
panic("implement me")
r.methodMap["single"] = r.Single
r.methodMap["multicast"] = r.Multicast
r.methodMap["broadcast"] = r.Broadcast
r.methodMap["official"] = r.Official
return nil
}
func (r *router) Stop() error {
panic("implement me")
return nil
}
func (r *router) Broadcast(ids []string) error {
panic("implement me")
//TODO
func (r *router) Add(p port.Port) error {
r.portMap.Add(p)
go func() {
c := p.ListenIBTPX()
for {
select {
case ibtpx := <-c:
err := r.Route(ibtpx)
if err != nil {
r.logger.Error(err)
}
}
}
}()
return nil
}
func (r *router) Adds(p []port.Port) error {
for _, pt := range p {
r.Add(pt)
}
return nil
}
// TODO 路由规则、路由优先级
// 一个连接一个goroutine
func (r *router) Route(ibtp *pb.IBTP) error {
func (r *router) Load(key string) (value port.Port, ok bool) {
return r.portMap.Load(key)
}
func (r *router) Remove(p port.Port) error {
r.portMap.Remove(p)
return nil
}
func (r *router) Route(ibtpx *pb.IBTPX) error {
mode := ibtpx.Mode
//本网关已签名、中继链已背书、to是本网关内部的appchain,即顺利通过并转发,否则打断。
if !((r.isSign(ibtpx) && mode == repo.RelayMode && r.isEndorse(ibtpx)) || !r.isSign(ibtpx)) {
return nil
}
//本网关签名
if !r.isSign(ibtpx) {
r.sign(ibtpx)
}
ibtp := ibtpx.Ibtp
err := r.checker.Check(ibtp)
if err != nil {
r.logger.Error("check ibtp: %w", err)
return err
}
_, to := ibtp.From, ibtp.To
if p, is := routerMap.Load(to); is {
return p.(port.Port).AsyncSend(ibtp)
if pp, is := r.portMap.Port(to); is {
switch {
case pp.Type() == port.Sidecar:
return pp.AsyncSend(ibtpx) //转发给sidecar节点,或者本身local节点。
case pp.Type() == port.Hub: //发给hub appchain
return pp.AsyncSend(ibtpx)
case pp.Type() == port.Appchain: //TODO 本机找到的appchain只能是自己的appchain
switch mode {
case repo.RelayMode:
hub, is := r.getHub()
if is && !r.isEndorse(ibtpx) {
return hub.AsyncSend(ibtpx)
} else {
return pp.AsyncSend(ibtpx)
}
case repo.DirectMode:
return pp.AsyncSend(ibtpx)
default:
//TODO 跳过
return nil
}
default:
//TODO 跳过
return nil
}
}
//规则判断 转发给sidecar节点
method := ibtpx.RouteMethod
if md, is := r.methodMap[method]; is {
ports := md(ibtpx.RouteMethodArg)
if len(ports) == 0 {
r.firstRoute(ibtpx)
}
for _, p := range ports {
_ = p.AsyncSend(ibtpx)
}
} else {
r.firstRoute(ibtp)
return nil
r.firstRoute(ibtpx)
}
return nil
}
func (r *router) firstRoute(ibtp *pb.IBTPX) {
panic("implement me")
}
func (r *router) firstRoute(ibtp *pb.IBTP) {
func (r *router) getHub() (port.Port, bool) {
return nil, false
}
func (r *router) isSign(ibtpx *pb.IBTPX) bool {
panic("implement me")
}
func (r *router) ExistAppchain(id string) bool {
func (r *router) sign(ibtpx *pb.IBTPX) {
panic("implement me")
}
func (r *router) AddAppchains(appchains []*appchainmgr.Appchain) error {
// hub endorse
func (r *router) isEndorse(ibtpx *pb.IBTPX) bool {
panic("implement me")
}
func (r *router) HandlerMethod() {}
func (r *router) Single([]string) []port.Port {
panic("implement me")
}
func (r *router) Multicast([]string) []port.Port {
panic("implement me")
}
func (r *router) Broadcast([]string) []port.Port {
panic("implement me")
}
func (r *router) Official([]string) []port.Port {
panic("implement me")
}
func (r *router) Send(id string, msg port.Message) (*pb.Message, error) {
if p, is := r.portMap.Port(id); is {
return p.Send(msg)
}
return nil, errors.New("id error!")
}
func (r *router) AsyncSend(id string, msg port.Message) error {
if p, is := r.portMap.Port(id); is {
return p.AsyncSend(msg)
}
return errors.New("id error!")
}
func (r *router) InPut(ibtp *pb.IBTP) chan *pb.IBTP {
panic("implement me")
}
......@@ -73,7 +203,6 @@ func (r *router) OutPut(ibtp *pb.IBTP) chan *pb.IBTP {
panic("implement me")
}
// 路由规则集
type RouteMethod interface {
Single()
Multicast()
......
package router
import (
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/model/pb"
appchainmgr "github.com/meshplus/bitxhub-core/appchain-mgr"
)
//go:generate mockgen -destination mock_router/mock_router.go -package mock_router -source router.go
......@@ -13,20 +13,17 @@ type Router interface {
// Stop
Stop() error
//Broadcast broadcasts the registered appchain ids to the union network
Broadcast(ids []string) error
Add(p port.Port) error
Adds(p []port.Port) error
//Route sends ibtp to the union pier in target relay chain
Route(ibtp *pb.IBTPX) error
//ExistAppchain returns if appchain id exist in route map
ExistAppchain(id string) bool
//AddAppchains adds appchains to route map and broadcast them to union network
AddAppchains(appchains []*appchainmgr.Appchain) error
Load(key string) (value port.Port, ok bool)
InPut(ibtp *pb.IBTP) chan *pb.IBTP
OutPut(ibtp *pb.IBTP) chan *pb.IBTP
//InPut(ibtp *pb.IBTP) chan *pb.IBTP
//OutPut(ibtp *pb.IBTP) chan *pb.IBTP
//代替上面两个方法
//Router(ibtp *pb.IBTP) chan *pb.IBTP
}
......
......@@ -4130,7 +4130,7 @@ func (m *SignResponse) Unmarshal(dAtA []byte) error {
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Sign", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field sign", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
......
......@@ -25,51 +25,45 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type Message_Type int32
const (
Message_APPCHAIN_REGISTER Message_Type = 0
Message_APPCHAIN_UPDATE Message_Type = 1
Message_APPCHAIN_GET Message_Type = 2
Message_INTERCHAIN_META_GET Message_Type = 3
Message_RULE_DEPLOY Message_Type = 4
Message_IBTP_GET Message_Type = 5
Message_IBTP_SEND Message_Type = 6
Message_IBTP_RECEIPT_SEND Message_Type = 7
Message_ROUTER_IBTP_SEND Message_Type = 8
Message_ROUTER_IBTP_RECEIPT_SEND Message_Type = 9
Message_ADDRESS_GET Message_Type = 10
Message_ROUTER_INTERCHAIN_SEND Message_Type = 11
Message_ACK Message_Type = 12
Message_ACK Message_Type = 0
Message_ADDRESS_GET Message_Type = 1
Message_APPCHAIN_REGISTER Message_Type = 2
Message_APPCHAIN_UPDATE Message_Type = 3
Message_APPCHAIN_GET Message_Type = 4
Message_INTERCHAIN_META_GET Message_Type = 5
Message_RULE_DEPLOY Message_Type = 6
Message_IBTP_GET Message_Type = 7
Message_IBTP_SEND Message_Type = 8
Message_IBTP_RECEIPT_SEND Message_Type = 9
Message_IBTP_RECEIPT_GET Message_Type = 10
)
var Message_Type_name = map[int32]string{
0: "APPCHAIN_REGISTER",
1: "APPCHAIN_UPDATE",
2: "APPCHAIN_GET",
3: "INTERCHAIN_META_GET",
4: "RULE_DEPLOY",
5: "IBTP_GET",
6: "IBTP_SEND",
7: "IBTP_RECEIPT_SEND",
8: "ROUTER_IBTP_SEND",
9: "ROUTER_IBTP_RECEIPT_SEND",
10: "ADDRESS_GET",
11: "ROUTER_INTERCHAIN_SEND",
12: "ACK",
0: "ACK",
1: "ADDRESS_GET",
2: "APPCHAIN_REGISTER",
3: "APPCHAIN_UPDATE",
4: "APPCHAIN_GET",
5: "INTERCHAIN_META_GET",
6: "RULE_DEPLOY",
7: "IBTP_GET",
8: "IBTP_SEND",
9: "IBTP_RECEIPT_SEND",
10: "IBTP_RECEIPT_GET",
}
var Message_Type_value = map[string]int32{
"APPCHAIN_REGISTER": 0,
"APPCHAIN_UPDATE": 1,
"APPCHAIN_GET": 2,
"INTERCHAIN_META_GET": 3,
"RULE_DEPLOY": 4,
"IBTP_GET": 5,
"IBTP_SEND": 6,
"IBTP_RECEIPT_SEND": 7,
"ROUTER_IBTP_SEND": 8,
"ROUTER_IBTP_RECEIPT_SEND": 9,
"ADDRESS_GET": 10,
"ROUTER_INTERCHAIN_SEND": 11,
"ACK": 12,
"ACK": 0,
"ADDRESS_GET": 1,
"APPCHAIN_REGISTER": 2,
"APPCHAIN_UPDATE": 3,
"APPCHAIN_GET": 4,
"INTERCHAIN_META_GET": 5,
"RULE_DEPLOY": 6,
"IBTP_GET": 7,
"IBTP_SEND": 8,
"IBTP_RECEIPT_SEND": 9,
"IBTP_RECEIPT_GET": 10,
}
func (x Message_Type) String() string {
......@@ -123,7 +117,7 @@ func (m *Message) GetType() Message_Type {
if m != nil {
return m.Type
}
return Message_APPCHAIN_REGISTER
return Message_ACK
}
func (m *Message) GetPayload() *Pack {
......@@ -201,30 +195,29 @@ func init() {
func init() { proto.RegisterFile("message.proto", fileDescriptor_33c57e4bae7b9afd) }
var fileDescriptor_33c57e4bae7b9afd = []byte{
// 362 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x91, 0xcf, 0x4e, 0xf2, 0x40,
0x14, 0xc5, 0x3b, 0xa5, 0x1f, 0x2d, 0x97, 0x02, 0xf3, 0x5d, 0xbe, 0x3f, 0x8d, 0x31, 0x0d, 0x69,
0x5c, 0x10, 0x17, 0x5d, 0xe0, 0x13, 0x14, 0x3a, 0xc1, 0x46, 0xfe, 0x34, 0xd3, 0x61, 0xe1, 0x8a,
0x14, 0x69, 0x8c, 0x41, 0x6d, 0x03, 0xc4, 0x84, 0x77, 0x70, 0xe1, 0x63, 0xb9, 0x64, 0xe9, 0xd2,
0xc0, 0x6b, 0xb8, 0x30, 0x1d, 0x04, 0x74, 0x37, 0xe7, 0x77, 0xce, 0xcc, 0xbd, 0x39, 0x03, 0x95,
0x87, 0x64, 0xb1, 0x88, 0x6f, 0x13, 0x37, 0x9b, 0xa7, 0xcb, 0x14, 0xd5, 0x6c, 0xe2, 0x7c, 0xa8,
0xa0, 0xf7, 0x77, 0x14, 0xcf, 0x40, 0x5b, 0xae, 0xb2, 0xc4, 0x22, 0x0d, 0xd2, 0xac, 0xb6, 0xa8,
0x9b, 0x4d, 0xdc, 0x2f, 0xcb, 0x15, 0xab, 0x2c, 0xe1, 0xd2, 0x45, 0x07, 0xf4, 0x2c, 0x5e, 0xdd,
0xa7, 0xf1, 0xd4, 0x52, 0x1b, 0xa4, 0x59, 0x6e, 0x19, 0x79, 0x30, 0x8c, 0x6f, 0x66, 0x7c, 0x6f,
0xa0, 0x05, 0xfa, 0x53, 0x32, 0x5f, 0xdc, 0xa5, 0x8f, 0x56, 0xa1, 0x41, 0x9a, 0x25, 0xbe, 0x97,
0xce, 0xb3, 0x0a, 0x5a, 0xfe, 0x18, 0xfe, 0x85, 0xdf, 0x5e, 0x18, 0x76, 0x2e, 0xbd, 0x60, 0x30,
0xe6, 0xac, 0x1b, 0x44, 0x82, 0x71, 0xaa, 0x60, 0x1d, 0x6a, 0x07, 0x3c, 0x0a, 0x7d, 0x4f, 0x30,
0x4a, 0x90, 0x82, 0x79, 0x80, 0x5d, 0x26, 0xa8, 0x8a, 0xff, 0xa1, 0x1e, 0x0c, 0x04, 0xe3, 0x3b,
0xd6, 0x67, 0xc2, 0x93, 0x46, 0x01, 0x6b, 0x50, 0xe6, 0xa3, 0x1e, 0x1b, 0xfb, 0x2c, 0xec, 0x0d,
0xaf, 0xa9, 0x86, 0x26, 0x18, 0x41, 0x5b, 0x84, 0xd2, 0xfe, 0x85, 0x15, 0x28, 0x49, 0x15, 0xb1,
0x81, 0x4f, 0x8b, 0xf9, 0x12, 0x52, 0x72, 0xd6, 0x61, 0x41, 0x28, 0x76, 0x58, 0xc7, 0x3f, 0x40,
0xf9, 0x70, 0x24, 0x18, 0x1f, 0x1f, 0xc3, 0x06, 0x9e, 0x82, 0xf5, 0x9d, 0xfe, 0xb8, 0x53, 0xca,
0x07, 0x7b, 0xbe, 0xcf, 0x59, 0x14, 0xc9, 0x51, 0x80, 0x27, 0xf0, 0x6f, 0x1f, 0x3f, 0x6e, 0x2a,
0xc3, 0x65, 0xd4, 0xa1, 0xe0, 0x75, 0xae, 0xa8, 0xe9, 0x9c, 0x83, 0x96, 0x37, 0x87, 0x55, 0x50,
0xd3, 0x99, 0x2c, 0xde, 0xe0, 0x6a, 0x3a, 0x43, 0x04, 0x6d, 0x1a, 0x2f, 0x63, 0xd9, 0xb0, 0xc9,
0xe5, 0xb9, 0x6d, 0xbd, 0x6e, 0x6c, 0xb2, 0xde, 0xd8, 0xe4, 0x7d, 0x63, 0x93, 0x97, 0xad, 0xad,
0xac, 0xb7, 0xb6, 0xf2, 0xb6, 0xb5, 0x95, 0x49, 0x51, 0xfe, 0xe7, 0xc5, 0x67, 0x00, 0x00, 0x00,
0xff, 0xff, 0x49, 0x5c, 0x17, 0x8d, 0xe0, 0x01, 0x00, 0x00,
// 339 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x91, 0xcd, 0x4e, 0xea, 0x40,
0x14, 0xc7, 0x3b, 0xa5, 0x97, 0x96, 0xc3, 0xd7, 0xdc, 0xc3, 0xbd, 0xb1, 0xab, 0x86, 0x34, 0x2e,
0x88, 0x8b, 0x2e, 0xf0, 0x09, 0x0a, 0x9d, 0x60, 0x23, 0xe0, 0x64, 0x3a, 0x2c, 0x5c, 0x91, 0x22,
0x8d, 0x31, 0xa8, 0x6d, 0x80, 0x98, 0xf0, 0x16, 0x3e, 0x93, 0x2b, 0x97, 0xc4, 0x95, 0x4b, 0x03,
0x2f, 0x62, 0x66, 0x10, 0x12, 0x77, 0xf3, 0xff, 0xfd, 0xe6, 0xcc, 0xc7, 0x39, 0x50, 0x7f, 0xca,
0x56, 0xab, 0xf4, 0x3e, 0x0b, 0x8a, 0x65, 0xbe, 0xce, 0xd1, 0x2c, 0x66, 0xfe, 0x9b, 0x09, 0xf6,
0xe8, 0x40, 0xf1, 0x1c, 0xac, 0xf5, 0xa6, 0xc8, 0x5c, 0xd2, 0x26, 0x9d, 0x46, 0x97, 0x06, 0xc5,
0x2c, 0xf8, 0x51, 0x81, 0xdc, 0x14, 0x99, 0xd0, 0x16, 0x7d, 0xb0, 0x8b, 0x74, 0xf3, 0x98, 0xa7,
0x73, 0xd7, 0x6c, 0x93, 0x4e, 0xb5, 0xeb, 0xa8, 0x8d, 0x3c, 0xbd, 0x5b, 0x88, 0xa3, 0x40, 0x17,
0xec, 0x97, 0x6c, 0xb9, 0x7a, 0xc8, 0x9f, 0xdd, 0x52, 0x9b, 0x74, 0x2a, 0xe2, 0x18, 0xfd, 0x0f,
0x02, 0x96, 0x3a, 0x0c, 0x6d, 0x28, 0x85, 0xfd, 0x6b, 0x6a, 0x60, 0x13, 0xaa, 0x61, 0x14, 0x09,
0x96, 0x24, 0xd3, 0x01, 0x93, 0x94, 0xe0, 0x7f, 0xf8, 0x1b, 0x72, 0xde, 0xbf, 0x0a, 0xe3, 0xf1,
0x54, 0xb0, 0x41, 0x9c, 0x48, 0x26, 0xa8, 0x89, 0x2d, 0x68, 0x9e, 0xf0, 0x84, 0x47, 0xa1, 0x64,
0xb4, 0x84, 0x14, 0x6a, 0x27, 0xa8, 0xaa, 0x2d, 0x3c, 0x83, 0x56, 0x3c, 0x96, 0x4c, 0x1c, 0xd8,
0x88, 0xc9, 0x50, 0x8b, 0x3f, 0xea, 0x1e, 0x31, 0x19, 0xb2, 0x69, 0xc4, 0xf8, 0xf0, 0xe6, 0x96,
0x96, 0xb1, 0x06, 0x4e, 0xdc, 0x93, 0x5c, 0x6b, 0x1b, 0xeb, 0x50, 0xd1, 0x29, 0x61, 0xe3, 0x88,
0x3a, 0xea, 0x11, 0x3a, 0x0a, 0xd6, 0x67, 0x31, 0x97, 0x07, 0x5c, 0xc1, 0x7f, 0x40, 0x7f, 0x61,
0x55, 0x0b, 0xfe, 0x05, 0x58, 0xea, 0xff, 0xd8, 0x00, 0x33, 0x5f, 0xe8, 0xf6, 0x39, 0xc2, 0xcc,
0x17, 0x88, 0x60, 0xcd, 0xd3, 0x75, 0xaa, 0xfb, 0x54, 0x13, 0x7a, 0xdd, 0x73, 0xdf, 0x77, 0x1e,
0xd9, 0xee, 0x3c, 0xf2, 0xb5, 0xf3, 0xc8, 0xeb, 0xde, 0x33, 0xb6, 0x7b, 0xcf, 0xf8, 0xdc, 0x7b,
0xc6, 0xac, 0xac, 0xa7, 0x72, 0xf9, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x8b, 0xda, 0x49, 0x68, 0xa6,
0x01, 0x00, 0x00,
}
func (m *Message) Marshal() (dAtA []byte, err error) {
......
......@@ -4,19 +4,18 @@ package pb;
message Message {
enum Type {
APPCHAIN_REGISTER = 0;
APPCHAIN_UPDATE = 1;
APPCHAIN_GET = 2;
INTERCHAIN_META_GET = 3;
RULE_DEPLOY = 4; //部署验证规则
IBTP_GET = 5;
IBTP_SEND = 6;
IBTP_RECEIPT_SEND = 7;
ROUTER_IBTP_SEND = 8;
ROUTER_IBTP_RECEIPT_SEND = 9;
ADDRESS_GET = 10;
ROUTER_INTERCHAIN_SEND = 11;
ACK = 12;
ACK = 0;
ADDRESS_GET = 1;
APPCHAIN_REGISTER = 2;
APPCHAIN_UPDATE = 3;
APPCHAIN_GET = 4;
INTERCHAIN_META_GET = 5;
RULE_DEPLOY = 6; //部署验证规则
//异步完成
IBTP_GET = 7;
IBTP_SEND = 8;
IBTP_RECEIPT_SEND = 9;
IBTP_RECEIPT_GET = 10;
}
Type type = 1;
Pack payload = 2;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment