package peermgr

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/Rican7/retry"
	"github.com/Rican7/retry/strategy"
	cid "github.com/ipfs/go-cid"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/meshplus/bitxhub-kit/crypto"
	network "github.com/meshplus/go-lightp2p"
	"github.com/sirupsen/logrus"

	"gitlab.33.cn/link33/sidecar/internal/port"
	"gitlab.33.cn/link33/sidecar/internal/repo"
	"gitlab.33.cn/link33/sidecar/internal/router"
	"gitlab.33.cn/link33/sidecar/model/pb"
)

const (
	protocolID          = "/sidecar/1.0.0" // magic protocol
	defaultProvidersNum = 1
)

var _ PeerManager = (*Swarm)(nil)

type Swarm struct {
	p2p    network.Network
	logger logrus.FieldLogger
	peers  map[string]*peer.AddrInfo
	router router.Router

	providers       uint64
	msgHandlers     sync.Map
	connectHandlers []ConnectHandler

	lock   sync.RWMutex
	ctx    context.Context
	cancel context.CancelFunc

	localPeer *localPeer
}

func New(config *repo.Config, router router.Router, nodePrivKey crypto.PrivateKey, privKey crypto.PrivateKey, providers uint64, logger logrus.FieldLogger) (*Swarm, error) {
	libp2pPrivKey, err := convertToLibp2pPrivKey(nodePrivKey)
	if err != nil {
		return nil, fmt.Errorf("convert private key: %w", err)
	}
	var ll string
	var remotes map[string]*peer.AddrInfo

	id, err := peer.IDFromPrivateKey(libp2pPrivKey)
	if err != nil {
		panic(err)
	}

	local := newLocal(id, privKey)

	ll, remotes, err = loadPeers(config.Peer.Peers, libp2pPrivKey)
	if err != nil {
		return nil, fmt.Errorf("load peers: %w", err)
	}

	protocolIDs := []string{protocolID}
	p2p, err := network.New(
		network.WithLocalAddr(ll),
		network.WithPrivateKey(libp2pPrivKey),
		network.WithProtocolIDs(protocolIDs),
		network.WithLogger(logger),
	)
	if err != nil {
		return nil, fmt.Errorf("create p2p: %w", err)
	}

	if providers == 0 {
		providers = defaultProvidersNum
	}

	ctx, cancel := context.WithCancel(context.Background())

	return &Swarm{
		providers: providers,
		p2p:       p2p,
		logger:    logger,
		peers:     remotes,
		ctx:       ctx,
		cancel:    cancel,
		router:    router,
		localPeer: local,
	}, nil
}

func (swarm *Swarm) add(p port.Port) {
	err := swarm.router.Add(p)
	if err != nil {
		swarm.logger.Error(err)
	}
}

func (swarm *Swarm) Start() error {
	err := swarm.router.Start()
	if err != nil {
		return err
	}
	swarm.add(swarm.localPeer)
	swarm.p2p.SetMessageHandler(swarm.handleMessage)
	if err := swarm.RegisterMsgHandler(pb.Message_ADDRESS_GET, swarm.localPeer.handleGetAddressMessage); err != nil {
		return fmt.Errorf("register get address msg handler: %w", err)
	}
	if err := swarm.RegisterMsgHandler(pb.Message_PEER_INFO_GET, swarm.localPeer.HandleGetPeerInfoMessage); err != nil {
		return fmt.Errorf("register get peer info msg handler: %w", err)
	}
	if err := swarm.RegisterMultiMsgHandler([]pb.Message_Type{
		pb.Message_IBTP_SEND,
		pb.Message_IBTP_GET,
		pb.Message_IBTP_RECEIPT_SEND,
		pb.Message_IBTP_RECEIPT_GET,
	}, swarm.handleIBTPX); err != nil {
		return fmt.Errorf("register handle IBTPX msg handler: %w", err)
	}

	if err := swarm.p2p.Start(); err != nil {
		return fmt.Errorf("p2p module start: %w", err)
	}

	// need to connect one other sidecar at least
	wg := &sync.WaitGroup{}
	wg.Add(1)

	for id, addr := range swarm.peers {
		go func(id string, addr *peer.AddrInfo) {
			if err := retry.Retry(func(attempt uint) error {
				if err := swarm.p2p.Connect(*addr); err != nil {
					if attempt != 0 && attempt%5 == 0 {
						swarm.logger.WithFields(logrus.Fields{
							"node":  id,
							"error": err,
						}).Error("Connect failed")
					}
					return err
				}

				address, err := swarm.getRemoteAddress(addr.ID)
				if err != nil {
					swarm.logger.WithFields(logrus.Fields{
						"node":  id,
						"error": err,
					}).Error("Get remote address failed")
					return err
				}

				swarm.logger.WithFields(logrus.Fields{
					"node":     id,
					"address:": address,
				}).Info("Connect successfully")

				swarm.addRemotePortByID(id)

				swarm.lock.RLock()
				defer swarm.lock.RUnlock()
				for _, handler := range swarm.connectHandlers {
					go func(connectHandler ConnectHandler, address string) {
						connectHandler(address)
					}(handler, address)
				}
				wg.Done()
				return nil
			},
				strategy.Wait(1*time.Second),
			); err != nil {
				swarm.logger.Error(err)
			}
		}(id, addr)
	}

	wg.Wait()

	return nil
}

// 注册异步处理数据的方法
func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
	m := &pb.Message{}
	if err := m.Unmarshal(data); err != nil {
		swarm.logger.Error(err)
		return
	}

	handler, ok := swarm.msgHandlers.Load(m.Type)
	if !ok {
		swarm.logger.WithFields(logrus.Fields{
			"error": fmt.Errorf("can't handle msg[type: %v]", m.Type),
			"type":  m.Type.String(),
		}).Error("Handle message")
		return
	}

	msgHandler, ok := handler.(MessageHandler)
	if !ok {
		swarm.logger.WithFields(logrus.Fields{
			"error": fmt.Errorf("invalid handler for msg [type: %v]", m.Type),
			"type":  m.Type.String(),
		}).Error("Handle message")
		return
	}

	msgHandler(swarm.newSidecar(s.RemotePeerID()), m)
}

func (swarm *Swarm) newSidecar(sidecarID string) *sidecar {
	info, err := swarm.getRemotePeerInfo(sidecarID)
	if err != nil {
		swarm.logger.Error(err)
	}
	p := newSidecar(sidecarID, info.GetTag(), swarm)
	return p
}

// 接收其它sidecar节点发过来的交易、请求等。主要是IBTP结构相关数据。
func (swarm *Swarm) handleIBTPX(pt port.Port, m *pb.Message) {
	p, is := swarm.router.Load(pt.ID())
	if is {
		ps, iss := p.(*sidecar)
		if iss {
			// ps.rev <- m
			ps.Receive(m)
		}
	} else {
		ppt := pt.(*sidecar)
		swarm.addRemotePort(ppt)
		ppt.Receive(m)
	}
}

func (swarm *Swarm) addRemotePort(ppt *sidecar) {
	swarm.add(ppt)
}

func (swarm *Swarm) Stop() error {
	if err := swarm.p2p.Stop(); err != nil {
		return err
	}
	swarm.cancel()
	err := swarm.router.Stop()
	if err != nil {
		return err
	}
	return nil
}

func (swarm *Swarm) Connect(addrInfo *peer.AddrInfo) (string, error) {
	err := swarm.p2p.Connect(*addrInfo)
	if err != nil {
		return "", err
	}
	address, err := swarm.getRemoteAddress(addrInfo.ID)
	if err != nil {
		return "", err
	}
	swarm.logger.WithFields(logrus.Fields{
		"address":  address,
		"addrInfo": addrInfo,
	}).Info("Connect peer")

	id := addrInfo.ID.String()
	swarm.addRemotePortByID(id)
	return id, nil
}

func (swarm *Swarm) addRemotePortByID(id string) {
	swarm.addRemotePort(swarm.newSidecar(id))
}

func (swarm *Swarm) AsyncSendWithPort(s port.Port, msg *pb.Message) error {
	return s.AsyncSend(msg)
}

func (swarm *Swarm) SendWithPort(s port.Port, msg *pb.Message) (*pb.Message, error) {
	return s.Send(msg)
}

func (swarm *Swarm) AsyncSendByID(id string, msg *pb.Message) error {
	data, err := msg.Marshal()
	if err != nil {
		return fmt.Errorf("marshal message: %w", err)
	}

	return swarm.p2p.AsyncSend(id, data)
}

func (swarm *Swarm) SendByID(id string, msg *pb.Message) (*pb.Message, error) {
	data, err := msg.Marshal()
	if err != nil {
		return nil, err
	}

	ret, err := swarm.p2p.Send(id, data)
	if err != nil {
		return nil, fmt.Errorf("sync send: %w", err)
	}

	m := &pb.Message{}
	if err := m.Unmarshal(ret); err != nil {
		return nil, err
	}

	return m, nil
}

func (swarm *Swarm) Peers() map[string]*peer.AddrInfo {
	m := make(map[string]*peer.AddrInfo)
	for id, addr := range swarm.peers {
		m[id] = addr
	}
	return m
}

func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) {
	msg := pb.Msg(pb.Message_ADDRESS_GET, true, nil)
	ret, err := swarm.SendByID(id.String(), msg)
	if err != nil {
		return "", err
	}
	return string(ret.Payload.Data), nil
}

func (swarm *Swarm) getRemotePeerInfo(id string) (*pb.PeerInfo, error) {
	msg := pb.Msg(pb.Message_PEER_INFO_GET, true, nil)
	ret, err := swarm.SendByID(id, msg)
	if err != nil {
		return nil, err
	}
	info := &pb.PeerInfo{}
	err = info.Unmarshal(ret.Payload.Data)
	return info, err
}

func (swarm *Swarm) FindProviders(id string) (string, error) {
	format := cid.V0Builder{}
	toCid, err := format.Sum([]byte(id))
	if err != nil {
		return "", err
	}
	providers, err := swarm.p2p.FindProvidersAsync(toCid.String(), int(swarm.providers))
	if err != nil {
		swarm.logger.WithFields(logrus.Fields{
			"id": id,
		}).Error("Not find providers")
		return "", err
	}

	for provider := range providers {
		swarm.logger.WithFields(logrus.Fields{
			"id":          id,
			"cid":         toCid.String(),
			"provider_id": provider.ID.String(),
		}).Info("Find provider")

		sidecarId, err := swarm.Connect(&provider)
		if err != nil {
			swarm.logger.WithFields(logrus.Fields{
				"peerId": sidecarId,
				"cid":    provider.ID.String(),
			}).Error("connect error: ", err)
			continue
		}
		return sidecarId, nil
	}

	swarm.logger.WithFields(logrus.Fields{
		"id":  id,
		"cid": toCid.String(),
	}).Warning("No providers found") // TODO add error
	return "", nil
}

func (swarm *Swarm) Provider(key string, passed bool) error {
	return swarm.p2p.Provider(key, passed)
}