Commit e177761b authored by suyanlong's avatar suyanlong

rename function and Add local function

parent 57845ff1
Pipeline #7962 failed with stages
package appchain
import (
"errors"
"github.com/link33/sidecar/model/pb"
)
......@@ -25,8 +27,26 @@ func (a *appChain) Send(msg *pb.Message) (*pb.Message, error) {
//TODO 调用该执行。
//a.ExecuteIBTP()
//a.Rollback()
panic("implement me")
t := msg.Type
switch {
case t == pb.Message_IBTP_SEND:
ibpx := &pb.IBTPX{}
err := ibpx.Unmarshal(msg.Payload.Data)
if err != nil {
return nil, err
}
ibtp := ibpx.GetIbtp()
ibtp, err = a.ExecuteIBTP(ibtp)
if err != nil {
return nil, err
}
if ibtp == nil { //TODO
return nil, nil
}
return nil, nil
default:
return nil, errors.New("msg error")
}
}
func (a *appChain) AsyncSend(msg *pb.Message) error {
......@@ -39,10 +59,24 @@ func (a *appChain) AsyncSend(msg *pb.Message) error {
// ListenIBTPX 监听IBTPX数据。
func (a *appChain) ListenIBTPX() <-chan *pb.Message {
panic("implement me")
ibtpCh := a.ListenIBTP()
msgCh := make(chan *pb.Message, 10)
func() {
for {
select {
case ibtp := <-ibtpCh:
//TODO ibtp to ibtpx
data, _ := ibtp.Marshal()
msg := pb.Msg(pb.Message_IBTP_SEND, true, data)
msgCh <- msg
case <-a.ctx.Done():
break
}
}
}()
return msgCh
}
// 同一个接口,不同的实现,行为可以不一样。
// 对原始交易背书,回执不需要、查询也不需要、回滚也不需要、回调也不需要。
// 对原始交易背书,交易查询、回执查询、回滚、回调不需要。
// 即只对GetIBTP得到的交易背书。如何映射,
......@@ -119,17 +119,17 @@ func (ex *Exchanger) feedReceipt(receipt *pb.IBTP) {
func (ex *Exchanger) postHandleIBTP(from string, receipt *pb.IBTP) {
if receipt == nil {
retMsg := pb.Msg(pb.Message_IBTP_RECEIPT_SEND, true, nil)
err := ex.peerMgr.AsyncSend(from, retMsg)
err := ex.peerMgr.AsyncSendByID(from, retMsg)
if err != nil {
ex.logger.Errorf("Send back empty ibtp receipt: %s", err.Error())
ex.logger.Errorf("SendByID back empty ibtp receipt: %s", err.Error())
}
return
}
data, _ := receipt.Marshal()
retMsg := pb.Msg(pb.Message_IBTP_RECEIPT_SEND, true, data)
if err := ex.peerMgr.AsyncSend(from, retMsg); err != nil {
ex.logger.Errorf("Send back ibtp receipt: %s", err.Error())
if err := ex.peerMgr.AsyncSendByID(from, retMsg); err != nil {
ex.logger.Errorf("SendByID back ibtp receipt: %s", err.Error())
}
}
......@@ -214,7 +214,7 @@ func (ex *Exchanger) handleNewConnection(dstSidecarID string) {
}{}
loop := func() error {
interchainMeta, err := ex.peerMgr.Send(dstSidecarID, msg)
interchainMeta, err := ex.peerMgr.SendByID(dstSidecarID, msg)
if err != nil {
return err
}
......
......@@ -126,7 +126,7 @@ func (ex *Exchanger) listenAndSendIBTPFromMnt() {
}
if err := retry.Retry(func(attempt uint) error {
if err := ex.sendIBTP(ibtp); err != nil {
ex.logger.Errorf("Send ibtp: %s", err.Error())
ex.logger.Errorf("SendByID ibtp: %s", err.Error())
// if err occurs, try to get new ibtp and resend
ibtpID := ibtp.ID()
if err := retry.Retry(func(attempt uint) error {
......@@ -185,7 +185,7 @@ func (ex *Exchanger) sendIBTP(ibtp *pb.IBTP) error {
case repo.RelayMode:
err := ex.syncer.SendIBTP(ibtp)
if err != nil {
entry.Errorf("Send ibtp to bitxhub: %s", err.Error())
entry.Errorf("SendByID ibtp to bitxhub: %s", err.Error())
return fmt.Errorf("send ibtp to bitxhub: %s", err.Error())
}
case repo.DirectMode:
......@@ -195,7 +195,7 @@ func (ex *Exchanger) sendIBTP(ibtp *pb.IBTP) error {
if err != nil {
panic(fmt.Sprintf("marshal ibtp: %s", err.Error()))
}
msg := pb.Message(pb.Message_IBTP_SEND, true, data)
msg := pb.Msg(pb.Message_IBTP_SEND, true, data)
var dst string
if ibtp.Type == pb.IBTP_INTERCHAIN {
dst = ibtp.To
......@@ -203,8 +203,8 @@ func (ex *Exchanger) sendIBTP(ibtp *pb.IBTP) error {
dst = ibtp.From
}
if err := ex.peerMgr.AsyncSend(dst, msg); err != nil {
ex.logger.Errorf("Send ibtp to sidecar %s: %s", ibtp.ID(), err.Error())
if err := ex.peerMgr.AsyncSendByID(dst, msg); err != nil {
ex.logger.Errorf("SendByID ibtp to sidecar %s: %s", ibtp.ID(), err.Error())
return err
}
......@@ -213,7 +213,7 @@ func (ex *Exchanger) sendIBTP(ibtp *pb.IBTP) error {
ex.logger.Panic(err)
}
}
entry.Info("Send ibtp success from monitor")
entry.Info("SendByID ibtp success from monitor")
return nil
}
......@@ -248,7 +248,7 @@ func (ex *Exchanger) queryIBTP(mode string, id, target string) (*pb.IBTP, bool,
case repo.DirectMode:
// query ibtp from another sidecar
msg := pb.Message(pb.Message_IBTP_GET, true, []byte(id))
result, err := ex.peerMgr.Send(target, msg)
result, err := ex.peerMgr.SendByID(target, msg)
if err != nil {
return nil, false, err
}
......
......@@ -9,6 +9,11 @@ import (
"github.com/sirupsen/logrus"
)
var (
Office = "office"
Test = "test"
)
type local struct {
id peer.ID
privKey crypto.PrivateKey
......@@ -23,7 +28,7 @@ func (l *local) ID() string {
}
func (l *local) Type() string {
return port.Sidecar
return port.Local
}
func (l *local) Name() string {
......@@ -34,17 +39,23 @@ func (l *local) Tag() string {
return l.tag
}
//需要同步处理的数据
func (l *local) Send(msg *pb.Message) (*pb.Message, error) {
// 同步完成
panic("implement me")
}
//需要异步处理的数据
func (l *local) AsyncSend(msg *pb.Message) error {
// 目标to == id 本机,获取信息
// 转发给路由器的。
// 判断数据类型
var to = ""
if to == l.id.String() {
//获取本机信息,以及本机做的工作。
// 获取本机信息,以及本机做的工作。
// 注册信息
// 分发节点信息
// 接收与sidecar节点相关的信息流
} else {
//转发给路由模块
......@@ -59,7 +70,7 @@ func (l *local) ListenIBTPX() <-chan *pb.Message {
return l.rev
}
// sidecar 节点
// HandleGetPeerInfoMessage sidecar 节点
func (l *local) HandleGetPeerInfoMessage(p port.Port, message *pb.Message) {
data := &pb.PeerInfo{
ID: l.ID(),
......
......@@ -37,18 +37,18 @@ func (m *MockPeerManager) EXPECT() *MockPeerManagerMockRecorder {
return m.recorder
}
// AsyncSend mocks base method.
func (m *MockPeerManager) AsyncSend(arg0 string, arg1 *pb.Message) error {
// AsyncSendByID mocks base method.
func (m *MockPeerManager) AsyncSendByID(arg0 string, arg1 *pb.Message) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AsyncSend", arg0, arg1)
ret := m.ctrl.Call(m, "AsyncSendByID", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// AsyncSend indicates an expected call of AsyncSend.
func (mr *MockPeerManagerMockRecorder) AsyncSend(arg0, arg1 interface{}) *gomock.Call {
// AsyncSendByID indicates an expected call of AsyncSendByID.
func (mr *MockPeerManagerMockRecorder) AsyncSendByID(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AsyncSend", reflect.TypeOf((*MockPeerManager)(nil).AsyncSend), arg0, arg1)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AsyncSendByID", reflect.TypeOf((*MockPeerManager)(nil).AsyncSendByID), arg0, arg1)
}
// AsyncSendWithPort mocks base method.
......@@ -151,19 +151,19 @@ func (mr *MockPeerManagerMockRecorder) RegisterMultiMsgHandler(arg0, arg1 interf
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterMultiMsgHandler", reflect.TypeOf((*MockPeerManager)(nil).RegisterMultiMsgHandler), arg0, arg1)
}
// Send mocks base method.
func (m *MockPeerManager) Send(arg0 string, arg1 *pb.Message) (*pb.Message, error) {
// SendByID mocks base method.
func (m *MockPeerManager) SendByID(arg0 string, arg1 *pb.Message) (*pb.Message, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Send", arg0, arg1)
ret := m.ctrl.Call(m, "SendByID", arg0, arg1)
ret0, _ := ret[0].(*pb.Message)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Send indicates an expected call of Send.
func (mr *MockPeerManagerMockRecorder) Send(arg0, arg1 interface{}) *gomock.Call {
// SendByID indicates an expected call of SendByID.
func (mr *MockPeerManagerMockRecorder) SendByID(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockPeerManager)(nil).Send), arg0, arg1)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendByID", reflect.TypeOf((*MockPeerManager)(nil).SendByID), arg0, arg1)
}
// SendWithPort mocks base method.
......
......@@ -22,9 +22,9 @@ type PeerManager interface {
Connect(info *peer.AddrInfo) (string, error)
// AsyncSend sends message to peer with peer info.
AsyncSend(string, *pb.Message) error
AsyncSendByID(string, *pb.Message) error
// Send sends message waiting response
Send(string, *pb.Message) (*pb.Message, error)
SendByID(string, *pb.Message) (*pb.Message, error)
AsyncSendWithPort(port.Port, *pb.Message) error
......
......@@ -89,14 +89,14 @@ func TestSwarm_AsyncSend(t *testing.T) {
_, _, mockSwarm, mockMsg, mockMultiAddr, mockId := prepare(t)
// test with wrong id
err := mockSwarm.AsyncSend("123", mockMsg)
err := mockSwarm.AsyncSendByID("123", mockMsg)
require.NotNil(t, err)
// test in right way
_, err = AddrToPeerInfo(mockMultiAddr)
require.Nil(t, err)
err = mockSwarm.AsyncSend(mockId, mockMsg)
err = mockSwarm.AsyncSendByID(mockId, mockMsg)
require.Nil(t, err)
}
......@@ -104,14 +104,14 @@ func TestSwarm_Send(t *testing.T) {
_, _, mockSwarm, mockMsg, mockMultiAddr, mockId := prepare(t)
// test with wrong id
_, err := mockSwarm.Send("123", mockMsg)
_, err := mockSwarm.SendByID("123", mockMsg)
require.NotNil(t, err)
// test in right way
_, err = AddrToPeerInfo(mockMultiAddr)
require.Nil(t, err)
_, err = mockSwarm.Send(mockId, mockMsg)
_, err = mockSwarm.SendByID(mockId, mockMsg)
require.Nil(t, err)
}
......@@ -301,7 +301,7 @@ func (ms *MockStream) AsyncSend(data []byte) error {
return nil
}
}
return fmt.Errorf("AsyncSend: invalid message type")
return fmt.Errorf("AsyncSendByID: invalid message type")
}
func (ms *MockStream) Send(data []byte) ([]byte, error) {
......@@ -318,7 +318,7 @@ func (ms *MockStream) Send(data []byte) ([]byte, error) {
return nil, nil
}
}
return nil, fmt.Errorf("Send: invalid message type")
return nil, fmt.Errorf("SendByID: invalid message type")
}
func (ms *MockStream) Read(time.Duration) ([]byte, error) {
......@@ -475,7 +475,7 @@ func (mn *MockNetwork) SetMessageHandler(network.MessageHandler) {
// AsyncSend sends message to peer with peer id.
func (mn *MockNetwork) AsyncSend(id string, msg []byte) error {
if len(id) != 46 {
return fmt.Errorf("AsyncSend: wrong id %s", id)
return fmt.Errorf("AsyncSendByID: wrong id %s", id)
}
return nil
}
......@@ -483,7 +483,7 @@ func (mn *MockNetwork) AsyncSend(id string, msg []byte) error {
// Send sends message to peer with peer id waiting response
func (mn *MockNetwork) Send(id string, data []byte) ([]byte, error) {
if len(id) != 46 {
return nil, fmt.Errorf("AsyncSend: wrong id %s", id)
return nil, fmt.Errorf("AsyncSendByID: wrong id %s", id)
}
msg := &pb.Message{}
......@@ -501,7 +501,7 @@ func (mn *MockNetwork) Send(id string, data []byte) ([]byte, error) {
return retData, nil
}
}
return nil, fmt.Errorf("Send: invalid message type")
return nil, fmt.Errorf("SendByID: invalid message type")
}
// Broadcast message to all node
......
......@@ -30,12 +30,12 @@ func (s *sidecar) Tag() string {
// Send 同步发送给绑定的对应的port dev
func (s *sidecar) Send(msg *pb.Message) (*pb.Message, error) {
return s.swarm.Send(s.ID(), msg)
return s.swarm.SendByID(s.ID(), msg)
}
// AsyncSend 异步发送给绑定的对应的port dev
func (s *sidecar) AsyncSend(msg *pb.Message) error {
return s.swarm.AsyncSend(s.ID(), msg)
return s.swarm.AsyncSendByID(s.ID(), msg)
}
// ListenIBTPX 从绑定的对应的port dev接收数据
......
......@@ -102,7 +102,7 @@ func (swarm *Swarm) Start() error {
l := &local{
id: swarm.id,
privKey: swarm.privKey,
tag: "",
tag: Office,
rev: make(chan *pb.Message),
rout: swarm.router,
}
......@@ -289,7 +289,7 @@ func (swarm *Swarm) SendWithPort(s port.Port, msg *pb.Message) (*pb.Message, err
return s.Send(msg)
}
func (swarm *Swarm) AsyncSend(id string, msg *pb.Message) error {
func (swarm *Swarm) AsyncSendByID(id string, msg *pb.Message) error {
data, err := msg.Marshal()
if err != nil {
return fmt.Errorf("marshal message: %w", err)
......@@ -298,7 +298,7 @@ func (swarm *Swarm) AsyncSend(id string, msg *pb.Message) error {
return swarm.p2p.AsyncSend(id, data)
}
func (swarm *Swarm) Send(id string, msg *pb.Message) (*pb.Message, error) {
func (swarm *Swarm) SendByID(id string, msg *pb.Message) (*pb.Message, error) {
data, err := msg.Marshal()
if err != nil {
return nil, err
......@@ -385,36 +385,19 @@ func AddrToPeerInfo(multiAddr string) (*peer.AddrInfo, error) {
func (swarm *Swarm) getRemoteAddress(id peer.ID) (string, error) {
msg := pb.Msg(pb.Message_ADDRESS_GET, true, nil)
reqData, err := msg.Marshal()
if err != nil {
return "", err
}
retData, err := swarm.p2p.Send(id.String(), reqData) //同步获取数据
ret, err := swarm.SendByID(id.String(), msg)
if err != nil {
return "", fmt.Errorf("sync send: %w", err)
}
ret := &pb.Message{}
if err := ret.Unmarshal(retData); err != nil {
return "", err
}
return string(ret.Payload.Data), nil
}
func (swarm *Swarm) getRemotePeerInfo(id string) (*pb.PeerInfo, error) {
msg := pb.Msg(pb.Message_PEER_INFO_GET, true, nil)
reqData, err := msg.Marshal()
ret, err := swarm.SendByID(id, msg)
if err != nil {
return nil, err
}
retData, err := swarm.p2p.Send(id, reqData) //同步获取数据
if err != nil {
return nil, fmt.Errorf("sync send: %w", err)
}
ret := &pb.Message{}
if err := ret.Unmarshal(retData); err != nil {
return nil, err
}
info := &pb.PeerInfo{}
err = info.Unmarshal(ret.Payload.Data)
return info, err
......
......@@ -13,6 +13,7 @@ const (
Hub = "hub" //Hub: 同步数据,同步元数据等。
Sidecar = "sidecar" //SideCar节点
Appchain = "appchain" //区块链客户端
Local = "local"
)
// 设计一套port管理机制:包括各种的管理模块。
......@@ -61,6 +62,7 @@ type PortMap struct {
peerPort map[string]Port
appchainPort map[string]Port
hubPort Port
local Port
}
func NewPortMap() *PortMap {
......@@ -92,6 +94,10 @@ func (p *PortMap) add(pt Port) {
if p.hubPort == nil {
p.hubPort = pt
}
case Local:
if p.local == nil {
p.local = pt
}
case Appchain:
p.appchainPort[pt.ID()] = pt
case Sidecar:
......@@ -108,12 +114,26 @@ func (p *PortMap) GetHub() (Port, bool) {
return p.hubPort, true
}
func (p *PortMap) GetLocal() (Port, bool) {
p.rw.RLocker()
defer p.rw.RUnlock()
if p.local == nil {
return nil, false
}
return p.local, true
}
func (p *PortMap) Port(id string) (Port, bool) {
p.rw.RLocker()
defer p.rw.RUnlock()
if p.hubPort.ID() == id {
return p.hubPort, true
}
if p.local.ID() == id {
return p.local, true
}
if pt, is := p.peerPort[id]; is {
return pt, is
}
......@@ -185,6 +205,10 @@ func (p *PortMap) remove(pt Port) {
if p.hubPort.ID() == pt.ID() {
p.hubPort = nil
}
case Local:
if p.local.ID() == pt.ID() {
p.local = nil
}
case Appchain:
delete(p.appchainPort, pt.ID())
case Sidecar:
......
......@@ -55,10 +55,19 @@ func (r *router) Add(p port.Port) error {
select {
case msg := <-c:
go func() {
if msg.IsIbtpRouter() {
err := r.Route(msg)
if err != nil {
r.logger.Error(err)
}
} else {
if local, is := r.portMap.GetLocal(); is {
err := local.AsyncSend(msg)
if err != nil {
r.logger.Error(err)
}
}
}
}()
case <-r.ctx.Done():
break
......@@ -157,7 +166,7 @@ func (r *router) Route(msg *pb.Message) error {
r.logger.Error(err)
return err
}
r.Route(ret) //返回值,重新路由; send返回值需要调换from , to;
return r.Route(ret) //返回值,重新路由; send返回值需要调换from , to;
//d,_ := ret.Marshal()
//msg = pb.Msg(pb.Message_IBTP_RECEIPT_SEND,true,d)
case pb.Message_IBTP_RECEIPT_SEND:
......
......@@ -14,3 +14,7 @@ func Msg(typ Message_Type, ok bool, data []byte) *Message {
},
}
}
func (m *Message) IsIbtpRouter() bool {
return m.Type == Message_IBTP_SEND || m.Type == Message_IBTP_GET || m.Type == Message_IBTP_RECEIPT_SEND || m.Type == Message_IBTP_RECEIPT_GET
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment