package peermgr import ( "errors" "github.com/libp2p/go-libp2p-core/peer" "github.com/meshplus/bitxhub-kit/crypto" "github.com/sirupsen/logrus" "gitlab.33.cn/link33/sidecar/internal/port" "gitlab.33.cn/link33/sidecar/model/pb" ) var ( Office = "office" Test = "test" ) type localPeer struct { id peer.ID tag string rev chan *pb.Message logger logrus.FieldLogger privKey crypto.PrivateKey } func newLocal(id peer.ID, privKey crypto.PrivateKey) *localPeer { return &localPeer{ id: id, tag: Office, rev: make(chan *pb.Message, port.MaxCapacity), privKey: privKey, } } func (l *localPeer) ID() string { return l.id.String() } func (l *localPeer) Type() string { return port.Local } func (l *localPeer) Name() string { return l.ID() } func (l *localPeer) Tag() string { return l.tag } // 需要同步处理的数据,主要用于处理接收的其它sidecar port 、外部API返回数据。 func (l *localPeer) Send(msg *pb.Message) (*pb.Message, error) { // 同步完成 switch msg.Type { case pb.Message_RULE_DEPLOY: case pb.Message_APPCHAIN_REGISTER: case pb.Message_APPCHAIN_GET: case pb.Message_APPCHAIN_UPDATE: } return nil, nil } // 需要异步处理的数据 func (l *localPeer) AsyncSend(msg *pb.Message) error { // 先获取消息类型,做出判断是否路由,判断是异步还是同步。 // 然后做出IBTPX消息,对from、to做路由判断,以及同步异步完成。 // 转发给路由器的。 // 判断数据类型 if msg.IsIbtpRouter() { ibtpx, err := msg.GetIBTPX() if err != nil { return err } // TODO if ibtpx.Ibtp.GetTo() == l.ID() { return errors.New("message type error") } l.rev <- msg } else { // 处理非IBTPX结构类型消息 // 获取本机信息,以及本机做的工作。 // 注册信息 // 分发节点信息 // 接收与sidecar节点相关的信息流 } return nil } func (l *localPeer) ListenIBTPX() <-chan *pb.Message { return l.rev } // HandleGetPeerInfoMessage sidecar 节点 func (l *localPeer) HandleGetPeerInfoMessage(p port.Port, message *pb.Message) { data := &pb.PeerInfo{ ID: l.ID(), Tag: l.Tag(), } msg, _ := data.Marshal() retMsg := pb.Msg(pb.Message_ACK, true, msg) err := p.AsyncSend(retMsg) if err != nil { l.logger.Error(err) } } func (l *localPeer) handleGetAddressMessage(p port.Port, message *pb.Message) { addr, err := l.privKey.PublicKey().Address() if err != nil { l.logger.Error(err) return } retMsg := pb.Msg(pb.Message_ACK, true, []byte(addr.String())) err = p.AsyncSend(retMsg) if err != nil { l.logger.Error(err) } } // 涉及到同步异步的问题。 // 一、根据目的地址转发(异步完成);(这种情况其实就是返回一个ack给绑定对应的port dev,还是根据消息类型判断)。 // 二、根据消息类型转发(同步情况:需要立马返回结果;异步情况:需要返回一个ack,给绑定对应的port dev,所以找到对应的port dev 很关键!)。