Commit 2b5151c7 authored by suyanlong's avatar suyanlong

Fixed bug

parent a4bfa5bc
Pipeline #7953 failed with stages
...@@ -3,9 +3,6 @@ package peermgr ...@@ -3,9 +3,6 @@ package peermgr
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/internal/router"
"github.com/link33/sidecar/model/pb"
"strings" "strings"
"sync" "sync"
"time" "time"
...@@ -15,7 +12,10 @@ import ( ...@@ -15,7 +12,10 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
crypto2 "github.com/libp2p/go-libp2p-core/crypto" crypto2 "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/internal/repo" "github.com/link33/sidecar/internal/repo"
"github.com/link33/sidecar/internal/router"
"github.com/link33/sidecar/model/pb"
"github.com/meshplus/bitxhub-kit/crypto" "github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/crypto/asym/ecdsa" "github.com/meshplus/bitxhub-kit/crypto/asym/ecdsa"
network "github.com/meshplus/go-lightp2p" network "github.com/meshplus/go-lightp2p"
...@@ -91,6 +91,13 @@ func New(config *repo.Config, router router.Router, nodePrivKey crypto.PrivateKe ...@@ -91,6 +91,13 @@ func New(config *repo.Config, router router.Router, nodePrivKey crypto.PrivateKe
}, nil }, nil
} }
func (swarm *Swarm) add(p port.Port) {
err := swarm.router.Add(p)
if err != nil {
swarm.logger.Error(err)
}
}
func (swarm *Swarm) Start() error { func (swarm *Swarm) Start() error {
l := &local{ l := &local{
id: swarm.id, id: swarm.id,
...@@ -99,18 +106,14 @@ func (swarm *Swarm) Start() error { ...@@ -99,18 +106,14 @@ func (swarm *Swarm) Start() error {
rev: make(chan *pb.Message), rev: make(chan *pb.Message),
rout: swarm.router, rout: swarm.router,
} }
swarm.router.Add(l) swarm.add(l)
swarm.p2p.SetMessageHandler(swarm.handleMessage) swarm.p2p.SetMessageHandler(swarm.handleMessage)
if err := swarm.RegisterMsgHandler(pb.Message_ADDRESS_GET, swarm.handleGetAddressMessage); err != nil { if err := swarm.RegisterMsgHandler(pb.Message_ADDRESS_GET, swarm.handleGetAddressMessage); err != nil {
return fmt.Errorf("register get address msg handler: %w", err) return fmt.Errorf("register get address msg handler: %w", err)
} }
if err := swarm.RegisterMsgHandler(pb.Message_PEER_INFO_GET, l.HandleGetPeerInfoMessage); err != nil { if err := swarm.RegisterMsgHandler(pb.Message_PEER_INFO_GET, l.HandleGetPeerInfoMessage); err != nil {
return fmt.Errorf("register get peer info msg handler: %w", err) return fmt.Errorf("register get peer info msg handler: %w", err)
} }
if err := swarm.RegisterMultiMsgHandler([]pb.Message_Type{ if err := swarm.RegisterMultiMsgHandler([]pb.Message_Type{
pb.Message_IBTP_SEND, pb.Message_IBTP_SEND,
pb.Message_IBTP_GET, pb.Message_IBTP_GET,
...@@ -155,14 +158,7 @@ func (swarm *Swarm) Start() error { ...@@ -155,14 +158,7 @@ func (swarm *Swarm) Start() error {
"address:": address, "address:": address,
}).Info("Connect successfully") }).Info("Connect successfully")
rec := make(chan *pb.Message) swarm.addRemotePortByID(id)
p := &sidecar{
id: id,
swarm: swarm,
tag: "",
rev: rec,
}
_ = swarm.router.Add(p)
swarm.lock.RLock() swarm.lock.RLock()
defer swarm.lock.RUnlock() defer swarm.lock.RUnlock()
...@@ -219,6 +215,7 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) { ...@@ -219,6 +215,7 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
tag: "", tag: "",
rev: rec, rev: rec,
} }
msgHandler(p, m) msgHandler(p, m)
} }
...@@ -232,14 +229,18 @@ func (swarm *Swarm) HandleIBTPX(pt port.Port, m *pb.Message) { ...@@ -232,14 +229,18 @@ func (swarm *Swarm) HandleIBTPX(pt port.Port, m *pb.Message) {
} }
} else { } else {
ppt := pt.(*sidecar) ppt := pt.(*sidecar)
swarm.addRemotePort(ppt)
ppt.rev <- m
}
}
func (swarm *Swarm) addRemotePort(ppt *sidecar) {
info, err := swarm.getRemotePeerInfo(ppt.ID()) info, err := swarm.getRemotePeerInfo(ppt.ID())
if err != nil { if err != nil {
swarm.logger.Error(err) swarm.logger.Error(err)
} }
ppt.id = info.GetID() ppt.id = info.GetID()
_ = swarm.router.Add(ppt) swarm.add(ppt)
ppt.rev <- m
}
} }
func (swarm *Swarm) Stop() error { func (swarm *Swarm) Stop() error {
...@@ -264,15 +265,20 @@ func (swarm *Swarm) Connect(addrInfo *peer.AddrInfo) (string, error) { ...@@ -264,15 +265,20 @@ func (swarm *Swarm) Connect(addrInfo *peer.AddrInfo) (string, error) {
"addrInfo": addrInfo, "addrInfo": addrInfo,
}).Info("Connect peer") }).Info("Connect peer")
id := addrInfo.ID.String()
swarm.addRemotePortByID(id)
return id, nil
}
func (swarm *Swarm) addRemotePortByID(id string) {
rec := make(chan *pb.Message) rec := make(chan *pb.Message)
p := &sidecar{ p := &sidecar{
id: addrInfo.ID.String(), id: id,
swarm: swarm, swarm: swarm,
tag: "", tag: "",
rev: rec, rev: rec,
} }
swarm.router.Add(p) swarm.addRemotePort(p)
return addrInfo.ID.String(), nil
} }
func (swarm *Swarm) AsyncSendWithPort(s port.Port, msg *pb.Message) error { func (swarm *Swarm) AsyncSendWithPort(s port.Port, msg *pb.Message) error {
...@@ -401,7 +407,7 @@ func (swarm *Swarm) getRemotePeerInfo(id string) (*pb.PeerInfo, error) { ...@@ -401,7 +407,7 @@ func (swarm *Swarm) getRemotePeerInfo(id string) (*pb.PeerInfo, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
retData, err := swarm.p2p.Send(id.String(), reqData) //同步获取数据 retData, err := swarm.p2p.Send(id, reqData) //同步获取数据
if err != nil { if err != nil {
return nil, fmt.Errorf("sync send: %w", err) return nil, fmt.Errorf("sync send: %w", err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment