Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
P
plugin
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
JIRA
JIRA
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
link33
plugin
Commits
8b6584b9
Unverified
Commit
8b6584b9
authored
Aug 20, 2021
by
vipwzw
Committed by
GitHub
Aug 20, 2021
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1020 from libangzhu/fix-gossip
Fix gossip
parents
15c55973
8a273471
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
141 additions
and
139 deletions
+141
-139
go.sum
go.sum
+0
-0
common.go
plugin/p2p/gossip/common.go
+27
-0
listener.go
plugin/p2p/gossip/listener.go
+1
-0
monitor.go
plugin/p2p/gossip/monitor.go
+62
-84
netaddress.go
plugin/p2p/gossip/netaddress.go
+1
-0
node.go
plugin/p2p/gossip/node.go
+32
-24
p2p.go
plugin/p2p/gossip/p2p.go
+0
-1
p2p_test.go
plugin/p2p/gossip/p2p_test.go
+5
-4
p2pcli.go
plugin/p2p/gossip/p2pcli.go
+7
-5
peer.go
plugin/p2p/gossip/peer.go
+6
-21
No files found.
go.sum
View file @
8b6584b9
This diff is collapsed.
Click to expand it.
plugin/p2p/gossip/common.go
View file @
8b6584b9
...
@@ -6,8 +6,11 @@ package gossip
...
@@ -6,8 +6,11 @@ package gossip
import
(
import
(
"bytes"
"bytes"
"context"
"encoding/binary"
"encoding/binary"
"encoding/hex"
"encoding/hex"
"errors"
"fmt"
"math/rand"
"math/rand"
"net"
"net"
"strings"
"strings"
...
@@ -95,7 +98,31 @@ func (c Comm) dialPeerWithAddress(addr *NetAddress, persistent bool, node *Node)
...
@@ -95,7 +98,31 @@ func (c Comm) dialPeerWithAddress(addr *NetAddress, persistent bool, node *Node)
if
persistent
{
if
persistent
{
peer
.
MakePersistent
()
peer
.
MakePersistent
()
}
}
//Set peer Name 在启动peer对象之前,获取节点对象的peerName,即pid
resp
,
err
:=
peer
.
mconn
.
gcli
.
Version2
(
context
.
Background
(),
&
types
.
P2PVersion
{
Nonce
:
time
.
Now
()
.
Unix
(),
Version
:
node
.
nodeInfo
.
channelVersion
,
AddrFrom
:
node
.
nodeInfo
.
GetExternalAddr
()
.
String
(),
AddrRecv
:
addr
.
String
(),
})
if
err
!=
nil
{
peer
.
Close
()
return
nil
,
err
}
//check remote peer is self or duplicate peer
_
,
pub
:=
node
.
nodeInfo
.
addrBook
.
GetPrivPubKey
()
if
node
.
Has
(
resp
.
UserAgent
)
||
resp
.
UserAgent
==
pub
{
//发现同一个peerID 下有两个不同的ip,则把新连接的ip加入黑名单5分钟
prepeer
:=
node
.
GetRegisterPeer
(
resp
.
UserAgent
)
log
.
Info
(
"dialPeerWithAddress"
,
"duplicate connect:"
,
prepeer
.
Addr
(),
addr
.
String
(),
resp
.
GetUserAgent
())
peer
.
Close
()
return
nil
,
errors
.
New
(
fmt
.
Sprintf
(
"duplicate connect %v"
,
resp
.
UserAgent
))
}
node
.
peerStore
.
Store
(
addr
.
String
(),
resp
.
UserAgent
)
peer
.
SetPeerName
(
resp
.
UserAgent
)
return
peer
,
nil
return
peer
,
nil
}
}
...
...
plugin/p2p/gossip/listener.go
View file @
8b6584b9
...
@@ -148,6 +148,7 @@ Retry:
...
@@ -148,6 +148,7 @@ Retry:
opts
=
append
(
opts
,
grpc
.
Creds
(
node
.
nodeInfo
.
servCreds
))
opts
=
append
(
opts
,
grpc
.
Creds
(
node
.
nodeInfo
.
servCreds
))
}
}
dl
.
server
=
grpc
.
NewServer
(
opts
...
)
dl
.
server
=
grpc
.
NewServer
(
opts
...
)
dl
.
p2pserver
=
pServer
dl
.
p2pserver
=
pServer
pb
.
RegisterP2PgserviceServer
(
dl
.
server
,
pServer
)
pb
.
RegisterP2PgserviceServer
(
dl
.
server
,
pServer
)
...
...
plugin/p2p/gossip/monitor.go
View file @
8b6584b9
...
@@ -25,7 +25,7 @@ func (n *Node) destroyPeer(peer *Peer) {
...
@@ -25,7 +25,7 @@ func (n *Node) destroyPeer(peer *Peer) {
"version support"
,
peer
.
version
.
IsSupport
())
"version support"
,
peer
.
version
.
IsSupport
())
n
.
nodeInfo
.
addrBook
.
RemoveAddr
(
peer
.
Addr
())
n
.
nodeInfo
.
addrBook
.
RemoveAddr
(
peer
.
Addr
())
n
.
remove
(
peer
.
Addr
())
n
.
remove
(
peer
.
GetPeerName
())
}
}
...
@@ -44,10 +44,7 @@ func (n *Node) monitorErrPeer() {
...
@@ -44,10 +44,7 @@ func (n *Node) monitorErrPeer() {
n
.
nodeInfo
.
blacklist
.
Add
(
peer
.
Addr
(),
int64
(
3600
*
12
))
n
.
nodeInfo
.
blacklist
.
Add
(
peer
.
Addr
(),
int64
(
3600
*
12
))
continue
continue
}
}
if
peer
.
IsMaxInbouds
{
if
peer
.
IsMaxInbouds
||
!
peer
.
GetRunning
()
{
n
.
destroyPeer
(
peer
)
}
if
!
peer
.
GetRunning
()
{
n
.
destroyPeer
(
peer
)
n
.
destroyPeer
(
peer
)
continue
continue
}
}
...
@@ -151,79 +148,61 @@ func (n *Node) getAddrFromOnline() {
...
@@ -151,79 +148,61 @@ func (n *Node) getAddrFromOnline() {
peers
,
_
:=
n
.
GetActivePeers
()
peers
,
_
:=
n
.
GetActivePeers
()
for
_
,
peer
:=
range
peers
{
//向其他节点发起请求,获取地址列表
for
_
,
peer
:=
range
peers
{
//向其他节点发起请求,获取地址列表
peerInfoList
,
err
:=
pcli
.
GetAddrList
(
peer
)
var
addrlist
[]
string
var
addrlistMap
map
[
string
]
int64
var
err
error
addrlistMap
,
err
=
pcli
.
GetAddrList
(
peer
)
P2pComm
.
CollectPeerStat
(
err
,
peer
)
P2pComm
.
CollectPeerStat
(
err
,
peer
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
(
"getAddrFromOnline"
,
"ERROR"
,
err
.
Error
())
log
.
Error
(
"getAddrFromOnline"
,
"ERROR"
,
err
.
Error
())
continue
continue
}
}
for
addr
:=
range
addrlistMap
{
for
_
,
info
:=
range
peerInfoList
{
addrlist
=
append
(
addrlist
,
addr
)
}
for
_
,
addr
:=
range
addrlist
{
if
!
n
.
needMore
()
{
if
!
n
.
needMore
()
{
//如果已经达到25个节点,则优先删除种子节点
//如果已经达到25个节点,则优先删除种子节点
localBlockHeight
,
err
:=
pcli
.
GetBlockHeight
(
n
.
nodeInfo
)
localBlockHeight
,
err
:=
pcli
.
GetBlockHeight
(
n
.
nodeInfo
)
if
err
!=
nil
{
if
err
!=
nil
{
continue
continue
}
}
//查询对方的高度,如果不小于自己的高度,或高度差在一定范围内,则剔除一个种子
//查询对方的高度,如果不小于自己的高度,或高度差在一定范围内,则剔除一个种子
if
peerHeight
,
ok
:=
addrlistMap
[
addr
];
ok
{
if
localBlockHeight
-
info
.
GetHeader
()
.
GetHeight
()
<
1024
{
//随机删除连接的一个种子
if
localBlockHeight
-
peerHeight
<
1024
{
n
.
innerSeeds
.
Range
(
func
(
k
,
v
interface
{})
bool
{
if
_
,
ok
:=
seedsMap
[
addr
]
;
ok
{
if
_
,
ok
:=
n
.
cfgSeeds
.
Load
(
k
.
(
string
))
;
ok
{
contin
ue
return
tr
ue
}
}
//remove inner seed
//随机删除连接的一个种子
for
_
,
peerInfo
:=
range
n
.
nodeInfo
.
peerInfos
.
GetPeerInfos
()
{
if
peerInfo
.
Addr
==
k
.
(
string
)
{
n
.
innerSeeds
.
Range
(
func
(
k
,
v
interface
{})
bool
{
n
.
remove
(
peerInfo
.
GetName
())
if
n
.
Has
(
k
.
(
string
))
{
//不能包含在cfgseed中
if
_
,
ok
:=
n
.
cfgSeeds
.
Load
(
k
.
(
string
));
ok
{
return
true
}
n
.
remove
(
k
.
(
string
))
n
.
nodeInfo
.
addrBook
.
RemoveAddr
(
k
.
(
string
))
n
.
nodeInfo
.
addrBook
.
RemoveAddr
(
k
.
(
string
))
return
false
return
false
}
}
return
true
}
})
return
false
}
})
}
}
time
.
Sleep
(
MonitorPeerInfoInterval
)
continue
continue
}
}
if
!
n
.
nodeInfo
.
blacklist
.
Has
(
addr
)
||
!
peerAddrFilter
.
Contains
(
addr
)
{
}
if
ticktimes
<
10
{
//如果连接了其他节点,优先不连接种子节点
if
_
,
ok
:=
n
.
innerSeeds
.
Load
(
addr
);
!
ok
{
//先把seed 排除在外
n
.
pubsub
.
FIFOPub
(
addr
,
"addr"
)
}
if
!
n
.
nodeInfo
.
blacklist
.
Has
(
peer
.
Addr
())
||
!
peerAddrFilter
.
Contains
(
peer
.
Addr
())
||
!
n
.
Has
(
peer
.
GetPeerName
())
{
}
else
{
if
ticktimes
<
10
{
n
.
pubsub
.
FIFOPub
(
addr
,
"addr"
)
//如果连接了其他节点,优先不连接种子节点
if
_
,
ok
:=
n
.
innerSeeds
.
Load
(
peer
.
Addr
());
!
ok
{
//先把seed 排除在外
n
.
pubsub
.
FIFOPub
(
peer
.
Addr
(),
"addr"
)
}
}
}
else
{
n
.
pubsub
.
FIFOPub
(
peer
.
Addr
(),
"addr"
)
}
}
}
}
}
}
}
}
}
}
func
(
n
*
Node
)
getAddrFromAddrBook
()
{
func
(
n
*
Node
)
getAddrFromAddrBook
()
{
...
@@ -249,7 +228,7 @@ func (n *Node) getAddrFromAddrBook() {
...
@@ -249,7 +228,7 @@ func (n *Node) getAddrFromAddrBook() {
addrNetArr
:=
n
.
nodeInfo
.
addrBook
.
GetPeers
()
addrNetArr
:=
n
.
nodeInfo
.
addrBook
.
GetPeers
()
for
_
,
addr
:=
range
addrNetArr
{
for
_
,
addr
:=
range
addrNetArr
{
if
!
n
.
Has
(
addr
.
String
())
&&
!
n
.
nodeInfo
.
blacklist
.
Has
(
addr
.
String
())
{
if
!
n
.
nodeInfo
.
blacklist
.
Has
(
addr
.
String
())
{
log
.
Debug
(
"GetAddrFromOffline"
,
"Add addr"
,
addr
.
String
())
log
.
Debug
(
"GetAddrFromOffline"
,
"Add addr"
,
addr
.
String
())
if
n
.
needMore
()
||
n
.
CacheBoundsSize
()
<
maxOutBoundNum
{
if
n
.
needMore
()
||
n
.
CacheBoundsSize
()
<
maxOutBoundNum
{
...
@@ -303,7 +282,7 @@ func (n *Node) nodeReBalance() {
...
@@ -303,7 +282,7 @@ func (n *Node) nodeReBalance() {
for
_
,
peer
:=
range
cachePeers
{
for
_
,
peer
:=
range
cachePeers
{
inbounds
,
err
:=
p2pcli
.
GetInPeersNum
(
peer
)
inbounds
,
err
:=
p2pcli
.
GetInPeersNum
(
peer
)
if
err
!=
nil
{
if
err
!=
nil
{
n
.
RemoveCachePeer
(
peer
.
Addr
())
n
.
RemoveCachePeer
(
peer
.
GetPeerName
())
peer
.
Close
()
peer
.
Close
()
continue
continue
}
}
...
@@ -326,14 +305,14 @@ func (n *Node) nodeReBalance() {
...
@@ -326,14 +305,14 @@ func (n *Node) nodeReBalance() {
//如果连接的节点最大负载量小于当前缓存节点的最大负载量
//如果连接的节点最大负载量小于当前缓存节点的最大负载量
if
MaxInBounds
<
MaxCacheInBounds
{
if
MaxInBounds
<
MaxCacheInBounds
{
n
.
RemoveCachePeer
(
MaxCacheInBoundPeer
.
Addr
())
n
.
RemoveCachePeer
(
MaxCacheInBoundPeer
.
GetPeerName
())
MaxCacheInBoundPeer
.
Close
()
MaxCacheInBoundPeer
.
Close
()
}
}
//如果最大的负载量比缓存中负载最小的小,则删除缓存中所有的节点
//如果最大的负载量比缓存中负载最小的小,则删除缓存中所有的节点
if
MaxInBounds
<
MinCacheInBounds
{
if
MaxInBounds
<
MinCacheInBounds
{
cachePeers
:=
n
.
GetCacheBounds
()
cachePeers
:=
n
.
GetCacheBounds
()
for
_
,
peer
:=
range
cachePeers
{
for
_
,
peer
:=
range
cachePeers
{
n
.
RemoveCachePeer
(
peer
.
Addr
())
n
.
RemoveCachePeer
(
peer
.
GetPeerName
())
peer
.
Close
()
peer
.
Close
()
}
}
...
@@ -347,7 +326,7 @@ func (n *Node) nodeReBalance() {
...
@@ -347,7 +326,7 @@ func (n *Node) nodeReBalance() {
if
MinCacheInBoundPeer
!=
nil
{
if
MinCacheInBoundPeer
!=
nil
{
info
,
err
:=
MinCacheInBoundPeer
.
GetPeerInfo
()
info
,
err
:=
MinCacheInBoundPeer
.
GetPeerInfo
()
if
err
!=
nil
{
if
err
!=
nil
{
n
.
RemoveCachePeer
(
MinCacheInBoundPeer
.
Addr
())
n
.
RemoveCachePeer
(
MinCacheInBoundPeer
.
GetPeerName
())
MinCacheInBoundPeer
.
Close
()
MinCacheInBoundPeer
.
Close
()
continue
continue
}
}
...
@@ -361,8 +340,8 @@ func (n *Node) nodeReBalance() {
...
@@ -361,8 +340,8 @@ func (n *Node) nodeReBalance() {
n
.
addPeer
(
MinCacheInBoundPeer
)
n
.
addPeer
(
MinCacheInBoundPeer
)
n
.
nodeInfo
.
addrBook
.
AddAddress
(
MinCacheInBoundPeer
.
peerAddr
,
nil
)
n
.
nodeInfo
.
addrBook
.
AddAddress
(
MinCacheInBoundPeer
.
peerAddr
,
nil
)
n
.
remove
(
MaxInBoundPeer
.
Addr
())
n
.
remove
(
MaxInBoundPeer
.
GetPeerName
())
n
.
RemoveCachePeer
(
MinCacheInBoundPeer
.
Addr
())
n
.
RemoveCachePeer
(
MinCacheInBoundPeer
.
GetPeerName
())
}
}
}
}
}
}
...
@@ -392,18 +371,18 @@ func (n *Node) monitorPeers() {
...
@@ -392,18 +371,18 @@ func (n *Node) monitorPeers() {
paddr
:=
pinfo
.
GetAddr
()
paddr
:=
pinfo
.
GetAddr
()
if
name
==
selfName
&&
!
pinfo
.
GetSelf
()
{
//发现连接到自己,立即删除
if
name
==
selfName
&&
!
pinfo
.
GetSelf
()
{
//发现连接到自己,立即删除
//删除节点数过低的节点
//删除节点数过低的节点
n
.
remove
(
pinfo
.
Get
Addr
())
n
.
remove
(
pinfo
.
Get
Name
())
n
.
nodeInfo
.
addrBook
.
RemoveAddr
(
paddr
)
n
.
nodeInfo
.
addrBook
.
RemoveAddr
(
paddr
)
n
.
nodeInfo
.
blacklist
.
Add
(
paddr
,
0
)
n
.
nodeInfo
.
blacklist
.
Add
(
paddr
,
0
)
}
}
if
localBlockHeight
-
peerheight
>
2048
{
if
localBlockHeight
-
peerheight
>
2048
{
//删除比自己较低的节点
//删除比自己较低的节点
if
addrMap
,
err
:=
p2pcli
.
GetAddrList
(
peers
[
paddr
]);
err
==
nil
{
if
peerList
,
err
:=
p2pcli
.
GetAddrList
(
peers
[
paddr
]);
err
==
nil
{
for
addr
:=
range
addrMap
{
for
peerName
,
info
:=
range
peerList
{
if
!
n
.
Has
(
addr
)
&&
!
n
.
nodeInfo
.
blacklist
.
Has
(
a
ddr
)
{
if
!
n
.
Has
(
peerName
)
&&
!
n
.
nodeInfo
.
blacklist
.
Has
(
info
.
A
ddr
)
{
n
.
pubsub
.
FIFOPub
(
a
ddr
,
"addr"
)
n
.
pubsub
.
FIFOPub
(
info
.
A
ddr
,
"addr"
)
}
}
}
}
...
@@ -417,7 +396,7 @@ func (n *Node) monitorPeers() {
...
@@ -417,7 +396,7 @@ func (n *Node) monitorPeers() {
continue
continue
}
}
//删除节点数过低的节点
//删除节点数过低的节点
n
.
remove
(
p
addr
)
n
.
remove
(
p
info
.
GetName
()
)
n
.
nodeInfo
.
addrBook
.
RemoveAddr
(
paddr
)
n
.
nodeInfo
.
addrBook
.
RemoveAddr
(
paddr
)
}
}
...
@@ -459,7 +438,10 @@ func (n *Node) monitorDialPeers() {
...
@@ -459,7 +438,10 @@ func (n *Node) monitorDialPeers() {
//先查询有没有注册进去,避免同时重复连接相同的地址
//先查询有没有注册进去,避免同时重复连接相同的地址
continue
continue
}
}
if
_
,
ok
:=
n
.
peerStore
.
Load
(
addr
.
(
string
));
ok
{
//不对已经创建peer的ip发起重复连接
continue
}
netAddr
,
err
:=
NewNetAddressString
(
addr
.
(
string
))
netAddr
,
err
:=
NewNetAddressString
(
addr
.
(
string
))
if
err
!=
nil
{
if
err
!=
nil
{
continue
continue
...
@@ -470,7 +452,7 @@ func (n *Node) monitorDialPeers() {
...
@@ -470,7 +452,7 @@ func (n *Node) monitorDialPeers() {
}
}
//不对已经连接上的地址或者黑名单地址发起连接 TODO:连接足够时,对于连入的地址也不再去重复连接(客户端服务端只维护一条连接, 后续优化)
//不对已经连接上的地址或者黑名单地址发起连接 TODO:连接足够时,对于连入的地址也不再去重复连接(客户端服务端只维护一条连接, 后续优化)
if
n
.
Has
(
netAddr
.
String
())
||
n
.
nodeInfo
.
blacklist
.
Has
(
netAddr
.
String
())
||
n
.
HasCacheBound
(
netAddr
.
String
())
{
if
n
.
nodeInfo
.
blacklist
.
Has
(
netAddr
.
String
())
{
log
.
Debug
(
"DialPeers"
,
"find hash"
,
netAddr
.
String
())
log
.
Debug
(
"DialPeers"
,
"find hash"
,
netAddr
.
String
())
continue
continue
}
}
...
@@ -593,28 +575,24 @@ func (n *Node) monitorCfgSeeds() {
...
@@ -593,28 +575,24 @@ func (n *Node) monitorCfgSeeds() {
<-
ticker
.
C
<-
ticker
.
C
n
.
cfgSeeds
.
Range
(
func
(
k
,
v
interface
{})
bool
{
n
.
cfgSeeds
.
Range
(
func
(
k
,
v
interface
{})
bool
{
if
!
n
.
Has
(
k
.
(
string
))
{
//尝试连接此节点
//尝试连接此节点
if
n
.
needMore
()
{
//如果需要更多的节点
if
n
.
needMore
()
{
//如果需要更多的节点
n
.
pubsub
.
FIFOPub
(
k
.
(
string
),
"addr"
)
n
.
pubsub
.
FIFOPub
(
k
.
(
string
),
"addr"
)
}
else
{
}
else
{
peers
,
_
:=
n
.
GetActivePeers
()
//腾笼换鸟
//选出当前连接的节点中,负载最大的节点
peers
,
_
:=
n
.
GetActivePeers
()
var
maxInBounds
int32
//选出当前连接的节点中,负载最大的节点
maxInBoundPeer
:=
&
Peer
{}
var
MaxInBounds
int32
for
_
,
peer
:=
range
peers
{
MaxInBoundPeer
:=
&
Peer
{}
if
peer
.
GetInBouns
()
>
maxInBounds
{
for
_
,
peer
:=
range
peers
{
maxInBounds
=
peer
.
GetInBouns
()
if
peer
.
GetInBouns
()
>
MaxInBounds
{
maxInBoundPeer
=
peer
MaxInBounds
=
peer
.
GetInBouns
()
MaxInBoundPeer
=
peer
}
}
}
n
.
remove
(
MaxInBoundPeer
.
Addr
())
n
.
pubsub
.
FIFOPub
(
k
.
(
string
),
"addr"
)
}
}
n
.
remove
(
maxInBoundPeer
.
GetPeerName
())
n
.
pubsub
.
FIFOPub
(
k
.
(
string
),
"addr"
)
}
}
return
true
return
true
})
})
...
...
plugin/p2p/gossip/netaddress.go
View file @
8b6584b9
...
@@ -162,6 +162,7 @@ func (na *NetAddress) DialTimeout(version int32, creds credentials.TransportCred
...
@@ -162,6 +162,7 @@ func (na *NetAddress) DialTimeout(version int32, creds credentials.TransportCred
}
else
{
}
else
{
secOpt
=
grpc
.
WithTransportCredentials
(
creds
)
secOpt
=
grpc
.
WithTransportCredentials
(
creds
)
}
}
//grpc.WithPerRPCCredentials
conn
,
err
:=
grpc
.
Dial
(
na
.
String
(),
conn
,
err
:=
grpc
.
Dial
(
na
.
String
(),
grpc
.
WithDefaultCallOptions
(
grpc
.
UseCompressor
(
"gzip"
)),
grpc
.
WithDefaultCallOptions
(
grpc
.
UseCompressor
(
"gzip"
)),
grpc
.
WithDefaultCallOptions
(
grpc
.
MaxCallRecvMsgSize
(
maxMsgSize
)),
grpc
.
WithDefaultCallOptions
(
grpc
.
MaxCallRecvMsgSize
(
maxMsgSize
)),
...
...
plugin/p2p/gossip/node.go
View file @
8b6584b9
...
@@ -74,12 +74,13 @@ type Node struct {
...
@@ -74,12 +74,13 @@ type Node struct {
omtx
sync
.
Mutex
omtx
sync
.
Mutex
nodeInfo
*
NodeInfo
nodeInfo
*
NodeInfo
cmtx
sync
.
Mutex
cmtx
sync
.
Mutex
cacheBound
map
[
string
]
*
Peer
cacheBound
map
[
string
]
*
Peer
//peerId-->peer
outBound
map
[
string
]
*
Peer
outBound
map
[
string
]
*
Peer
//peerId-->peer
server
*
listener
server
*
listener
listenPort
int
listenPort
int
innerSeeds
sync
.
Map
innerSeeds
sync
.
Map
cfgSeeds
sync
.
Map
cfgSeeds
sync
.
Map
peerStore
sync
.
Map
//peerIp-->PeerName
closed
int32
closed
int32
pubsub
*
pubsub
.
PubSub
pubsub
*
pubsub
.
PubSub
chainCfg
*
types
.
Chain33Config
chainCfg
*
types
.
Chain33Config
...
@@ -221,15 +222,16 @@ func (n *Node) doNat() {
...
@@ -221,15 +222,16 @@ func (n *Node) doNat() {
func
(
n
*
Node
)
addPeer
(
pr
*
Peer
)
{
func
(
n
*
Node
)
addPeer
(
pr
*
Peer
)
{
n
.
omtx
.
Lock
()
n
.
omtx
.
Lock
()
defer
n
.
omtx
.
Unlock
()
defer
n
.
omtx
.
Unlock
()
if
peer
,
ok
:=
n
.
outBound
[
pr
.
Addr
()];
ok
{
if
peer
,
ok
:=
n
.
outBound
[
pr
.
GetPeerName
()];
ok
{
log
.
Info
(
"AddPeer"
,
"delete peer"
,
pr
.
Addr
())
log
.
Info
(
"AddPeer"
,
"delete peer"
,
pr
.
Addr
())
n
.
nodeInfo
.
addrBook
.
RemoveAddr
(
peer
.
Addr
())
n
.
nodeInfo
.
addrBook
.
RemoveAddr
(
peer
.
Addr
())
delete
(
n
.
outBound
,
pr
.
Addr
())
delete
(
n
.
outBound
,
pr
.
GetPeerName
())
peer
.
Close
()
peer
.
Close
()
}
}
log
.
Debug
(
"AddPeer"
,
"peer"
,
pr
.
Addr
())
n
.
outBound
[
pr
.
Addr
()]
=
pr
log
.
Debug
(
"AddPeer"
,
"peer"
,
pr
.
Addr
(),
"pid:"
,
pr
.
GetPeerName
())
n
.
outBound
[
pr
.
GetPeerName
()]
=
pr
pr
.
Start
()
pr
.
Start
()
}
}
...
@@ -237,21 +239,26 @@ func (n *Node) addPeer(pr *Peer) {
...
@@ -237,21 +239,26 @@ func (n *Node) addPeer(pr *Peer) {
func
(
n
*
Node
)
AddCachePeer
(
pr
*
Peer
)
{
func
(
n
*
Node
)
AddCachePeer
(
pr
*
Peer
)
{
n
.
cmtx
.
Lock
()
n
.
cmtx
.
Lock
()
defer
n
.
cmtx
.
Unlock
()
defer
n
.
cmtx
.
Unlock
()
n
.
cacheBound
[
pr
.
Addr
()]
=
pr
n
.
cacheBound
[
pr
.
GetPeerName
()]
=
pr
}
}
// RemoveCachePeer remove cacheBound by addr
// RemoveCachePeer remove cacheBound by addr
func
(
n
*
Node
)
RemoveCachePeer
(
addr
string
)
{
func
(
n
*
Node
)
RemoveCachePeer
(
peerName
string
)
{
n
.
cmtx
.
Lock
()
n
.
cmtx
.
Lock
()
defer
n
.
cmtx
.
Unlock
()
defer
n
.
cmtx
.
Unlock
()
delete
(
n
.
cacheBound
,
addr
)
peer
,
ok
:=
n
.
cacheBound
[
peerName
]
if
ok
{
peer
.
Close
()
}
delete
(
n
.
cacheBound
,
peerName
)
}
}
// HasCacheBound peer whether exists according to address
// HasCacheBound peer whether exists according to address
func
(
n
*
Node
)
HasCacheBound
(
addr
string
)
bool
{
func
(
n
*
Node
)
HasCacheBound
(
peerName
string
)
bool
{
n
.
cmtx
.
Lock
()
n
.
cmtx
.
Lock
()
defer
n
.
cmtx
.
Unlock
()
defer
n
.
cmtx
.
Unlock
()
_
,
ok
:=
n
.
cacheBound
[
addr
]
_
,
ok
:=
n
.
cacheBound
[
peerName
]
return
ok
return
ok
}
}
...
@@ -285,18 +292,18 @@ func (n *Node) Size() int {
...
@@ -285,18 +292,18 @@ func (n *Node) Size() int {
}
}
// Has peer whether exists according to address
// Has peer whether exists according to address
func
(
n
*
Node
)
Has
(
p
addr
string
)
bool
{
func
(
n
*
Node
)
Has
(
p
eerName
string
)
bool
{
n
.
omtx
.
Lock
()
n
.
omtx
.
Lock
()
defer
n
.
omtx
.
Unlock
()
defer
n
.
omtx
.
Unlock
()
_
,
ok
:=
n
.
outBound
[
p
addr
]
_
,
ok
:=
n
.
outBound
[
p
eerName
]
return
ok
return
ok
}
}
// GetRegisterPeer return one peer according to paddr
// GetRegisterPeer return one peer according to paddr
func
(
n
*
Node
)
GetRegisterPeer
(
p
addr
string
)
*
Peer
{
func
(
n
*
Node
)
GetRegisterPeer
(
p
eerName
string
)
*
Peer
{
n
.
omtx
.
Lock
()
n
.
omtx
.
Lock
()
defer
n
.
omtx
.
Unlock
()
defer
n
.
omtx
.
Unlock
()
if
peer
,
ok
:=
n
.
outBound
[
p
addr
];
ok
{
if
peer
,
ok
:=
n
.
outBound
[
p
eerName
];
ok
{
return
peer
return
peer
}
}
return
nil
return
nil
...
@@ -324,21 +331,22 @@ func (n *Node) GetActivePeers() (map[string]*Peer, map[string]*types.Peer) {
...
@@ -324,21 +331,22 @@ func (n *Node) GetActivePeers() (map[string]*Peer, map[string]*types.Peer) {
var
peers
=
make
(
map
[
string
]
*
Peer
)
var
peers
=
make
(
map
[
string
]
*
Peer
)
for
_
,
peer
:=
range
regPeers
{
for
_
,
peer
:=
range
regPeers
{
n
ame
:=
peer
.
GetPeerName
()
peerN
ame
:=
peer
.
GetPeerName
()
if
_
,
ok
:=
infos
[
n
ame
];
ok
{
if
_
,
ok
:=
infos
[
peerN
ame
];
ok
{
peers
[
n
ame
]
=
peer
peers
[
peerN
ame
]
=
peer
}
}
}
}
return
peers
,
infos
return
peers
,
infos
}
}
func
(
n
*
Node
)
remove
(
peer
Addr
string
)
{
func
(
n
*
Node
)
remove
(
peer
Name
string
)
{
n
.
omtx
.
Lock
()
n
.
omtx
.
Lock
()
defer
n
.
omtx
.
Unlock
()
defer
n
.
omtx
.
Unlock
()
peer
,
ok
:=
n
.
outBound
[
peerAddr
]
peer
,
ok
:=
n
.
outBound
[
peerName
]
if
ok
{
if
ok
{
delete
(
n
.
outBound
,
peer
Addr
)
delete
(
n
.
outBound
,
peer
Name
)
peer
.
Close
()
peer
.
Close
()
}
}
}
}
...
@@ -346,8 +354,8 @@ func (n *Node) remove(peerAddr string) {
...
@@ -346,8 +354,8 @@ func (n *Node) remove(peerAddr string) {
func
(
n
*
Node
)
removeAll
()
{
func
(
n
*
Node
)
removeAll
()
{
n
.
omtx
.
Lock
()
n
.
omtx
.
Lock
()
defer
n
.
omtx
.
Unlock
()
defer
n
.
omtx
.
Unlock
()
for
addr
,
peer
:=
range
n
.
outBound
{
for
peerName
,
peer
:=
range
n
.
outBound
{
delete
(
n
.
outBound
,
addr
)
delete
(
n
.
outBound
,
peerName
)
peer
.
Close
()
peer
.
Close
()
}
}
}
}
...
@@ -395,7 +403,7 @@ func (n *Node) detectNodeAddr() {
...
@@ -395,7 +403,7 @@ func (n *Node) detectNodeAddr() {
}
}
//如果nat,getSelfExternalAddr 无法发现自己的外网地址,则把localaddr 赋值给外网地址
//如果nat,getSelfExternalAddr 无法发现自己的外网地址,则把localaddr 赋值给外网地址
if
len
(
externalIP
)
==
0
{
if
externalIP
==
""
{
externalIP
=
laddr
externalIP
=
laddr
}
}
...
...
plugin/p2p/gossip/p2p.go
View file @
8b6584b9
...
@@ -404,7 +404,6 @@ func (network *P2p) subP2pMsg() {
...
@@ -404,7 +404,6 @@ func (network *P2p) subP2pMsg() {
}
}
func
(
network
*
P2p
)
processEvent
(
msg
*
queue
.
Message
,
taskIdx
int64
,
eventFunc
p2pEventFunc
)
{
func
(
network
*
P2p
)
processEvent
(
msg
*
queue
.
Message
,
taskIdx
int64
,
eventFunc
p2pEventFunc
)
{
network
.
lock
.
Lock
()
network
.
lock
.
Lock
()
defer
network
.
lock
.
Unlock
()
defer
network
.
lock
.
Unlock
()
if
network
.
isClose
()
{
if
network
.
isClose
()
{
...
...
plugin/p2p/gossip/p2p_test.go
View file @
8b6584b9
...
@@ -199,15 +199,16 @@ func testPeer(t *testing.T, p2p *P2p, q queue.Queue) {
...
@@ -199,15 +199,16 @@ func testPeer(t *testing.T, p2p *P2p, q queue.Queue) {
t
.
Log
(
localP2P
.
node
.
CacheBoundsSize
())
t
.
Log
(
localP2P
.
node
.
CacheBoundsSize
())
t
.
Log
(
localP2P
.
node
.
GetCacheBounds
())
t
.
Log
(
localP2P
.
node
.
GetCacheBounds
())
_
,
localPeerName
:=
localP2P
.
node
.
nodeInfo
.
addrBook
.
GetPrivPubKey
()
localP2P
.
node
.
RemoveCachePeer
(
"localhost:12345"
)
localP2P
.
node
.
RemoveCachePeer
(
"localhost:12345"
)
assert
.
False
(
t
,
localP2P
.
node
.
HasCacheBound
(
"localhost:12345"
))
assert
.
False
(
t
,
localP2P
.
node
.
HasCacheBound
(
"localhost:12345"
))
peer
,
err
:=
P2pComm
.
dialPeer
(
remote
,
localP2P
.
node
)
peer
,
err
:=
P2pComm
.
dialPeer
(
remote
,
localP2P
.
node
)
t
.
Log
(
"peerName"
,
peer
.
GetPeerName
(),
"self peerName"
,
localPeerName
)
assert
.
Nil
(
t
,
err
)
assert
.
Nil
(
t
,
err
)
defer
peer
.
Close
()
defer
peer
.
Close
()
peer
.
MakePersistent
()
peer
.
MakePersistent
()
localP2P
.
node
.
addPeer
(
peer
)
localP2P
.
node
.
addPeer
(
peer
)
_
,
localPeerName
:=
localP2P
.
node
.
nodeInfo
.
addrBook
.
GetPrivPubKey
()
var
info
*
innerpeer
var
info
*
innerpeer
t
.
Log
(
"WaitRegisterPeerStart..."
)
t
.
Log
(
"WaitRegisterPeerStart..."
)
trytime
:=
0
trytime
:=
0
...
@@ -247,7 +248,7 @@ func testPeer(t *testing.T, p2p *P2p, q queue.Queue) {
...
@@ -247,7 +248,7 @@ func testPeer(t *testing.T, p2p *P2p, q queue.Queue) {
localP2P
.
node
.
nodeInfo
.
peerInfos
.
SetPeerInfo
(
nil
)
localP2P
.
node
.
nodeInfo
.
peerInfos
.
SetPeerInfo
(
nil
)
localP2P
.
node
.
nodeInfo
.
peerInfos
.
GetPeerInfo
(
"1222"
)
localP2P
.
node
.
nodeInfo
.
peerInfos
.
GetPeerInfo
(
"1222"
)
t
.
Log
(
p2p
.
node
.
GetRegisterPeer
(
"localhost:43802"
))
t
.
Log
(
p2p
.
node
.
GetRegisterPeer
(
localPeerName
))
//测试发送Ping消息
//测试发送Ping消息
err
=
p2pcli
.
SendPing
(
peer
,
localP2P
.
node
.
nodeInfo
)
err
=
p2pcli
.
SendPing
(
peer
,
localP2P
.
node
.
nodeInfo
)
assert
.
Nil
(
t
,
err
)
assert
.
Nil
(
t
,
err
)
...
@@ -302,7 +303,7 @@ func testPeer(t *testing.T, p2p *P2p, q queue.Queue) {
...
@@ -302,7 +303,7 @@ func testPeer(t *testing.T, p2p *P2p, q queue.Queue) {
localP2P
.
node
.
addPeer
(
peer
)
localP2P
.
node
.
addPeer
(
peer
)
assert
.
True
(
t
,
localP2P
.
node
.
needMore
())
assert
.
True
(
t
,
localP2P
.
node
.
needMore
())
peer
.
Close
()
peer
.
Close
()
localP2P
.
node
.
remove
(
peer
.
peerAddr
.
String
())
localP2P
.
node
.
remove
(
peer
.
GetPeerName
())
}
}
//测试grpc 多连接
//测试grpc 多连接
...
...
plugin/p2p/gossip/p2pcli.go
View file @
8b6584b9
...
@@ -42,7 +42,7 @@ type NormalInterface interface {
...
@@ -42,7 +42,7 @@ type NormalInterface interface {
SendPing
(
peer
*
Peer
,
nodeinfo
*
NodeInfo
)
error
SendPing
(
peer
*
Peer
,
nodeinfo
*
NodeInfo
)
error
GetBlockHeight
(
nodeinfo
*
NodeInfo
)
(
int64
,
error
)
GetBlockHeight
(
nodeinfo
*
NodeInfo
)
(
int64
,
error
)
CheckPeerNatOk
(
addr
string
,
nodeInfo
*
NodeInfo
)
bool
CheckPeerNatOk
(
addr
string
,
nodeInfo
*
NodeInfo
)
bool
GetAddrList
(
peer
*
Peer
)
(
map
[
string
]
int64
,
error
)
GetAddrList
(
peer
*
Peer
)
(
map
[
string
]
*
pb
.
P2PPeerInfo
,
error
)
GetInPeersNum
(
peer
*
Peer
)
(
int
,
error
)
GetInPeersNum
(
peer
*
Peer
)
(
int
,
error
)
CheckSelf
(
addr
string
,
nodeinfo
*
NodeInfo
)
bool
CheckSelf
(
addr
string
,
nodeinfo
*
NodeInfo
)
bool
}
}
...
@@ -199,9 +199,9 @@ func (m *Cli) GetInPeersNum(peer *Peer) (int, error) {
...
@@ -199,9 +199,9 @@ func (m *Cli) GetInPeersNum(peer *Peer) (int, error) {
}
}
// GetAddrList return a map for address-prot
// GetAddrList return a map for address-prot
func
(
m
*
Cli
)
GetAddrList
(
peer
*
Peer
)
(
map
[
string
]
int64
,
error
)
{
func
(
m
*
Cli
)
GetAddrList
(
peer
*
Peer
)
(
map
[
string
]
*
pb
.
P2PPeerInfo
,
error
)
{
var
addrlist
=
make
(
map
[
string
]
int64
)
var
addrlist
=
make
(
map
[
string
]
*
pb
.
P2PPeerInfo
)
if
peer
==
nil
{
if
peer
==
nil
{
return
addrlist
,
fmt
.
Errorf
(
"pointer is nil"
)
return
addrlist
,
fmt
.
Errorf
(
"pointer is nil"
)
}
}
...
@@ -229,9 +229,11 @@ func (m *Cli) GetAddrList(peer *Peer) (map[string]int64, error) {
...
@@ -229,9 +229,11 @@ func (m *Cli) GetAddrList(peer *Peer) (map[string]int64, error) {
peerinfos
:=
resp
.
GetPeerinfo
()
peerinfos
:=
resp
.
GetPeerinfo
()
for
_
,
peerinfo
:=
range
peerinfos
{
for
_
,
peerinfo
:=
range
peerinfos
{
if
peerinfo
==
nil
{
continue
}
if
localBlockHeight
-
peerinfo
.
GetHeader
()
.
GetHeight
()
<
2048
{
if
localBlockHeight
-
peerinfo
.
GetHeader
()
.
GetHeight
()
<
2048
{
addrlist
[
peerinfo
.
GetName
()]
=
peerinfo
addrlist
[
fmt
.
Sprintf
(
"%v:%v"
,
peerinfo
.
GetAddr
(),
peerinfo
.
GetPort
())]
=
peerinfo
.
GetHeader
()
.
GetHeight
()
}
}
}
}
return
addrlist
,
nil
return
addrlist
,
nil
...
...
plugin/p2p/gossip/peer.go
View file @
8b6584b9
...
@@ -35,6 +35,7 @@ func (p *Peer) Close() {
...
@@ -35,6 +35,7 @@ func (p *Peer) Close() {
//unsub all topics
//unsub all topics
p
.
node
.
pubsub
.
Unsub
(
p
.
taskChan
)
p
.
node
.
pubsub
.
Unsub
(
p
.
taskChan
)
}
}
p
.
node
.
peerStore
.
Delete
(
p
.
Addr
())
log
.
Info
(
"Peer"
,
"closed"
,
p
.
Addr
())
log
.
Info
(
"Peer"
,
"closed"
,
p
.
Addr
())
}
}
...
@@ -138,18 +139,7 @@ func (p *Peer) heartBeat() {
...
@@ -138,18 +139,7 @@ func (p *Peer) heartBeat() {
if
!
p
.
GetRunning
()
{
if
!
p
.
GetRunning
()
{
return
return
}
}
peername
,
err
:=
pcli
.
SendVersion
(
p
,
p
.
node
.
nodeInfo
)
p
.
taskChan
=
p
.
node
.
pubsub
.
Sub
(
"block"
,
"tx"
,
p
.
name
)
P2pComm
.
CollectPeerStat
(
err
,
p
)
if
err
!=
nil
||
peername
==
""
{
//版本不对,直接关掉
log
.
Error
(
"PeerHeartBeatSendVersion"
,
"peerName"
,
peername
,
"err"
,
err
)
p
.
Close
()
return
}
log
.
Debug
(
"sendVersion"
,
"peer name"
,
peername
)
p
.
SetPeerName
(
peername
)
//设置连接的远程节点的节点名称
p
.
taskChan
=
p
.
node
.
pubsub
.
Sub
(
"block"
,
"tx"
,
peername
)
go
p
.
sendStream
()
go
p
.
sendStream
()
go
p
.
readStream
()
go
p
.
readStream
()
break
break
...
@@ -309,7 +299,7 @@ func (p *Peer) readStream() {
...
@@ -309,7 +299,7 @@ func (p *Peer) readStream() {
log
.
Error
(
"readStream"
,
"err:"
,
err
.
Error
(),
"peerIp"
,
p
.
Addr
())
log
.
Error
(
"readStream"
,
"err:"
,
err
.
Error
(),
"peerIp"
,
p
.
Addr
())
continue
continue
}
}
resp
,
err
:=
p
.
mconn
.
gcli
.
ServerStreamSend
(
context
.
Background
(),
ping
)
resp
,
err
:=
p
.
mconn
.
gcli
.
ServerStreamSend
(
context
.
Background
(),
ping
,
grpc
.
WaitForReady
(
true
)
)
P2pComm
.
CollectPeerStat
(
err
,
p
)
P2pComm
.
CollectPeerStat
(
err
,
p
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Error
(
"readStream"
,
"serverstreamsend,err:"
,
err
,
"peer"
,
p
.
Addr
())
log
.
Error
(
"readStream"
,
"serverstreamsend,err:"
,
err
,
"peer"
,
p
.
Addr
())
...
@@ -330,18 +320,12 @@ func (p *Peer) readStream() {
...
@@ -330,18 +320,12 @@ func (p *Peer) readStream() {
data
,
err
:=
resp
.
Recv
()
data
,
err
:=
resp
.
Recv
()
if
err
!=
nil
{
if
err
!=
nil
{
P2pComm
.
CollectPeerStat
(
err
,
p
)
P2pComm
.
CollectPeerStat
(
err
,
p
)
log
.
Error
(
"readStream"
,
"recv
,err:"
,
err
.
Error
(),
"peerAddr"
,
p
.
Addr
()
)
log
.
Error
(
"readStream"
,
"recv
err"
,
err
.
Error
(),
"peerAddr"
,
p
.
Addr
(),
"data:"
,
data
)
errs
:=
resp
.
CloseSend
()
errs
:=
resp
.
CloseSend
()
if
errs
!=
nil
{
if
errs
!=
nil
{
log
.
Error
(
"CloseSend"
,
"err"
,
errs
)
log
.
Error
(
"CloseSend"
,
"err"
,
errs
)
}
}
if
status
.
Code
(
err
)
==
codes
.
Unavailable
{
break
//重新创建新的流
}
log
.
Error
(
"readStream"
,
"recv,err:"
,
err
.
Error
(),
"peerIp"
,
p
.
Addr
())
if
status
.
Code
(
err
)
==
codes
.
Unimplemented
{
//maybe order peers delete peer to BlackList
if
status
.
Code
(
err
)
==
codes
.
Unimplemented
{
//maybe order peers delete peer to BlackList
p
.
node
.
nodeInfo
.
blacklist
.
Add
(
p
.
Addr
(),
3600
)
p
.
node
.
nodeInfo
.
blacklist
.
Add
(
p
.
Addr
(),
3600
)
return
return
...
@@ -353,8 +337,9 @@ func (p *Peer) readStream() {
...
@@ -353,8 +337,9 @@ func (p *Peer) readStream() {
P2pComm
.
CollectPeerStat
(
err
,
p
)
P2pComm
.
CollectPeerStat
(
err
,
p
)
return
return
}
}
time
.
Sleep
(
time
.
Second
)
//have a rest
//其他stream 错误全部break ,重新创建新的stream
break
}
}
p
.
node
.
processRecvP2P
(
data
,
p
.
GetPeerName
(),
p
.
node
.
pubToPeer
,
p
.
Addr
())
p
.
node
.
processRecvP2P
(
data
,
p
.
GetPeerName
(),
p
.
node
.
pubToPeer
,
p
.
Addr
())
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment