Commit 29d525a1 authored by suyanlong's avatar suyanlong

Adjust route package file

parent 34086771
Pipeline #7976 canceled with stages
......@@ -95,6 +95,21 @@ func (mr *MockPeerManagerMockRecorder) FindProviders(id interface{}) *gomock.Cal
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindProviders", reflect.TypeOf((*MockPeerManager)(nil).FindProviders), id)
}
// GetRemotePeerInfo mocks base method.
func (m *MockPeerManager) GetRemotePeerInfo(id string) (*pb.PeerInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetRemotePeerInfo", id)
ret0, _ := ret[0].(*pb.PeerInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetRemotePeerInfo indicates an expected call of GetRemotePeerInfo.
func (mr *MockPeerManagerMockRecorder) GetRemotePeerInfo(id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRemotePeerInfo", reflect.TypeOf((*MockPeerManager)(nil).GetRemotePeerInfo), id)
}
// Provider mocks base method.
func (m *MockPeerManager) Provider(arg0 string, arg1 bool) error {
m.ctrl.T.Helper()
......
package router
import (
"context"
"errors"
"strings"
"github.com/link33/sidecar/internal/checker"
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/internal/repo"
"github.com/link33/sidecar/model/pb"
"github.com/sirupsen/logrus"
)
type router struct {
logger logrus.FieldLogger
ctx context.Context
cancel context.CancelFunc
checker checker.Checker
portMap *port.PortMap
methodMap map[string]routeMethod
}
type routeMethod func([]string) []port.Port
func NewRouter(logger logrus.FieldLogger) Router {
ctx, cancel := context.WithCancel(context.Background())
return &router{
logger: logger,
ctx: ctx,
cancel: cancel,
portMap: port.NewPortMap(),
}
}
func (r *router) Start() error {
r.methodMap["single"] = r.Single
r.methodMap["multicast"] = r.Multicast
r.methodMap["broadcast"] = r.Broadcast
r.methodMap["official"] = r.Official
return nil
}
func (r *router) Stop() error {
r.cancel()
return nil
}
//TODO
func (r *router) Add(p port.Port) error {
r.portMap.Add(p)
go func() {
c := p.ListenIBTPX()
for {
select {
case msg := <-c:
go func() {
if msg.IsIbtpRouter() {
err := r.Route(msg)
if err != nil {
r.logger.Error(err)
}
} else {
if local, is := r.portMap.GetLocal(); is {
err := local.AsyncSend(msg)
if err != nil {
r.logger.Error(err)
}
}
}
}()
case <-r.ctx.Done():
break
}
}
}()
return nil
}
func (r *router) Adds(p []port.Port) error {
for _, pt := range p {
r.Add(pt)
}
return nil
}
func (r *router) Load(key string) (value port.Port, ok bool) {
return r.portMap.Load(key)
}
func (r *router) Remove(p port.Port) error {
r.portMap.Remove(p)
return nil
}
//路由的有那些数据?需要区分!!!
func (r *router) Route(msg *pb.Message) error {
if msg == nil {
r.logger.Error("msg = nil")
}
ibtpx := &pb.IBTPX{}
err := ibtpx.Unmarshal(msg.Payload.Data)
if err != nil {
return err
}
if !r.isRouter(ibtpx) {
return nil
}
//本网关签名
if !r.isSign(ibtpx) {
r.sign(ibtpx)
}
data, err := ibtpx.Marshal()
if err != nil {
return err
}
msg.Payload.Data = data
ibtp := ibtpx.Ibtp
err = r.checker.Check(ibtp)
if err != nil {
r.logger.Error("check ibtp: %w", err)
return err
}
_, to := ibtp.From, ibtp.To
if pp, is := r.portMap.Port(to); is {
mode := ibtpx.Mode
hub, is := r.getHub()
asyncSend := func() error {
switch mode {
case repo.RelayMode:
if is && !r.isEndorse(ibtpx) {
return hub.AsyncSend(msg)
} else {
return pp.AsyncSend(msg)
}
case repo.DirectMode:
return pp.AsyncSend(msg)
default:
return nil
}
}
send := func() (*pb.Message, error) {
switch mode {
case repo.RelayMode:
if is && !r.isEndorse(ibtpx) {
return hub.Send(msg)
} else {
return pp.Send(msg)
}
case repo.DirectMode:
return pp.Send(msg)
default:
return nil, errors.New("error mode type")
}
}
switch {
case pp.Type() == port.Sidecar:
return pp.AsyncSend(msg) //转发给其它的sidecar节点或者本身local节点。
case pp.Type() == port.Hub: //发给hub appchain TODO 本机找到的appchain只能是自己的appchain
return pp.AsyncSend(msg)
case pp.Type() == port.Appchain: //TODO 本机找到的appchain只能是自己的appchain
switch msg.Type {
case pb.Message_IBTP_SEND:
ret, err := send()
if err != nil {
r.logger.Error(err)
return err
}
return r.Route(ret) //返回值,重新路由; send返回值需要调换from , to;
//d,_ := ret.Marshal()
//msg = pb.Msg(pb.Message_IBTP_RECEIPT_SEND,true,d)
case pb.Message_IBTP_RECEIPT_SEND:
err = asyncSend()
if err != nil {
r.logger.Error(err)
return err
}
case pb.Message_IBTP_GET:
case pb.Message_IBTP_RECEIPT_GET:
default:
return nil
}
default:
return nil
}
}
//规则判断 转发给其它的sidecar节点
method := strings.ToLower(ibtpx.RouteMethod)
if md, is := r.methodMap[method]; is {
ports := md(ibtpx.RouteMethodArg)
if len(ports) == 0 {
r.firstRoute(msg)
}
for _, p := range ports {
_ = p.AsyncSend(msg)
}
} else {
r.firstRoute(msg)
}
return nil
}
func (r *router) isRouter(ibtpx *pb.IBTPX) bool {
mode := ibtpx.Mode
//本网关已签名、中继链已背书、to是本网关内部的appchain,即顺利通过并转发,否则打断。
if !((r.isSign(ibtpx) && mode == repo.RelayMode && r.isEndorse(ibtpx)) || !r.isSign(ibtpx)) {
return false
}
return true
}
func (r *router) firstRoute(msg *pb.Message) {
panic("implement me")
}
func (r *router) getHub() (port.Port, bool) {
return r.portMap.GetHub()
}
func (r *router) isSign(ibtpx *pb.IBTPX) bool {
panic("implement me")
}
func (r *router) sign(ibtpx *pb.IBTPX) {
panic("implement me")
}
// hub endorse
func (r *router) isEndorse(ibtpx *pb.IBTPX) bool {
panic("implement me")
}
func (r *router) HandlerMethod() {}
func (r *router) Single([]string) []port.Port {
panic("implement me")
}
func (r *router) Multicast([]string) []port.Port {
panic("implement me")
}
func (r *router) Broadcast([]string) []port.Port {
panic("implement me")
}
func (r *router) Official([]string) []port.Port {
panic("implement me")
}
func (r *router) Send(id string, msg *pb.Message) (*pb.Message, error) {
if p, is := r.portMap.Port(id); is {
return p.Send(msg)
}
return nil, errors.New("id error!")
}
func (r *router) AsyncSend(id string, msg *pb.Message) error {
if p, is := r.portMap.Port(id); is {
return p.AsyncSend(msg)
}
return errors.New("id error!")
}
func (r *router) InPut(ibtp *pb.IBTP) chan *pb.IBTP {
panic("implement me")
}
func (r *router) OutPut(ibtp *pb.IBTP) chan *pb.IBTP {
panic("implement me")
}
type RouteMethod interface {
Single()
Multicast()
Broadcast()
Official()
}
package router
import (
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/model/pb"
)
//go:generate mockgen -destination mock_router/mock_router.go -package mock_router -source interface.go
type Router interface {
// Start starts the router module
Start() error
// Stop
Stop() error
Add(p port.Port) error
Adds(p []port.Port) error
//Route sends ibtp to the union pier in target relay chain
Route(ibtp *pb.Message) error
Load(key string) (value port.Port, ok bool)
//InPut(ibtp *pb.IBTP) chan *pb.IBTP
//OutPut(ibtp *pb.IBTP) chan *pb.IBTP
//代替上面两个方法
//Router(ibtp *pb.IBTP) chan *pb.IBTP
}
//路由规则,根据路由表,rules接口, 这个放到交易内部。可以让用户决定。并且签名。
// 1、广播
// 2、单一路由,指定路由标识。
// 3、p2p模式的路由,
// 4、中继路由
// 5、定制化:
//验证规则
//1、收集多个节点
//2、收集指定节点
//3、收集共识节点
// 即满足多个条件
//路由规则优先级:
//1、用户交易内部的路由规则最高。
//2、用户在程序设定。
//3、系统默认。
// Code generated by MockGen. DO NOT EDIT.
// Source: router.go
// Source: interface.go
// Package mock_router is a generated GoMock package.
package mock_router
......
package router
import (
"context"
"errors"
"strings"
"github.com/link33/sidecar/internal/checker"
"github.com/link33/sidecar/internal/port"
"github.com/link33/sidecar/internal/repo"
"github.com/link33/sidecar/model/pb"
"github.com/sirupsen/logrus"
)
//go:generate mockgen -destination mock_router/mock_router.go -package mock_router -source router.go
type Router interface {
// Start starts the router module
Start() error
type router struct {
logger logrus.FieldLogger
ctx context.Context
cancel context.CancelFunc
checker checker.Checker
portMap *port.PortMap
methodMap map[string]routeMethod
}
type routeMethod func([]string) []port.Port
func NewRouter(logger logrus.FieldLogger) Router {
ctx, cancel := context.WithCancel(context.Background())
return &router{
logger: logger,
ctx: ctx,
cancel: cancel,
portMap: port.NewPortMap(),
}
}
func (r *router) Start() error {
r.methodMap["single"] = r.Single
r.methodMap["multicast"] = r.Multicast
r.methodMap["broadcast"] = r.Broadcast
r.methodMap["official"] = r.Official
return nil
}
func (r *router) Stop() error {
r.cancel()
return nil
}
//TODO
func (r *router) Add(p port.Port) error {
r.portMap.Add(p)
go func() {
c := p.ListenIBTPX()
for {
select {
case msg := <-c:
go func() {
if msg.IsIbtpRouter() {
err := r.Route(msg)
if err != nil {
r.logger.Error(err)
}
} else {
if local, is := r.portMap.GetLocal(); is {
err := local.AsyncSend(msg)
if err != nil {
r.logger.Error(err)
}
}
}
}()
case <-r.ctx.Done():
break
}
}
}()
return nil
}
func (r *router) Adds(p []port.Port) error {
for _, pt := range p {
r.Add(pt)
}
return nil
}
func (r *router) Load(key string) (value port.Port, ok bool) {
return r.portMap.Load(key)
}
func (r *router) Remove(p port.Port) error {
r.portMap.Remove(p)
return nil
}
//路由的有那些数据?需要区分!!!
func (r *router) Route(msg *pb.Message) error {
if msg == nil {
r.logger.Error("msg = nil")
}
ibtpx := &pb.IBTPX{}
err := ibtpx.Unmarshal(msg.Payload.Data)
if err != nil {
return err
}
if !r.isRouter(ibtpx) {
return nil
}
//本网关签名
if !r.isSign(ibtpx) {
r.sign(ibtpx)
}
data, err := ibtpx.Marshal()
if err != nil {
return err
}
msg.Payload.Data = data
ibtp := ibtpx.Ibtp
err = r.checker.Check(ibtp)
if err != nil {
r.logger.Error("check ibtp: %w", err)
return err
}
_, to := ibtp.From, ibtp.To
if pp, is := r.portMap.Port(to); is {
mode := ibtpx.Mode
hub, is := r.getHub()
asyncSend := func() error {
switch mode {
case repo.RelayMode:
if is && !r.isEndorse(ibtpx) {
return hub.AsyncSend(msg)
} else {
return pp.AsyncSend(msg)
}
case repo.DirectMode:
return pp.AsyncSend(msg)
default:
return nil
}
}
send := func() (*pb.Message, error) {
switch mode {
case repo.RelayMode:
if is && !r.isEndorse(ibtpx) {
return hub.Send(msg)
} else {
return pp.Send(msg)
}
case repo.DirectMode:
return pp.Send(msg)
default:
return nil, errors.New("error mode type")
}
}
switch {
case pp.Type() == port.Sidecar:
return pp.AsyncSend(msg) //转发给其它的sidecar节点或者本身local节点。
case pp.Type() == port.Hub: //发给hub appchain TODO 本机找到的appchain只能是自己的appchain
return pp.AsyncSend(msg)
case pp.Type() == port.Appchain: //TODO 本机找到的appchain只能是自己的appchain
switch msg.Type {
case pb.Message_IBTP_SEND:
ret, err := send()
if err != nil {
r.logger.Error(err)
return err
}
return r.Route(ret) //返回值,重新路由; send返回值需要调换from , to;
//d,_ := ret.Marshal()
//msg = pb.Msg(pb.Message_IBTP_RECEIPT_SEND,true,d)
case pb.Message_IBTP_RECEIPT_SEND:
err = asyncSend()
if err != nil {
r.logger.Error(err)
return err
}
case pb.Message_IBTP_GET:
case pb.Message_IBTP_RECEIPT_GET:
default:
return nil
}
default:
return nil
}
}
//规则判断 转发给其它的sidecar节点
method := strings.ToLower(ibtpx.RouteMethod)
if md, is := r.methodMap[method]; is {
ports := md(ibtpx.RouteMethodArg)
if len(ports) == 0 {
r.firstRoute(msg)
}
for _, p := range ports {
_ = p.AsyncSend(msg)
}
} else {
r.firstRoute(msg)
}
return nil
}
func (r *router) isRouter(ibtpx *pb.IBTPX) bool {
mode := ibtpx.Mode
//本网关已签名、中继链已背书、to是本网关内部的appchain,即顺利通过并转发,否则打断。
if !((r.isSign(ibtpx) && mode == repo.RelayMode && r.isEndorse(ibtpx)) || !r.isSign(ibtpx)) {
return false
}
return true
}
func (r *router) firstRoute(msg *pb.Message) {
panic("implement me")
}
func (r *router) getHub() (port.Port, bool) {
return r.portMap.GetHub()
}
func (r *router) isSign(ibtpx *pb.IBTPX) bool {
panic("implement me")
}
// Stop
Stop() error
func (r *router) sign(ibtpx *pb.IBTPX) {
panic("implement me")
}
Add(p port.Port) error
// hub endorse
func (r *router) isEndorse(ibtpx *pb.IBTPX) bool {
panic("implement me")
}
Adds(p []port.Port) error
func (r *router) HandlerMethod() {}
//Route sends ibtp to the union pier in target relay chain
Route(ibtp *pb.Message) error
func (r *router) Single([]string) []port.Port {
panic("implement me")
}
Load(key string) (value port.Port, ok bool)
func (r *router) Multicast([]string) []port.Port {
//InPut(ibtp *pb.IBTP) chan *pb.IBTP
//OutPut(ibtp *pb.IBTP) chan *pb.IBTP
//代替上面两个方法
//Router(ibtp *pb.IBTP) chan *pb.IBTP
panic("implement me")
}
//路由规则,根据路由表,rules接口, 这个放到交易内部。可以让用户决定。并且签名。
// 1、广播
// 2、单一路由,指定路由标识。
// 3、p2p模式的路由,
// 4、中继路由
// 5、定制化:
func (r *router) Broadcast([]string) []port.Port {
panic("implement me")
}
//验证规则
//1、收集多个节点
//2、收集指定节点
//3、收集共识节点
// 即满足多个条件
func (r *router) Official([]string) []port.Port {
panic("implement me")
}
//路由规则优先级:
//1、用户交易内部的路由规则最高。
//2、用户在程序设定。
//3、系统默认。
func (r *router) Send(id string, msg *pb.Message) (*pb.Message, error) {
if p, is := r.portMap.Port(id); is {
return p.Send(msg)
}
return nil, errors.New("id error!")
}
func (r *router) AsyncSend(id string, msg *pb.Message) error {
if p, is := r.portMap.Port(id); is {
return p.AsyncSend(msg)
}
return errors.New("id error!")
}
func (r *router) InPut(ibtp *pb.IBTP) chan *pb.IBTP {
panic("implement me")
}
func (r *router) OutPut(ibtp *pb.IBTP) chan *pb.IBTP {
panic("implement me")
}
type RouteMethod interface {
Single()
Multicast()
Broadcast()
Official()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment