Commit 57845ff1 authored by suyanlong's avatar suyanlong

Fixed route function

parent 2b5151c7
Pipeline #7954 failed with stages
...@@ -20,6 +20,7 @@ func (a *appChain) Name() string { ...@@ -20,6 +20,7 @@ func (a *appChain) Name() string {
return a.client.Name() return a.client.Name()
} }
// Send 接收的数据只能是IBTPX相关的结构,以及查询完成,需要同步返回的数据
func (a *appChain) Send(msg *pb.Message) (*pb.Message, error) { func (a *appChain) Send(msg *pb.Message) (*pb.Message, error) {
//TODO 调用该执行。 //TODO 调用该执行。
//a.ExecuteIBTP() //a.ExecuteIBTP()
...@@ -36,7 +37,12 @@ func (a *appChain) AsyncSend(msg *pb.Message) error { ...@@ -36,7 +37,12 @@ func (a *appChain) AsyncSend(msg *pb.Message) error {
panic("implement me") panic("implement me")
} }
// ListenIBTPX 监听IBTPX数据。
func (a *appChain) ListenIBTPX() <-chan *pb.Message { func (a *appChain) ListenIBTPX() <-chan *pb.Message {
panic("implement me") panic("implement me")
} }
// 同一个接口,不同的实现,行为可以不一样。
// 对原始交易背书,回执不需要、查询也不需要、回滚也不需要、回调也不需要。
// 即只对GetIBTP得到的交易背书。如何映射,
...@@ -5,10 +5,8 @@ import ( ...@@ -5,10 +5,8 @@ import (
"fmt" "fmt"
"github.com/Rican7/retry" "github.com/Rican7/retry"
"github.com/Rican7/retry/strategy" "github.com/Rican7/retry/strategy"
"github.com/link33/sidecar/internal/peermgr"
"github.com/link33/sidecar/internal/port" "github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/model/pb" "github.com/link33/sidecar/model/pb"
"github.com/sirupsen/logrus"
"time" "time"
) )
...@@ -120,7 +118,7 @@ func (ex *Exchanger) feedReceipt(receipt *pb.IBTP) { ...@@ -120,7 +118,7 @@ func (ex *Exchanger) feedReceipt(receipt *pb.IBTP) {
func (ex *Exchanger) postHandleIBTP(from string, receipt *pb.IBTP) { func (ex *Exchanger) postHandleIBTP(from string, receipt *pb.IBTP) {
if receipt == nil { if receipt == nil {
retMsg := peermgr.Message(pb.Message_IBTP_RECEIPT_SEND, true, nil) retMsg := pb.Msg(pb.Message_IBTP_RECEIPT_SEND, true, nil)
err := ex.peerMgr.AsyncSend(from, retMsg) err := ex.peerMgr.AsyncSend(from, retMsg)
if err != nil { if err != nil {
ex.logger.Errorf("Send back empty ibtp receipt: %s", err.Error()) ex.logger.Errorf("Send back empty ibtp receipt: %s", err.Error())
...@@ -129,7 +127,7 @@ func (ex *Exchanger) postHandleIBTP(from string, receipt *pb.IBTP) { ...@@ -129,7 +127,7 @@ func (ex *Exchanger) postHandleIBTP(from string, receipt *pb.IBTP) {
} }
data, _ := receipt.Marshal() data, _ := receipt.Marshal()
retMsg := peermgr.Message(pb.Message_IBTP_RECEIPT_SEND, true, data) retMsg := pb.Msg(pb.Message_IBTP_RECEIPT_SEND, true, data)
if err := ex.peerMgr.AsyncSend(from, retMsg); err != nil { if err := ex.peerMgr.AsyncSend(from, retMsg); err != nil {
ex.logger.Errorf("Send back ibtp receipt: %s", err.Error()) ex.logger.Errorf("Send back ibtp receipt: %s", err.Error())
} }
...@@ -197,7 +195,7 @@ func (ex *Exchanger) handleGetIBTPMessage(p port.Port, msg *pb.Message) { ...@@ -197,7 +195,7 @@ func (ex *Exchanger) handleGetIBTPMessage(p port.Port, msg *pb.Message) {
return return
} }
retMsg := peermgr.Message(pb.Message_ACK, true, data) retMsg := pb.Msg(pb.Message_ACK, true, data)
err = ex.peerMgr.AsyncSendWithPort(p, retMsg) err = ex.peerMgr.AsyncSendWithPort(p, retMsg)
if err != nil { if err != nil {
...@@ -208,7 +206,7 @@ func (ex *Exchanger) handleGetIBTPMessage(p port.Port, msg *pb.Message) { ...@@ -208,7 +206,7 @@ func (ex *Exchanger) handleGetIBTPMessage(p port.Port, msg *pb.Message) {
// 直连 // 直连
func (ex *Exchanger) handleNewConnection(dstSidecarID string) { func (ex *Exchanger) handleNewConnection(dstSidecarID string) {
appchainMethod := []byte(ex.appchainDID) appchainMethod := []byte(ex.appchainDID)
msg := peermgr.Message(pb.Message_INTERCHAIN_META_GET, true, appchainMethod) msg := pb.Msg(pb.Message_INTERCHAIN_META_GET, true, appchainMethod)
indices := &struct { indices := &struct {
InterchainIndex uint64 `json:"interchain_index"` InterchainIndex uint64 `json:"interchain_index"`
...@@ -264,7 +262,7 @@ func (ex *Exchanger) handleGetInterchainMessage(p port.Port, msg *pb.Message) { ...@@ -264,7 +262,7 @@ func (ex *Exchanger) handleGetInterchainMessage(p port.Port, msg *pb.Message) {
panic(err) panic(err)
} }
retMsg := peermgr.Message(pb.Message_ACK, true, data) retMsg := pb.Msg(pb.Message_ACK, true, data)
if err := ex.peerMgr.AsyncSendWithPort(p, retMsg); err != nil { if err := ex.peerMgr.AsyncSendWithPort(p, retMsg); err != nil {
ex.logger.Error(err) ex.logger.Error(err)
return return
......
...@@ -67,7 +67,7 @@ func (mgr *Manager) handleMessage(s port.Port, msg *pb.Message) { ...@@ -67,7 +67,7 @@ func (mgr *Manager) handleMessage(s port.Port, msg *pb.Message) {
mgr.logger.Error(m) mgr.logger.Error(m)
} }
ackMsg := peermgr.Message(msg.Type, ok, res) ackMsg := pb.Msg(msg.Type, ok, res)
err := s.AsyncSend(ackMsg) err := s.AsyncSend(ackMsg)
if err != nil { if err != nil {
mgr.logger.Error(err) mgr.logger.Error(err)
......
package peermgr
import (
"github.com/link33/sidecar/model/pb"
)
const (
V1 = "1.0"
)
func Message(typ pb.Message_Type, ok bool, data []byte) *pb.Message {
return &pb.Message{
Type: typ,
Version: V1,
Payload: &pb.Pack{
Ok: ok,
Data: data,
},
}
}
...@@ -66,7 +66,7 @@ func (l *local) HandleGetPeerInfoMessage(p port.Port, message *pb.Message) { ...@@ -66,7 +66,7 @@ func (l *local) HandleGetPeerInfoMessage(p port.Port, message *pb.Message) {
Tag: l.Tag(), Tag: l.Tag(),
} }
msg, _ := data.Marshal() msg, _ := data.Marshal()
retMsg := Message(pb.Message_ACK, true, msg) retMsg := pb.Msg(pb.Message_ACK, true, msg)
err := p.AsyncSend(retMsg) err := p.AsyncSend(retMsg)
if err != nil { if err != nil {
l.logger.Error(err) l.logger.Error(err)
......
...@@ -493,7 +493,7 @@ func (mn *MockNetwork) Send(id string, data []byte) ([]byte, error) { ...@@ -493,7 +493,7 @@ func (mn *MockNetwork) Send(id string, data []byte) ([]byte, error) {
for msgType := range pb.Message_Type_name { for msgType := range pb.Message_Type_name {
if msgType == int32(msg.GetType()) { if msgType == int32(msg.GetType()) {
retMsg := Message(pb.Message_ACK, true, []byte(id)) retMsg := pb.Msg(pb.Message_ACK, true, []byte(id))
retData, err := retMsg.Marshal() retData, err := retMsg.Marshal()
if err != nil { if err != nil {
return nil, fmt.Errorf("Marshal message: %w", err) return nil, fmt.Errorf("Marshal message: %w", err)
......
...@@ -28,7 +28,6 @@ func (s *sidecar) Tag() string { ...@@ -28,7 +28,6 @@ func (s *sidecar) Tag() string {
return s.tag return s.tag
} }
// Send TODO 如何区别IBTPX与Message
// Send 同步发送给绑定的对应的port dev // Send 同步发送给绑定的对应的port dev
func (s *sidecar) Send(msg *pb.Message) (*pb.Message, error) { func (s *sidecar) Send(msg *pb.Message) (*pb.Message, error) {
return s.swarm.Send(s.ID(), msg) return s.swarm.Send(s.ID(), msg)
......
...@@ -384,7 +384,7 @@ func AddrToPeerInfo(multiAddr string) (*peer.AddrInfo, error) { ...@@ -384,7 +384,7 @@ func AddrToPeerInfo(multiAddr string) (*peer.AddrInfo, error) {
} }
func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) { func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) {
msg := Message(pb.Message_ADDRESS_GET, true, nil) msg := pb.Msg(pb.Message_ADDRESS_GET, true, nil)
reqData, err := msg.Marshal() reqData, err := msg.Marshal()
if err != nil { if err != nil {
return "", err return "", err
...@@ -402,7 +402,7 @@ func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) { ...@@ -402,7 +402,7 @@ func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) {
} }
func (swarm *Swarm) getRemotePeerInfo(id string) (*pb.PeerInfo, error) { func (swarm *Swarm) getRemotePeerInfo(id string) (*pb.PeerInfo, error) {
msg := Message(pb.Message_PEER_INFO_GET, true, nil) msg := pb.Msg(pb.Message_PEER_INFO_GET, true, nil)
reqData, err := msg.Marshal() reqData, err := msg.Marshal()
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -467,7 +467,7 @@ func (swarm *Swarm) handleGetAddressMessage(p port.Port, message *pb.Message) { ...@@ -467,7 +467,7 @@ func (swarm *Swarm) handleGetAddressMessage(p port.Port, message *pb.Message) {
swarm.logger.Error(err) swarm.logger.Error(err)
return return
} }
retMsg := Message(pb.Message_ACK, true, []byte(addr.String())) retMsg := pb.Msg(pb.Message_ACK, true, []byte(addr.String()))
err = swarm.AsyncSendWithPort(p, retMsg) err = swarm.AsyncSendWithPort(p, retMsg)
if err != nil { if err != nil {
swarm.logger.Error(err) swarm.logger.Error(err)
......
...@@ -99,7 +99,7 @@ func (p *PortMap) add(pt Port) { ...@@ -99,7 +99,7 @@ func (p *PortMap) add(pt Port) {
} }
} }
func (p *PortMap) getHub() (Port, bool) { func (p *PortMap) GetHub() (Port, bool) {
p.rw.RLocker() p.rw.RLocker()
defer p.rw.RUnlock() defer p.rw.RUnlock()
if p.hubPort == nil { if p.hubPort == nil {
......
...@@ -42,6 +42,7 @@ func (r *router) Start() error { ...@@ -42,6 +42,7 @@ func (r *router) Start() error {
} }
func (r *router) Stop() error { func (r *router) Stop() error {
r.cancel()
return nil return nil
} }
...@@ -53,14 +54,17 @@ func (r *router) Add(p port.Port) error { ...@@ -53,14 +54,17 @@ func (r *router) Add(p port.Port) error {
for { for {
select { select {
case msg := <-c: case msg := <-c:
go func() {
err := r.Route(msg) err := r.Route(msg)
if err != nil { if err != nil {
r.logger.Error(err) r.logger.Error(err)
} }
}()
case <-r.ctx.Done():
break
} }
} }
}() }()
return nil return nil
} }
...@@ -79,15 +83,14 @@ func (r *router) Remove(p port.Port) error { ...@@ -79,15 +83,14 @@ func (r *router) Remove(p port.Port) error {
return nil return nil
} }
//路由的有那些数据?需要区分!!!
func (r *router) Route(msg *pb.Message) error { func (r *router) Route(msg *pb.Message) error {
ibtpx := &pb.IBTPX{} ibtpx := &pb.IBTPX{}
err := ibtpx.Unmarshal(msg.Payload.Data) err := ibtpx.Unmarshal(msg.Payload.Data)
if err != nil { if err != nil {
return err return err
} }
mode := ibtpx.Mode if !r.isRouter(ibtpx) {
//本网关已签名、中继链已背书、to是本网关内部的appchain,即顺利通过并转发,否则打断。
if !((r.isSign(ibtpx) && mode == repo.RelayMode && r.isEndorse(ibtpx)) || !r.isSign(ibtpx)) {
return nil return nil
} }
//本网关签名 //本网关签名
...@@ -109,13 +112,8 @@ func (r *router) Route(msg *pb.Message) error { ...@@ -109,13 +112,8 @@ func (r *router) Route(msg *pb.Message) error {
} }
_, to := ibtp.From, ibtp.To _, to := ibtp.From, ibtp.To
if pp, is := r.portMap.Port(to); is { if pp, is := r.portMap.Port(to); is {
switch { mode := ibtpx.Mode
case pp.Type() == port.Sidecar: asyncSend := func() error {
//
return pp.AsyncSend(msg) //转发给其它的sidecar节点或者本身local节点。
case pp.Type() == port.Hub: //发给hub appchain TODO 本机找到的appchain只能是自己的appchain
return pp.AsyncSend(msg)
case pp.Type() == port.Appchain: //TODO 本机找到的appchain只能是自己的appchain
switch mode { switch mode {
case repo.RelayMode: case repo.RelayMode:
hub, is := r.getHub() hub, is := r.getHub()
...@@ -127,11 +125,54 @@ func (r *router) Route(msg *pb.Message) error { ...@@ -127,11 +125,54 @@ func (r *router) Route(msg *pb.Message) error {
case repo.DirectMode: case repo.DirectMode:
return pp.AsyncSend(msg) return pp.AsyncSend(msg)
default: default:
//TODO 跳过
return nil return nil
} }
}
send := func() (*pb.Message, error) {
switch mode {
case repo.RelayMode:
hub, is := r.getHub()
if is && !r.isEndorse(ibtpx) {
return hub.Send(msg)
} else {
return pp.Send(msg)
}
case repo.DirectMode:
return pp.Send(msg)
default:
return nil, errors.New("error mode type")
}
}
switch {
case pp.Type() == port.Sidecar:
return pp.AsyncSend(msg) //转发给其它的sidecar节点或者本身local节点。
case pp.Type() == port.Hub: //发给hub appchain TODO 本机找到的appchain只能是自己的appchain
return pp.AsyncSend(msg)
case pp.Type() == port.Appchain: //TODO 本机找到的appchain只能是自己的appchain
switch msg.Type {
case pb.Message_IBTP_SEND:
ret, err := send()
if err != nil {
r.logger.Error(err)
return err
}
r.Route(ret) //返回值,重新路由; send返回值需要调换from , to;
//d,_ := ret.Marshal()
//msg = pb.Msg(pb.Message_IBTP_RECEIPT_SEND,true,d)
case pb.Message_IBTP_RECEIPT_SEND:
err = asyncSend()
if err != nil {
r.logger.Error(err)
return err
}
case pb.Message_IBTP_GET:
case pb.Message_IBTP_RECEIPT_GET:
default:
return nil
}
default: default:
//TODO 跳过
return nil return nil
} }
} }
...@@ -151,13 +192,21 @@ func (r *router) Route(msg *pb.Message) error { ...@@ -151,13 +192,21 @@ func (r *router) Route(msg *pb.Message) error {
return nil return nil
} }
func (r *router) isRouter(ibtpx *pb.IBTPX) bool {
mode := ibtpx.Mode
//本网关已签名、中继链已背书、to是本网关内部的appchain,即顺利通过并转发,否则打断。
if !((r.isSign(ibtpx) && mode == repo.RelayMode && r.isEndorse(ibtpx)) || !r.isSign(ibtpx)) {
return false
}
return true
}
func (r *router) firstRoute(msg *pb.Message) { func (r *router) firstRoute(msg *pb.Message) {
panic("implement me") panic("implement me")
} }
func (r *router) getHub() (port.Port, bool) { func (r *router) getHub() (port.Port, bool) {
return r.portMap.GetHub()
return nil, false
} }
func (r *router) isSign(ibtpx *pb.IBTPX) bool { func (r *router) isSign(ibtpx *pb.IBTPX) bool {
......
...@@ -4,8 +4,13 @@ const ( ...@@ -4,8 +4,13 @@ const (
V1 = "1.0" V1 = "1.0"
) )
//type Message interface { func Msg(typ Message_Type, ok bool, data []byte) *Message {
// Marshal() ([]byte, error) return &Message{
// Unmarshal([]byte) error Type: typ,
// IsMsg()bool Version: V1,
//} Payload: &Pack{
Ok: ok,
Data: data,
},
}
}
...@@ -18,6 +18,7 @@ type Client interface { //业务实现委托接口。需要实现的那有些。 ...@@ -18,6 +18,7 @@ type Client interface { //业务实现委托接口。需要实现的那有些。
internal.Launcher internal.Launcher
Kernel Kernel
Bind(kern Kernel) Bind(kern Kernel)
// Initialize initialize plugin client // Initialize initialize plugin client
Initialize(configPath string, ID string, extra []byte) error Initialize(configPath string, ID string, extra []byte) error
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment