package peermgr import ( "fmt" network "github.com/meshplus/go-lightp2p" "gitlab.33.cn/link33/sidecar/internal/port" "gitlab.33.cn/link33/sidecar/model/pb" ) type sidecar struct { id string swarm PeerManager tag string rev chan *pb.Message } func newSidecar(id string, tag string, swarm *Swarm) *sidecar { rec := make(chan *pb.Message, port.Capacity) return &sidecar{ id: id, swarm: swarm, tag: tag, rev: rec, } } func (s *sidecar) ID() string { return s.id } func (s *sidecar) Type() string { return port.Sidecar } func (s *sidecar) Name() string { return s.ID() } func (s *sidecar) Tag() string { return s.tag } // Send 同步发送给绑定的对应的port dev func (s *sidecar) Send(msg *pb.Message) (*pb.Message, error) { return s.swarm.SendByID(s.ID(), msg) } // AsyncSend 异步发送给绑定的对应的port dev func (s *sidecar) AsyncSend(msg *pb.Message) error { return s.swarm.AsyncSendByID(s.ID(), msg) } // ListenIBTPX 从绑定的对应的port dev接收数据 func (s *sidecar) ListenIBTPX() <-chan *pb.Message { return s.rev } func (s *sidecar) Receive(msg *pb.Message) { s.rev <- msg } type tcpTmpStream struct { s network.Stream tag string rev chan *pb.Message } func newStream(s network.Stream, tag string) *tcpTmpStream { rec := make(chan *pb.Message, port.Capacity) return &tcpTmpStream{ s: s, tag: tag, rev: rec, } } func (s *tcpTmpStream) ID() string { return s.s.RemotePeerID() } func (s *tcpTmpStream) Type() string { return port.Sidecar } func (s *tcpTmpStream) Name() string { return s.ID() } func (s *tcpTmpStream) Tag() string { return s.tag } func (s *tcpTmpStream) Send(msg *pb.Message) (*pb.Message, error) { return Send(s.s.Send, msg) } func Send(sender func([]byte) ([]byte, error), msg *pb.Message) (*pb.Message, error) { data, err := msg.Marshal() if err != nil { return nil, err } ret, err := sender(data) if err != nil { return nil, fmt.Errorf("sync send: %w", err) } m := &pb.Message{} if err := m.Unmarshal(ret); err != nil { return nil, err } return m, nil } func (s *tcpTmpStream) AsyncSend(msg *pb.Message) error { data, err := msg.Marshal() if err != nil { return fmt.Errorf("marshal message: %w", err) } return s.s.AsyncSend(data) } func (s *tcpTmpStream) ListenIBTPX() <-chan *pb.Message { panic("implement me") }