Commit 7eff445d authored by suyanlong's avatar suyanlong

replace port.Message to pb.Message

parent 41c3ed99
Pipeline #7945 canceled with stages
...@@ -49,18 +49,18 @@ func (mr *MockMonitorMockRecorder) ListenIBTP() *gomock.Call { ...@@ -49,18 +49,18 @@ func (mr *MockMonitorMockRecorder) ListenIBTP() *gomock.Call {
} }
// QueryIBTP mocks base method. // QueryIBTP mocks base method.
func (m *MockMonitor) QueryIBTP(id string) (*pb.IBTP, error) { func (m *MockMonitor) QueryIBTP(to, id string) (*pb.IBTP, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "QueryIBTP", id) ret := m.ctrl.Call(m, "QueryIBTP", to, id)
ret0, _ := ret[0].(*pb.IBTP) ret0, _ := ret[0].(*pb.IBTP)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }
// QueryIBTP indicates an expected call of QueryIBTP. // QueryIBTP indicates an expected call of QueryIBTP.
func (mr *MockMonitorMockRecorder) QueryIBTP(id interface{}) *gomock.Call { func (mr *MockMonitorMockRecorder) QueryIBTP(to, id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryIBTP", reflect.TypeOf((*MockMonitor)(nil).QueryIBTP), id) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryIBTP", reflect.TypeOf((*MockMonitor)(nil).QueryIBTP), to, id)
} }
// QueryOuterMeta mocks base method. // QueryOuterMeta mocks base method.
......
package appchain package appchain
import ( import (
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/model/pb" "github.com/link33/sidecar/model/pb"
) )
...@@ -21,7 +20,7 @@ func (a *appChain) Name() string { ...@@ -21,7 +20,7 @@ func (a *appChain) Name() string {
return a.client.Name() return a.client.Name()
} }
func (a *appChain) Send(msg port.Message) (*pb.Message, error) { func (a *appChain) Send(msg *pb.Message) (*pb.Message, error) {
//TODO 调用该执行。 //TODO 调用该执行。
//a.ExecuteIBTP() //a.ExecuteIBTP()
//a.Rollback() //a.Rollback()
...@@ -29,7 +28,7 @@ func (a *appChain) Send(msg port.Message) (*pb.Message, error) { ...@@ -29,7 +28,7 @@ func (a *appChain) Send(msg port.Message) (*pb.Message, error) {
panic("implement me") panic("implement me")
} }
func (a *appChain) AsyncSend(msg port.Message) error { func (a *appChain) AsyncSend(msg *pb.Message) error {
//TODO 调用该执行。 //TODO 调用该执行。
//a.ExecuteIBTP() //a.ExecuteIBTP()
//a.Rollback() //a.Rollback()
...@@ -37,7 +36,7 @@ func (a *appChain) AsyncSend(msg port.Message) error { ...@@ -37,7 +36,7 @@ func (a *appChain) AsyncSend(msg port.Message) error {
panic("implement me") panic("implement me")
} }
func (a *appChain) ListenIBTPX() <-chan *pb.IBTPX { func (a *appChain) ListenIBTPX() <-chan *pb.Message {
panic("implement me") panic("implement me")
} }
...@@ -71,7 +71,7 @@ func getIBTP(t *testing.T, index uint64, typ pb.IBTP_Type, fid, tid, proofPath s ...@@ -71,7 +71,7 @@ func getIBTP(t *testing.T, index uint64, typ pb.IBTP_Type, fid, tid, proofPath s
From: fid, From: fid,
To: tid, To: tid,
Payload: ibtppd, Payload: ibtppd,
Index: index, Nonce: index,
Type: typ, Type: typ,
Timestamp: time.Now().UnixNano(), Timestamp: time.Now().UnixNano(),
Proof: proof, Proof: proof,
......
...@@ -12,7 +12,7 @@ type local struct { ...@@ -12,7 +12,7 @@ type local struct {
id peer.ID id peer.ID
privKey crypto.PrivateKey privKey crypto.PrivateKey
tag string tag string
rev chan *pb.IBTPX rev chan *pb.Message
rout router.Router rout router.Router
} }
...@@ -32,7 +32,11 @@ func (l *local) Tag() string { ...@@ -32,7 +32,11 @@ func (l *local) Tag() string {
return l.tag return l.tag
} }
func (l *local) Send(msg port.Message) (*pb.Message, error) { func (l *local) Send(msg *pb.Message) (*pb.Message, error) {
panic("implement me")
}
func (l *local) AsyncSend(msg *pb.Message) error {
// 目标to == id 本机,获取信息 // 目标to == id 本机,获取信息
// 转发给路由器的。 // 转发给路由器的。
// 判断数据类型 // 判断数据类型
...@@ -43,15 +47,15 @@ func (l *local) Send(msg port.Message) (*pb.Message, error) { ...@@ -43,15 +47,15 @@ func (l *local) Send(msg port.Message) (*pb.Message, error) {
} else { } else {
//转发给路由模块 //转发给路由模块
//l.rout.Route() //l.rout.Route()
var ibtp = &pb.Message{}
l.rev <- ibtp
} }
return nil
panic("implement me") //panic("implement me")
} }
func (l *local) AsyncSend(msg port.Message) error { func (l *local) ListenIBTPX() <-chan *pb.Message {
panic("implement me")
}
func (l *local) ListenIBTPX() <-chan *pb.IBTPX {
return l.rev return l.rev
} }
// sidecar 节点
...@@ -38,7 +38,7 @@ func (m *MockPeerManager) EXPECT() *MockPeerManagerMockRecorder { ...@@ -38,7 +38,7 @@ func (m *MockPeerManager) EXPECT() *MockPeerManagerMockRecorder {
} }
// AsyncSend mocks base method. // AsyncSend mocks base method.
func (m *MockPeerManager) AsyncSend(arg0 string, arg1 port.Message) error { func (m *MockPeerManager) AsyncSend(arg0 string, arg1 *pb.Message) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AsyncSend", arg0, arg1) ret := m.ctrl.Call(m, "AsyncSend", arg0, arg1)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
...@@ -52,7 +52,7 @@ func (mr *MockPeerManagerMockRecorder) AsyncSend(arg0, arg1 interface{}) *gomock ...@@ -52,7 +52,7 @@ func (mr *MockPeerManagerMockRecorder) AsyncSend(arg0, arg1 interface{}) *gomock
} }
// AsyncSendWithPort mocks base method. // AsyncSendWithPort mocks base method.
func (m *MockPeerManager) AsyncSendWithPort(arg0 port.Port, arg1 port.Message) error { func (m *MockPeerManager) AsyncSendWithPort(arg0 port.Port, arg1 *pb.Message) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AsyncSendWithPort", arg0, arg1) ret := m.ctrl.Call(m, "AsyncSendWithPort", arg0, arg1)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
...@@ -152,7 +152,7 @@ func (mr *MockPeerManagerMockRecorder) RegisterMultiMsgHandler(arg0, arg1 interf ...@@ -152,7 +152,7 @@ func (mr *MockPeerManagerMockRecorder) RegisterMultiMsgHandler(arg0, arg1 interf
} }
// Send mocks base method. // Send mocks base method.
func (m *MockPeerManager) Send(arg0 string, arg1 port.Message) (*pb.Message, error) { func (m *MockPeerManager) Send(arg0 string, arg1 *pb.Message) (*pb.Message, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Send", arg0, arg1) ret := m.ctrl.Call(m, "Send", arg0, arg1)
ret0, _ := ret[0].(*pb.Message) ret0, _ := ret[0].(*pb.Message)
...@@ -167,7 +167,7 @@ func (mr *MockPeerManagerMockRecorder) Send(arg0, arg1 interface{}) *gomock.Call ...@@ -167,7 +167,7 @@ func (mr *MockPeerManagerMockRecorder) Send(arg0, arg1 interface{}) *gomock.Call
} }
// SendWithPort mocks base method. // SendWithPort mocks base method.
func (m *MockPeerManager) SendWithPort(s port.Port, msg port.Message) (*pb.Message, error) { func (m *MockPeerManager) SendWithPort(s port.Port, msg *pb.Message) (*pb.Message, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendWithPort", s, msg) ret := m.ctrl.Call(m, "SendWithPort", s, msg)
ret0, _ := ret[0].(*pb.Message) ret0, _ := ret[0].(*pb.Message)
......
...@@ -22,13 +22,13 @@ type PeerManager interface { ...@@ -22,13 +22,13 @@ type PeerManager interface {
Connect(info *peer.AddrInfo) (string, error) Connect(info *peer.AddrInfo) (string, error)
// AsyncSend sends message to peer with peer info. // AsyncSend sends message to peer with peer info.
AsyncSend(string, port.Message) error AsyncSend(string, *pb.Message) error
// Send sends message waiting response // Send sends message waiting response
Send(string, port.Message) (*pb.Message, error) Send(string, *pb.Message) (*pb.Message, error)
AsyncSendWithPort(port.Port, port.Message) error AsyncSendWithPort(port.Port, *pb.Message) error
SendWithPort(s port.Port, msg port.Message) (*pb.Message, error) SendWithPort(s port.Port, msg *pb.Message) (*pb.Message, error)
Handler Handler
} }
......
...@@ -2,6 +2,7 @@ package peermgr ...@@ -2,6 +2,7 @@ package peermgr
import ( import (
"fmt" "fmt"
router2 "github.com/link33/sidecar/internal/router"
"github.com/link33/sidecar/model/pb" "github.com/link33/sidecar/model/pb"
"testing" "testing"
"time" "time"
...@@ -18,12 +19,12 @@ import ( ...@@ -18,12 +19,12 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var portMap = &port.PortMap{} var router router2.Router
func newSidecar(addr *peer2.AddrInfo, pm PeerManager) port.Port { func newSidecar(addr *peer2.AddrInfo, pm PeerManager) port.Port {
rec := make(chan *pb.IBTPX) rec := make(chan *pb.Message)
return &sidecar{ return &sidecar{
addr: addr, id: addr.ID.String(),
swarm: pm, swarm: pm,
tag: "", tag: "",
rev: rec, rev: rec,
...@@ -35,22 +36,22 @@ func TestNew(t *testing.T) { ...@@ -35,22 +36,22 @@ func TestNew(t *testing.T) {
// test wrong nodePrivKey // test wrong nodePrivKey
nodeKeys, privKeys, config, _ := genKeysAndConfig(t, 2, repo.DirectMode) nodeKeys, privKeys, config, _ := genKeysAndConfig(t, 2, repo.DirectMode)
_, err := New(config, portMap, nil, privKeys[0], 0, logger) _, err := New(config, router, nil, privKeys[0], 0, logger)
require.NotNil(t, err) require.NotNil(t, err)
// test new swarm in direct mode // test new swarm in direct mode
nodeKeys, privKeys, config, _ = genKeysAndConfig(t, 2, repo.DirectMode) nodeKeys, privKeys, config, _ = genKeysAndConfig(t, 2, repo.DirectMode)
_, err = New(config, portMap, nodeKeys[0], privKeys[0], 0, logger) _, err = New(config, router, nodeKeys[0], privKeys[0], 0, logger)
require.Nil(t, err) require.Nil(t, err)
_, err = New(config, portMap, nodeKeys[0], privKeys[0], 0, logger) _, err = New(config, router, nodeKeys[0], privKeys[0], 0, logger)
require.Nil(t, err) require.Nil(t, err)
// test new swarm in unsupport mode // test new swarm in unsupport mode
nodeKeys, privKeys, config, _ = genKeysAndConfig(t, 2, "") nodeKeys, privKeys, config, _ = genKeysAndConfig(t, 2, "")
_, err = New(config, portMap, nodeKeys[0], privKeys[0], 0, logger) _, err = New(config, router, nodeKeys[0], privKeys[0], 0, logger)
require.NotNil(t, err) require.NotNil(t, err)
} }
...@@ -58,12 +59,12 @@ func TestSwarm_Start(t *testing.T) { ...@@ -58,12 +59,12 @@ func TestSwarm_Start(t *testing.T) {
logger := log.NewWithModule("swarm") logger := log.NewWithModule("swarm")
nodeKeys, privKeys, config, _ := genKeysAndConfig(t, 2, repo.DirectMode) nodeKeys, privKeys, config, _ := genKeysAndConfig(t, 2, repo.DirectMode)
swarm1, err := New(config, portMap, nodeKeys[0], privKeys[0], 0, logger) swarm1, err := New(config, router, nodeKeys[0], privKeys[0], 0, logger)
require.Nil(t, err) require.Nil(t, err)
go swarm1.Start() go swarm1.Start()
swarm2, err := New(config, portMap, nodeKeys[1], privKeys[1], 0, logger) swarm2, err := New(config, router, nodeKeys[1], privKeys[1], 0, logger)
require.Nil(t, err) require.Nil(t, err)
go swarm2.Start() go swarm2.Start()
...@@ -92,11 +93,9 @@ func TestSwarm_AsyncSend(t *testing.T) { ...@@ -92,11 +93,9 @@ func TestSwarm_AsyncSend(t *testing.T) {
require.NotNil(t, err) require.NotNil(t, err)
// test in right way // test in right way
addr, err := AddrToPeerInfo(mockMultiAddr) _, err = AddrToPeerInfo(mockMultiAddr)
require.Nil(t, err) require.Nil(t, err)
mockSwarm.router.Store(mockId, newSidecar(addr, mockSwarm))
err = mockSwarm.AsyncSend(mockId, mockMsg) err = mockSwarm.AsyncSend(mockId, mockMsg)
require.Nil(t, err) require.Nil(t, err)
} }
...@@ -109,11 +108,9 @@ func TestSwarm_Send(t *testing.T) { ...@@ -109,11 +108,9 @@ func TestSwarm_Send(t *testing.T) {
require.NotNil(t, err) require.NotNil(t, err)
// test in right way // test in right way
addr, err := AddrToPeerInfo(mockMultiAddr) _, err = AddrToPeerInfo(mockMultiAddr)
require.Nil(t, err) require.Nil(t, err)
mockSwarm.router.Store(mockId, newSidecar(addr, mockSwarm))
_, err = mockSwarm.Send(mockId, mockMsg) _, err = mockSwarm.Send(mockId, mockMsg)
require.Nil(t, err) require.Nil(t, err)
} }
...@@ -223,10 +220,10 @@ func TestSwarm_Provider(t *testing.T) { ...@@ -223,10 +220,10 @@ func TestSwarm_Provider(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
} }
func prepare(t *testing.T) (*Swarm, []string, *Swarm, port.Message, string, string) { func prepare(t *testing.T) (*Swarm, []string, *Swarm, *pb.Message, string, string) {
nodeKeys, privKeys, config, ids := genKeysAndConfig(t, 2, repo.DirectMode) nodeKeys, privKeys, config, ids := genKeysAndConfig(t, 2, repo.DirectMode)
swarm, err := New(config, portMap, nodeKeys[0], privKeys[0], 0, log.NewWithModule("swarm")) swarm, err := New(config, router, nodeKeys[0], privKeys[0], 0, log.NewWithModule("swarm"))
require.Nil(t, err) require.Nil(t, err)
mockMsg := &pb.Message{Type: pb.Message_APPCHAIN_REGISTER} mockMsg := &pb.Message{Type: pb.Message_APPCHAIN_REGISTER}
......
...@@ -9,7 +9,7 @@ type sidecar struct { ...@@ -9,7 +9,7 @@ type sidecar struct {
id string id string
swarm PeerManager swarm PeerManager
tag string tag string
rev chan *pb.IBTPX rev chan *pb.Message
} }
func (s *sidecar) ID() string { func (s *sidecar) ID() string {
...@@ -30,17 +30,16 @@ func (s *sidecar) Tag() string { ...@@ -30,17 +30,16 @@ func (s *sidecar) Tag() string {
// Send TODO 如何区别IBTPX与Message // Send TODO 如何区别IBTPX与Message
// Send 同步发送给绑定的对应的port dev // Send 同步发送给绑定的对应的port dev
func (s *sidecar) Send(msg port.Message) (*pb.Message, error) { func (s *sidecar) Send(msg *pb.Message) (*pb.Message, error) {
return s.swarm.Send(s.ID(), msg) return s.swarm.Send(s.ID(), msg)
} }
// AsyncSend 异步发送给绑定的对应的port dev // AsyncSend 异步发送给绑定的对应的port dev
func (s *sidecar) AsyncSend(msg port.Message) error { func (s *sidecar) AsyncSend(msg *pb.Message) error {
return s.swarm.AsyncSend(s.ID(), msg) return s.swarm.AsyncSend(s.ID(), msg)
} }
// ListenIBTPX 从绑定的对应的port dev接收数据 // ListenIBTPX 从绑定的对应的port dev接收数据
func (s *sidecar) ListenIBTPX() <-chan *pb.IBTPX { func (s *sidecar) ListenIBTPX() <-chan *pb.Message {
return s.rev return s.rev
} }
...@@ -96,7 +96,7 @@ func (swarm *Swarm) Start() error { ...@@ -96,7 +96,7 @@ func (swarm *Swarm) Start() error {
id: swarm.id, id: swarm.id,
privKey: swarm.privKey, privKey: swarm.privKey,
tag: "", tag: "",
rev: make(chan *pb.IBTPX), rev: make(chan *pb.Message),
rout: swarm.router, rout: swarm.router,
} }
swarm.router.Add(l) swarm.router.Add(l)
...@@ -142,7 +142,7 @@ func (swarm *Swarm) Start() error { ...@@ -142,7 +142,7 @@ func (swarm *Swarm) Start() error {
"address:": address, "address:": address,
}).Info("Connect successfully") }).Info("Connect successfully")
rec := make(chan *pb.IBTPX) rec := make(chan *pb.Message)
p := &sidecar{ p := &sidecar{
id: id, id: id,
swarm: swarm, swarm: swarm,
...@@ -173,7 +173,7 @@ func (swarm *Swarm) Start() error { ...@@ -173,7 +173,7 @@ func (swarm *Swarm) Start() error {
return nil return nil
} }
//注册异步处理数据的方法 // 注册异步处理数据的方法
func (swarm *Swarm) handleMessage(s network.Stream, data []byte) { func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
m := &pb.Message{} m := &pb.Message{}
if err := m.Unmarshal(data); err != nil { if err := m.Unmarshal(data); err != nil {
...@@ -181,26 +181,20 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) { ...@@ -181,26 +181,20 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
return return
} }
pack := m.Payload.Data
t := m.Type t := m.Type
switch { switch {
// 接收其它sidecar节点发过来的交易、请求等。主要是IBTP结构相关数据。 // 接收其它sidecar节点发过来的交易、请求等。主要是IBTP结构相关数据。
case t == pb.Message_IBTP_SEND || t == pb.Message_IBTP_GET || t == pb.Message_IBTP_RECEIPT_SEND || t == pb.Message_IBTP_RECEIPT_GET: case t == pb.Message_IBTP_SEND || t == pb.Message_IBTP_GET || t == pb.Message_IBTP_RECEIPT_SEND || t == pb.Message_IBTP_RECEIPT_GET:
ibtpx := &pb.IBTPX{}
if err := m.Unmarshal(pack); err != nil {
swarm.logger.Error(err)
return
}
p, is := swarm.router.Load(s.RemotePeerID()) p, is := swarm.router.Load(s.RemotePeerID())
if is { if is {
ps, iss := p.(*sidecar) ps, iss := p.(*sidecar)
if iss { if iss {
ps.rev <- ibtpx ps.rev <- m
return return
} }
} }
addr, _ := peer.AddrInfoFromP2pAddr(s.RemotePeerAddr()) addr, _ := peer.AddrInfoFromP2pAddr(s.RemotePeerAddr())
rec := make(chan *pb.IBTPX) rec := make(chan *pb.Message)
newPort := &sidecar{ newPort := &sidecar{
id: addr.ID.String(), id: addr.ID.String(),
swarm: swarm, swarm: swarm,
...@@ -208,10 +202,9 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) { ...@@ -208,10 +202,9 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
rev: rec, rev: rec,
} }
swarm.router.Add(newPort) swarm.router.Add(newPort)
rec <- ibtpx rec <- m
default: //非IBTP结构相关数据 default: //非IBTP结构相关数据,转发给local处理
//TODO //TODO
} }
} }
...@@ -237,7 +230,7 @@ func (swarm *Swarm) Connect(addrInfo *peer.AddrInfo) (string, error) { ...@@ -237,7 +230,7 @@ func (swarm *Swarm) Connect(addrInfo *peer.AddrInfo) (string, error) {
"addrInfo": addrInfo, "addrInfo": addrInfo,
}).Info("Connect peer") }).Info("Connect peer")
rec := make(chan *pb.IBTPX) rec := make(chan *pb.Message)
p := &sidecar{ p := &sidecar{
id: addrInfo.ID.String(), id: addrInfo.ID.String(),
swarm: swarm, swarm: swarm,
...@@ -248,15 +241,15 @@ func (swarm *Swarm) Connect(addrInfo *peer.AddrInfo) (string, error) { ...@@ -248,15 +241,15 @@ func (swarm *Swarm) Connect(addrInfo *peer.AddrInfo) (string, error) {
return addrInfo.ID.String(), nil return addrInfo.ID.String(), nil
} }
func (swarm *Swarm) AsyncSendWithPort(s port.Port, msg port.Message) error { func (swarm *Swarm) AsyncSendWithPort(s port.Port, msg *pb.Message) error {
return s.AsyncSend(msg) return s.AsyncSend(msg)
} }
func (swarm *Swarm) SendWithPort(s port.Port, msg port.Message) (*pb.Message, error) { func (swarm *Swarm) SendWithPort(s port.Port, msg *pb.Message) (*pb.Message, error) {
return s.Send(msg) return s.Send(msg)
} }
func (swarm *Swarm) AsyncSend(id string, msg port.Message) error { func (swarm *Swarm) AsyncSend(id string, msg *pb.Message) error {
data, err := msg.Marshal() data, err := msg.Marshal()
if err != nil { if err != nil {
return fmt.Errorf("marshal message: %w", err) return fmt.Errorf("marshal message: %w", err)
...@@ -265,7 +258,7 @@ func (swarm *Swarm) AsyncSend(id string, msg port.Message) error { ...@@ -265,7 +258,7 @@ func (swarm *Swarm) AsyncSend(id string, msg port.Message) error {
return swarm.p2p.AsyncSend(id, data) return swarm.p2p.AsyncSend(id, data)
} }
func (swarm *Swarm) Send(id string, msg port.Message) (*pb.Message, error) { func (swarm *Swarm) Send(id string, msg *pb.Message) (*pb.Message, error) {
data, err := msg.Marshal() data, err := msg.Marshal()
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -15,7 +15,7 @@ const ( ...@@ -15,7 +15,7 @@ const (
Appchain = "appchain" //区块链客户端 Appchain = "appchain" //区块链客户端
) )
// 设计一套port管理机制:包括各种的管理模块。以组合的行驶。 // 设计一套port管理机制:包括各种的管理模块。
// 设计一套,管理机制。 // 设计一套,管理机制。
// 与中继交互的是单独完整的机制。并且注册到路由表中。或者更加类型,这样就限制一个sidecar最多只能连接一个hub。避免网络风暴。或者只是一个转发功能。转发到指定节点。 // 与中继交互的是单独完整的机制。并且注册到路由表中。或者更加类型,这样就限制一个sidecar最多只能连接一个hub。避免网络风暴。或者只是一个转发功能。转发到指定节点。
// 先是从转发开始完成。 // 先是从转发开始完成。
...@@ -39,13 +39,13 @@ type Port interface { ...@@ -39,13 +39,13 @@ type Port interface {
Tag() string Tag() string
// Send 同步发送给绑定的对应的port dev // Send 同步发送给绑定的对应的port dev
Send(msg Message) (*pb.Message, error) //TODO 如何区别IBTPX与Message Send(msg *pb.Message) (*pb.Message, error) //TODO 如何区别IBTPX与Message
// AsyncSend 异步发送给绑定的对应的port dev // AsyncSend 异步发送给绑定的对应的port dev
AsyncSend(msg Message) error AsyncSend(msg *pb.Message) error
// ListenIBTPX 从绑定的对应的port dev接收数据 // ListenIBTPX 从绑定的对应的port dev接收数据
ListenIBTPX() <-chan *pb.IBTPX ListenIBTPX() <-chan *pb.Message
} }
type Message interface { type Message interface {
......
...@@ -3,6 +3,7 @@ package router ...@@ -3,6 +3,7 @@ package router
import ( import (
"context" "context"
"errors" "errors"
"strings"
"github.com/link33/sidecar/internal/checker" "github.com/link33/sidecar/internal/checker"
"github.com/link33/sidecar/internal/port" "github.com/link33/sidecar/internal/port"
...@@ -51,8 +52,8 @@ func (r *router) Add(p port.Port) error { ...@@ -51,8 +52,8 @@ func (r *router) Add(p port.Port) error {
c := p.ListenIBTPX() c := p.ListenIBTPX()
for { for {
select { select {
case ibtpx := <-c: case msg := <-c:
err := r.Route(ibtpx) err := r.Route(msg)
if err != nil { if err != nil {
r.logger.Error(err) r.logger.Error(err)
} }
...@@ -78,7 +79,12 @@ func (r *router) Remove(p port.Port) error { ...@@ -78,7 +79,12 @@ func (r *router) Remove(p port.Port) error {
return nil return nil
} }
func (r *router) Route(ibtpx *pb.IBTPX) error { func (r *router) Route(msg *pb.Message) error {
ibtpx := &pb.IBTPX{}
err := ibtpx.Unmarshal(msg.Payload.Data)
if err != nil {
return err
}
mode := ibtpx.Mode mode := ibtpx.Mode
//本网关已签名、中继链已背书、to是本网关内部的appchain,即顺利通过并转发,否则打断。 //本网关已签名、中继链已背书、to是本网关内部的appchain,即顺利通过并转发,否则打断。
if !((r.isSign(ibtpx) && mode == repo.RelayMode && r.isEndorse(ibtpx)) || !r.isSign(ibtpx)) { if !((r.isSign(ibtpx) && mode == repo.RelayMode && r.isEndorse(ibtpx)) || !r.isSign(ibtpx)) {
...@@ -89,8 +95,14 @@ func (r *router) Route(ibtpx *pb.IBTPX) error { ...@@ -89,8 +95,14 @@ func (r *router) Route(ibtpx *pb.IBTPX) error {
r.sign(ibtpx) r.sign(ibtpx)
} }
data, err := ibtpx.Marshal()
if err != nil {
return err
}
msg.Payload.Data = data
ibtp := ibtpx.Ibtp ibtp := ibtpx.Ibtp
err := r.checker.Check(ibtp) err = r.checker.Check(ibtp)
if err != nil { if err != nil {
r.logger.Error("check ibtp: %w", err) r.logger.Error("check ibtp: %w", err)
return err return err
...@@ -99,20 +111,21 @@ func (r *router) Route(ibtpx *pb.IBTPX) error { ...@@ -99,20 +111,21 @@ func (r *router) Route(ibtpx *pb.IBTPX) error {
if pp, is := r.portMap.Port(to); is { if pp, is := r.portMap.Port(to); is {
switch { switch {
case pp.Type() == port.Sidecar: case pp.Type() == port.Sidecar:
return pp.AsyncSend(ibtpx) //转发给sidecar节点,或者本身local节点。 //
case pp.Type() == port.Hub: //发给hub appchain return pp.AsyncSend(msg) //转发给其它的sidecar节点或者本身local节点。
return pp.AsyncSend(ibtpx) case pp.Type() == port.Hub: //发给hub appchain TODO 本机找到的appchain只能是自己的appchain
return pp.AsyncSend(msg)
case pp.Type() == port.Appchain: //TODO 本机找到的appchain只能是自己的appchain case pp.Type() == port.Appchain: //TODO 本机找到的appchain只能是自己的appchain
switch mode { switch mode {
case repo.RelayMode: case repo.RelayMode:
hub, is := r.getHub() hub, is := r.getHub()
if is && !r.isEndorse(ibtpx) { if is && !r.isEndorse(ibtpx) {
return hub.AsyncSend(ibtpx) return hub.AsyncSend(msg)
} else { } else {
return pp.AsyncSend(ibtpx) return pp.AsyncSend(msg)
} }
case repo.DirectMode: case repo.DirectMode:
return pp.AsyncSend(ibtpx) return pp.AsyncSend(msg)
default: default:
//TODO 跳过 //TODO 跳过
return nil return nil
...@@ -122,23 +135,23 @@ func (r *router) Route(ibtpx *pb.IBTPX) error { ...@@ -122,23 +135,23 @@ func (r *router) Route(ibtpx *pb.IBTPX) error {
return nil return nil
} }
} }
//规则判断 转发给sidecar节点 //规则判断 转发给其它的sidecar节点
method := ibtpx.RouteMethod method := strings.ToLower(ibtpx.RouteMethod)
if md, is := r.methodMap[method]; is { if md, is := r.methodMap[method]; is {
ports := md(ibtpx.RouteMethodArg) ports := md(ibtpx.RouteMethodArg)
if len(ports) == 0 { if len(ports) == 0 {
r.firstRoute(ibtpx) r.firstRoute(msg)
} }
for _, p := range ports { for _, p := range ports {
_ = p.AsyncSend(ibtpx) _ = p.AsyncSend(msg)
} }
} else { } else {
r.firstRoute(ibtpx) r.firstRoute(msg)
} }
return nil return nil
} }
func (r *router) firstRoute(ibtp *pb.IBTPX) { func (r *router) firstRoute(ibtp *pb.Message) {
panic("implement me") panic("implement me")
} }
...@@ -179,7 +192,7 @@ func (r *router) Official([]string) []port.Port { ...@@ -179,7 +192,7 @@ func (r *router) Official([]string) []port.Port {
panic("implement me") panic("implement me")
} }
func (r *router) Send(id string, msg port.Message) (*pb.Message, error) { func (r *router) Send(id string, msg *pb.Message) (*pb.Message, error) {
if p, is := r.portMap.Port(id); is { if p, is := r.portMap.Port(id); is {
return p.Send(msg) return p.Send(msg)
} }
...@@ -187,7 +200,7 @@ func (r *router) Send(id string, msg port.Message) (*pb.Message, error) { ...@@ -187,7 +200,7 @@ func (r *router) Send(id string, msg port.Message) (*pb.Message, error) {
return nil, errors.New("id error!") return nil, errors.New("id error!")
} }
func (r *router) AsyncSend(id string, msg port.Message) error { func (r *router) AsyncSend(id string, msg *pb.Message) error {
if p, is := r.portMap.Port(id); is { if p, is := r.portMap.Port(id); is {
return p.AsyncSend(msg) return p.AsyncSend(msg)
} }
......
...@@ -8,8 +8,8 @@ import ( ...@@ -8,8 +8,8 @@ import (
reflect "reflect" reflect "reflect"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
port "github.com/link33/sidecar/internal/port"
pb "github.com/link33/sidecar/model/pb" pb "github.com/link33/sidecar/model/pb"
appchain_mgr "github.com/meshplus/bitxhub-core/appchain-mgr"
) )
// MockRouter is a mock of Router interface. // MockRouter is a mock of Router interface.
...@@ -35,78 +35,51 @@ func (m *MockRouter) EXPECT() *MockRouterMockRecorder { ...@@ -35,78 +35,51 @@ func (m *MockRouter) EXPECT() *MockRouterMockRecorder {
return m.recorder return m.recorder
} }
// AddAppchains mocks base method. // Add mocks base method.
func (m *MockRouter) AddAppchains(appchains []*appchain_mgr.Appchain) error { func (m *MockRouter) Add(p port.Port) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddAppchains", appchains) ret := m.ctrl.Call(m, "Add", p)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// AddAppchains indicates an expected call of AddAppchains. // Add indicates an expected call of Add.
func (mr *MockRouterMockRecorder) AddAppchains(appchains interface{}) *gomock.Call { func (mr *MockRouterMockRecorder) Add(p interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAppchains", reflect.TypeOf((*MockRouter)(nil).AddAppchains), appchains) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockRouter)(nil).Add), p)
} }
// Broadcast mocks base method. // Adds mocks base method.
func (m *MockRouter) Broadcast(ids []string) error { func (m *MockRouter) Adds(p []port.Port) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Broadcast", ids) ret := m.ctrl.Call(m, "Adds", p)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// Broadcast indicates an expected call of Broadcast. // Adds indicates an expected call of Adds.
func (mr *MockRouterMockRecorder) Broadcast(ids interface{}) *gomock.Call { func (mr *MockRouterMockRecorder) Adds(p interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockRouter)(nil).Broadcast), ids) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Adds", reflect.TypeOf((*MockRouter)(nil).Adds), p)
} }
// ExistAppchain mocks base method. // Load mocks base method.
func (m *MockRouter) ExistAppchain(id string) bool { func (m *MockRouter) Load(key string) (port.Port, bool) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ExistAppchain", id) ret := m.ctrl.Call(m, "Load", key)
ret0, _ := ret[0].(bool) ret0, _ := ret[0].(port.Port)
return ret0 ret1, _ := ret[1].(bool)
} return ret0, ret1
// ExistAppchain indicates an expected call of ExistAppchain.
func (mr *MockRouterMockRecorder) ExistAppchain(id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExistAppchain", reflect.TypeOf((*MockRouter)(nil).ExistAppchain), id)
}
// InPut mocks base method.
func (m *MockRouter) InPut(ibtp *pb.IBTP) chan *pb.IBTP {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "InPut", ibtp)
ret0, _ := ret[0].(chan *pb.IBTP)
return ret0
}
// InPut indicates an expected call of InPut.
func (mr *MockRouterMockRecorder) InPut(ibtp interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InPut", reflect.TypeOf((*MockRouter)(nil).InPut), ibtp)
}
// OutPut mocks base method.
func (m *MockRouter) OutPut(ibtp *pb.IBTP) chan *pb.IBTP {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "OutPut", ibtp)
ret0, _ := ret[0].(chan *pb.IBTP)
return ret0
} }
// OutPut indicates an expected call of OutPut. // Load indicates an expected call of Load.
func (mr *MockRouterMockRecorder) OutPut(ibtp interface{}) *gomock.Call { func (mr *MockRouterMockRecorder) Load(key interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OutPut", reflect.TypeOf((*MockRouter)(nil).OutPut), ibtp) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Load", reflect.TypeOf((*MockRouter)(nil).Load), key)
} }
// Route mocks base method. // Route mocks base method.
func (m *MockRouter) Route(ibtp *pb.IBTPX) error { func (m *MockRouter) Route(ibtp *pb.Message) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Route", ibtp) ret := m.ctrl.Call(m, "Route", ibtp)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
......
...@@ -18,7 +18,7 @@ type Router interface { ...@@ -18,7 +18,7 @@ type Router interface {
Adds(p []port.Port) error Adds(p []port.Port) error
//Route sends ibtp to the union pier in target relay chain //Route sends ibtp to the union pier in target relay chain
Route(ibtp *pb.IBTPX) error Route(ibtp *pb.Message) error
Load(key string) (value port.Port, ok bool) Load(key string) (value port.Port, ok bool)
......
...@@ -9,3 +9,7 @@ const ( ...@@ -9,3 +9,7 @@ const (
// Unmarshal([]byte) error // Unmarshal([]byte) error
// IsMsg()bool // IsMsg()bool
//} //}
func (m *Message) IsIBTPX() bool {
return false
}
...@@ -32,6 +32,7 @@ const ( ...@@ -32,6 +32,7 @@ const (
Message_APPCHAIN_GET Message_Type = 4 Message_APPCHAIN_GET Message_Type = 4
Message_INTERCHAIN_META_GET Message_Type = 5 Message_INTERCHAIN_META_GET Message_Type = 5
Message_RULE_DEPLOY Message_Type = 6 Message_RULE_DEPLOY Message_Type = 6
//异步完成
Message_IBTP_GET Message_Type = 7 Message_IBTP_GET Message_Type = 7
Message_IBTP_SEND Message_Type = 8 Message_IBTP_SEND Message_Type = 8
Message_IBTP_RECEIPT_SEND Message_Type = 9 Message_IBTP_RECEIPT_SEND Message_Type = 9
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment