Commit 9f3137c3 authored by suyanlong's avatar suyanlong

1. Add stream Port.

2. Update package. 3. Fixed p2p bug.
parent 5fd7fb8b
...@@ -52,7 +52,7 @@ require ( ...@@ -52,7 +52,7 @@ require (
github.com/meshplus/bitxhub-core v1.3.1-0.20210524071255-789fd9ab501c github.com/meshplus/bitxhub-core v1.3.1-0.20210524071255-789fd9ab501c
github.com/meshplus/bitxhub-kit v1.2.1-0.20210524063043-9afae78ac098 github.com/meshplus/bitxhub-kit v1.2.1-0.20210524063043-9afae78ac098
github.com/meshplus/bitxid v0.0.0-20210412025850-e0eaf0f9063a github.com/meshplus/bitxid v0.0.0-20210412025850-e0eaf0f9063a
github.com/meshplus/go-lightp2p v0.0.0-20200817105923-6b3aee40fa54 github.com/meshplus/go-lightp2p v0.0.0-20210617153734-471d08b829f8
github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/go-homedir v1.1.0
github.com/mitchellh/mapstructure v1.4.2 github.com/mitchellh/mapstructure v1.4.2
github.com/multiformats/go-multiaddr v0.3.0 github.com/multiformats/go-multiaddr v0.3.0
...@@ -74,6 +74,7 @@ require ( ...@@ -74,6 +74,7 @@ require (
require ( require (
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible // indirect github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible // indirect
github.com/benbjohnson/clock v1.0.1 // indirect
github.com/casbin/casbin/v2 v2.37.0 // indirect github.com/casbin/casbin/v2 v2.37.0 // indirect
github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect
...@@ -118,6 +119,7 @@ require ( ...@@ -118,6 +119,7 @@ require (
github.com/libp2p/go-libp2p-autonat v0.2.3 // indirect github.com/libp2p/go-libp2p-autonat v0.2.3 // indirect
github.com/libp2p/go-libp2p-blankhost v0.1.6 // indirect github.com/libp2p/go-libp2p-blankhost v0.1.6 // indirect
github.com/libp2p/go-libp2p-circuit v0.2.2 // indirect github.com/libp2p/go-libp2p-circuit v0.2.2 // indirect
github.com/libp2p/go-libp2p-connmgr v0.2.3 // indirect
github.com/libp2p/go-libp2p-crypto v0.1.0 // indirect github.com/libp2p/go-libp2p-crypto v0.1.0 // indirect
github.com/libp2p/go-libp2p-discovery v0.4.0 // indirect github.com/libp2p/go-libp2p-discovery v0.4.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.8.2 // indirect github.com/libp2p/go-libp2p-kad-dht v0.8.2 // indirect
......
...@@ -49,6 +49,8 @@ github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6l ...@@ -49,6 +49,8 @@ github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6l
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/benbjohnson/clock v1.0.1 h1:lVM1R/o5khtrr7t3qAr+sS6uagZOP+7iprc7gS3V9CE=
github.com/benbjohnson/clock v1.0.1/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
...@@ -155,6 +157,7 @@ github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0 ...@@ -155,6 +157,7 @@ github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc= github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc=
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
...@@ -541,6 +544,7 @@ github.com/libp2p/go-libp2p v0.5.0/go.mod h1:Os7a5Z3B+ErF4v7zgIJ7nBHNu2LYt8ZMLkT ...@@ -541,6 +544,7 @@ github.com/libp2p/go-libp2p v0.5.0/go.mod h1:Os7a5Z3B+ErF4v7zgIJ7nBHNu2LYt8ZMLkT
github.com/libp2p/go-libp2p v0.6.1/go.mod h1:CTFnWXogryAHjXAKEbOf1OWY+VeAP3lDMZkfEI5sT54= github.com/libp2p/go-libp2p v0.6.1/go.mod h1:CTFnWXogryAHjXAKEbOf1OWY+VeAP3lDMZkfEI5sT54=
github.com/libp2p/go-libp2p v0.7.0/go.mod h1:hZJf8txWeCduQRDC/WSqBGMxaTHCOYHt2xSU1ivxn0k= github.com/libp2p/go-libp2p v0.7.0/go.mod h1:hZJf8txWeCduQRDC/WSqBGMxaTHCOYHt2xSU1ivxn0k=
github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniVO7zIHGMw= github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniVO7zIHGMw=
github.com/libp2p/go-libp2p v0.8.1/go.mod h1:QRNH9pwdbEBpx5DTJYg+qxcVaDMAz3Ee/qDKwXujH5o=
github.com/libp2p/go-libp2p v0.8.2/go.mod h1:NQDA/F/qArMHGe0J7sDScaKjW8Jh4y/ozQqBbYJ+BnA= github.com/libp2p/go-libp2p v0.8.2/go.mod h1:NQDA/F/qArMHGe0J7sDScaKjW8Jh4y/ozQqBbYJ+BnA=
github.com/libp2p/go-libp2p v0.8.3/go.mod h1:EsH1A+8yoWK+L4iKcbPYu6MPluZ+CHWI9El8cTaefiM= github.com/libp2p/go-libp2p v0.8.3/go.mod h1:EsH1A+8yoWK+L4iKcbPYu6MPluZ+CHWI9El8cTaefiM=
github.com/libp2p/go-libp2p v0.9.2 h1:5rViLwtjkaEWcIBbk6oII39cVjPTElo3F78SSLf9yho= github.com/libp2p/go-libp2p v0.9.2 h1:5rViLwtjkaEWcIBbk6oII39cVjPTElo3F78SSLf9yho=
...@@ -559,6 +563,8 @@ github.com/libp2p/go-libp2p-circuit v0.1.4/go.mod h1:CY67BrEjKNDhdTk8UgBX1Y/H5c3 ...@@ -559,6 +563,8 @@ github.com/libp2p/go-libp2p-circuit v0.1.4/go.mod h1:CY67BrEjKNDhdTk8UgBX1Y/H5c3
github.com/libp2p/go-libp2p-circuit v0.2.1/go.mod h1:BXPwYDN5A8z4OEY9sOfr2DUQMLQvKt/6oku45YUmjIo= github.com/libp2p/go-libp2p-circuit v0.2.1/go.mod h1:BXPwYDN5A8z4OEY9sOfr2DUQMLQvKt/6oku45YUmjIo=
github.com/libp2p/go-libp2p-circuit v0.2.2 h1:87RLabJ9lrhoiSDDZyCJ80ZlI5TLJMwfyoGAaWXzWqA= github.com/libp2p/go-libp2p-circuit v0.2.2 h1:87RLabJ9lrhoiSDDZyCJ80ZlI5TLJMwfyoGAaWXzWqA=
github.com/libp2p/go-libp2p-circuit v0.2.2/go.mod h1:nkG3iE01tR3FoQ2nMm06IUrCpCyJp1Eo4A1xYdpjfs4= github.com/libp2p/go-libp2p-circuit v0.2.2/go.mod h1:nkG3iE01tR3FoQ2nMm06IUrCpCyJp1Eo4A1xYdpjfs4=
github.com/libp2p/go-libp2p-connmgr v0.2.3 h1:v7skKI9n+0obPpzMIO6aIlOSdQOmhxTf40cbpzqaGMQ=
github.com/libp2p/go-libp2p-connmgr v0.2.3/go.mod h1:Gqjg29zI8CwXX21zRxy6gOg8VYu3zVerJRt2KyktzH4=
github.com/libp2p/go-libp2p-core v0.5.6 h1:IxFH4PmtLlLdPf4fF/i129SnK/C+/v8WEX644MxhC48= github.com/libp2p/go-libp2p-core v0.5.6 h1:IxFH4PmtLlLdPf4fF/i129SnK/C+/v8WEX644MxhC48=
github.com/libp2p/go-libp2p-core v0.5.6/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX0bJvM49Ykaswo= github.com/libp2p/go-libp2p-core v0.5.6/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX0bJvM49Ykaswo=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ= github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
...@@ -706,8 +712,11 @@ github.com/meshplus/bitxhub-model v1.1.1/go.mod h1:lUl9vPZXM9tP+B0ABRW/2eOW/6KCm ...@@ -706,8 +712,11 @@ github.com/meshplus/bitxhub-model v1.1.1/go.mod h1:lUl9vPZXM9tP+B0ABRW/2eOW/6KCm
github.com/meshplus/bitxhub-model v1.2.1-0.20210524063354-5d48e2fee178/go.mod h1:vwJ+sHPUyA2JELmUUDBol+7zA+7GcqutxzqXjsN0QLA= github.com/meshplus/bitxhub-model v1.2.1-0.20210524063354-5d48e2fee178/go.mod h1:vwJ+sHPUyA2JELmUUDBol+7zA+7GcqutxzqXjsN0QLA=
github.com/meshplus/bitxid v0.0.0-20210412025850-e0eaf0f9063a h1:c4ESPDa60Jd4zfzZIGGTyzhfaVM3vKN+xV2G9BwIDGQ= github.com/meshplus/bitxid v0.0.0-20210412025850-e0eaf0f9063a h1:c4ESPDa60Jd4zfzZIGGTyzhfaVM3vKN+xV2G9BwIDGQ=
github.com/meshplus/bitxid v0.0.0-20210412025850-e0eaf0f9063a/go.mod h1:vAldSRfDe2Qo7exsSTbchVmZWXPY7fhWQrRw18QJHho= github.com/meshplus/bitxid v0.0.0-20210412025850-e0eaf0f9063a/go.mod h1:vAldSRfDe2Qo7exsSTbchVmZWXPY7fhWQrRw18QJHho=
github.com/meshplus/go-libp2p-cert v0.0.0-20210125114242-7d9ed2eaaccd/go.mod h1:rS4AYMqKypLn2IPEnHICP//V2v16SZo4CWUbwMdihl0=
github.com/meshplus/go-lightp2p v0.0.0-20200817105923-6b3aee40fa54 h1:5Ip5AB7SxxQHg5SRtf2cCOI2wy1p75MQB12soPtPyf8= github.com/meshplus/go-lightp2p v0.0.0-20200817105923-6b3aee40fa54 h1:5Ip5AB7SxxQHg5SRtf2cCOI2wy1p75MQB12soPtPyf8=
github.com/meshplus/go-lightp2p v0.0.0-20200817105923-6b3aee40fa54/go.mod h1:G89UJaeqCQFxFdp8wzy1AdKfMtDEhpySau0pjDNeeaw= github.com/meshplus/go-lightp2p v0.0.0-20200817105923-6b3aee40fa54/go.mod h1:G89UJaeqCQFxFdp8wzy1AdKfMtDEhpySau0pjDNeeaw=
github.com/meshplus/go-lightp2p v0.0.0-20210617153734-471d08b829f8 h1:HU/rDVIT84vGM2l2NKIWVGQRzI4jnEr6kzlbHV/VMcY=
github.com/meshplus/go-lightp2p v0.0.0-20210617153734-471d08b829f8/go.mod h1:ARRXc4Oo0iZjsj5CB+LTmcMNxK2c1fug7XsEZ7yPJvg=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.12/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.1.12/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.28/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= github.com/miekg/dns v1.1.28/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
......
...@@ -7,8 +7,16 @@ import ( ...@@ -7,8 +7,16 @@ import (
) )
func (swarm *Swarm) RegisterMultiMsgHandler(messageTypes []pb.Message_Type, handler MessageHandler) error { func (swarm *Swarm) RegisterMultiMsgHandler(messageTypes []pb.Message_Type, handler MessageHandler) error {
return swarm.registerMultiMsgHandler(messageTypes, handler)
}
func (swarm *Swarm) RegisterMultiMsgHandlerPort(messageTypes []pb.Message_Type, handler MessageHandlerPort) error {
return swarm.registerMultiMsgHandler(messageTypes, handler)
}
func (swarm *Swarm) registerMultiMsgHandler(messageTypes []pb.Message_Type, handler interface{}) error {
for _, typ := range messageTypes { for _, typ := range messageTypes {
if err := swarm.RegisterMsgHandler(typ, handler); err != nil { if err := swarm.registerMsgHandler(typ, handler); err != nil {
return err return err
} }
} }
...@@ -16,6 +24,10 @@ func (swarm *Swarm) RegisterMultiMsgHandler(messageTypes []pb.Message_Type, hand ...@@ -16,6 +24,10 @@ func (swarm *Swarm) RegisterMultiMsgHandler(messageTypes []pb.Message_Type, hand
} }
func (swarm *Swarm) RegisterMsgHandler(messageType pb.Message_Type, handler MessageHandler) error { func (swarm *Swarm) RegisterMsgHandler(messageType pb.Message_Type, handler MessageHandler) error {
return swarm.registerMsgHandler(messageType, handler)
}
func (swarm *Swarm) registerMsgHandler(messageType pb.Message_Type, handler interface{}) error {
if handler == nil { if handler == nil {
return fmt.Errorf("register msg handler: empty handler") return fmt.Errorf("register msg handler: empty handler")
} }
...@@ -30,8 +42,9 @@ func (swarm *Swarm) RegisterMsgHandler(messageType pb.Message_Type, handler Mess ...@@ -30,8 +42,9 @@ func (swarm *Swarm) RegisterMsgHandler(messageType pb.Message_Type, handler Mess
} }
func (swarm *Swarm) RegisterConnectHandler(handler ConnectHandler) error { func (swarm *Swarm) RegisterConnectHandler(handler ConnectHandler) error {
swarm.lock.Lock() //swarm.lock.Lock()
defer swarm.lock.Unlock() //defer swarm.lock.Unlock()
swarm.connectHandlers = append(swarm.connectHandlers, handler) //swarm.connectHandlers = append(swarm.connectHandlers, handler)
return nil //return nil
panic("Unimplemented!")
} }
...@@ -120,6 +120,10 @@ func (l *localPeer) handleGetAddressMessage(p port.Port, message *pb.Message) { ...@@ -120,6 +120,10 @@ func (l *localPeer) handleGetAddressMessage(p port.Port, message *pb.Message) {
} }
} }
func (l *localPeer) doAck(p port.Port, message *pb.Message) {
l.logger.Info("port ID:", p.ID(), "msg type: ", message.Type)
}
// 涉及到同步异步的问题。 // 涉及到同步异步的问题。
// 一、根据目的地址转发(异步完成);(这种情况其实就是返回一个ack给绑定对应的port dev,还是根据消息类型判断)。 // 一、根据目的地址转发(异步完成);(这种情况其实就是返回一个ack给绑定对应的port dev,还是根据消息类型判断)。
// 二、根据消息类型转发(同步情况:需要立马返回结果;异步情况:需要返回一个ack,给绑定对应的port dev,所以找到对应的port dev 很关键!)。 // 二、根据消息类型转发(同步情况:需要立马返回结果;异步情况:需要返回一个ack,给绑定对应的port dev,所以找到对应的port dev 很关键!)。
...@@ -9,8 +9,9 @@ import ( ...@@ -9,8 +9,9 @@ import (
) )
type ( type (
MessageHandler func(port.Port, *pb.Message) MessageHandler func(port.Port, *pb.Message)
ConnectHandler func(string) MessageHandlerPort func(func() port.Port, *pb.Message)
ConnectHandler func(string)
) )
//go:generate mockgen -destination mock_peermgr/mock_peermgr.go -package mock_peermgr -source peermgr.go //go:generate mockgen -destination mock_peermgr/mock_peermgr.go -package mock_peermgr -source peermgr.go
......
package peermgr package peermgr
import ( import (
"fmt"
network "github.com/meshplus/go-lightp2p"
"gitlab.33.cn/link33/sidecar/internal/port" "gitlab.33.cn/link33/sidecar/internal/port"
"gitlab.33.cn/link33/sidecar/model/pb" "gitlab.33.cn/link33/sidecar/model/pb"
) )
...@@ -56,3 +60,68 @@ func (s *sidecar) ListenIBTPX() <-chan *pb.Message { ...@@ -56,3 +60,68 @@ func (s *sidecar) ListenIBTPX() <-chan *pb.Message {
func (s *sidecar) Receive(msg *pb.Message) { func (s *sidecar) Receive(msg *pb.Message) {
s.rev <- msg s.rev <- msg
} }
type stream struct {
s network.Stream
tag string
rev chan *pb.Message
}
func newStream(s network.Stream, tag string) *stream {
rec := make(chan *pb.Message, port.Capacity)
return &stream{
s: s,
tag: tag,
rev: rec,
}
}
func (s *stream) ID() string {
return s.s.RemotePeerID()
}
func (s *stream) Type() string {
return port.Sidecar
}
func (s *stream) Name() string {
return s.ID()
}
func (s *stream) Tag() string {
return s.tag
}
func (s *stream) Send(msg *pb.Message) (*pb.Message, error) {
return Send(s.s.Send, msg)
}
func Send(sender func([]byte) ([]byte, error), msg *pb.Message) (*pb.Message, error) {
data, err := msg.Marshal()
if err != nil {
return nil, err
}
ret, err := sender(data)
if err != nil {
return nil, fmt.Errorf("sync send: %w", err)
}
m := &pb.Message{}
if err := m.Unmarshal(ret); err != nil {
return nil, err
}
return m, nil
}
func (s *stream) AsyncSend(msg *pb.Message) error {
data, err := msg.Marshal()
if err != nil {
return fmt.Errorf("marshal message: %w", err)
}
return s.s.AsyncSend(data)
}
func (s *stream) ListenIBTPX() <-chan *pb.Message {
panic("implement me")
}
...@@ -110,7 +110,12 @@ func (swarm *Swarm) Start() error { ...@@ -110,7 +110,12 @@ func (swarm *Swarm) Start() error {
if err := swarm.RegisterMsgHandler(pb.Message_PEER_INFO_GET, swarm.localPeer.HandleGetPeerInfoMessage); err != nil { if err := swarm.RegisterMsgHandler(pb.Message_PEER_INFO_GET, swarm.localPeer.HandleGetPeerInfoMessage); err != nil {
return fmt.Errorf("register get peer info msg handler: %w", err) return fmt.Errorf("register get peer info msg handler: %w", err)
} }
if err := swarm.RegisterMultiMsgHandler([]pb.Message_Type{
if err := swarm.RegisterMsgHandler(pb.Message_ACK, swarm.localPeer.doAck); err != nil {
return fmt.Errorf("register get peer info msg handler: %w", err)
}
if err := swarm.RegisterMultiMsgHandlerPort([]pb.Message_Type{
pb.Message_IBTP_SEND, pb.Message_IBTP_SEND,
pb.Message_IBTP_GET, pb.Message_IBTP_GET,
pb.Message_IBTP_RECEIPT_SEND, pb.Message_IBTP_RECEIPT_SEND,
...@@ -128,53 +133,44 @@ func (swarm *Swarm) Start() error { ...@@ -128,53 +133,44 @@ func (swarm *Swarm) Start() error {
wg.Add(1) wg.Add(1)
for id, addr := range swarm.peers { for id, addr := range swarm.peers {
go func(id string, addr *peer.AddrInfo) { go func(remoteID string, addr *peer.AddrInfo) {
if err := retry.Retry(func(attempt uint) error { if err := retry.Retry(func(attempt uint) error {
if err := swarm.p2p.Connect(*addr); err != nil { if err := swarm.p2p.Connect(*addr); err != nil {
if attempt != 0 && attempt%5 == 0 { if attempt != 0 && attempt%5 == 0 {
swarm.logger.WithFields(logrus.Fields{ swarm.logger.WithFields(logrus.Fields{
"node": id, "node": remoteID,
"error": err, "error": err,
}).Error("Connect failed") }).Error("Connect failed")
} }
return err return err
} }
address, err := swarm.getRemoteAddress(addr.ID)
if err != nil {
swarm.logger.WithFields(logrus.Fields{
"node": id,
"error": err,
}).Error("Get remote address failed")
return err
}
swarm.logger.WithFields(logrus.Fields{
"node": id,
"address:": address,
}).Info("Connect successfully")
swarm.addRemotePortByID(id)
swarm.lock.RLock()
defer swarm.lock.RUnlock()
for _, handler := range swarm.connectHandlers {
go func(connectHandler ConnectHandler, address string) {
connectHandler(address)
}(handler, address)
}
wg.Done()
return nil return nil
}, }, strategy.Wait(1*time.Second)); err != nil {
strategy.Wait(1*time.Second),
); err != nil {
swarm.logger.Error(err) swarm.logger.Error(err)
} }
address, err := swarm.getRemoteAddress(addr.ID)
if err != nil {
swarm.logger.WithFields(logrus.Fields{
"node": remoteID,
"error": err,
}).Error("Get remote address failed")
}
swarm.addRemotePortByID(remoteID)
swarm.logger.WithFields(logrus.Fields{
"node": remoteID,
"address:": address,
}).Info("Connect successfully")
//swarm.lock.RLock()
//defer swarm.lock.RUnlock()
//for _, handler := range swarm.connectHandlers {
// go func(connectHandler ConnectHandler, address string) {
// connectHandler(address)
// }(handler, address)
//}
wg.Done()
}(id, addr) }(id, addr)
} }
wg.Wait() wg.Wait()
return nil return nil
} }
...@@ -196,15 +192,20 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) { ...@@ -196,15 +192,20 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
} }
msgHandler, ok := handler.(MessageHandler) msgHandler, ok := handler.(MessageHandler)
if !ok { if ok {
swarm.logger.WithFields(logrus.Fields{ msgHandler(newStream(s, "tmp_sidecar"), m)
"error": fmt.Errorf("invalid handler for msg [type: %v]", m.Type),
"type": m.Type.String(),
}).Error("Handle message")
return return
} }
msgHandlerPort, ok := handler.(MessageHandlerPort)
if ok {
msgHandler(swarm.newSidecar(s.RemotePeerID()), m) msgHandlerPort(func() port.Port { return swarm.newSidecar(s.RemotePeerID()) }, m)
return
}
swarm.logger.WithFields(logrus.Fields{
"error": fmt.Errorf("invalid handler for msg [type: %v]", m.Type),
"type": m.Type.String(),
}).Error("Handle message")
} }
func (swarm *Swarm) newSidecar(sidecarID string) *sidecar { func (swarm *Swarm) newSidecar(sidecarID string) *sidecar {
...@@ -217,7 +218,8 @@ func (swarm *Swarm) newSidecar(sidecarID string) *sidecar { ...@@ -217,7 +218,8 @@ func (swarm *Swarm) newSidecar(sidecarID string) *sidecar {
} }
// 接收其它sidecar节点发过来的交易、请求等。主要是IBTP结构相关数据。 // 接收其它sidecar节点发过来的交易、请求等。主要是IBTP结构相关数据。
func (swarm *Swarm) handleIBTPX(pt port.Port, m *pb.Message) { func (swarm *Swarm) handleIBTPX(fn func() port.Port, m *pb.Message) {
pt := fn()
p, is := swarm.router.Load(pt.ID()) p, is := swarm.router.Load(pt.ID())
if is { if is {
ps, iss := p.(*sidecar) ps, iss := p.(*sidecar)
...@@ -226,9 +228,13 @@ func (swarm *Swarm) handleIBTPX(pt port.Port, m *pb.Message) { ...@@ -226,9 +228,13 @@ func (swarm *Swarm) handleIBTPX(pt port.Port, m *pb.Message) {
ps.Receive(m) ps.Receive(m)
} }
} else { } else {
ppt := pt.(*sidecar) //ppt := pt.(*sidecar)
swarm.addRemotePort(ppt) //swarm.addRemotePort(ppt)
ppt.Receive(m) //ppt.Receive(m)
swarm.logger.WithFields(logrus.Fields{
"remote port": pt.ID(),
"func": "Swarm.handleIBTPX",
}).Error("unknown pb msg")
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment