package peermgr import ( "errors" "github.com/libp2p/go-libp2p-core/peer" "github.com/sirupsen/logrus" "github.com/link33/sidecar/internal/port" "github.com/link33/sidecar/model/pb" ) var ( Office = "office" Test = "test" ) type local struct { id peer.ID tag string rev chan *pb.Message logger logrus.FieldLogger } func newLocal(id peer.ID) *local { return &local{ id: id, tag: Office, rev: make(chan *pb.Message, 10), } } func (l *local) ID() string { return l.id.String() } func (l *local) Type() string { return port.Local } func (l *local) Name() string { return l.ID() } func (l *local) Tag() string { return l.tag } // 需要同步处理的数据,主要用于处理接收的其它sidecar port 、外部API返回数据。 func (l *local) Send(msg *pb.Message) (*pb.Message, error) { // 同步完成 switch msg.Type { case pb.Message_RULE_DEPLOY: case pb.Message_APPCHAIN_REGISTER: case pb.Message_APPCHAIN_GET: case pb.Message_APPCHAIN_UPDATE: } return nil, nil } //需要异步处理的数据 func (l *local) AsyncSend(msg *pb.Message) error { // 先获取消息类型,做出判断是否路由,判断是异步还是同步。 // 然后做出IBTPX消息,对from、to做路由判断,以及同步异步完成。 // 转发给路由器的。 // 判断数据类型 if msg.IsIbtpRouter() { ibtpx, err := msg.GetIBTPX() if err != nil { return err } //TODO if ibtpx.Ibtp.GetTo() == l.ID() { return errors.New("message type error") } l.rev <- msg } else { // 处理非IBTPX结构类型消息 // 获取本机信息,以及本机做的工作。 // 注册信息 // 分发节点信息 // 接收与sidecar节点相关的信息流 } return nil } func (l *local) ListenIBTPX() <-chan *pb.Message { return l.rev } // HandleGetPeerInfoMessage sidecar 节点 func (l *local) HandleGetPeerInfoMessage(p port.Port, message *pb.Message) { data := &pb.PeerInfo{ ID: l.ID(), Tag: l.Tag(), } msg, _ := data.Marshal() retMsg := pb.Msg(pb.Message_ACK, true, msg) err := p.AsyncSend(retMsg) if err != nil { l.logger.Error(err) } } //涉及到同步异步的问题。 //一、根据目的地址转发(异步完成);(这种情况其实就是返回一个ack给绑定对应的port dev,还是根据消息类型判断)。 //二、根据消息类型转发(同步情况:需要立马返回结果;异步情况:需要返回一个ack,给绑定对应的port dev,所以找到对应的port dev 很关键!)。