package router import ( "context" "errors" "strings" blseth "github.com/herumi/bls-eth-go-binary/bls" "github.com/meshplus/bitxhub-kit/crypto" "github.com/sirupsen/logrus" "gitlab.33.cn/link33/sidecar/internal/checker" "gitlab.33.cn/link33/sidecar/internal/port" "gitlab.33.cn/link33/sidecar/internal/repo" "gitlab.33.cn/link33/sidecar/model/pb" "gitlab.33.cn/link33/sidecar/pkg/crypto/bls" ) type router struct { logger logrus.FieldLogger ctx context.Context cancel context.CancelFunc checker checker.Checker portMap *port.PortMap methodMap map[string]routeMethod privateKey crypto.PrivateKey hubPublicKey crypto.PublicKey } type routeMethod func([]string) []port.Port func NewRouter(privateKey crypto.PrivateKey, logger logrus.FieldLogger) Router { ctx, cancel := context.WithCancel(context.Background()) return &router{ logger: logger, ctx: ctx, cancel: cancel, portMap: port.NewPortMap(), methodMap: map[string]routeMethod{}, privateKey: privateKey, } } func (r *router) Start() error { r.methodMap[Single] = r.Single r.methodMap[Multicast] = r.Multicast r.methodMap[Broadcast] = r.Broadcast r.methodMap[Official] = r.Official r.methodMap[Random] = r.Random return nil } func (r *router) Stop() error { r.cancel() return nil } // TODO func (r *router) Add(p port.Port) error { r.portMap.Add(p) go func() { c := p.ListenIBTPX() for { select { case msg := <-c: go func() { if msg.IsIbtpRouter() { err := r.Route(msg) if err != nil { r.logger.Error(err) } } else { if local, is := r.portMap.GetLocal(); is { err := local.AsyncSend(msg) if err != nil { r.logger.Error(err) } } } }() case <-r.ctx.Done(): break } } }() return nil } func (r *router) Adds(p []port.Port) error { for _, pt := range p { r.Add(pt) } return nil } func (r *router) Load(key string) (value port.Port, ok bool) { return r.portMap.Load(key) } func (r *router) Remove(p port.Port) error { r.portMap.Remove(p) return nil } // 路由的有那些数据?需要区分!!! func (r *router) Route(msg *pb.Message) error { if msg == nil { r.logger.Error("msg = nil") } ibtpx := &pb.IBTPX{} err := ibtpx.Unmarshal(msg.Payload.Data) if err != nil { return err } if !r.isRouter(ibtpx) { return nil } // 本网关签名 if !r.isSign(ibtpx) { err := r.sign(ibtpx) if err != nil { return err } } data, err := ibtpx.Marshal() if err != nil { return err } msg.Payload.Data = data ibtp := ibtpx.Ibtp err = r.checker.Check(ibtp) if err != nil { r.logger.Error("check ibtp: %w", err) return err } _, to := ibtp.From, ibtp.To if pp, is := r.portMap.Port(to); is { mode := ibtpx.Mode hub, is := r.getHub() asyncSend := func() error { switch mode { case repo.RelayMode: if is && !r.isEndorse(ibtpx) { return hub.AsyncSend(msg) } else { return pp.AsyncSend(msg) } case repo.DirectMode: return pp.AsyncSend(msg) default: return nil } } send := func() (*pb.Message, error) { switch mode { case repo.RelayMode: if is && !r.isEndorse(ibtpx) { return hub.Send(msg) } else { return pp.Send(msg) } case repo.DirectMode: return pp.Send(msg) default: return nil, errors.New("error mode type") } } switch { case pp.Type() == port.Sidecar: return pp.AsyncSend(msg) // 转发给其它的sidecar节点或者本身local节点。 case pp.Type() == port.Hub: // 发给hub appchain TODO 本机找到的appchain只能是自己的appchain return pp.AsyncSend(msg) case pp.Type() == port.Appchain: // TODO 本机找到的appchain只能是自己的appchain switch msg.Type { case pb.Message_IBTP_SEND: ret, err := send() if err != nil { r.logger.Error(err) return err } return r.Route(ret) // 返回值,重新路由; send返回值需要调换from , to; // d,_ := ret.Marshal() // msg = pb.Msg(pb.Message_IBTP_RECEIPT_SEND,true,d) case pb.Message_IBTP_RECEIPT_SEND: err = asyncSend() if err != nil { r.logger.Error(err) return err } case pb.Message_IBTP_GET: case pb.Message_IBTP_RECEIPT_GET: default: return nil } default: return nil } } // 规则判断 转发给其它的sidecar节点 method := strings.ToLower(ibtpx.RouteMethod) if md, is := r.methodMap[method]; is { ports := md(ibtpx.RouteMethodArg) if len(ports) == 0 { r.firstRoute(msg) } for _, p := range ports { _ = p.AsyncSend(msg) } } else { r.firstRoute(msg) } return nil } func (r *router) isRouter(ibtpx *pb.IBTPX) bool { mode := ibtpx.Mode // 本网关已签名、中继链已背书、to是本网关内部的appchain,即顺利通过并转发,否则打断。 if !((r.isSign(ibtpx) && mode == repo.RelayMode && r.isEndorse(ibtpx)) || !r.isSign(ibtpx)) { return false } return true } // TODO func (r *router) firstRoute(msg *pb.Message) { p := r.portMap.RandRouterPort() _ = p.AsyncSend(msg) return } func (r *router) getHub() (port.Port, bool) { return r.portMap.GetHub() } func (r *router) isSign(ibtpx *pb.IBTPX) bool { return r.verify(ibtpx) } func (r *router) sign(ibtpx *pb.IBTPX) error { p, ok := r.privateKey.(*bls.PrivateKey) if ok { sign := p.SignByte(ibtpx.FrontPart()) if len(ibtpx.RouteSign) > 0 && ibtpx.RouteSign[0] != "" { var aggSign = bls.Sign{} if err := aggSign.DeserializeHexStr(ibtpx.RouteSign[0]); err != nil { return err } aggSign.Add(sign) ibtpx.RouteSign[0] = aggSign.SerializeToHexStr() } else { if ibtpx.RouteSign == nil { ibtpx.RouteSign = make([]string, 1) } ibtpx.RouteSign[0] = sign.SerializeToHexStr() } //TODO append publicKey ibtpx.RouteSign = append(ibtpx.RouteSign, p.GetPublicKey().SerializeToHexStr()) } else { hash := ibtpx.Hash() sign, err := r.privateKey.Sign(hash.Bytes()) if err != nil { return err } ibtpx.RouteSign = append(ibtpx.RouteSign, string(sign)) } return nil } // hub endorse func (r *router) isEndorse(ibtpx *pb.IBTPX) bool { return r.verify(ibtpx) } func (r *router) verify(ibtpx *pb.IBTPX) bool { if p, ok := r.privateKey.(*bls.PrivateKey); ok { pkStr := p.GetPublicKey().SerializeToHexStr() if len(ibtpx.RouteSign) > 0 { for _, val := range ibtpx.RouteSign[1:] { if val == pkStr { return true } } } return false } return ibtpx.Verify(r.hubPublicKey.Verify) } func (r *router) aggVerify(ibtpx *pb.IBTPX) bool { aggSign := &bls.Sign{} if len(ibtpx.RouteSign) > 0 { err := aggSign.DeserializeHexStr(ibtpx.RouteSign[0]) if err != nil { return false } pks := &blseth.PublicKey{} for _, val := range ibtpx.RouteSign[1:] { var pub = &blseth.PublicKey{} _ = pub.DeserializeHexStr(val) pks.Add(pub) } return aggSign.VerifyByte(pks, ibtpx.FrontPart()) } else { return false } } func (r *router) HandlerMethod() {} func (r *router) Single(ids []string) []port.Port { return r.portMap.RouterPortByID(ids) } func (r *router) Multicast(ids []string) []port.Port { return r.portMap.RouterPortByID(ids) } func (r *router) Broadcast([]string) []port.Port { return r.portMap.AllRouterPort() } func (r *router) Random([]string) []port.Port { return []port.Port{r.portMap.RandRouterPort()} } func (r *router) Official(tags []string) []port.Port { var ps []port.Port for _, tag := range tags { p := r.portMap.RouterPortByTag(tag) ps = append(ps, p...) } return ps } func (r *router) Send(id string, msg *pb.Message) (*pb.Message, error) { if p, is := r.portMap.Port(id); is { return p.Send(msg) } return nil, errors.New("id error!") } func (r *router) AsyncSend(id string, msg *pb.Message) error { if p, is := r.portMap.Port(id); is { return p.AsyncSend(msg) } return errors.New("id error!") } func (r *router) InPut(ibtp *pb.IBTP) chan *pb.IBTP { panic("implement me") } func (r *router) OutPut(ibtp *pb.IBTP) chan *pb.IBTP { panic("implement me") } type RouteMethod interface { Single([]string) []port.Port Multicast([]string) []port.Port Broadcast([]string) []port.Port Official([]string) []port.Port Random([]string) []port.Port }