Commit 8c74ffdc authored by suyanlong's avatar suyanlong

Adjust localPeer function

parent 4478741f
Pipeline #8017 failed with stages
......@@ -53,7 +53,7 @@ func loadPeers(peers []string, privateKey crypto2.PrivKey) (string, map[string]*
}
if local == "" {
return "", nil, fmt.Errorf("get local addr: no local addr is configured")
return "", nil, fmt.Errorf("get localPeer addr: no localPeer addr is configured")
}
return local, remotes, nil
......
......@@ -4,6 +4,7 @@ import (
"errors"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/meshplus/bitxhub-kit/crypto"
"github.com/sirupsen/logrus"
"github.com/link33/sidecar/internal/port"
......@@ -15,39 +16,41 @@ var (
Test = "test"
)
type local struct {
id peer.ID
tag string
rev chan *pb.Message
logger logrus.FieldLogger
type localPeer struct {
id peer.ID
tag string
rev chan *pb.Message
logger logrus.FieldLogger
privKey crypto.PrivateKey
}
func newLocal(id peer.ID) *local {
return &local{
id: id,
tag: Office,
rev: make(chan *pb.Message, port.MaxCapacity),
func newLocal(id peer.ID, privKey crypto.PrivateKey) *localPeer {
return &localPeer{
id: id,
tag: Office,
rev: make(chan *pb.Message, port.MaxCapacity),
privKey: privKey,
}
}
func (l *local) ID() string {
func (l *localPeer) ID() string {
return l.id.String()
}
func (l *local) Type() string {
func (l *localPeer) Type() string {
return port.Local
}
func (l *local) Name() string {
func (l *localPeer) Name() string {
return l.ID()
}
func (l *local) Tag() string {
func (l *localPeer) Tag() string {
return l.tag
}
// 需要同步处理的数据,主要用于处理接收的其它sidecar port 、外部API返回数据。
func (l *local) Send(msg *pb.Message) (*pb.Message, error) {
func (l *localPeer) Send(msg *pb.Message) (*pb.Message, error) {
// 同步完成
switch msg.Type {
case pb.Message_RULE_DEPLOY:
......@@ -60,7 +63,7 @@ func (l *local) Send(msg *pb.Message) (*pb.Message, error) {
}
// 需要异步处理的数据
func (l *local) AsyncSend(msg *pb.Message) error {
func (l *localPeer) AsyncSend(msg *pb.Message) error {
// 先获取消息类型,做出判断是否路由,判断是异步还是同步。
// 然后做出IBTPX消息,对from、to做路由判断,以及同步异步完成。
// 转发给路由器的。
......@@ -86,12 +89,12 @@ func (l *local) AsyncSend(msg *pb.Message) error {
return nil
}
func (l *local) ListenIBTPX() <-chan *pb.Message {
func (l *localPeer) ListenIBTPX() <-chan *pb.Message {
return l.rev
}
// HandleGetPeerInfoMessage sidecar 节点
func (l *local) HandleGetPeerInfoMessage(p port.Port, message *pb.Message) {
func (l *localPeer) HandleGetPeerInfoMessage(p port.Port, message *pb.Message) {
data := &pb.PeerInfo{
ID: l.ID(),
Tag: l.Tag(),
......@@ -104,6 +107,19 @@ func (l *local) HandleGetPeerInfoMessage(p port.Port, message *pb.Message) {
}
}
func (l *localPeer) handleGetAddressMessage(p port.Port, message *pb.Message) {
addr, err := l.privKey.PublicKey().Address()
if err != nil {
l.logger.Error(err)
return
}
retMsg := pb.Msg(pb.Message_ACK, true, []byte(addr.String()))
err = p.AsyncSend(retMsg)
if err != nil {
l.logger.Error(err)
}
}
// 涉及到同步异步的问题。
// 一、根据目的地址转发(异步完成);(这种情况其实就是返回一个ack给绑定对应的port dev,还是根据消息类型判断)。
// 二、根据消息类型转发(同步情况:需要立马返回结果;异步情况:需要返回一个ack,给绑定对应的port dev,所以找到对应的port dev 很关键!)。
......@@ -95,21 +95,6 @@ func (mr *MockPeerManagerMockRecorder) FindProviders(id interface{}) *gomock.Cal
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindProviders", reflect.TypeOf((*MockPeerManager)(nil).FindProviders), id)
}
// GetRemotePeerInfo mocks base method.
func (m *MockPeerManager) GetRemotePeerInfo(id string) (*pb.PeerInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetRemotePeerInfo", id)
ret0, _ := ret[0].(*pb.PeerInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetRemotePeerInfo indicates an expected call of GetRemotePeerInfo.
func (mr *MockPeerManagerMockRecorder) GetRemotePeerInfo(id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRemotePeerInfo", reflect.TypeOf((*MockPeerManager)(nil).GetRemotePeerInfo), id)
}
// Provider mocks base method.
func (m *MockPeerManager) Provider(arg0 string, arg1 bool) error {
m.ctrl.T.Helper()
......
......@@ -32,7 +32,7 @@ type PeerManager interface {
SendWithPort(s port.Port, msg *pb.Message) (*pb.Message, error)
GetRemotePeerInfo(id string) (*pb.PeerInfo, error)
// getRemotePeerInfo(id string) (*pb.PeerInfo, error)
}
type Handler interface {
......@@ -51,7 +51,7 @@ type DHTManager interface {
FindProviders(id string) (string, error)
// Provide adds the given cid to the content routing system. If 'true' is
// passed, it also announces it, otherwise it is just kept in the local
// passed, it also announces it, otherwise it is just kept in the localPeer
// accounting of which objects are being provided.
Provider(string, bool) error
}
......@@ -332,7 +332,7 @@ func (msh *MockStreamHandler) ReleaseStream(network.Stream) {
//=======================================================================
type MockPeerHandler struct{}
// get local peer id
// get localPeer peer id
func (mph *MockPeerHandler) PeerID() string {
return ""
}
......@@ -352,7 +352,7 @@ func (mph *MockPeerHandler) GetPeers() []peer2.AddrInfo {
return nil
}
// get local peer addr
// get localPeer peer addr
func (mph *MockPeerHandler) LocalAddr() string {
return ""
}
......@@ -409,7 +409,7 @@ func (mdhth *MockDHTHandler) FindProvidersAsync(id string, count int) (<-chan pe
}
// Provide adds the given cid to the content routing system. If 'true' is
// passed, it also announces it, otherwise it is just kept in the local
// passed, it also announces it, otherwise it is just kept in the localPeer
// accounting of which objects are being provided.
func (mdhth *MockDHTHandler) Provider(string, bool) error {
return nil
......
......@@ -34,14 +34,14 @@ type Swarm struct {
router router.Router
providers uint64
privKey crypto.PrivateKey
id peer.ID
msgHandlers sync.Map
connectHandlers []ConnectHandler
lock sync.RWMutex
ctx context.Context
cancel context.CancelFunc
localPeer *localPeer
}
func New(config *repo.Config, router router.Router, nodePrivKey crypto.PrivateKey, privKey crypto.PrivateKey, providers uint64, logger logrus.FieldLogger) (*Swarm, error) {
......@@ -57,6 +57,8 @@ func New(config *repo.Config, router router.Router, nodePrivKey crypto.PrivateKe
panic(err)
}
local := newLocal(id, privKey)
ll, remotes, err = loadPeers(config.Peer.Peers, libp2pPrivKey)
if err != nil {
return nil, fmt.Errorf("load peers: %w", err)
......@@ -84,11 +86,10 @@ func New(config *repo.Config, router router.Router, nodePrivKey crypto.PrivateKe
p2p: p2p,
logger: logger,
peers: remotes,
privKey: privKey,
ctx: ctx,
cancel: cancel,
router: router,
id: id,
localPeer: local,
}, nil
}
......@@ -104,13 +105,12 @@ func (swarm *Swarm) Start() error {
if err != nil {
return err
}
l := newLocal(swarm.id)
swarm.add(l)
swarm.add(swarm.localPeer)
swarm.p2p.SetMessageHandler(swarm.handleMessage)
if err := swarm.RegisterMsgHandler(pb.Message_ADDRESS_GET, swarm.handleGetAddressMessage); err != nil {
if err := swarm.RegisterMsgHandler(pb.Message_ADDRESS_GET, swarm.localPeer.handleGetAddressMessage); err != nil {
return fmt.Errorf("register get address msg handler: %w", err)
}
if err := swarm.RegisterMsgHandler(pb.Message_PEER_INFO_GET, l.HandleGetPeerInfoMessage); err != nil {
if err := swarm.RegisterMsgHandler(pb.Message_PEER_INFO_GET, swarm.localPeer.HandleGetPeerInfoMessage); err != nil {
return fmt.Errorf("register get peer info msg handler: %w", err)
}
if err := swarm.RegisterMultiMsgHandler([]pb.Message_Type{
......@@ -118,7 +118,7 @@ func (swarm *Swarm) Start() error {
pb.Message_IBTP_GET,
pb.Message_IBTP_RECEIPT_SEND,
pb.Message_IBTP_RECEIPT_GET,
}, swarm.HandleIBTPX); err != nil {
}, swarm.handleIBTPX); err != nil {
return fmt.Errorf("register handle IBTPX msg handler: %w", err)
}
......@@ -211,7 +211,7 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
}
func (swarm *Swarm) newSidecar(sidecarID string) *sidecar {
info, err := swarm.GetRemotePeerInfo(sidecarID)
info, err := swarm.getRemotePeerInfo(sidecarID)
if err != nil {
swarm.logger.Error(err)
}
......@@ -220,7 +220,7 @@ func (swarm *Swarm) newSidecar(sidecarID string) *sidecar {
}
// 接收其它sidecar节点发过来的交易、请求等。主要是IBTP结构相关数据。
func (swarm *Swarm) HandleIBTPX(pt port.Port, m *pb.Message) {
func (swarm *Swarm) handleIBTPX(pt port.Port, m *pb.Message) {
p, is := swarm.router.Load(pt.ID())
if is {
ps, iss := p.(*sidecar)
......@@ -327,7 +327,7 @@ func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) {
return string(ret.Payload.Data), nil
}
func (swarm *Swarm) GetRemotePeerInfo(id string) (*pb.PeerInfo, error) {
func (swarm *Swarm) getRemotePeerInfo(id string) (*pb.PeerInfo, error) {
msg := pb.Msg(pb.Message_PEER_INFO_GET, true, nil)
ret, err := swarm.SendByID(id, msg)
if err != nil {
......@@ -380,16 +380,3 @@ func (swarm *Swarm) FindProviders(id string) (string, error) {
func (swarm *Swarm) Provider(key string, passed bool) error {
return swarm.p2p.Provider(key, passed)
}
func (swarm *Swarm) handleGetAddressMessage(p port.Port, message *pb.Message) {
addr, err := swarm.privKey.PublicKey().Address()
if err != nil {
swarm.logger.Error(err)
return
}
retMsg := pb.Msg(pb.Message_ACK, true, []byte(addr.String()))
err = swarm.AsyncSendWithPort(p, retMsg)
if err != nil {
swarm.logger.Error(err)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment