Commit fae196d4 authored by libangzhu's avatar libangzhu

调整gossip peer管理,fix issues #1018

parent a74228e2
......@@ -6,8 +6,11 @@ package gossip
import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"math/rand"
"net"
"strings"
......@@ -95,7 +98,31 @@ func (c Comm) dialPeerWithAddress(addr *NetAddress, persistent bool, node *Node)
if persistent {
peer.MakePersistent()
}
//Set peer Name 在启动peer对象之前,获取节点对象的peerName,即pid
resp, err := peer.mconn.gcli.Version2(context.Background(), &types.P2PVersion{
Nonce: time.Now().Unix(),
Version: node.nodeInfo.channelVersion,
AddrFrom: node.nodeInfo.GetExternalAddr().String(),
AddrRecv: addr.String(),
})
if err != nil {
peer.Close()
return nil, err
}
//check remote peer is self or duplicate peer
_, pub := node.nodeInfo.addrBook.GetPrivPubKey()
if node.Has(resp.UserAgent) ||resp.UserAgent == pub{
//发现同一个peerID 下有两个不同的ip,则把新连接的ip加入黑名单5分钟
prepeer:= node.GetRegisterPeer(resp.UserAgent)
log.Info("dialPeerWithAddress","duplicate connect:",prepeer.Addr(),addr.String(),resp.GetUserAgent())
peer.Close()
return nil, errors.New(fmt.Sprintf("duplicate connect %v",resp.UserAgent))
}
node.peerStore.Store(addr.String(),resp.UserAgent)
peer.SetPeerName(resp.UserAgent)
return peer, nil
}
......
......@@ -148,6 +148,7 @@ Retry:
opts = append(opts, grpc.Creds(node.nodeInfo.servCreds))
}
dl.server = grpc.NewServer(opts...)
dl.p2pserver = pServer
pb.RegisterP2PgserviceServer(dl.server, pServer)
......
......@@ -25,7 +25,7 @@ func (n *Node) destroyPeer(peer *Peer) {
"version support", peer.version.IsSupport())
n.nodeInfo.addrBook.RemoveAddr(peer.Addr())
n.remove(peer.Addr())
n.remove(peer.GetPeerName())
}
......@@ -44,10 +44,7 @@ func (n *Node) monitorErrPeer() {
n.nodeInfo.blacklist.Add(peer.Addr(), int64(3600*12))
continue
}
if peer.IsMaxInbouds {
n.destroyPeer(peer)
}
if !peer.GetRunning() {
if peer.IsMaxInbouds || !peer.GetRunning() {
n.destroyPeer(peer)
continue
}
......@@ -151,79 +148,61 @@ func (n *Node) getAddrFromOnline() {
peers, _ := n.GetActivePeers()
for _, peer := range peers { //向其他节点发起请求,获取地址列表
var addrlist []string
var addrlistMap map[string]int64
var err error
addrlistMap, err = pcli.GetAddrList(peer)
peerInfoList, err := pcli.GetAddrList(peer)
P2pComm.CollectPeerStat(err, peer)
if err != nil {
log.Error("getAddrFromOnline", "ERROR", err.Error())
continue
}
for addr := range addrlistMap {
addrlist = append(addrlist, addr)
}
for _, addr := range addrlist {
for _, info := range peerInfoList {
if !n.needMore() {
//如果已经达到25个节点,则优先删除种子节点
localBlockHeight, err := pcli.GetBlockHeight(n.nodeInfo)
if err != nil {
continue
}
//查询对方的高度,如果不小于自己的高度,或高度差在一定范围内,则剔除一个种子
if peerHeight, ok := addrlistMap[addr]; ok {
if localBlockHeight-peerHeight < 1024 {
if _, ok := seedsMap[addr]; ok {
continue
if localBlockHeight-info.GetHeader().GetHeight() < 1024 {
//随机删除连接的一个种子
n.innerSeeds.Range(func(k, v interface{}) bool {
if _, ok := n.cfgSeeds.Load(k.(string)); ok {
return true
}
//随机删除连接的一个种子
n.innerSeeds.Range(func(k, v interface{}) bool {
if n.Has(k.(string)) {
//不能包含在cfgseed中
if _, ok := n.cfgSeeds.Load(k.(string)); ok {
return true
}
n.remove(k.(string))
//remove inner seed
for _, peerInfo := range n.nodeInfo.peerInfos.GetPeerInfos() {
if peerInfo.Addr == k.(string) {
n.remove(peerInfo.GetName())
n.nodeInfo.addrBook.RemoveAddr(k.(string))
return false
}
return true
})
}
}
return false
})
}
time.Sleep(MonitorPeerInfoInterval)
continue
}
if !n.nodeInfo.blacklist.Has(addr) || !peerAddrFilter.Contains(addr) {
if ticktimes < 10 {
//如果连接了其他节点,优先不连接种子节点
if _, ok := n.innerSeeds.Load(addr); !ok {
//先把seed 排除在外
n.pubsub.FIFOPub(addr, "addr")
}
}
} else {
n.pubsub.FIFOPub(addr, "addr")
if !n.nodeInfo.blacklist.Has(peer.Addr()) || !peerAddrFilter.Contains(peer.Addr()) || !n.Has(peer.GetPeerName()) {
if ticktimes < 10 {
//如果连接了其他节点,优先不连接种子节点
if _, ok := n.innerSeeds.Load(peer.Addr()); !ok {
//先把seed 排除在外
n.pubsub.FIFOPub(peer.Addr(), "addr")
}
} else {
n.pubsub.FIFOPub(peer.Addr(), "addr")
}
}
}
}
}
}
func (n *Node) getAddrFromAddrBook() {
......@@ -249,7 +228,7 @@ func (n *Node) getAddrFromAddrBook() {
addrNetArr := n.nodeInfo.addrBook.GetPeers()
for _, addr := range addrNetArr {
if !n.Has(addr.String()) && !n.nodeInfo.blacklist.Has(addr.String()) {
if !n.nodeInfo.blacklist.Has(addr.String()) {
log.Debug("GetAddrFromOffline", "Add addr", addr.String())
if n.needMore() || n.CacheBoundsSize() < maxOutBoundNum {
......@@ -303,7 +282,7 @@ func (n *Node) nodeReBalance() {
for _, peer := range cachePeers {
inbounds, err := p2pcli.GetInPeersNum(peer)
if err != nil {
n.RemoveCachePeer(peer.Addr())
n.RemoveCachePeer(peer.GetPeerName())
peer.Close()
continue
}
......@@ -326,14 +305,14 @@ func (n *Node) nodeReBalance() {
//如果连接的节点最大负载量小于当前缓存节点的最大负载量
if MaxInBounds < MaxCacheInBounds {
n.RemoveCachePeer(MaxCacheInBoundPeer.Addr())
n.RemoveCachePeer(MaxCacheInBoundPeer.GetPeerName())
MaxCacheInBoundPeer.Close()
}
//如果最大的负载量比缓存中负载最小的小,则删除缓存中所有的节点
if MaxInBounds < MinCacheInBounds {
cachePeers := n.GetCacheBounds()
for _, peer := range cachePeers {
n.RemoveCachePeer(peer.Addr())
n.RemoveCachePeer(peer.GetPeerName())
peer.Close()
}
......@@ -347,7 +326,7 @@ func (n *Node) nodeReBalance() {
if MinCacheInBoundPeer != nil {
info, err := MinCacheInBoundPeer.GetPeerInfo()
if err != nil {
n.RemoveCachePeer(MinCacheInBoundPeer.Addr())
n.RemoveCachePeer(MinCacheInBoundPeer.GetPeerName())
MinCacheInBoundPeer.Close()
continue
}
......@@ -361,8 +340,8 @@ func (n *Node) nodeReBalance() {
n.addPeer(MinCacheInBoundPeer)
n.nodeInfo.addrBook.AddAddress(MinCacheInBoundPeer.peerAddr, nil)
n.remove(MaxInBoundPeer.Addr())
n.RemoveCachePeer(MinCacheInBoundPeer.Addr())
n.remove(MaxInBoundPeer.GetPeerName())
n.RemoveCachePeer(MinCacheInBoundPeer.GetPeerName())
}
}
}
......@@ -392,18 +371,18 @@ func (n *Node) monitorPeers() {
paddr := pinfo.GetAddr()
if name == selfName && !pinfo.GetSelf() { //发现连接到自己,立即删除
//删除节点数过低的节点
n.remove(pinfo.GetAddr())
n.remove(pinfo.GetName())
n.nodeInfo.addrBook.RemoveAddr(paddr)
n.nodeInfo.blacklist.Add(paddr, 0)
}
if localBlockHeight-peerheight > 2048 {
//删除比自己较低的节点
if addrMap, err := p2pcli.GetAddrList(peers[paddr]); err == nil {
if peerList, err := p2pcli.GetAddrList(peers[paddr]); err == nil {
for addr := range addrMap {
if !n.Has(addr) && !n.nodeInfo.blacklist.Has(addr) {
n.pubsub.FIFOPub(addr, "addr")
for peerName, info := range peerList {
if !n.Has(peerName) && !n.nodeInfo.blacklist.Has(info.Addr) {
n.pubsub.FIFOPub(info.Addr, "addr")
}
}
......@@ -417,7 +396,7 @@ func (n *Node) monitorPeers() {
continue
}
//删除节点数过低的节点
n.remove(paddr)
n.remove(pinfo.GetName())
n.nodeInfo.addrBook.RemoveAddr(paddr)
}
......@@ -459,7 +438,10 @@ func (n *Node) monitorDialPeers() {
//先查询有没有注册进去,避免同时重复连接相同的地址
continue
}
if _,ok:= n.peerStore.Load(addr.(string));ok{
//不对已经创建peer的ip发起重复连接
continue
}
netAddr, err := NewNetAddressString(addr.(string))
if err != nil {
continue
......@@ -470,7 +452,7 @@ func (n *Node) monitorDialPeers() {
}
//不对已经连接上的地址或者黑名单地址发起连接 TODO:连接足够时,对于连入的地址也不再去重复连接(客户端服务端只维护一条连接, 后续优化)
if n.Has(netAddr.String()) || n.nodeInfo.blacklist.Has(netAddr.String()) || n.HasCacheBound(netAddr.String()) {
if n.nodeInfo.blacklist.Has(netAddr.String()) {
log.Debug("DialPeers", "find hash", netAddr.String())
continue
}
......@@ -593,28 +575,24 @@ func (n *Node) monitorCfgSeeds() {
<-ticker.C
n.cfgSeeds.Range(func(k, v interface{}) bool {
if !n.Has(k.(string)) {
//尝试连接此节点
if n.needMore() { //如果需要更多的节点
n.pubsub.FIFOPub(k.(string), "addr")
} else {
//腾笼换鸟
peers, _ := n.GetActivePeers()
//选出当前连接的节点中,负载最大的节点
var MaxInBounds int32
MaxInBoundPeer := &Peer{}
for _, peer := range peers {
if peer.GetInBouns() > MaxInBounds {
MaxInBounds = peer.GetInBouns()
MaxInBoundPeer = peer
}
//尝试连接此节点
if n.needMore() { //如果需要更多的节点
n.pubsub.FIFOPub(k.(string), "addr")
} else {
peers, _ := n.GetActivePeers()
//选出当前连接的节点中,负载最大的节点
var maxInBounds int32
maxInBoundPeer := &Peer{}
for _, peer := range peers {
if peer.GetInBouns() > maxInBounds {
maxInBounds = peer.GetInBouns()
maxInBoundPeer = peer
}
n.remove(MaxInBoundPeer.Addr())
n.pubsub.FIFOPub(k.(string), "addr")
}
n.remove(maxInBoundPeer.GetPeerName())
n.pubsub.FIFOPub(k.(string), "addr")
}
return true
})
......
......@@ -162,6 +162,7 @@ func (na *NetAddress) DialTimeout(version int32, creds credentials.TransportCred
} else {
secOpt = grpc.WithTransportCredentials(creds)
}
//grpc.WithPerRPCCredentials
conn, err := grpc.Dial(na.String(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
......
......@@ -74,12 +74,13 @@ type Node struct {
omtx sync.Mutex
nodeInfo *NodeInfo
cmtx sync.Mutex
cacheBound map[string]*Peer
outBound map[string]*Peer
cacheBound map[string]*Peer //peerId-->peer
outBound map[string]*Peer //peerId-->peer
server *listener
listenPort int
innerSeeds sync.Map
cfgSeeds sync.Map
peerStore sync.Map//peerIp-->PeerName
closed int32
pubsub *pubsub.PubSub
chainCfg *types.Chain33Config
......@@ -221,15 +222,16 @@ func (n *Node) doNat() {
func (n *Node) addPeer(pr *Peer) {
n.omtx.Lock()
defer n.omtx.Unlock()
if peer, ok := n.outBound[pr.Addr()]; ok {
if peer, ok := n.outBound[pr.GetPeerName()]; ok {
log.Info("AddPeer", "delete peer", pr.Addr())
n.nodeInfo.addrBook.RemoveAddr(peer.Addr())
delete(n.outBound, pr.Addr())
delete(n.outBound, pr.GetPeerName())
peer.Close()
}
log.Debug("AddPeer", "peer", pr.Addr())
n.outBound[pr.Addr()] = pr
log.Debug("AddPeer", "peer", pr.Addr(), "pid:", pr.GetPeerName())
n.outBound[pr.GetPeerName()] = pr
pr.Start()
}
......@@ -237,21 +239,26 @@ func (n *Node) addPeer(pr *Peer) {
func (n *Node) AddCachePeer(pr *Peer) {
n.cmtx.Lock()
defer n.cmtx.Unlock()
n.cacheBound[pr.Addr()] = pr
n.cacheBound[pr.GetPeerName()] = pr
}
// RemoveCachePeer remove cacheBound by addr
func (n *Node) RemoveCachePeer(addr string) {
func (n *Node) RemoveCachePeer(peerName string) {
n.cmtx.Lock()
defer n.cmtx.Unlock()
delete(n.cacheBound, addr)
peer,ok:= n.cacheBound[peerName]
if ok{
peer.Close()
}
delete(n.cacheBound, peerName)
}
// HasCacheBound peer whether exists according to address
func (n *Node) HasCacheBound(addr string) bool {
func (n *Node) HasCacheBound(peerName string) bool {
n.cmtx.Lock()
defer n.cmtx.Unlock()
_, ok := n.cacheBound[addr]
_, ok := n.cacheBound[peerName]
return ok
}
......@@ -285,18 +292,18 @@ func (n *Node) Size() int {
}
// Has peer whether exists according to address
func (n *Node) Has(paddr string) bool {
func (n *Node) Has(peerName string) bool {
n.omtx.Lock()
defer n.omtx.Unlock()
_, ok := n.outBound[paddr]
_, ok := n.outBound[peerName]
return ok
}
// GetRegisterPeer return one peer according to paddr
func (n *Node) GetRegisterPeer(paddr string) *Peer {
func (n *Node) GetRegisterPeer(peerName string) *Peer {
n.omtx.Lock()
defer n.omtx.Unlock()
if peer, ok := n.outBound[paddr]; ok {
if peer, ok := n.outBound[peerName]; ok {
return peer
}
return nil
......@@ -324,21 +331,22 @@ func (n *Node) GetActivePeers() (map[string]*Peer, map[string]*types.Peer) {
var peers = make(map[string]*Peer)
for _, peer := range regPeers {
name := peer.GetPeerName()
if _, ok := infos[name]; ok {
peerName := peer.GetPeerName()
if _, ok := infos[peerName]; ok {
peers[name] = peer
peers[peerName] = peer
}
}
return peers, infos
}
func (n *Node) remove(peerAddr string) {
func (n *Node) remove(peerName string) {
n.omtx.Lock()
defer n.omtx.Unlock()
peer, ok := n.outBound[peerAddr]
peer, ok := n.outBound[peerName]
if ok {
delete(n.outBound, peerAddr)
delete(n.outBound, peerName)
peer.Close()
}
}
......@@ -346,8 +354,8 @@ func (n *Node) remove(peerAddr string) {
func (n *Node) removeAll() {
n.omtx.Lock()
defer n.omtx.Unlock()
for addr, peer := range n.outBound {
delete(n.outBound, addr)
for peerName, peer := range n.outBound {
delete(n.outBound, peerName)
peer.Close()
}
}
......@@ -395,7 +403,7 @@ func (n *Node) detectNodeAddr() {
}
//如果nat,getSelfExternalAddr 无法发现自己的外网地址,则把localaddr 赋值给外网地址
if len(externalIP) == 0 {
if externalIP == "" {
externalIP = laddr
}
......
......@@ -81,7 +81,6 @@ type P2p struct {
subCfg *subConfig
mgr *p2p.Manager
subChan chan interface{}
lock sync.Mutex
}
// New produce a p2p object
......@@ -148,8 +147,6 @@ func (network *P2p) isRestart() bool {
//CloseP2P Close network client
func (network *P2p) CloseP2P() {
network.lock.Lock()
defer network.lock.Unlock()
log.Info("p2p network start shutdown")
atomic.StoreInt32(&network.closed, 1)
//等待业务协程停止
......@@ -324,8 +321,6 @@ func (network *P2p) genAirDropKeyFromWallet() error {
//ReStart p2p
func (network *P2p) ReStart() {
network.lock.Lock()
defer network.lock.Unlock()
//避免重复
if !atomic.CompareAndSwapInt32(&network.restart, 0, 1) {
return
......@@ -405,11 +400,6 @@ func (network *P2p) subP2pMsg() {
func (network *P2p) processEvent(msg *queue.Message, taskIdx int64, eventFunc p2pEventFunc) {
network.lock.Lock()
defer network.lock.Unlock()
if network.isClose() {
return
}
//检测重启标志,停止分发事件,需要等待重启
if network.isRestart() {
log.Info("wait for p2p restart....")
......
......@@ -199,15 +199,16 @@ func testPeer(t *testing.T, p2p *P2p, q queue.Queue) {
t.Log(localP2P.node.CacheBoundsSize())
t.Log(localP2P.node.GetCacheBounds())
_, localPeerName := localP2P.node.nodeInfo.addrBook.GetPrivPubKey()
localP2P.node.RemoveCachePeer("localhost:12345")
assert.False(t, localP2P.node.HasCacheBound("localhost:12345"))
peer, err := P2pComm.dialPeer(remote, localP2P.node)
t.Log("peerName", peer.GetPeerName(), "self peerName", localPeerName)
assert.Nil(t, err)
defer peer.Close()
peer.MakePersistent()
localP2P.node.addPeer(peer)
_, localPeerName := localP2P.node.nodeInfo.addrBook.GetPrivPubKey()
var info *innerpeer
t.Log("WaitRegisterPeerStart...")
trytime := 0
......@@ -247,7 +248,7 @@ func testPeer(t *testing.T, p2p *P2p, q queue.Queue) {
localP2P.node.nodeInfo.peerInfos.SetPeerInfo(nil)
localP2P.node.nodeInfo.peerInfos.GetPeerInfo("1222")
t.Log(p2p.node.GetRegisterPeer("localhost:43802"))
t.Log(p2p.node.GetRegisterPeer(localPeerName))
//测试发送Ping消息
err = p2pcli.SendPing(peer, localP2P.node.nodeInfo)
assert.Nil(t, err)
......@@ -302,7 +303,7 @@ func testPeer(t *testing.T, p2p *P2p, q queue.Queue) {
localP2P.node.addPeer(peer)
assert.True(t, localP2P.node.needMore())
peer.Close()
localP2P.node.remove(peer.peerAddr.String())
localP2P.node.remove(peer.GetPeerName())
}
//测试grpc 多连接
......
......@@ -42,7 +42,7 @@ type NormalInterface interface {
SendPing(peer *Peer, nodeinfo *NodeInfo) error
GetBlockHeight(nodeinfo *NodeInfo) (int64, error)
CheckPeerNatOk(addr string, nodeInfo *NodeInfo) bool
GetAddrList(peer *Peer) (map[string]int64, error)
GetAddrList(peer *Peer) (map[string]*pb.P2PPeerInfo, error)
GetInPeersNum(peer *Peer) (int, error)
CheckSelf(addr string, nodeinfo *NodeInfo) bool
}
......@@ -199,9 +199,9 @@ func (m *Cli) GetInPeersNum(peer *Peer) (int, error) {
}
// GetAddrList return a map for address-prot
func (m *Cli) GetAddrList(peer *Peer) (map[string]int64, error) {
func (m *Cli) GetAddrList(peer *Peer) (map[string]*pb.P2PPeerInfo, error) {
var addrlist = make(map[string]int64)
var addrlist = make(map[string]*pb.P2PPeerInfo)
if peer == nil {
return addrlist, fmt.Errorf("pointer is nil")
}
......@@ -229,9 +229,11 @@ func (m *Cli) GetAddrList(peer *Peer) (map[string]int64, error) {
peerinfos := resp.GetPeerinfo()
for _, peerinfo := range peerinfos {
if peerinfo == nil {
continue
}
if localBlockHeight-peerinfo.GetHeader().GetHeight() < 2048 {
addrlist[fmt.Sprintf("%v:%v", peerinfo.GetAddr(), peerinfo.GetPort())] = peerinfo.GetHeader().GetHeight()
addrlist[peerinfo.GetName()] = peerinfo
}
}
return addrlist, nil
......
......@@ -35,6 +35,7 @@ func (p *Peer) Close() {
//unsub all topics
p.node.pubsub.Unsub(p.taskChan)
}
p.node.peerStore.Delete(p.Addr())
log.Info("Peer", "closed", p.Addr())
}
......@@ -138,18 +139,7 @@ func (p *Peer) heartBeat() {
if !p.GetRunning() {
return
}
peername, err := pcli.SendVersion(p, p.node.nodeInfo)
P2pComm.CollectPeerStat(err, p)
if err != nil || peername == "" {
//版本不对,直接关掉
log.Error("PeerHeartBeatSendVersion", "peerName", peername, "err", err)
p.Close()
return
}
log.Debug("sendVersion", "peer name", peername)
p.SetPeerName(peername) //设置连接的远程节点的节点名称
p.taskChan = p.node.pubsub.Sub("block", "tx", peername)
p.taskChan = p.node.pubsub.Sub("block", "tx", p.name)
go p.sendStream()
go p.readStream()
break
......@@ -309,7 +299,7 @@ func (p *Peer) readStream() {
log.Error("readStream", "err:", err.Error(), "peerIp", p.Addr())
continue
}
resp, err := p.mconn.gcli.ServerStreamSend(context.Background(), ping)
resp, err := p.mconn.gcli.ServerStreamSend(context.Background(), ping, grpc.WaitForReady(true))
P2pComm.CollectPeerStat(err, p)
if err != nil {
log.Error("readStream", "serverstreamsend,err:", err, "peer", p.Addr())
......@@ -330,18 +320,12 @@ func (p *Peer) readStream() {
data, err := resp.Recv()
if err != nil {
P2pComm.CollectPeerStat(err, p)
log.Error("readStream", "recv,err:", err.Error(), "peerAddr", p.Addr())
log.Error("readStream", "recv err", err.Error(), "peerAddr", p.Addr(), "data:", data)
errs := resp.CloseSend()
if errs != nil {
log.Error("CloseSend", "err", errs)
}
if status.Code(err) == codes.Unavailable {
break //重新创建新的流
}
log.Error("readStream", "recv,err:", err.Error(), "peerIp", p.Addr())
if status.Code(err) == codes.Unimplemented { //maybe order peers delete peer to BlackList
p.node.nodeInfo.blacklist.Add(p.Addr(), 3600)
return
......@@ -353,8 +337,9 @@ func (p *Peer) readStream() {
P2pComm.CollectPeerStat(err, p)
return
}
time.Sleep(time.Second) //have a rest
//其他stream 错误全部break ,重新创建新的stream
break
}
p.node.processRecvP2P(data, p.GetPeerName(), p.node.pubToPeer, p.Addr())
......
......@@ -65,7 +65,11 @@ func (n *Node) processSendP2P(rawData interface{}, peerVersion int32, pid, peerA
} else if ping, ok := rawData.(*types.P2PPing); ok {
doSend = true
sendData.Value = &types.BroadCastData_Ping{Ping: ping}
} else {
//没有合适的类型
log.Error("processSendP2P", "rawdata:", rawData)
}
log.Debug("ProcessSendP2PEnd", "peerAddr", peerAddr, "doSend", doSend)
return
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment