Commit a4bfa5bc authored by suyanlong's avatar suyanlong

Add HandleIBTPX and getRemotePeerInfo function

parent a2e0aa28
Pipeline #7952 canceled with stages
......@@ -3,13 +3,13 @@ package appchain
import (
"context"
"fmt"
"github.com/link33/sidecar/internal/port"
"strconv"
"strings"
"time"
"github.com/Rican7/retry"
"github.com/Rican7/retry/strategy"
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/internal/txcrypto"
"github.com/link33/sidecar/model/pb"
"github.com/link33/sidecar/pkg/plugins"
......
......@@ -32,8 +32,6 @@ func (swarm *Swarm) RegisterMsgHandler(messageType pb.Message_Type, handler Mess
func (swarm *Swarm) RegisterConnectHandler(handler ConnectHandler) error {
swarm.lock.Lock()
defer swarm.lock.Unlock()
swarm.connectHandlers = append(swarm.connectHandlers, handler)
return nil
}
......@@ -6,6 +6,7 @@ import (
"github.com/link33/sidecar/internal/router"
"github.com/link33/sidecar/model/pb"
"github.com/meshplus/bitxhub-kit/crypto"
"github.com/sirupsen/logrus"
)
type local struct {
......@@ -14,6 +15,7 @@ type local struct {
tag string
rev chan *pb.Message
rout router.Router
logger logrus.FieldLogger
}
func (l *local) ID() string {
......@@ -58,3 +60,15 @@ func (l *local) ListenIBTPX() <-chan *pb.Message {
}
// sidecar 节点
func (l *local) HandleGetPeerInfoMessage(p port.Port, message *pb.Message) {
data := &pb.PeerInfo{
ID: l.ID(),
Tag: l.Tag(),
}
msg, _ := data.Marshal()
retMsg := Message(pb.Message_ACK, true, msg)
err := p.AsyncSend(retMsg)
if err != nil {
l.logger.Error(err)
}
}
......@@ -107,6 +107,19 @@ func (swarm *Swarm) Start() error {
return fmt.Errorf("register get address msg handler: %w", err)
}
if err := swarm.RegisterMsgHandler(pb.Message_PEER_INFO_GET, l.HandleGetPeerInfoMessage); err != nil {
return fmt.Errorf("register get peer info msg handler: %w", err)
}
if err := swarm.RegisterMultiMsgHandler([]pb.Message_Type{
pb.Message_IBTP_SEND,
pb.Message_IBTP_GET,
pb.Message_IBTP_RECEIPT_SEND,
pb.Message_IBTP_RECEIPT_GET,
}, swarm.HandleIBTPX); err != nil {
return fmt.Errorf("register handle IBTPX msg handler: %w", err)
}
if err := swarm.p2p.Start(); err != nil {
return fmt.Errorf("p2p module start: %w", err)
}
......@@ -181,30 +194,51 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
return
}
t := m.Type
switch {
// 接收其它sidecar节点发过来的交易、请求等。主要是IBTP结构相关数据。
case t == pb.Message_IBTP_SEND || t == pb.Message_IBTP_GET || t == pb.Message_IBTP_RECEIPT_SEND || t == pb.Message_IBTP_RECEIPT_GET:
p, is := swarm.router.Load(s.RemotePeerID())
if is {
ps, iss := p.(*sidecar)
if iss {
ps.rev <- m
handler, ok := swarm.msgHandlers.Load(m.Type)
if !ok {
swarm.logger.WithFields(logrus.Fields{
"error": fmt.Errorf("can't handle msg[type: %v]", m.Type),
"type": m.Type.String(),
}).Error("Handle message")
return
}
msgHandler, ok := handler.(MessageHandler)
if !ok {
swarm.logger.WithFields(logrus.Fields{
"error": fmt.Errorf("invalid handler for msg [type: %v]", m.Type),
"type": m.Type.String(),
}).Error("Handle message")
return
}
addr, _ := peer.AddrInfoFromP2pAddr(s.RemotePeerAddr())
rec := make(chan *pb.Message)
newPort := &sidecar{
id: addr.ID.String(),
p := &sidecar{
id: s.RemotePeerID(),
swarm: swarm,
tag: "",
rev: rec,
}
swarm.router.Add(newPort)
rec <- m
default: //非IBTP结构相关数据,转发给local处理
//TODO
msgHandler(p, m)
}
// 接收其它sidecar节点发过来的交易、请求等。主要是IBTP结构相关数据。
func (swarm *Swarm) HandleIBTPX(pt port.Port, m *pb.Message) {
p, is := swarm.router.Load(pt.ID())
if is {
ps, iss := p.(*sidecar)
if iss {
ps.rev <- m
}
} else {
ppt := pt.(*sidecar)
info, err := swarm.getRemotePeerInfo(ppt.ID())
if err != nil {
swarm.logger.Error(err)
}
ppt.id = info.GetID()
_ = swarm.router.Add(ppt)
ppt.rev <- m
}
}
......@@ -361,6 +395,25 @@ func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) {
return string(ret.Payload.Data), nil
}
func (swarm *Swarm) getRemotePeerInfo(id string) (*pb.PeerInfo, error) {
msg := Message(pb.Message_PEER_INFO_GET, true, nil)
reqData, err := msg.Marshal()
if err != nil {
return nil, err
}
retData, err := swarm.p2p.Send(id.String(), reqData) //同步获取数据
if err != nil {
return nil, fmt.Errorf("sync send: %w", err)
}
ret := &pb.Message{}
if err := ret.Unmarshal(retData); err != nil {
return nil, err
}
info := &pb.PeerInfo{}
err = info.Unmarshal(ret.Payload.Data)
return info, err
}
func (swarm *Swarm) FindProviders(id string) (string, error) {
format := cid.V0Builder{}
toCid, err := format.Sum([]byte(id))
......
......@@ -151,7 +151,7 @@ func (r *router) Route(msg *pb.Message) error {
return nil
}
func (r *router) firstRoute(ibtp *pb.Message) {
func (r *router) firstRoute(msg *pb.Message) {
panic("implement me")
}
......
This diff is collapsed.
......@@ -16,6 +16,7 @@ message Message {
IBTP_SEND = 8;
IBTP_RECEIPT_SEND = 9;
IBTP_RECEIPT_GET = 10;
PEER_INFO_GET = 11;
}
Type type = 1;
Pack payload = 2;
......@@ -26,3 +27,10 @@ message Pack {
bool ok = 1;
bytes data = 2;
}
message PeerInfo {
string ID = 1;
string Tag = 2;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment