package peermgr import ( "context" "fmt" "sync" "time" "github.com/Rican7/retry" "github.com/Rican7/retry/strategy" cid "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" "github.com/meshplus/bitxhub-kit/crypto" network "github.com/meshplus/go-lightp2p" "github.com/sirupsen/logrus" "gitlab.33.cn/link33/sidecar/internal/port" "gitlab.33.cn/link33/sidecar/internal/repo" "gitlab.33.cn/link33/sidecar/internal/router" "gitlab.33.cn/link33/sidecar/model/pb" ) const ( protocolID = "/sidecar/1.0.0" // magic protocol defaultProvidersNum = 1 ) var _ PeerManager = (*Swarm)(nil) type Swarm struct { p2p network.Network logger logrus.FieldLogger peers map[string]*peer.AddrInfo router router.Router providers uint64 msgHandlers sync.Map connectHandlers []ConnectHandler lock sync.RWMutex ctx context.Context cancel context.CancelFunc localPeer *localPeer } func New(config *repo.Config, router router.Router, nodePrivKey crypto.PrivateKey, privKey crypto.PrivateKey, providers uint64, logger logrus.FieldLogger) (*Swarm, error) { libp2pPrivKey, err := convertToLibp2pPrivKey(nodePrivKey) if err != nil { return nil, fmt.Errorf("convert private key: %w", err) } var ll string var remotes map[string]*peer.AddrInfo id, err := peer.IDFromPrivateKey(libp2pPrivKey) if err != nil { panic(err) } local := newLocal(id, privKey) ll, remotes, err = loadPeers(config.Peer.Peers, libp2pPrivKey) if err != nil { return nil, fmt.Errorf("load peers: %w", err) } protocolIDs := []string{protocolID} p2p, err := network.New( network.WithLocalAddr(ll), network.WithPrivateKey(libp2pPrivKey), network.WithProtocolIDs(protocolIDs), network.WithLogger(logger), ) if err != nil { return nil, fmt.Errorf("create p2p: %w", err) } if providers == 0 { providers = defaultProvidersNum } ctx, cancel := context.WithCancel(context.Background()) return &Swarm{ providers: providers, p2p: p2p, logger: logger, peers: remotes, ctx: ctx, cancel: cancel, router: router, localPeer: local, }, nil } func (swarm *Swarm) add(p port.Port) { err := swarm.router.Add(p) if err != nil { swarm.logger.Error(err) } } func (swarm *Swarm) Start() error { err := swarm.router.Start() if err != nil { return err } swarm.add(swarm.localPeer) swarm.p2p.SetMessageHandler(swarm.handleMessage) if err := swarm.RegisterMsgHandler(pb.Message_ADDRESS_GET, swarm.localPeer.handleGetAddressMessage); err != nil { return fmt.Errorf("register get address msg handler: %w", err) } if err := swarm.RegisterMsgHandler(pb.Message_PEER_INFO_GET, swarm.localPeer.HandleGetPeerInfoMessage); err != nil { return fmt.Errorf("register get peer info msg handler: %w", err) } if err := swarm.RegisterMultiMsgHandler([]pb.Message_Type{ pb.Message_IBTP_SEND, pb.Message_IBTP_GET, pb.Message_IBTP_RECEIPT_SEND, pb.Message_IBTP_RECEIPT_GET, }, swarm.handleIBTPX); err != nil { return fmt.Errorf("register handle IBTPX msg handler: %w", err) } if err := swarm.p2p.Start(); err != nil { return fmt.Errorf("p2p module start: %w", err) } // need to connect one other sidecar at least wg := &sync.WaitGroup{} wg.Add(1) for id, addr := range swarm.peers { go func(id string, addr *peer.AddrInfo) { if err := retry.Retry(func(attempt uint) error { if err := swarm.p2p.Connect(*addr); err != nil { if attempt != 0 && attempt%5 == 0 { swarm.logger.WithFields(logrus.Fields{ "node": id, "error": err, }).Error("Connect failed") } return err } address, err := swarm.getRemoteAddress(addr.ID) if err != nil { swarm.logger.WithFields(logrus.Fields{ "node": id, "error": err, }).Error("Get remote address failed") return err } swarm.logger.WithFields(logrus.Fields{ "node": id, "address:": address, }).Info("Connect successfully") swarm.addRemotePortByID(id) swarm.lock.RLock() defer swarm.lock.RUnlock() for _, handler := range swarm.connectHandlers { go func(connectHandler ConnectHandler, address string) { connectHandler(address) }(handler, address) } wg.Done() return nil }, strategy.Wait(1*time.Second), ); err != nil { swarm.logger.Error(err) } }(id, addr) } wg.Wait() return nil } // 注册异步处理数据的方法 func (swarm *Swarm) handleMessage(s network.Stream, data []byte) { m := &pb.Message{} if err := m.Unmarshal(data); err != nil { swarm.logger.Error(err) return } handler, ok := swarm.msgHandlers.Load(m.Type) if !ok { swarm.logger.WithFields(logrus.Fields{ "error": fmt.Errorf("can't handle msg[type: %v]", m.Type), "type": m.Type.String(), }).Error("Handle message") return } msgHandler, ok := handler.(MessageHandler) if !ok { swarm.logger.WithFields(logrus.Fields{ "error": fmt.Errorf("invalid handler for msg [type: %v]", m.Type), "type": m.Type.String(), }).Error("Handle message") return } msgHandler(swarm.newSidecar(s.RemotePeerID()), m) } func (swarm *Swarm) newSidecar(sidecarID string) *sidecar { info, err := swarm.getRemotePeerInfo(sidecarID) if err != nil { swarm.logger.Error(err) } p := newSidecar(sidecarID, info.GetTag(), swarm) return p } // 接收其它sidecar节点发过来的交易、请求等。主要是IBTP结构相关数据。 func (swarm *Swarm) handleIBTPX(pt port.Port, m *pb.Message) { p, is := swarm.router.Load(pt.ID()) if is { ps, iss := p.(*sidecar) if iss { // ps.rev <- m ps.Receive(m) } } else { ppt := pt.(*sidecar) swarm.addRemotePort(ppt) ppt.Receive(m) } } func (swarm *Swarm) addRemotePort(ppt *sidecar) { swarm.add(ppt) } func (swarm *Swarm) Stop() error { if err := swarm.p2p.Stop(); err != nil { return err } swarm.cancel() err := swarm.router.Stop() if err != nil { return err } return nil } func (swarm *Swarm) Connect(addrInfo *peer.AddrInfo) (string, error) { err := swarm.p2p.Connect(*addrInfo) if err != nil { return "", err } address, err := swarm.getRemoteAddress(addrInfo.ID) if err != nil { return "", err } swarm.logger.WithFields(logrus.Fields{ "address": address, "addrInfo": addrInfo, }).Info("Connect peer") id := addrInfo.ID.String() swarm.addRemotePortByID(id) return id, nil } func (swarm *Swarm) addRemotePortByID(id string) { swarm.addRemotePort(swarm.newSidecar(id)) } func (swarm *Swarm) AsyncSendWithPort(s port.Port, msg *pb.Message) error { return s.AsyncSend(msg) } func (swarm *Swarm) SendWithPort(s port.Port, msg *pb.Message) (*pb.Message, error) { return s.Send(msg) } func (swarm *Swarm) AsyncSendByID(id string, msg *pb.Message) error { data, err := msg.Marshal() if err != nil { return fmt.Errorf("marshal message: %w", err) } return swarm.p2p.AsyncSend(id, data) } func (swarm *Swarm) SendByID(id string, msg *pb.Message) (*pb.Message, error) { data, err := msg.Marshal() if err != nil { return nil, err } ret, err := swarm.p2p.Send(id, data) if err != nil { return nil, fmt.Errorf("sync send: %w", err) } m := &pb.Message{} if err := m.Unmarshal(ret); err != nil { return nil, err } return m, nil } func (swarm *Swarm) Peers() map[string]*peer.AddrInfo { m := make(map[string]*peer.AddrInfo) for id, addr := range swarm.peers { m[id] = addr } return m } func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) { msg := pb.Msg(pb.Message_ADDRESS_GET, true, nil) ret, err := swarm.SendByID(id.String(), msg) if err != nil { return "", err } return string(ret.Payload.Data), nil } func (swarm *Swarm) getRemotePeerInfo(id string) (*pb.PeerInfo, error) { msg := pb.Msg(pb.Message_PEER_INFO_GET, true, nil) ret, err := swarm.SendByID(id, msg) if err != nil { return nil, err } info := &pb.PeerInfo{} err = info.Unmarshal(ret.Payload.Data) return info, err } func (swarm *Swarm) FindProviders(id string) (string, error) { format := cid.V0Builder{} toCid, err := format.Sum([]byte(id)) if err != nil { return "", err } providers, err := swarm.p2p.FindProvidersAsync(toCid.String(), int(swarm.providers)) if err != nil { swarm.logger.WithFields(logrus.Fields{ "id": id, }).Error("Not find providers") return "", err } for provider := range providers { swarm.logger.WithFields(logrus.Fields{ "id": id, "cid": toCid.String(), "provider_id": provider.ID.String(), }).Info("Find provider") sidecarId, err := swarm.Connect(&provider) if err != nil { swarm.logger.WithFields(logrus.Fields{ "peerId": sidecarId, "cid": provider.ID.String(), }).Error("connect error: ", err) continue } return sidecarId, nil } swarm.logger.WithFields(logrus.Fields{ "id": id, "cid": toCid.String(), }).Warning("No providers found") // TODO add error return "", nil } func (swarm *Swarm) Provider(key string, passed bool) error { return swarm.p2p.Provider(key, passed) }