swarm.go 8.81 KB
Newer Older
suyanlong's avatar
suyanlong committed
1 2 3 4 5 6 7 8 9 10
package peermgr

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/Rican7/retry"
	"github.com/Rican7/retry/strategy"
11
	cid "github.com/ipfs/go-cid"
suyanlong's avatar
suyanlong committed
12
	"github.com/libp2p/go-libp2p-core/peer"
suyanlong's avatar
suyanlong committed
13 14 15 16
	"github.com/meshplus/bitxhub-kit/crypto"
	network "github.com/meshplus/go-lightp2p"
	"github.com/sirupsen/logrus"

suyanlong's avatar
suyanlong committed
17 18 19 20
	"gitlab.33.cn/link33/sidecar/internal/port"
	"gitlab.33.cn/link33/sidecar/internal/repo"
	"gitlab.33.cn/link33/sidecar/internal/router"
	"gitlab.33.cn/link33/sidecar/model/pb"
suyanlong's avatar
suyanlong committed
21 22 23 24 25 26 27 28 29 30
)

const (
	protocolID          = "/sidecar/1.0.0" // magic protocol
	defaultProvidersNum = 1
)

var _ PeerManager = (*Swarm)(nil)

type Swarm struct {
suyanlong's avatar
suyanlong committed
31 32 33 34
	p2p    network.Network
	logger logrus.FieldLogger
	peers  map[string]*peer.AddrInfo
	router router.Router
suyanlong's avatar
suyanlong committed
35 36 37 38 39 40 41 42

	providers       uint64
	msgHandlers     sync.Map
	connectHandlers []ConnectHandler

	lock   sync.RWMutex
	ctx    context.Context
	cancel context.CancelFunc
suyanlong's avatar
suyanlong committed
43 44

	localPeer *localPeer
suyanlong's avatar
suyanlong committed
45 46
}

suyanlong's avatar
suyanlong committed
47
func New(config *repo.Config, router router.Router, nodePrivKey crypto.PrivateKey, privKey crypto.PrivateKey, providers uint64, logger logrus.FieldLogger) (*Swarm, error) {
suyanlong's avatar
suyanlong committed
48 49 50 51
	libp2pPrivKey, err := convertToLibp2pPrivKey(nodePrivKey)
	if err != nil {
		return nil, fmt.Errorf("convert private key: %w", err)
	}
suyanlong's avatar
suyanlong committed
52
	var ll string
suyanlong's avatar
suyanlong committed
53 54
	var remotes map[string]*peer.AddrInfo

suyanlong's avatar
suyanlong committed
55 56 57 58
	id, err := peer.IDFromPrivateKey(libp2pPrivKey)
	if err != nil {
		panic(err)
	}
suyanlong's avatar
suyanlong committed
59

suyanlong's avatar
suyanlong committed
60 61
	local := newLocal(id, privKey)

suyanlong's avatar
suyanlong committed
62
	ll, remotes, err = loadPeers(config.Peer.Peers, libp2pPrivKey)
63 64 65 66
	if err != nil {
		return nil, fmt.Errorf("load peers: %w", err)
	}

suyanlong's avatar
suyanlong committed
67
	protocolIDs := []string{protocolID}
suyanlong's avatar
suyanlong committed
68
	p2p, err := network.New(
suyanlong's avatar
suyanlong committed
69
		network.WithLocalAddr(ll),
suyanlong's avatar
suyanlong committed
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
		network.WithPrivateKey(libp2pPrivKey),
		network.WithProtocolIDs(protocolIDs),
		network.WithLogger(logger),
	)
	if err != nil {
		return nil, fmt.Errorf("create p2p: %w", err)
	}

	if providers == 0 {
		providers = defaultProvidersNum
	}

	ctx, cancel := context.WithCancel(context.Background())

	return &Swarm{
		providers: providers,
		p2p:       p2p,
		logger:    logger,
		peers:     remotes,
		ctx:       ctx,
		cancel:    cancel,
suyanlong's avatar
suyanlong committed
91
		router:    router,
suyanlong's avatar
suyanlong committed
92
		localPeer: local,
suyanlong's avatar
suyanlong committed
93 94 95
	}, nil
}

suyanlong's avatar
suyanlong committed
96 97 98 99 100 101 102
func (swarm *Swarm) add(p port.Port) {
	err := swarm.router.Add(p)
	if err != nil {
		swarm.logger.Error(err)
	}
}

suyanlong's avatar
suyanlong committed
103
func (swarm *Swarm) Start() error {
suyanlong's avatar
suyanlong committed
104 105 106 107
	err := swarm.router.Start()
	if err != nil {
		return err
	}
suyanlong's avatar
suyanlong committed
108
	swarm.add(swarm.localPeer)
suyanlong's avatar
suyanlong committed
109
	swarm.p2p.SetMessageHandler(swarm.handleMessage)
suyanlong's avatar
suyanlong committed
110
	if err := swarm.RegisterMsgHandler(pb.Message_ADDRESS_GET, swarm.localPeer.handleGetAddressMessage); err != nil {
suyanlong's avatar
suyanlong committed
111 112
		return fmt.Errorf("register get address msg handler: %w", err)
	}
suyanlong's avatar
suyanlong committed
113
	if err := swarm.RegisterMsgHandler(pb.Message_PEER_INFO_GET, swarm.localPeer.HandleGetPeerInfoMessage); err != nil {
114 115 116 117 118 119 120
		return fmt.Errorf("register get peer info msg handler: %w", err)
	}
	if err := swarm.RegisterMultiMsgHandler([]pb.Message_Type{
		pb.Message_IBTP_SEND,
		pb.Message_IBTP_GET,
		pb.Message_IBTP_RECEIPT_SEND,
		pb.Message_IBTP_RECEIPT_GET,
suyanlong's avatar
suyanlong committed
121
	}, swarm.handleIBTPX); err != nil {
122 123 124
		return fmt.Errorf("register handle IBTPX msg handler: %w", err)
	}

suyanlong's avatar
suyanlong committed
125 126 127 128
	if err := swarm.p2p.Start(); err != nil {
		return fmt.Errorf("p2p module start: %w", err)
	}

suyanlong's avatar
suyanlong committed
129
	// need to connect one other sidecar at least
suyanlong's avatar
suyanlong committed
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
	wg := &sync.WaitGroup{}
	wg.Add(1)

	for id, addr := range swarm.peers {
		go func(id string, addr *peer.AddrInfo) {
			if err := retry.Retry(func(attempt uint) error {
				if err := swarm.p2p.Connect(*addr); err != nil {
					if attempt != 0 && attempt%5 == 0 {
						swarm.logger.WithFields(logrus.Fields{
							"node":  id,
							"error": err,
						}).Error("Connect failed")
					}
					return err
				}

				address, err := swarm.getRemoteAddress(addr.ID)
				if err != nil {
					swarm.logger.WithFields(logrus.Fields{
						"node":  id,
						"error": err,
					}).Error("Get remote address failed")
					return err
				}

				swarm.logger.WithFields(logrus.Fields{
					"node":     id,
					"address:": address,
				}).Info("Connect successfully")

suyanlong's avatar
suyanlong committed
160
				swarm.addRemotePortByID(id)
suyanlong's avatar
suyanlong committed
161

suyanlong's avatar
suyanlong committed
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
				swarm.lock.RLock()
				defer swarm.lock.RUnlock()
				for _, handler := range swarm.connectHandlers {
					go func(connectHandler ConnectHandler, address string) {
						connectHandler(address)
					}(handler, address)
				}
				wg.Done()
				return nil
			},
				strategy.Wait(1*time.Second),
			); err != nil {
				swarm.logger.Error(err)
			}
		}(id, addr)
	}

	wg.Wait()

	return nil
}

184
// 注册异步处理数据的方法
suyanlong's avatar
suyanlong committed
185 186 187 188 189 190 191
func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
	m := &pb.Message{}
	if err := m.Unmarshal(data); err != nil {
		swarm.logger.Error(err)
		return
	}

192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
	handler, ok := swarm.msgHandlers.Load(m.Type)
	if !ok {
		swarm.logger.WithFields(logrus.Fields{
			"error": fmt.Errorf("can't handle msg[type: %v]", m.Type),
			"type":  m.Type.String(),
		}).Error("Handle message")
		return
	}

	msgHandler, ok := handler.(MessageHandler)
	if !ok {
		swarm.logger.WithFields(logrus.Fields{
			"error": fmt.Errorf("invalid handler for msg [type: %v]", m.Type),
			"type":  m.Type.String(),
		}).Error("Handle message")
		return
	}

210 211
	msgHandler(swarm.newSidecar(s.RemotePeerID()), m)
}
suyanlong's avatar
suyanlong committed
212

213
func (swarm *Swarm) newSidecar(sidecarID string) *sidecar {
suyanlong's avatar
suyanlong committed
214
	info, err := swarm.getRemotePeerInfo(sidecarID)
215 216 217 218 219
	if err != nil {
		swarm.logger.Error(err)
	}
	p := newSidecar(sidecarID, info.GetTag(), swarm)
	return p
220 221 222
}

// 接收其它sidecar节点发过来的交易、请求等。主要是IBTP结构相关数据。
suyanlong's avatar
suyanlong committed
223
func (swarm *Swarm) handleIBTPX(pt port.Port, m *pb.Message) {
224 225 226 227
	p, is := swarm.router.Load(pt.ID())
	if is {
		ps, iss := p.(*sidecar)
		if iss {
suyanlong's avatar
suyanlong committed
228
			// ps.rev <- m
229
			ps.Receive(m)
suyanlong's avatar
suyanlong committed
230
		}
231 232
	} else {
		ppt := pt.(*sidecar)
suyanlong's avatar
suyanlong committed
233
		swarm.addRemotePort(ppt)
234
		ppt.Receive(m)
suyanlong's avatar
suyanlong committed
235 236 237
	}
}

suyanlong's avatar
suyanlong committed
238 239 240 241
func (swarm *Swarm) addRemotePort(ppt *sidecar) {
	swarm.add(ppt)
}

suyanlong's avatar
suyanlong committed
242 243 244 245 246
func (swarm *Swarm) Stop() error {
	if err := swarm.p2p.Stop(); err != nil {
		return err
	}
	swarm.cancel()
suyanlong's avatar
suyanlong committed
247 248 249 250
	err := swarm.router.Stop()
	if err != nil {
		return err
	}
suyanlong's avatar
suyanlong committed
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
	return nil
}

func (swarm *Swarm) Connect(addrInfo *peer.AddrInfo) (string, error) {
	err := swarm.p2p.Connect(*addrInfo)
	if err != nil {
		return "", err
	}
	address, err := swarm.getRemoteAddress(addrInfo.ID)
	if err != nil {
		return "", err
	}
	swarm.logger.WithFields(logrus.Fields{
		"address":  address,
		"addrInfo": addrInfo,
	}).Info("Connect peer")

suyanlong's avatar
suyanlong committed
268 269 270 271 272 273
	id := addrInfo.ID.String()
	swarm.addRemotePortByID(id)
	return id, nil
}

func (swarm *Swarm) addRemotePortByID(id string) {
274
	swarm.addRemotePort(swarm.newSidecar(id))
suyanlong's avatar
suyanlong committed
275 276
}

277
func (swarm *Swarm) AsyncSendWithPort(s port.Port, msg *pb.Message) error {
suyanlong's avatar
suyanlong committed
278 279 280
	return s.AsyncSend(msg)
}

281
func (swarm *Swarm) SendWithPort(s port.Port, msg *pb.Message) (*pb.Message, error) {
suyanlong's avatar
suyanlong committed
282 283 284
	return s.Send(msg)
}

285
func (swarm *Swarm) AsyncSendByID(id string, msg *pb.Message) error {
suyanlong's avatar
suyanlong committed
286 287 288 289 290 291 292 293
	data, err := msg.Marshal()
	if err != nil {
		return fmt.Errorf("marshal message: %w", err)
	}

	return swarm.p2p.AsyncSend(id, data)
}

294
func (swarm *Swarm) SendByID(id string, msg *pb.Message) (*pb.Message, error) {
suyanlong's avatar
suyanlong committed
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
	data, err := msg.Marshal()
	if err != nil {
		return nil, err
	}

	ret, err := swarm.p2p.Send(id, data)
	if err != nil {
		return nil, fmt.Errorf("sync send: %w", err)
	}

	m := &pb.Message{}
	if err := m.Unmarshal(ret); err != nil {
		return nil, err
	}

	return m, nil
}

func (swarm *Swarm) Peers() map[string]*peer.AddrInfo {
	m := make(map[string]*peer.AddrInfo)
	for id, addr := range swarm.peers {
		m[id] = addr
	}
	return m
}

func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) {
suyanlong's avatar
suyanlong committed
322
	msg := pb.Msg(pb.Message_ADDRESS_GET, true, nil)
323
	ret, err := swarm.SendByID(id.String(), msg)
suyanlong's avatar
suyanlong committed
324 325 326 327 328 329
	if err != nil {
		return "", err
	}
	return string(ret.Payload.Data), nil
}

suyanlong's avatar
suyanlong committed
330
func (swarm *Swarm) getRemotePeerInfo(id string) (*pb.PeerInfo, error) {
suyanlong's avatar
suyanlong committed
331
	msg := pb.Msg(pb.Message_PEER_INFO_GET, true, nil)
332
	ret, err := swarm.SendByID(id, msg)
333 334 335 336 337 338 339 340
	if err != nil {
		return nil, err
	}
	info := &pb.PeerInfo{}
	err = info.Unmarshal(ret.Payload.Data)
	return info, err
}

suyanlong's avatar
suyanlong committed
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
func (swarm *Swarm) FindProviders(id string) (string, error) {
	format := cid.V0Builder{}
	toCid, err := format.Sum([]byte(id))
	if err != nil {
		return "", err
	}
	providers, err := swarm.p2p.FindProvidersAsync(toCid.String(), int(swarm.providers))
	if err != nil {
		swarm.logger.WithFields(logrus.Fields{
			"id": id,
		}).Error("Not find providers")
		return "", err
	}

	for provider := range providers {
		swarm.logger.WithFields(logrus.Fields{
			"id":          id,
			"cid":         toCid.String(),
			"provider_id": provider.ID.String(),
		}).Info("Find provider")

		sidecarId, err := swarm.Connect(&provider)
		if err != nil {
suyanlong's avatar
suyanlong committed
364 365 366 367
			swarm.logger.WithFields(logrus.Fields{
				"peerId": sidecarId,
				"cid":    provider.ID.String(),
			}).Error("connect error: ", err)
suyanlong's avatar
suyanlong committed
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
			continue
		}
		return sidecarId, nil
	}

	swarm.logger.WithFields(logrus.Fields{
		"id":  id,
		"cid": toCid.String(),
	}).Warning("No providers found") // TODO add error
	return "", nil
}

func (swarm *Swarm) Provider(key string, passed bool) error {
	return swarm.p2p.Provider(key, passed)
}