Commit 6c138fef authored by suyanlong's avatar suyanlong

Adjust local struct and Add hepler file for refactor code

parent e177761b
Pipeline #7969 canceled with stages
package peermgr
import (
"fmt"
crypto2 "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/crypto/asym/ecdsa"
ma "github.com/multiformats/go-multiaddr"
"strings"
)
func convertToLibp2pPrivKey(privateKey crypto.PrivateKey) (crypto2.PrivKey, error) {
ecdsaPrivKey, ok := privateKey.(*ecdsa.PrivateKey)
if !ok {
return nil, fmt.Errorf("convert to libp2p private key: not ecdsa private key")
}
libp2pPrivKey, _, err := crypto2.ECDSAKeyPairFromKey(ecdsaPrivKey.K)
if err != nil {
return nil, err
}
return libp2pPrivKey, nil
}
func loadPeers(peers []string, privateKey crypto2.PrivKey) (string, map[string]*peer.AddrInfo, error) {
var local string
remotes := make(map[string]*peer.AddrInfo)
id, err := peer.IDFromPrivateKey(privateKey)
if err != nil {
return "", nil, err
}
for _, p := range peers {
if strings.HasSuffix(p, id.String()) {
idx := strings.LastIndex(p, "/p2p/")
if idx == -1 {
return "", nil, fmt.Errorf("pid is not existed in bootstrap")
}
local = p[:idx]
} else {
addr, err := AddrToPeerInfo(p)
if err != nil {
return "", nil, fmt.Errorf("wrong network addr: %w", err)
}
remotes[addr.ID.String()] = addr
}
}
if local == "" {
return "", nil, fmt.Errorf("get local addr: no local addr is configured")
}
return local, remotes, nil
}
// AddrToPeerInfo transfer addr to PeerInfo
// addr example: "/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64"
func AddrToPeerInfo(multiAddr string) (*peer.AddrInfo, error) {
maddr, err := ma.NewMultiaddr(multiAddr)
if err != nil {
return nil, err
}
return peer.AddrInfoFromP2pAddr(maddr)
}
package peermgr package peermgr
import ( import (
"errors"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/link33/sidecar/internal/port" "github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/internal/router" "github.com/link33/sidecar/internal/router"
...@@ -39,31 +40,45 @@ func (l *local) Tag() string { ...@@ -39,31 +40,45 @@ func (l *local) Tag() string {
return l.tag return l.tag
} }
//需要同步处理的数据 // 需要同步处理的数据,主要用于处理接收的其它sidecar port 、外部API返回数据。
func (l *local) Send(msg *pb.Message) (*pb.Message, error) { func (l *local) Send(msg *pb.Message) (*pb.Message, error) {
// 同步完成 // 同步完成
panic("implement me") switch msg.Type {
case pb.Message_RULE_DEPLOY:
case pb.Message_APPCHAIN_REGISTER:
case pb.Message_APPCHAIN_GET:
case pb.Message_APPCHAIN_UPDATE:
}
return nil, nil
} }
//需要异步处理的数据 //需要异步处理的数据
func (l *local) AsyncSend(msg *pb.Message) error { func (l *local) AsyncSend(msg *pb.Message) error {
// 目标to == id 本机,获取信息 // 先获取消息类型,做出判断是否路由,判断是异步还是同步。
// 然后做出IBTPX消息,对from、to做路由判断,以及同步异步完成。
// 转发给路由器的。 // 转发给路由器的。
// 判断数据类型 // 判断数据类型
var to = ""
if to == l.id.String() { if msg.IsIbtpRouter() {
ibtpx, err := msg.GetIBTPX()
if err != nil {
return err
}
//TODO
if ibtpx.Ibtp.GetTo() == l.ID() {
return errors.New("message type error")
}
l.rev <- msg
} else {
// 处理非IBTPX结构类型消息
// 获取本机信息,以及本机做的工作。 // 获取本机信息,以及本机做的工作。
// 注册信息 // 注册信息
// 分发节点信息 // 分发节点信息
// 接收与sidecar节点相关的信息流 // 接收与sidecar节点相关的信息流
} else {
//转发给路由模块
//l.rout.Route()
l.rev <- msg
} }
return nil return nil
//panic("implement me")
} }
func (l *local) ListenIBTPX() <-chan *pb.Message { func (l *local) ListenIBTPX() <-chan *pb.Message {
...@@ -83,3 +98,7 @@ func (l *local) HandleGetPeerInfoMessage(p port.Port, message *pb.Message) { ...@@ -83,3 +98,7 @@ func (l *local) HandleGetPeerInfoMessage(p port.Port, message *pb.Message) {
l.logger.Error(err) l.logger.Error(err)
} }
} }
//涉及到同步异步的问题。
//一、根据目的地址转发(异步完成);(这种情况其实就是返回一个ack给绑定对应的port dev,还是根据消息类型判断)。
//二、根据消息类型转发(同步情况:需要立马返回结果;异步情况:需要返回一个ack,给绑定对应的port dev,所以找到对应的port dev 很关键!)。
...@@ -30,6 +30,8 @@ type PeerManager interface { ...@@ -30,6 +30,8 @@ type PeerManager interface {
SendWithPort(s port.Port, msg *pb.Message) (*pb.Message, error) SendWithPort(s port.Port, msg *pb.Message) (*pb.Message, error)
GetRemotePeerInfo(id string) (*pb.PeerInfo, error)
Handler Handler
} }
......
...@@ -2,8 +2,6 @@ package peermgr ...@@ -2,8 +2,6 @@ package peermgr
import ( import (
"fmt" "fmt"
router2 "github.com/link33/sidecar/internal/router"
"github.com/link33/sidecar/model/pb"
"testing" "testing"
"time" "time"
...@@ -11,6 +9,8 @@ import ( ...@@ -11,6 +9,8 @@ import (
peer2 "github.com/libp2p/go-libp2p-core/peer" peer2 "github.com/libp2p/go-libp2p-core/peer"
"github.com/link33/sidecar/internal/port" "github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/internal/repo" "github.com/link33/sidecar/internal/repo"
router2 "github.com/link33/sidecar/internal/router"
"github.com/link33/sidecar/model/pb"
"github.com/meshplus/bitxhub-kit/crypto" "github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/crypto/asym" "github.com/meshplus/bitxhub-kit/crypto/asym"
"github.com/meshplus/bitxhub-kit/log" "github.com/meshplus/bitxhub-kit/log"
...@@ -21,16 +21,6 @@ import ( ...@@ -21,16 +21,6 @@ import (
var router router2.Router var router router2.Router
func newSidecar(addr *peer2.AddrInfo, pm PeerManager) port.Port {
rec := make(chan *pb.Message)
return &sidecar{
id: addr.ID.String(),
swarm: pm,
tag: "",
rev: rec,
}
}
func TestNew(t *testing.T) { func TestNew(t *testing.T) {
logger := log.NewWithModule("swarm") logger := log.NewWithModule("swarm")
// test wrong nodePrivKey // test wrong nodePrivKey
......
...@@ -12,6 +12,16 @@ type sidecar struct { ...@@ -12,6 +12,16 @@ type sidecar struct {
rev chan *pb.Message rev chan *pb.Message
} }
func newSidecar(id string, tag string, swarm *Swarm) *sidecar {
rec := make(chan *pb.Message, 10)
return &sidecar{
id: id,
swarm: swarm,
tag: tag,
rev: rec,
}
}
func (s *sidecar) ID() string { func (s *sidecar) ID() string {
return s.id return s.id
} }
...@@ -42,3 +52,7 @@ func (s *sidecar) AsyncSend(msg *pb.Message) error { ...@@ -42,3 +52,7 @@ func (s *sidecar) AsyncSend(msg *pb.Message) error {
func (s *sidecar) ListenIBTPX() <-chan *pb.Message { func (s *sidecar) ListenIBTPX() <-chan *pb.Message {
return s.rev return s.rev
} }
func (s *sidecar) Receive(msg *pb.Message) {
s.rev <- msg
}
...@@ -3,23 +3,19 @@ package peermgr ...@@ -3,23 +3,19 @@ package peermgr
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
"github.com/Rican7/retry" "github.com/Rican7/retry"
"github.com/Rican7/retry/strategy" "github.com/Rican7/retry/strategy"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
crypto2 "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/link33/sidecar/internal/port" "github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/internal/repo" "github.com/link33/sidecar/internal/repo"
"github.com/link33/sidecar/internal/router" "github.com/link33/sidecar/internal/router"
"github.com/link33/sidecar/model/pb" "github.com/link33/sidecar/model/pb"
"github.com/meshplus/bitxhub-kit/crypto" "github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/crypto/asym/ecdsa"
network "github.com/meshplus/go-lightp2p" network "github.com/meshplus/go-lightp2p"
ma "github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
...@@ -208,15 +204,16 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) { ...@@ -208,15 +204,16 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
return return
} }
rec := make(chan *pb.Message) msgHandler(swarm.newSidecar(s.RemotePeerID()), m)
p := &sidecar{ }
id: s.RemotePeerID(),
swarm: swarm,
tag: "",
rev: rec,
}
msgHandler(p, m) func (swarm *Swarm) newSidecar(sidecarID string) *sidecar {
info, err := swarm.GetRemotePeerInfo(sidecarID)
if err != nil {
swarm.logger.Error(err)
}
p := newSidecar(sidecarID, info.GetTag(), swarm)
return p
} }
// 接收其它sidecar节点发过来的交易、请求等。主要是IBTP结构相关数据。 // 接收其它sidecar节点发过来的交易、请求等。主要是IBTP结构相关数据。
...@@ -225,21 +222,17 @@ func (swarm *Swarm) HandleIBTPX(pt port.Port, m *pb.Message) { ...@@ -225,21 +222,17 @@ func (swarm *Swarm) HandleIBTPX(pt port.Port, m *pb.Message) {
if is { if is {
ps, iss := p.(*sidecar) ps, iss := p.(*sidecar)
if iss { if iss {
ps.rev <- m //ps.rev <- m
ps.Receive(m)
} }
} else { } else {
ppt := pt.(*sidecar) ppt := pt.(*sidecar)
swarm.addRemotePort(ppt) swarm.addRemotePort(ppt)
ppt.rev <- m ppt.Receive(m)
} }
} }
func (swarm *Swarm) addRemotePort(ppt *sidecar) { func (swarm *Swarm) addRemotePort(ppt *sidecar) {
info, err := swarm.getRemotePeerInfo(ppt.ID())
if err != nil {
swarm.logger.Error(err)
}
ppt.id = info.GetID()
swarm.add(ppt) swarm.add(ppt)
} }
...@@ -271,14 +264,7 @@ func (swarm *Swarm) Connect(addrInfo *peer.AddrInfo) (string, error) { ...@@ -271,14 +264,7 @@ func (swarm *Swarm) Connect(addrInfo *peer.AddrInfo) (string, error) {
} }
func (swarm *Swarm) addRemotePortByID(id string) { func (swarm *Swarm) addRemotePortByID(id string) {
rec := make(chan *pb.Message) swarm.addRemotePort(swarm.newSidecar(id))
p := &sidecar{
id: id,
swarm: swarm,
tag: "",
rev: rec,
}
swarm.addRemotePort(p)
} }
func (swarm *Swarm) AsyncSendWithPort(s port.Port, msg *pb.Message) error { func (swarm *Swarm) AsyncSendWithPort(s port.Port, msg *pb.Message) error {
...@@ -325,64 +311,6 @@ func (swarm *Swarm) Peers() map[string]*peer.AddrInfo { ...@@ -325,64 +311,6 @@ func (swarm *Swarm) Peers() map[string]*peer.AddrInfo {
return m return m
} }
func convertToLibp2pPrivKey(privateKey crypto.PrivateKey) (crypto2.PrivKey, error) {
ecdsaPrivKey, ok := privateKey.(*ecdsa.PrivateKey)
if !ok {
return nil, fmt.Errorf("convert to libp2p private key: not ecdsa private key")
}
libp2pPrivKey, _, err := crypto2.ECDSAKeyPairFromKey(ecdsaPrivKey.K)
if err != nil {
return nil, err
}
return libp2pPrivKey, nil
}
func loadPeers(peers []string, privateKey crypto2.PrivKey) (string, map[string]*peer.AddrInfo, error) {
var local string
remotes := make(map[string]*peer.AddrInfo)
id, err := peer.IDFromPrivateKey(privateKey)
if err != nil {
return "", nil, err
}
for _, p := range peers {
if strings.HasSuffix(p, id.String()) {
idx := strings.LastIndex(p, "/p2p/")
if idx == -1 {
return "", nil, fmt.Errorf("pid is not existed in bootstrap")
}
local = p[:idx]
} else {
addr, err := AddrToPeerInfo(p)
if err != nil {
return "", nil, fmt.Errorf("wrong network addr: %w", err)
}
remotes[addr.ID.String()] = addr
}
}
if local == "" {
return "", nil, fmt.Errorf("get local addr: no local addr is configured")
}
return local, remotes, nil
}
// AddrToPeerInfo transfer addr to PeerInfo
// addr example: "/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64"
func AddrToPeerInfo(multiAddr string) (*peer.AddrInfo, error) {
maddr, err := ma.NewMultiaddr(multiAddr)
if err != nil {
return nil, err
}
return peer.AddrInfoFromP2pAddr(maddr)
}
func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) { func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) {
msg := pb.Msg(pb.Message_ADDRESS_GET, true, nil) msg := pb.Msg(pb.Message_ADDRESS_GET, true, nil)
ret, err := swarm.SendByID(id.String(), msg) ret, err := swarm.SendByID(id.String(), msg)
...@@ -392,7 +320,7 @@ func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) { ...@@ -392,7 +320,7 @@ func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) {
return string(ret.Payload.Data), nil return string(ret.Payload.Data), nil
} }
func (swarm *Swarm) getRemotePeerInfo(id string) (*pb.PeerInfo, error) { func (swarm *Swarm) GetRemotePeerInfo(id string) (*pb.PeerInfo, error) {
msg := pb.Msg(pb.Message_PEER_INFO_GET, true, nil) msg := pb.Msg(pb.Message_PEER_INFO_GET, true, nil)
ret, err := swarm.SendByID(id, msg) ret, err := swarm.SendByID(id, msg)
if err != nil { if err != nil {
......
...@@ -122,10 +122,10 @@ func (r *router) Route(msg *pb.Message) error { ...@@ -122,10 +122,10 @@ func (r *router) Route(msg *pb.Message) error {
_, to := ibtp.From, ibtp.To _, to := ibtp.From, ibtp.To
if pp, is := r.portMap.Port(to); is { if pp, is := r.portMap.Port(to); is {
mode := ibtpx.Mode mode := ibtpx.Mode
hub, is := r.getHub()
asyncSend := func() error { asyncSend := func() error {
switch mode { switch mode {
case repo.RelayMode: case repo.RelayMode:
hub, is := r.getHub()
if is && !r.isEndorse(ibtpx) { if is && !r.isEndorse(ibtpx) {
return hub.AsyncSend(msg) return hub.AsyncSend(msg)
} else { } else {
...@@ -137,11 +137,9 @@ func (r *router) Route(msg *pb.Message) error { ...@@ -137,11 +137,9 @@ func (r *router) Route(msg *pb.Message) error {
return nil return nil
} }
} }
send := func() (*pb.Message, error) { send := func() (*pb.Message, error) {
switch mode { switch mode {
case repo.RelayMode: case repo.RelayMode:
hub, is := r.getHub()
if is && !r.isEndorse(ibtpx) { if is && !r.isEndorse(ibtpx) {
return hub.Send(msg) return hub.Send(msg)
} else { } else {
......
...@@ -18,3 +18,13 @@ func Msg(typ Message_Type, ok bool, data []byte) *Message { ...@@ -18,3 +18,13 @@ func Msg(typ Message_Type, ok bool, data []byte) *Message {
func (m *Message) IsIbtpRouter() bool { func (m *Message) IsIbtpRouter() bool {
return m.Type == Message_IBTP_SEND || m.Type == Message_IBTP_GET || m.Type == Message_IBTP_RECEIPT_SEND || m.Type == Message_IBTP_RECEIPT_GET return m.Type == Message_IBTP_SEND || m.Type == Message_IBTP_GET || m.Type == Message_IBTP_RECEIPT_SEND || m.Type == Message_IBTP_RECEIPT_GET
} }
func (m *Message) IsLocalMsg() bool {
return m.Type == Message_APPCHAIN_GET || m.Type == Message_RULE_DEPLOY || m.Type == Message_APPCHAIN_REGISTER || m.Type == Message_APPCHAIN_UPDATE
}
func (m *Message) GetIBTPX() (*IBTPX, error) {
ibtpx := &IBTPX{}
err := ibtpx.Unmarshal(m.Payload.Data)
return ibtpx, err
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment