Commit 4e1878e1 authored by jiangpeng's avatar jiangpeng Committed by vipwzw

plugin:import p2p plugin gossip

parent 298964c4
......@@ -46,14 +46,7 @@ enableReExecLocal=true
enableReduceLocaldb=false
[p2p]
seeds=[]
enable=false
isSeed=false
serverStart=true
innerSeedEnable=true
useGithub=true
innerBounds=300
msgCacheSize=10240
driver="leveldb"
dbPath="paradatadir/addrbook"
dbCache=4
......
......@@ -44,18 +44,37 @@ enableReExecLocal=true
enableReduceLocaldb=true
[p2p]
seeds=[]
# p2p类型
types=["gossip", "dht"]
# 是否启动P2P服务
enable=true
isSeed=false
serverStart=true
innerSeedEnable=true
useGithub=true
innerBounds=300
msgCacheSize=10240
# 使用的数据库类型
driver="leveldb"
# 使用的数据库类型
dbPath="datadir/addrbook"
# 数据库缓存大小
dbCache=4
# GRPC请求日志文件
grpcLogFile="grpc33.log"
#waitPid 等待seed导入
waitPid=false
[p2p.sub.gossip]
# 种子节点,格式为ip:port,多个节点以逗号分隔,如seeds=["10.0.0.1:13802","10.0.0.2:13802","10.0.0.3:13802"]
seeds=[]
# 是否为种子节点
isSeed=false
# 是否作为服务端,对外提供服务
serverStart=true
# 是否使用Github获取种子节点
useGithub=true
# 最多的接入节点个数
innerBounds=300
[p2p.sub.dht]
seeds=[""]
[rpc]
jrpcBindAddr="localhost:8801"
......@@ -260,4 +279,4 @@ url="http://influxdb:8086"
database="chain33metrics"
username=""
password=""
namespace=""
\ No newline at end of file
namespace=""
......@@ -2,31 +2,30 @@ module github.com/33cn/plugin
go 1.12
replace github.com/33cn/chain33 => ../chain33
require (
github.com/33cn/chain33 v0.0.0-20200311054608-67f804d0ad2c
github.com/BurntSushi/toml v0.3.1
github.com/NebulousLabs/Sia v1.3.7
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd v0.0.0-20181013004428-67e573d211ac
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
github.com/coreos/etcd v3.3.15+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/davecgh/go-spew v1.1.1
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
github.com/golang/protobuf v1.3.4
github.com/hashicorp/golang-lru v0.5.0
github.com/pkg/errors v0.8.0
github.com/hashicorp/golang-lru v0.5.3
github.com/huin/goupnp v1.0.0
github.com/jackpal/go-nat-pmp v1.0.1
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.9.2 // indirect
github.com/prometheus/common v0.4.1 // indirect
github.com/prometheus/procfs v0.0.3 // indirect
github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d
github.com/rs/cors v1.6.0
github.com/spf13/cobra v0.0.3
github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.3.0
github.com/tjfoc/gmsm v0.0.0-20171124023159-98aa888b79d8
github.com/valyala/fasthttp v1.5.0
......@@ -34,7 +33,7 @@ require (
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.2.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392
golang.org/x/net v0.0.0-20200301022130-244492dfa37a
golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 // indirect
google.golang.org/grpc v1.28.0
......
This diff is collapsed.
......@@ -5,5 +5,6 @@ import (
_ "github.com/33cn/plugin/plugin/crypto/init" //crypto init
_ "github.com/33cn/plugin/plugin/dapp/init" //dapp init
_ "github.com/33cn/plugin/plugin/mempool/init" //mempool init
_ "github.com/33cn/plugin/plugin/p2p/init" //p2p init
_ "github.com/33cn/plugin/plugin/store/init" //store init
)
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
import (
"encoding/hex"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/types"
)
// Start addrbook start
func (a *AddrBook) Start() error {
log.Debug("addrbook start")
a.loadDb()
go a.saveRoutine()
return nil
}
// Close addrbook close
func (a *AddrBook) Close() {
a.Quit <- struct{}{}
a.bookDb.Close()
}
// AddrBook peer address manager
type AddrBook struct {
mtx sync.Mutex
ourAddrs map[string]*NetAddress
addrPeer map[string]*KnownAddress
p2pCfg *types.P2P
cfg *subConfig
keymtx sync.Mutex
privkey string
pubkey string
bookDb db.DB
Quit chan struct{}
}
// KnownAddress defines known address type
type KnownAddress struct {
kmtx sync.Mutex
Addr *NetAddress `json:"addr"`
Attempts uint `json:"attempts"`
LastAttempt time.Time `json:"lastattempt"`
LastSuccess time.Time `json:"lastsuccess"`
}
// GetPeerStat get peer stat
func (a *AddrBook) GetPeerStat(addr string) *KnownAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
if peer, ok := a.addrPeer[addr]; ok {
return peer
}
return nil
}
func (a *AddrBook) setAddrStat(addr string, run bool) (*KnownAddress, bool) {
a.mtx.Lock()
defer a.mtx.Unlock()
if peer, ok := a.addrPeer[addr]; ok {
if run {
peer.markGood()
} else {
peer.markAttempt()
}
return peer, true
}
return nil, false
}
// NewAddrBook create a addrbook
func NewAddrBook(cfg *types.P2P, subCfg *subConfig) *AddrBook {
a := &AddrBook{
ourAddrs: make(map[string]*NetAddress),
addrPeer: make(map[string]*KnownAddress),
p2pCfg: cfg,
Quit: make(chan struct{}, 1),
cfg: subCfg,
}
err := a.Start()
if err != nil {
return nil
}
return a
}
func newKnownAddress(addr *NetAddress) *KnownAddress {
return &KnownAddress{
kmtx: sync.Mutex{},
Addr: addr,
Attempts: 0,
LastAttempt: types.Now(),
}
}
func (ka *KnownAddress) markGood() {
ka.kmtx.Lock()
defer ka.kmtx.Unlock()
now := types.Now()
ka.LastAttempt = now
ka.Attempts = 0
ka.LastSuccess = now
}
// Copy a KnownAddress
func (ka *KnownAddress) Copy() *KnownAddress {
ka.kmtx.Lock()
ret := KnownAddress{
Addr: ka.Addr.Copy(),
Attempts: ka.Attempts,
LastAttempt: ka.LastAttempt,
LastSuccess: ka.LastSuccess,
}
ka.kmtx.Unlock()
return &ret
}
func (ka *KnownAddress) markAttempt() {
ka.kmtx.Lock()
defer ka.kmtx.Unlock()
now := types.Now()
ka.LastAttempt = now
ka.Attempts++
}
// GetAttempts return attempts
func (ka *KnownAddress) GetAttempts() uint {
ka.kmtx.Lock()
defer ka.kmtx.Unlock()
return ka.Attempts
}
// ISOurAddress determine if the address is ours
func (a *AddrBook) ISOurAddress(addr *NetAddress) bool {
a.mtx.Lock()
defer a.mtx.Unlock()
if _, ok := a.ourAddrs[addr.String()]; ok {
return true
}
return false
}
// IsOurStringAddress determine if the address is ours
func (a *AddrBook) IsOurStringAddress(addr string) bool {
a.mtx.Lock()
defer a.mtx.Unlock()
if _, ok := a.ourAddrs[addr]; ok {
return true
}
return false
}
// AddOurAddress add a address for ours
func (a *AddrBook) AddOurAddress(addr *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
log.Debug("Add our address to book", "addr", addr)
a.ourAddrs[addr.String()] = addr
}
// Size return addrpeer size
func (a *AddrBook) Size() int {
a.mtx.Lock()
defer a.mtx.Unlock()
return len(a.addrPeer)
}
type addrBookJSON struct {
Addrs []*KnownAddress `json:"addrs"`
}
func (a *AddrBook) saveToDb() {
a.mtx.Lock()
defer a.mtx.Unlock()
addrs := []*KnownAddress{}
seeds := a.cfg.Seeds
seedsMap := make(map[string]int)
for index, seed := range seeds {
seedsMap[seed] = index
}
for _, ka := range a.addrPeer {
if _, ok := seedsMap[ka.Addr.String()]; !ok {
addrs = append(addrs, ka.Copy())
}
}
if len(addrs) == 0 {
return
}
aJSON := &addrBookJSON{
Addrs: addrs,
}
jsonBytes, err := json.Marshal(aJSON)
if err != nil {
log.Error("Failed to save AddrBook to file", "err", err)
return
}
log.Debug("saveToDb", "addrs", string(jsonBytes))
err = a.bookDb.Set([]byte(addrkeyTag), jsonBytes)
if err != nil {
panic(err)
}
}
func (a *AddrBook) genPubkey(privkey string) string {
maxRetry := 10
for i := 0; i < maxRetry; i++ {
pubkey, err := P2pComm.Pubkey(privkey)
if err == nil {
return pubkey
}
}
panic(fmt.Sprintf("get pubkey from privkey:%s failed", privkey))
}
// Returns false if file does not exist.
// cmn.Panics if file is corrupt.
func (a *AddrBook) loadDb() bool {
dbPath := a.p2pCfg.DbPath + "/" + P2PTypeName
a.bookDb = db.NewDB("addrbook", a.p2pCfg.Driver, dbPath, a.p2pCfg.DbCache)
privkey, err := a.bookDb.Get([]byte(privKeyTag))
if len(privkey) != 0 && err == nil {
a.setKey(string(privkey), a.genPubkey(string(privkey)))
}
iteror := a.bookDb.Iterator(nil, nil, false)
for iteror.Next() {
if string(iteror.Key()) == addrkeyTag {
//读取存入的其他节点地址信息
aJSON := &addrBookJSON{}
dec := json.NewDecoder(strings.NewReader(string(iteror.Value())))
err := dec.Decode(aJSON)
if err != nil {
log.Crit("Error reading file %s: %v", a.p2pCfg.DbPath, err)
}
for _, ka := range aJSON.Addrs {
log.Debug("AddrBookloadDb", "peer", ka.Addr.String())
netaddr, err := NewNetAddressString(ka.Addr.String())
if err != nil {
continue
}
a.AddAddress(netaddr, ka)
}
}
}
return true
}
// Save saves the book.
func (a *AddrBook) Save() {
a.saveToDb()
}
func (a *AddrBook) saveRoutine() {
dumpAddressTicker := time.NewTicker(120 * time.Second)
defer dumpAddressTicker.Stop()
out:
for {
select {
case <-dumpAddressTicker.C:
a.Save()
case <-a.Quit:
break out
}
}
log.Info("Address handler done")
}
// AddAddress add a address for ours
// NOTE: addr must not be nil
func (a *AddrBook) AddAddress(addr *NetAddress, ka *KnownAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
log.Debug("Add address to book", "addr", addr)
if addr == nil {
return
}
if _, ok := a.ourAddrs[addr.String()]; ok {
// Ignore our own listener address.
return
}
//已经添加的不重复添加
if _, ok := a.addrPeer[addr.String()]; ok {
return
}
if nil == ka {
ka = newKnownAddress(addr)
}
a.addrPeer[ka.Addr.String()] = ka
}
// RemoveAddr remove address
func (a *AddrBook) RemoveAddr(peeraddr string) {
a.mtx.Lock()
defer a.mtx.Unlock()
if _, ok := a.addrPeer[peeraddr]; ok {
delete(a.addrPeer, peeraddr)
}
}
// GetPeers return peerlist
func (a *AddrBook) GetPeers() []*NetAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
var peerlist []*NetAddress
for _, peer := range a.addrPeer {
peerlist = append(peerlist, peer.Addr)
}
return peerlist
}
// GetAddrs return addrlist
func (a *AddrBook) GetAddrs() []string {
a.mtx.Lock()
defer a.mtx.Unlock()
var addrlist []string
for _, peer := range a.addrPeer {
if peer.GetAttempts() == 0 {
addrlist = append(addrlist, peer.Addr.String())
}
}
return addrlist
}
func (a *AddrBook) initKey() {
maxRetry := 10
for i := 0; i < maxRetry; i++ {
priv, pub, err := P2pComm.GenPrivPubkey()
if err == nil {
a.setKey(hex.EncodeToString(priv), hex.EncodeToString(pub))
return
}
}
panic(fmt.Sprintf("p2p initPrivPubkey failed"))
}
func (a *AddrBook) setKey(privkey, pubkey string) {
a.keymtx.Lock()
defer a.keymtx.Unlock()
a.privkey = privkey
a.pubkey = pubkey
}
//ResetPeerkey reset priv,pub key
func (a *AddrBook) ResetPeerkey(privkey, pubkey string) {
if privkey == "" || pubkey == "" {
a.initKey()
privkey, pubkey = a.GetPrivPubKey()
}
a.setKey(privkey, pubkey)
err := a.bookDb.Set([]byte(privKeyTag), []byte(privkey))
if err != nil {
panic(err)
}
}
// GetPrivPubKey return privkey and pubkey
func (a *AddrBook) GetPrivPubKey() (string, string) {
a.keymtx.Lock()
defer a.keymtx.Unlock()
return a.privkey, a.pubkey
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
import (
"bytes"
"encoding/binary"
"encoding/hex"
"math/rand"
"net"
"strings"
"time"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/types"
"google.golang.org/grpc"
)
// P2pComm p2p communication
var P2pComm Comm
// Comm information
type Comm struct{}
// AddrRouteble address router ,return enbale address
func (Comm) AddrRouteble(addrs []string, version int32) []string {
var enableAddrs []string
for _, addr := range addrs {
netaddr, err := NewNetAddressString(addr)
if err != nil {
log.Error("AddrRouteble", "NewNetAddressString", err.Error())
continue
}
conn, err := netaddr.DialTimeout(version)
if err != nil {
//log.Error("AddrRouteble", "DialTimeout", err.Error())
continue
}
err = conn.Close()
if err != nil {
log.Error("AddrRouteble", "conn.Close err", err.Error())
}
enableAddrs = append(enableAddrs, addr)
}
return enableAddrs
}
// RandStr return a rand string
func (c Comm) RandStr(n int) string {
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
r := rand.New(rand.NewSource(types.Now().Unix()))
b := make([]rune, n)
for i := range b {
b[i] = letters[r.Intn(len(letters))]
}
return string(b)
}
// GetLocalAddr get local address ,return address
func (c Comm) GetLocalAddr() string {
conn, err := net.Dial("udp", "114.114.114.114:80")
if err != nil {
log.Error(err.Error())
return ""
}
defer conn.Close()
log.Debug(strings.Split(conn.LocalAddr().String(), ":")[0])
return strings.Split(conn.LocalAddr().String(), ":")[0]
}
func (c Comm) dialPeerWithAddress(addr *NetAddress, persistent bool, node *Node) (*Peer, error) {
log.Debug("dialPeerWithAddress")
conn, err := addr.DialTimeout(node.nodeInfo.channelVersion)
if err != nil {
return nil, err
}
peer, err := c.newPeerFromConn(conn, addr, node)
if err != nil {
err = conn.Close()
return nil, err
}
peer.SetAddr(addr)
log.Debug("dialPeerWithAddress", "peer", peer.Addr(), "persistent:", persistent)
if persistent {
peer.MakePersistent()
}
return peer, nil
}
func (c Comm) newPeerFromConn(rawConn *grpc.ClientConn, remote *NetAddress, node *Node) (*Peer, error) {
// Key and NodeInfo are set after Handshake
p := NewPeer(rawConn, node, remote)
return p, nil
}
func (c Comm) dialPeer(addr *NetAddress, node *Node) (*Peer, error) {
log.Debug("dialPeer", "will connect", addr.String())
var persistent bool
if _, ok := node.cfgSeeds.Load(addr.String()); ok {
persistent = true
}
peer, err := c.dialPeerWithAddress(addr, persistent, node)
if err != nil {
log.Error("dialPeer", "nodeListenAddr", node.nodeInfo.listenAddr.str, "peerAddr", addr.str, "err", err)
return nil, err
}
//获取远程节点的信息 peer
log.Debug("dialPeer", "peer", peer)
return peer, nil
}
// GenPrivPubkey return key and pubkey in bytes
func (c Comm) GenPrivPubkey() ([]byte, []byte, error) {
cr, err := crypto.New(types.GetSignName("", types.SECP256K1))
if err != nil {
log.Error("CryPto Error", "Error", err.Error())
return nil, nil, err
}
key, err := cr.GenKey()
if err != nil {
log.Error("GenKey", "Error", err)
return nil, nil, err
}
return key.Bytes(), key.PubKey().Bytes(), nil
}
// Pubkey get pubkey by priv key
func (c Comm) Pubkey(key string) (string, error) {
cr, err := crypto.New(types.GetSignName("", types.SECP256K1))
if err != nil {
log.Error("CryPto Error", "Error", err.Error())
return "", err
}
pribyts, err := hex.DecodeString(key)
if err != nil {
log.Error("DecodeString Error", "Error", err.Error())
return "", err
}
priv, err := cr.PrivKeyFromBytes(pribyts)
if err != nil {
log.Error("Load PrivKey", "Error", err.Error())
return "", err
}
return hex.EncodeToString(priv.PubKey().Bytes()), nil
}
// NewPingData get ping node ,return p2pping
func (c Comm) NewPingData(nodeInfo *NodeInfo) (*types.P2PPing, error) {
randNonce := rand.New(rand.NewSource(time.Now().UnixNano())).Int63()
ping := &types.P2PPing{Nonce: randNonce, Addr: nodeInfo.GetExternalAddr().IP.String(), Port: int32(nodeInfo.GetExternalAddr().Port)}
var err error
p2pPrivKey, _ := nodeInfo.addrBook.GetPrivPubKey()
ping, err = c.Signature(p2pPrivKey, ping)
if err != nil {
log.Error("Signature", "Error", err.Error())
return nil, err
}
return ping, nil
}
// Signature nodedata by key
func (c Comm) Signature(key string, in *types.P2PPing) (*types.P2PPing, error) {
data := types.Encode(in)
cr, err := crypto.New(types.GetSignName("", types.SECP256K1))
if err != nil {
log.Error("CryPto Error", "Error", err.Error())
return nil, err
}
pribyts, err := hex.DecodeString(key)
if err != nil {
log.Error("DecodeString Error", "Error", err.Error())
return nil, err
}
priv, err := cr.PrivKeyFromBytes(pribyts)
if err != nil {
log.Error("Load PrivKey", "Error", err.Error())
return nil, err
}
in.Sign = new(types.Signature)
in.Sign.Signature = priv.Sign(data).Bytes()
in.Sign.Ty = types.SECP256K1
in.Sign.Pubkey = priv.PubKey().Bytes()
return in, nil
}
// CheckSign check signature data
func (c Comm) CheckSign(in *types.P2PPing) bool {
sign := in.GetSign()
if sign == nil {
log.Error("CheckSign Get sign err")
return false
}
cr, err := crypto.New(types.GetSignName("", int(sign.Ty)))
if err != nil {
log.Error("CheckSign", "crypto.New err", err.Error())
return false
}
pub, err := cr.PubKeyFromBytes(sign.Pubkey)
if err != nil {
log.Error("CheckSign", "PubKeyFromBytes err", err.Error())
return false
}
signbytes, err := cr.SignatureFromBytes(sign.Signature)
if err != nil {
log.Error("CheckSign", "SignatureFromBytes err", err.Error())
return false
}
in.Sign = nil
data := types.Encode(in)
if pub.VerifyBytes(data, signbytes) {
in.Sign = sign
return true
}
return false
}
// CollectPeerStat collect peer stat and report
func (c Comm) CollectPeerStat(err error, peer *Peer) {
if err != nil {
if err == types.ErrVersion || err == types.ErrP2PChannel {
peer.version.SetSupport(false)
}
peer.peerStat.NotOk()
} else {
peer.peerStat.Ok()
}
c.reportPeerStat(peer)
}
func (c Comm) reportPeerStat(peer *Peer) {
timeout := time.NewTimer(time.Second)
select {
case peer.node.nodeInfo.monitorChan <- peer:
case <-timeout.C:
timeout.Stop()
return
}
if !timeout.Stop() {
<-timeout.C
}
}
// BytesToInt32 bytes to int32 type
func (c Comm) BytesToInt32(b []byte) int32 {
bytesBuffer := bytes.NewBuffer(b)
var tmp int32
err := binary.Read(bytesBuffer, binary.LittleEndian, &tmp)
if err != nil {
log.Error("BytesToInt32", "binary.Read err", err.Error())
return tmp
}
return tmp
}
// Int32ToBytes int32 to bytes type
func (c Comm) Int32ToBytes(n int32) []byte {
tmp := n
bytesBuffer := bytes.NewBuffer([]byte{})
err := binary.Write(bytesBuffer, binary.LittleEndian, tmp)
if err != nil {
return nil
}
return bytesBuffer.Bytes()
}
// GrpcConfig grpc config
func (c Comm) GrpcConfig() grpc.ServiceConfig {
var defaulttimeout = 20 * time.Second
var MethodConf = map[string]grpc.MethodConfig{
"/types.p2pgservice/Ping": {Timeout: &defaulttimeout},
"/types.p2pgservice/Version2": {Timeout: &defaulttimeout},
"/types.p2pgservice/BroadCastTx": {Timeout: &defaulttimeout},
"/types.p2pgservice/GetMemPool": {Timeout: &defaulttimeout},
"/types.p2pgservice/GetBlocks": {Timeout: &defaulttimeout},
"/types.p2pgservice/GetPeerInfo": {Timeout: &defaulttimeout},
"/types.p2pgservice/BroadCastBlock": {Timeout: &defaulttimeout},
"/types.p2pgservice/GetAddr": {Timeout: &defaulttimeout},
"/types.p2pgservice/GetHeaders": {Timeout: &defaulttimeout},
"/types.p2pgservice/RemotePeerAddr": {Timeout: &defaulttimeout},
"/types.p2pgservice/RemotePeerNatOk": {Timeout: &defaulttimeout},
}
return grpc.ServiceConfig{Methods: MethodConf}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
import (
pb "github.com/33cn/chain33/types"
"google.golang.org/grpc"
)
// MConnection contains node, grpc client, p2pgserviceClient, netaddress, peer
type MConnection struct {
node *Node
gconn *grpc.ClientConn
gcli pb.P2PgserviceClient // source connection
remoteAddress *NetAddress //peer 的地址
peer *Peer
}
// NewMConnection wraps net.Conn and creates multiplex connection
func NewMConnection(conn *grpc.ClientConn, remote *NetAddress, peer *Peer) *MConnection {
log.Info("NewMConnection p2p client", "addr", remote)
mconn := &MConnection{
gconn: conn,
gcli: pb.NewP2PgserviceClient(conn),
peer: peer,
}
mconn.node = peer.node
mconn.remoteAddress = remote
return mconn
}
// Close mconnection
func (c *MConnection) Close() {
err := c.gconn.Close()
if err != nil {
log.Error("Mconnection", "Close err", err)
}
log.Debug("Mconnection", "Close", "^_^!")
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
import (
"time"
)
// time limit for timeout
var (
UpdateState = 2 * time.Second
PingTimeout = 14 * time.Second
DefaultSendTimeout = 10 * time.Second
DialTimeout = 5 * time.Second
mapUpdateInterval = 45 * time.Hour
StreamPingTimeout = 20 * time.Second
MonitorPeerInfoInterval = 10 * time.Second
MonitorPeerNumInterval = 30 * time.Second
MonitorReBalanceInterval = 15 * time.Minute
GetAddrFromAddrBookInterval = 5 * time.Second
GetAddrFromOnlineInterval = 5 * time.Second
GetAddrFromGitHubInterval = 5 * time.Minute
CheckActivePeersInterVal = 5 * time.Second
CheckBlackListInterVal = 30 * time.Second
CheckCfgSeedsInterVal = 1 * time.Minute
)
const (
msgTx = 1
msgBlock = 2
tryMapPortTimes = 20
maxSamIPNum = 20
)
const (
//defalutNatPort = 23802
maxOutBoundNum = 25
stableBoundNum = 15
maxAttemps = 5
protocol = "tcp"
externalPortTag = "externalport"
)
const (
nodeNetwork = 1
nodeGetUTXO = 2
nodeBloom = 4
)
const (
// Service service number
Service int32 = nodeBloom + nodeNetwork + nodeGetUTXO
)
// leveldb 中p2p privkey,addrkey
const (
addrkeyTag = "addrs"
privKeyTag = "privkey"
)
//TTL
const (
DefaultLtTxBroadCastTTL = 3
DefaultMaxTxBroadCastTTL = 25
DefaultMinLtBlockTxNum = 5
)
// P2pCacheTxSize p2pcache size of transaction
const (
PeerAddrCacheNum = 1000
//接收的交易哈希过滤缓存设为mempool最大接收交易量
TxRecvFilterCacheNum = 10240
BlockFilterCacheNum = 50
//发送过滤主要用于发送时冗余检测, 发送完即可以被删除, 维护较小缓存数
TxSendFilterCacheNum = 500
BlockCacheNum = 10
MaxBlockCacheByteSize = 100 * 1024 * 1024
)
// TestNetSeeds test seeds of net
var TestNetSeeds = []string{
"47.97.223.101:13802",
}
// MainNetSeeds built-in list of seed
var MainNetSeeds = []string{
"116.62.14.25:13802",
"114.55.95.234:13802",
"115.28.184.14:13802",
"39.106.166.159:13802",
"39.106.193.172:13802",
"47.106.114.93:13802",
"120.76.100.165:13802",
"120.24.85.66:13802",
"120.24.92.123:13802",
"161.117.7.127:13802",
"161.117.9.54:13802",
"161.117.5.95:13802",
"161.117.7.28:13802",
"161.117.8.242:13802",
"161.117.6.193:13802",
"161.117.8.158:13802",
"47.88.157.209:13802",
"47.74.215.41:13802",
"47.74.128.69:13802",
"47.74.178.226:13802",
"47.88.154.76:13802",
"47.74.151.226:13802",
"47.245.31.41:13802",
"47.245.57.239:13802",
"47.245.54.118:13802",
"47.245.54.121:13802",
"47.245.56.140:13802",
"47.245.52.211:13802",
"47.91.88.195:13802",
"47.91.72.71:13802",
"47.91.91.38:13802",
"47.91.94.224:13802",
"47.91.75.191:13802",
"47.254.152.172:13802",
"47.252.0.181:13802",
"47.90.246.246:13802",
"47.90.208.100:13802",
"47.89.182.70:13802",
"47.90.207.173:13802",
"47.89.188.54:13802",
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
//p2p 网络模块:
//主要的功能是实现区块链数据的广播,交易的广播的功能。
//p2p 模块的接口:
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
import (
"fmt"
"io"
"sort"
"sync"
"sync/atomic"
"time"
pb "github.com/33cn/chain33/types"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
// Invs datastruct
type Invs []*pb.Inventory
//Len size of the Invs data
func (i Invs) Len() int {
return len(i)
}
//Less Sort from low to high
func (i Invs) Less(a, b int) bool {
return i[a].GetHeight() < i[b].GetHeight()
}
//Swap the param
func (i Invs) Swap(a, b int) {
i[a], i[b] = i[b], i[a]
}
// DownloadJob defines download job type
type DownloadJob struct {
wg sync.WaitGroup
p2pcli *Cli
canceljob int32
mtx sync.Mutex
busyPeer map[string]*peerJob
downloadPeers []*Peer
MaxJob int32
retryItems Invs
}
type peerJob struct {
limit int32
}
// NewDownloadJob create a downloadjob object
func NewDownloadJob(p2pcli *Cli, peers []*Peer) *DownloadJob {
job := new(DownloadJob)
job.p2pcli = p2pcli
job.busyPeer = make(map[string]*peerJob)
job.downloadPeers = peers
job.retryItems = make([]*pb.Inventory, 0)
job.MaxJob = 5
if len(peers) < 5 {
job.MaxJob = 10
}
//job.okChan = make(chan *pb.Inventory, 512)
return job
}
func (d *DownloadJob) getDownloadPeers() []*Peer {
d.mtx.Lock()
defer d.mtx.Unlock()
peers := make([]*Peer, len(d.downloadPeers))
copy(peers, d.downloadPeers)
return peers
}
func (d *DownloadJob) isBusyPeer(pid string) bool {
d.mtx.Lock()
defer d.mtx.Unlock()
if pjob, ok := d.busyPeer[pid]; ok {
return atomic.LoadInt32(&pjob.limit) >= d.MaxJob //每个节点最多同时接受10个下载任务
}
return false
}
func (d *DownloadJob) getJobNum(pid string) int32 {
d.mtx.Lock()
defer d.mtx.Unlock()
if pjob, ok := d.busyPeer[pid]; ok {
return atomic.LoadInt32(&pjob.limit)
}
return 0
}
func (d *DownloadJob) setBusyPeer(pid string) {
d.mtx.Lock()
defer d.mtx.Unlock()
if pjob, ok := d.busyPeer[pid]; ok {
atomic.AddInt32(&pjob.limit, 1)
d.busyPeer[pid] = pjob
return
}
d.busyPeer[pid] = &peerJob{1}
}
func (d *DownloadJob) removePeer(pid string) {
d.mtx.Lock()
defer d.mtx.Unlock()
for i, pr := range d.downloadPeers {
if pr.GetPeerName() == pid {
if i != len(d.downloadPeers)-1 {
d.downloadPeers = append(d.downloadPeers[:i], d.downloadPeers[i+1:]...)
return
}
d.downloadPeers = d.downloadPeers[:i]
return
}
}
}
// ResetDownloadPeers reset download peers
func (d *DownloadJob) ResetDownloadPeers(peers []*Peer) {
d.mtx.Lock()
defer d.mtx.Unlock()
copy(d.downloadPeers, peers)
}
func (d *DownloadJob) avalidPeersNum() int {
d.mtx.Lock()
defer d.mtx.Unlock()
return len(d.downloadPeers)
}
func (d *DownloadJob) setFreePeer(pid string) {
d.mtx.Lock()
defer d.mtx.Unlock()
if pjob, ok := d.busyPeer[pid]; ok {
if atomic.AddInt32(&pjob.limit, -1) <= 0 {
delete(d.busyPeer, pid)
return
}
d.busyPeer[pid] = pjob
}
}
//加入到重试数组
func (d *DownloadJob) appendRetryItem(item *pb.Inventory) {
d.mtx.Lock()
defer d.mtx.Unlock()
d.retryItems = append(d.retryItems, item)
}
// GetFreePeer get free peer ,return peer
func (d *DownloadJob) GetFreePeer(blockHeight int64) *Peer {
infos := d.p2pcli.network.node.nodeInfo.peerInfos.GetPeerInfos()
var minJobNum int32 = 10
var bestPeer *Peer
//对download peer读取需要增加保护
for _, peer := range d.getDownloadPeers() {
peerName := peer.GetPeerName()
if d.isBusyPeer(peerName) {
continue
}
if jobNum := d.getJobNum(peerName); jobNum < minJobNum &&
infos[peerName].GetHeader().GetHeight() >= blockHeight {
minJobNum = jobNum
bestPeer = peer
}
}
return bestPeer
}
// CancelJob cancel the downloadjob object
func (d *DownloadJob) CancelJob() {
atomic.StoreInt32(&d.canceljob, 1)
}
func (d *DownloadJob) isCancel() bool {
return atomic.LoadInt32(&d.canceljob) == 1
}
// DownloadBlock download the block
func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory,
bchan chan *pb.BlockPid) []*pb.Inventory {
if d.isCancel() {
return nil
}
for _, inv := range invs { //让一个节点一次下载一个区块,下载失败区块,交给下一轮下载
//获取当前任务数最少的节点,相当于 下载速度最快的节点
freePeer := d.GetFreePeer(inv.GetHeight())
for freePeer == nil {
log.Debug("no free peer")
time.Sleep(time.Millisecond * 100)
freePeer = d.GetFreePeer(inv.GetHeight())
}
d.setBusyPeer(freePeer.GetPeerName())
d.wg.Add(1)
go func(peer *Peer, inv *pb.Inventory) {
defer d.wg.Done()
err := d.syncDownloadBlock(peer, inv, bchan)
if err != nil {
d.removePeer(peer.GetPeerName())
log.Error("DownloadBlock:syncDownloadBlock", "height", inv.GetHeight(), "peer", peer.GetPeerName(), "err", err)
d.appendRetryItem(inv) //失败的下载,放在下一轮ReDownload进行下载
} else {
d.setFreePeer(peer.GetPeerName())
}
}(freePeer, inv)
}
//等待下载任务
d.wg.Wait()
retryInvs := d.retryItems
//存在重试项
if retryInvs.Len() > 0 {
d.retryItems = make([]*pb.Inventory, 0)
sort.Sort(retryInvs)
}
return retryInvs
}
func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan chan *pb.BlockPid) error {
//每次下载一个高度的数据,通过bchan返回上层
if peer == nil {
return fmt.Errorf("peer is not exist")
}
if !peer.GetRunning() {
return fmt.Errorf("peer not running")
}
var p2pdata pb.P2PGetData
p2pdata.Version = d.p2pcli.network.node.nodeInfo.channelVersion
p2pdata.Invs = []*pb.Inventory{inv}
ctx, cancel := context.WithCancel(context.Background())
//主动取消grpc流, 即时释放资源
defer cancel()
beg := pb.Now()
resp, err := peer.mconn.gcli.GetData(ctx, &p2pdata, grpc.FailFast(true))
P2pComm.CollectPeerStat(err, peer)
if err != nil {
log.Error("syncDownloadBlock", "GetData err", err.Error())
return err
}
defer func() {
log.Debug("download", "frompeer", peer.Addr(), "blockheight", inv.GetHeight(), "downloadcost", pb.Since(beg))
}()
invData, err := resp.Recv()
if err != nil && err != io.EOF {
log.Error("syncDownloadBlock", "RecvData err", err.Error())
return err
}
//返回单个数据条目
if invData == nil || len(invData.Items) != 1 {
return fmt.Errorf("InvalidRecvData")
}
block := invData.Items[0].GetBlock()
log.Debug("download", "frompeer", peer.Addr(), "blockheight", inv.GetHeight(), "blockSize", block.Size())
bchan <- &pb.BlockPid{Pid: peer.GetPeerName(), Block: block} //加入到输出通道
return nil
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
import (
"fmt"
"math/rand"
"net"
"sync"
"time"
pb "github.com/33cn/chain33/types"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
pr "google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
)
// Listener the actions
type Listener interface {
Close1()
Start()
}
// Start server start
func (l *listener) Start() {
l.p2pserver.Start()
go l.server.Serve(l.netlistener)
}
// Close server close
func (l *listener) Close() {
err := l.netlistener.Close()
if err != nil {
log.Error("Close", "netlistener.Close() err", err)
}
go l.server.Stop()
l.p2pserver.Close()
log.Info("stop", "server", "close")
}
type listener struct {
server *grpc.Server
nodeInfo *NodeInfo
p2pserver *P2pserver
node *Node
netlistener net.Listener
}
// newListener produce a server object
func newListener(protocol string, node *Node) *listener {
Retry:
log.Info("newListener", "localPort", node.listenPort)
l, err := net.Listen(protocol, fmt.Sprintf(":%v", node.listenPort))
if err != nil {
log.Error("Failed to listen", "Error", err.Error())
for {
randPort := rand.New(rand.NewSource(time.Now().UnixNano())).Int31n(65535)
if int(randPort) == node.listenPort || randPort < 2048 {
continue
}
node.listenPort = int(randPort)
break
}
log.Info("Flush Listen Port", "RandPort", node.listenPort)
goto Retry
}
dl := &listener{
nodeInfo: node.nodeInfo,
node: node,
netlistener: l,
}
pServer := NewP2pServer()
pServer.node = dl.node
//一元拦截器 接口调用之前进行校验拦截
interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
//checkAuth
getctx, ok := pr.FromContext(ctx)
if !ok {
return nil, fmt.Errorf("")
}
ip, _, err := net.SplitHostPort(getctx.Addr.String())
if err != nil {
return nil, err
}
if pServer.node.nodeInfo.blacklist.Has(ip) {
return nil, fmt.Errorf("blacklist %v no authorized", ip)
}
if !auth(ip) {
log.Error("interceptor", "auth faild", ip)
//把相应的IP地址加入黑名单中
pServer.node.nodeInfo.blacklist.Add(ip, int64(3600))
return nil, fmt.Errorf("auth faild %v no authorized", ip)
}
// Continue processing the request
return handler(ctx, req)
}
//流拦截器
interceptorStream := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
getctx, ok := pr.FromContext(ss.Context())
if !ok {
log.Error("interceptorStream", "FromContext error", "")
return fmt.Errorf("stream Context err")
}
ip, _, err := net.SplitHostPort(getctx.Addr.String())
if err != nil {
return err
}
if pServer.node.nodeInfo.blacklist.Has(ip) {
return fmt.Errorf("blacklist %v no authorized", ip)
}
if !auth(ip) {
log.Error("interceptorStream", "auth faild", ip)
//把相应的IP地址加入黑名单中
pServer.node.nodeInfo.blacklist.Add(ip, int64(3600))
return fmt.Errorf("auth faild %v no authorized", ip)
}
return handler(srv, ss)
}
var opts []grpc.ServerOption
opts = append(opts, grpc.UnaryInterceptor(interceptor), grpc.StreamInterceptor(interceptorStream))
maxMsgSize := pb.MaxBlockSize + 1024*1024 //最大传输数据 最大区块大小
msgRecvOp := grpc.MaxRecvMsgSize(maxMsgSize) //设置最大接收数据
msgSendOp := grpc.MaxSendMsgSize(maxMsgSize) //设置最大发送数据
kaep := keepalive.EnforcementPolicy{
MinTime: 10 * time.Second, //只允许不低于10s频率的ping周期
PermitWithoutStream: true,
}
var keepparm keepalive.ServerParameters
keepparm.Time = 5 * time.Minute
keepparm.Timeout = 50 * time.Second
maxStreams := grpc.MaxConcurrentStreams(1000)
keepOp := grpc.KeepaliveParams(keepparm)
StatsOp := grpc.StatsHandler(&statshandler{})
opts = append(opts, msgRecvOp, msgSendOp, grpc.KeepaliveEnforcementPolicy(kaep), keepOp, maxStreams, StatsOp)
dl.server = grpc.NewServer(opts...)
dl.p2pserver = pServer
pb.RegisterP2PgserviceServer(dl.server, pServer)
return dl
}
type statshandler struct{}
func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return context.WithValue(ctx, connCtxKey{}, info)
}
func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return ctx
}
func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) {
if ctx == nil {
return
}
tag, ok := getConnTagFromContext(ctx)
if !ok {
log.Error("can not get conn tag")
return
}
ip, _, err := net.SplitHostPort(tag.RemoteAddr.String())
if err != nil {
return
}
connsMutex.Lock()
defer connsMutex.Unlock()
if _, ok := conns[ip]; !ok {
conns[ip] = 0
}
switch s.(type) {
case *stats.ConnBegin:
conns[ip] = conns[ip] + 1
case *stats.ConnEnd:
conns[ip] = conns[ip] - 1
if conns[ip] <= 0 {
delete(conns, ip)
}
log.Debug("ip connend", "ip", ip, "n", conns[ip])
default:
log.Error("illegal ConnStats type")
}
}
// HandleRPC 为空.
func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {}
type connCtxKey struct{}
var connsMutex sync.Mutex
var conns = make(map[string]uint)
func getConnTagFromContext(ctx context.Context) (*stats.ConnTagInfo, bool) {
tag, ok := ctx.Value(connCtxKey{}).(*stats.ConnTagInfo)
return tag, ok
}
func auth(checkIP string) bool {
connsMutex.Lock()
defer connsMutex.Unlock()
count, ok := conns[checkIP]
if ok && count > maxSamIPNum {
log.Error("AuthCheck", "sameIP num:", count, "checkIP:", checkIP, "diffIP num:", len(conns))
return false
}
return true
}
This diff is collapsed.
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package nat provides access to common network port mapping protocols.
package nat
import (
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/33cn/chain33/common/log/log15"
natpmp "github.com/jackpal/go-nat-pmp"
)
var (
log = log15.New("module", "p2p.nat")
)
// Interface An implementation of nat.Interface can map local ports to ports
// accessible from the Internet.
type Interface interface {
// These methods manage a mapping between a port on the local
// machine to a port that can be connected to from the internet.
//
// protocol is "UDP" or "TCP". Some implementations allow setting
// a display name for the mapping. The mapping may be removed by
// the gateway when its lifetime ends.
AddMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error
DeleteMapping(protocol string, extport, intport int) error
// This method should return the external (Internet-facing)
// address of the gateway device.
ExternalIP() (net.IP, error)
// Should return name of the method. This is used for logging.
String() string
}
// Parse parses a NAT interface description.
// The following formats are currently accepted.
// Note that mechanism names are not case-sensitive.
//
// "" or "none" return nil
// "extip:77.12.33.4" will assume the local machine is reachable on the given IP
// "any" uses the first auto-detected mechanism
// "upnp" uses the Universal Plug and Play protocol
// "pmp" uses NAT-PMP with an auto-detected gateway address
// "pmp:192.168.0.1" uses NAT-PMP with the given gateway address
func Parse(spec string) (Interface, error) {
var (
parts = strings.SplitN(spec, ":", 2)
mech = strings.ToLower(parts[0])
ip net.IP
)
if len(parts) > 1 {
ip = net.ParseIP(parts[1])
if ip == nil {
return nil, errors.New("invalid IP address")
}
}
switch mech {
case "", "none", "off":
return nil, nil
case "any", "auto", "on":
return Any(), nil
case "extip", "ip":
if ip == nil {
return nil, errors.New("missing IP address")
}
return ExtIP(ip), nil
case "upnp":
return UPnP(), nil
case "pmp", "natpmp", "nat-pmp":
return PMP(ip), nil
default:
return nil, fmt.Errorf("unknown mechanism %q", parts[0])
}
}
const (
mapTimeout = 20 * time.Minute
mapUpdateInterval = 15 * time.Minute
)
// Map adds a port mapping on m and keeps it alive until c is closed.
// This function is typically invoked in its own goroutine.
// Map adds a port mapping on m and keeps it alive until c is closed.
// This function is typically invoked in its own goroutine.
func Map(m Interface, c chan struct{}, protocol string, extport, intport int, name string) {
refresh := time.NewTimer(mapUpdateInterval)
defer func() {
refresh.Stop()
log.Info("Deleting port mapping")
err := m.DeleteMapping(protocol, extport, intport)
if err != nil {
return
}
}()
if err := m.AddMapping(protocol, extport, intport, name, mapTimeout); err != nil {
log.Error("Couldn't add port mapping", "err", err)
} else {
log.Info("Mapped network port")
}
for {
select {
case _, ok := <-c:
if !ok {
return
}
case <-refresh.C:
log.Info("Refreshing port mapping")
if err := m.AddMapping(protocol, extport, intport, name, mapTimeout); err != nil {
log.Error("Couldn't add port mapping", "err", err)
}
refresh.Reset(mapUpdateInterval)
}
}
}
// ExtIP assumes that the local machine is reachable on the given
// external IP address, and that any required ports were mapped manually.
// Mapping operations will not return an error but won't actually do anything.
func ExtIP(ip net.IP) Interface {
if ip == nil {
panic("IP must not be nil")
}
return extIP(ip)
}
type extIP net.IP
func (n extIP) ExternalIP() (net.IP, error) { return net.IP(n), nil }
func (n extIP) String() string { return fmt.Sprintf("ExtIP(%v)", net.IP(n)) }
// These do nothing.
func (extIP) AddMapping(string, int, int, string, time.Duration) error { return nil }
func (extIP) DeleteMapping(string, int, int) error { return nil }
// Any returns a port mapper that tries to discover any supported
// mechanism on the local network.
func Any() Interface {
// TODO: attempt to discover whether the local machine has an
// Internet-class address. Return ExtIP in this case.
return startautodisc("UPnP or NAT-PMP", func() Interface {
found := make(chan Interface, 2)
go func() { found <- discoverUPnP() }()
go func() { found <- discoverPMP() }()
for i := 0; i < cap(found); i++ {
if c := <-found; c != nil {
return c
}
}
return nil
})
}
// UPnP returns a port mapper that uses UPnP. It will attempt to
// discover the address of your router using UDP broadcasts.
func UPnP() Interface {
return startautodisc("UPnP", discoverUPnP)
}
// PMP returns a port mapper that uses NAT-PMP. The provided gateway
// address should be the IP of your router. If the given gateway
// address is nil, PMP will attempt to auto-discover the router.
func PMP(gateway net.IP) Interface {
if gateway != nil {
return &pmp{gw: gateway, c: natpmp.NewClient(gateway)}
}
return startautodisc("NAT-PMP", discoverPMP)
}
// autodisc represents a port mapping mechanism that is still being
// auto-discovered. Calls to the Interface methods on this type will
// wait until the discovery is done and then call the method on the
// discovered mechanism.
//
// This type is useful because discovery can take a while but we
// want return an Interface value from UPnP, PMP and Auto immediately.
type autodisc struct {
what string // type of interface being autodiscovered
once sync.Once
doit func() Interface
mu sync.Mutex
found Interface
}
func startautodisc(what string, doit func() Interface) Interface {
// TODO: monitor network configuration and rerun doit when it changes.
return &autodisc{what: what, doit: doit}
}
func (n *autodisc) AddMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error {
if err := n.wait(); err != nil {
return err
}
return n.found.AddMapping(protocol, extport, intport, name, lifetime)
}
func (n *autodisc) DeleteMapping(protocol string, extport, intport int) error {
if err := n.wait(); err != nil {
return err
}
return n.found.DeleteMapping(protocol, extport, intport)
}
func (n *autodisc) ExternalIP() (net.IP, error) {
if err := n.wait(); err != nil {
return nil, err
}
return n.found.ExternalIP()
}
func (n *autodisc) String() string {
n.mu.Lock()
defer n.mu.Unlock()
if n.found == nil {
return n.what
}
return n.found.String()
}
// wait blocks until auto-discovery has been performed.
func (n *autodisc) wait() error {
n.once.Do(func() {
n.mu.Lock()
n.found = n.doit()
n.mu.Unlock()
})
if n.found == nil {
return fmt.Errorf("no %s router discovered", n.what)
}
return nil
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package nat
import (
"net"
"testing"
"time"
)
// This test checks that autodisc doesn't hang and returns
// consistent results when multiple goroutines call its methods
// concurrently.
func TestAutoDiscRace(t *testing.T) {
ad := startautodisc("thing", func() Interface {
time.Sleep(500 * time.Millisecond)
return extIP{33, 44, 55, 66}
})
// Spawn a few concurrent calls to ad.ExternalIP.
type rval struct {
ip net.IP
err error
}
results := make(chan rval, 50)
for i := 0; i < cap(results); i++ {
go func() {
ip, err := ad.ExternalIP()
results <- rval{ip, err}
}()
}
// Check that they all return the correct result within the deadline.
deadline := time.After(2 * time.Second)
for i := 0; i < cap(results); i++ {
select {
case <-deadline:
t.Fatal("deadline exceeded")
case rval := <-results:
if rval.err != nil {
t.Errorf("result %d: unexpected error: %v", i, rval.err)
}
wantIP := net.IP{33, 44, 55, 66}
if !rval.ip.Equal(wantIP) {
t.Errorf("result %d: got IP %v, want %v", i, rval.ip, wantIP)
}
}
}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package nat
import (
"fmt"
"net"
"strings"
"time"
natpmp "github.com/jackpal/go-nat-pmp"
)
// natPMPClient adapts the NAT-PMP protocol implementation so it conforms to
// the common interface.
type pmp struct {
gw net.IP
c *natpmp.Client
}
func (n *pmp) String() string {
return fmt.Sprintf("NAT-PMP(%v)", n.gw)
}
func (n *pmp) ExternalIP() (net.IP, error) {
response, err := n.c.GetExternalAddress()
if err != nil {
return nil, err
}
return response.ExternalIPAddress[:], nil
}
func (n *pmp) AddMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error {
if lifetime <= 0 {
return fmt.Errorf("lifetime must not be <= 0")
}
// Note order of port arguments is switched between our
// AddMapping and the client's AddPortMapping.
_, err := n.c.AddPortMapping(strings.ToLower(protocol), intport, extport, int(lifetime/time.Second))
return err
}
func (n *pmp) DeleteMapping(protocol string, extport, intport int) (err error) {
// To destroy a mapping, send an add-port with an internalPort of
// the internal port to destroy, an external port of zero and a
// time of zero.
_, err = n.c.AddPortMapping(strings.ToLower(protocol), intport, 0, 0)
return err
}
func discoverPMP() Interface {
// run external address lookups on all potential gateways
gws := potentialGateways()
found := make(chan *pmp, len(gws))
for i := range gws {
gw := gws[i]
go func() {
c := natpmp.NewClient(gw)
if _, err := c.GetExternalAddress(); err != nil {
found <- nil
} else {
found <- &pmp{gw, c}
}
}()
}
// return the one that responds first.
// discovery needs to be quick, so we stop caring about
// any responses after a very short timeout.
timeout := time.NewTimer(1 * time.Second)
defer timeout.Stop()
for range gws {
select {
case c := <-found:
if c != nil {
return c
}
case <-timeout.C:
return nil
}
}
return nil
}
var (
// LAN IP ranges
_, lan10, _ = net.ParseCIDR("10.0.0.0/8")
_, lan176, _ = net.ParseCIDR("172.16.0.0/12")
_, lan192, _ = net.ParseCIDR("192.168.0.0/16")
)
// TODO: improve this. We currently assume that (on most networks)
// the router is X.X.X.1 in a local LAN range.
func potentialGateways() (gws []net.IP) {
ifaces, err := net.Interfaces()
if err != nil {
return nil
}
for _, iface := range ifaces {
ifaddrs, err := iface.Addrs()
if err != nil {
return gws
}
for _, addr := range ifaddrs {
switch x := addr.(type) {
case *net.IPNet:
if lan10.Contains(x.IP) || lan176.Contains(x.IP) || lan192.Contains(x.IP) {
ip := x.IP.Mask(x.Mask).To4()
if ip != nil {
ip[3] = ip[3] | 0x01
gws = append(gws, ip)
}
}
}
}
}
return gws
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package nat
import (
"errors"
"fmt"
"net"
"strings"
"time"
"github.com/huin/goupnp"
"github.com/huin/goupnp/dcps/internetgateway1"
"github.com/huin/goupnp/dcps/internetgateway2"
)
const soapRequestTimeout = 3 * time.Second
type upnp struct {
dev *goupnp.RootDevice
service string
client upnpClient
}
type upnpClient interface {
GetExternalIPAddress() (string, error)
AddPortMapping(string, uint16, string, uint16, string, bool, string, uint32) error
DeletePortMapping(string, uint16, string) error
GetNATRSIPStatus() (sip bool, nat bool, err error)
}
func (n *upnp) ExternalIP() (addr net.IP, err error) {
ipString, err := n.client.GetExternalIPAddress()
if err != nil {
return nil, err
}
ip := net.ParseIP(ipString)
if ip == nil {
return nil, errors.New("bad IP in response")
}
return ip, nil
}
func (n *upnp) AddMapping(protocol string, extport, intport int, desc string, lifetime time.Duration) error {
ip, err := n.internalAddress()
if err != nil {
return nil
}
protocol = strings.ToUpper(protocol)
lifetimeS := uint32(lifetime / time.Second)
err = n.DeleteMapping(protocol, extport, intport)
if err != nil {
return err
}
return n.client.AddPortMapping("", uint16(extport), protocol, uint16(intport), ip.String(), true, desc, lifetimeS)
}
func (n *upnp) internalAddress() (net.IP, error) {
devaddr, err := net.ResolveUDPAddr("udp4", n.dev.URLBase.Host)
if err != nil {
return nil, err
}
ifaces, err := net.Interfaces()
if err != nil {
return nil, err
}
for _, iface := range ifaces {
addrs, err := iface.Addrs()
if err != nil {
return nil, err
}
for _, addr := range addrs {
switch x := addr.(type) {
case *net.IPNet:
if x.Contains(devaddr.IP) {
return x.IP, nil
}
}
}
}
return nil, fmt.Errorf("could not find local address in same net as %v", devaddr)
}
func (n *upnp) DeleteMapping(protocol string, extport, intport int) error {
return n.client.DeletePortMapping("", uint16(extport), strings.ToUpper(protocol))
}
func (n *upnp) String() string {
return "UPNP " + n.service
}
// discoverUPnP searches for Internet Gateway Devices
// and returns the first one it can find on the local network.
func discoverUPnP() Interface {
found := make(chan *upnp, 2)
// IGDv1
go discover(found, internetgateway1.URN_WANConnectionDevice_1, func(dev *goupnp.RootDevice, sc goupnp.ServiceClient) *upnp {
switch sc.Service.ServiceType {
case internetgateway1.URN_WANIPConnection_1:
return &upnp{dev, "IGDv1-IP1", &internetgateway1.WANIPConnection1{ServiceClient: sc}}
case internetgateway1.URN_WANPPPConnection_1:
return &upnp{dev, "IGDv1-PPP1", &internetgateway1.WANPPPConnection1{ServiceClient: sc}}
}
return nil
})
// IGDv2
go discover(found, internetgateway2.URN_WANConnectionDevice_2, func(dev *goupnp.RootDevice, sc goupnp.ServiceClient) *upnp {
switch sc.Service.ServiceType {
case internetgateway2.URN_WANIPConnection_1:
return &upnp{dev, "IGDv2-IP1", &internetgateway2.WANIPConnection1{ServiceClient: sc}}
case internetgateway2.URN_WANIPConnection_2:
return &upnp{dev, "IGDv2-IP2", &internetgateway2.WANIPConnection2{ServiceClient: sc}}
case internetgateway2.URN_WANPPPConnection_1:
return &upnp{dev, "IGDv2-PPP1", &internetgateway2.WANPPPConnection1{ServiceClient: sc}}
}
return nil
})
for i := 0; i < cap(found); i++ {
if c := <-found; c != nil {
return c
}
}
return nil
}
// finds devices matching the given target and calls matcher for all
// advertised services of each device. The first non-nil service found
// is sent into out. If no service matched, nil is sent.
func discover(out chan<- *upnp, target string, matcher func(*goupnp.RootDevice, goupnp.ServiceClient) *upnp) {
devs, err := goupnp.DiscoverDevices(target)
if err != nil {
out <- nil
return
}
found := false
for i := 0; i < len(devs) && !found; i++ {
if devs[i].Root == nil {
continue
}
devs[i].Root.Device.VisitServices(func(service *goupnp.Service) {
if found {
return
}
// check for a matching IGD service
sc := goupnp.ServiceClient{
SOAPClient: service.NewSOAPClient(),
RootDevice: devs[i].Root,
Location: devs[i].Location,
Service: service,
}
sc.SOAPClient.HTTPClient.Timeout = soapRequestTimeout
upnp := matcher(devs[i].Root, sc)
if upnp == nil {
return
}
// check whether port mapping is enabled
if _, nat, err := upnp.client.GetNATRSIPStatus(); err != nil || !nat {
return
}
out <- upnp
found = true
})
}
if !found {
out <- nil
}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package nat
import (
"fmt"
"io"
"net"
"net/http"
"strings"
"testing"
"github.com/huin/goupnp/httpu"
)
// fakeIGD presents itself as a discoverable UPnP device which sends
// canned responses to HTTPU and HTTP requests.
type fakeIGD struct {
t *testing.T // for logging
listener net.Listener
mcastListener *net.UDPConn
// This should be a complete HTTP response (including headers).
// It is sent as the response to any sspd packet. Any occurrence
// of "{{listenAddr}}" is replaced with the actual TCP listen
// address of the HTTP server.
ssdpResp string
// This one should contain XML payloads for all requests
// performed. The keys contain method and path, e.g. "GET /foo/bar".
// As with ssdpResp, "{{listenAddr}}" is replaced with the TCP
// listen address.
httpResps map[string]string
}
// httpu.Handler
func (dev *fakeIGD) ServeMessage(r *http.Request) {
dev.t.Logf(`HTTPU request %s %s`, r.Method, r.RequestURI)
conn, err := net.Dial("udp4", r.RemoteAddr)
if err != nil {
fmt.Printf("reply Dial error: %v", err)
return
}
defer conn.Close()
io.WriteString(conn, dev.replaceListenAddr(dev.ssdpResp))
}
// http.Handler
func (dev *fakeIGD) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if resp, ok := dev.httpResps[r.Method+" "+r.RequestURI]; ok {
dev.t.Logf(`HTTP request "%s %s" --> %d`, r.Method, r.RequestURI, 200)
io.WriteString(w, dev.replaceListenAddr(resp))
} else {
dev.t.Logf(`HTTP request "%s %s" --> %d`, r.Method, r.RequestURI, 404)
w.WriteHeader(http.StatusNotFound)
}
}
func (dev *fakeIGD) replaceListenAddr(resp string) string {
return strings.Replace(resp, "{{listenAddr}}", dev.listener.Addr().String(), -1)
}
func (dev *fakeIGD) listen() (err error) {
if dev.listener, err = net.Listen("tcp", "127.0.0.1:0"); err != nil {
return err
}
laddr := &net.UDPAddr{IP: net.ParseIP("239.255.255.250"), Port: 1900}
if dev.mcastListener, err = net.ListenMulticastUDP("udp", nil, laddr); err != nil {
dev.listener.Close()
return err
}
return nil
}
func (dev *fakeIGD) serve() {
go httpu.Serve(dev.mcastListener, dev)
go http.Serve(dev.listener, dev)
}
func (dev *fakeIGD) close() {
dev.mcastListener.Close()
dev.listener.Close()
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
import (
"net"
"testing"
"github.com/33cn/chain33/p2p/utils"
"github.com/stretchr/testify/assert"
)
func TestNetAddress(t *testing.T) {
tcpAddr := new(net.TCPAddr)
tcpAddr.IP = net.ParseIP("localhost")
tcpAddr.Port = 2223
nad := NewNetAddress(tcpAddr)
nad1 := nad.Copy()
nad.Equals(nad1)
nad2s, err := NewNetAddressStrings([]string{"localhost:3306"})
if err != nil {
return
}
nad.Less(nad2s[0])
}
func TestAddrRouteble(t *testing.T) {
resp := P2pComm.AddrRouteble([]string{"114.55.101.159:13802"}, utils.CalcChannelVersion(119, VERSION))
t.Log(resp)
}
func TestGetLocalAddr(t *testing.T) {
t.Log(P2pComm.GetLocalAddr())
}
func TestP2pListen(t *testing.T) {
var node Node
node.listenPort = 3333
listen1 := newListener("tcp", &node)
assert.Equal(t, true, listen1 != nil)
listen2 := newListener("tcp", &node)
assert.Equal(t, true, listen2 != nil)
listen1.Close()
listen2.Close()
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
import (
"context"
"fmt"
"net"
"strconv"
"time"
pb "github.com/33cn/chain33/types"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
)
// NetAddress defines information about a peer on the network
// including its IP address, and port.
type NetAddress struct {
IP net.IP
Port uint16
str string
}
// NewNetAddress returns a new NetAddress using the provided TCP
// address.
func NewNetAddress(addr net.Addr) *NetAddress {
tcpAddr, ok := addr.(*net.TCPAddr)
if !ok {
return nil
}
ip := tcpAddr.IP
port := uint16(tcpAddr.Port)
return NewNetAddressIPPort(ip, port)
}
// NewNetAddressString returns a new NetAddress using the provided
// address in the form of "IP:Port". Also resolves the host if host
// is not an IP.
func NewNetAddressString(addr string) (*NetAddress, error) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
ip := net.ParseIP(host)
if ip == nil {
if len(host) > 0 {
ips, err := net.LookupIP(host)
if err != nil {
return nil, err
}
ip = ips[0]
}
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return nil, err
}
na := NewNetAddressIPPort(ip, uint16(port))
return na, nil
}
// NewNetAddressStrings returns an array of NetAddress'es build using
// the provided strings.
func NewNetAddressStrings(addrs []string) ([]*NetAddress, error) {
netAddrs := make([]*NetAddress, len(addrs))
for i, addr := range addrs {
netAddr, err := NewNetAddressString(addr)
if err != nil {
return nil, fmt.Errorf("error in address %s: %v", addr, err)
}
netAddrs[i] = netAddr
}
return netAddrs, nil
}
// NewNetAddressIPPort returns a new NetAddress using the provided IP
// and port number.
func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress {
na := &NetAddress{
IP: ip,
Port: port,
str: net.JoinHostPort(
ip.String(),
strconv.FormatUint(uint64(port), 10),
),
}
return na
}
// Equals reports whether na and other are the same addresses.
func (na *NetAddress) Equals(other interface{}) bool {
if o, ok := other.(*NetAddress); ok {
return na.String() == o.String()
}
return false
}
// Less reports whether na and other are the less addresses
func (na *NetAddress) Less(other interface{}) bool {
if o, ok := other.(*NetAddress); ok {
return na.String() < o.String()
}
log.Error("Cannot compare unequal types")
return false
}
// String representation.
func (na *NetAddress) String() string {
if na.str == "" {
na.str = net.JoinHostPort(
na.IP.String(),
strconv.FormatUint(uint64(na.Port), 10),
)
}
return na.str
}
// Copy na address
func (na *NetAddress) Copy() *NetAddress {
copytmp := *na
return &copytmp
}
// DialTimeout calls net.DialTimeout on the address.
func isCompressSupport(err error) bool {
var errstr = `grpc: Decompressor is not installed for grpc-encoding "gzip"`
if grpc.Code(err) == codes.Unimplemented && grpc.ErrorDesc(err) == errstr {
return false
}
return true
}
// DialTimeout dial timeout
func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) {
ch := make(chan grpc.ServiceConfig, 1)
ch <- P2pComm.GrpcConfig()
var cliparm keepalive.ClientParameters
cliparm.Time = 15 * time.Second //keepalive ping 周期
cliparm.Timeout = 10 * time.Second //ping后的获取ack消息超时时间
cliparm.PermitWithoutStream = true //启动keepalive 进行检查
keepaliveOp := grpc.WithKeepaliveParams(cliparm)
timeoutOp := grpc.WithTimeout(time.Second * 3)
log.Debug("NetAddress", "Dial", na.String())
conn, err := grpc.Dial(na.String(), grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")), grpc.WithServiceConfig(ch), keepaliveOp, timeoutOp)
if err != nil {
log.Debug("grpc DialCon", "did not connect", err, "addr", na.String())
return nil, err
}
//p2p version check 通过版本协议,获取通信session
//判断是否对方是否支持压缩
cli := pb.NewP2PgserviceClient(conn)
_, err = cli.GetHeaders(context.Background(), &pb.P2PGetHeaders{StartHeight: 0, EndHeight: 0, Version: version}, grpc.FailFast(true))
if err != nil && !isCompressSupport(err) {
//compress not support
log.Error("compress not supprot , rollback to uncompress version", "addr", na.String())
err = conn.Close()
if err != nil {
log.Error("conn", "close err", err)
}
ch2 := make(chan grpc.ServiceConfig, 1)
ch2 <- P2pComm.GrpcConfig()
log.Debug("NetAddress", "Dial with unCompressor", na.String())
conn, err = grpc.Dial(na.String(), grpc.WithInsecure(), grpc.WithServiceConfig(ch2), keepaliveOp, timeoutOp)
}
if err != nil {
log.Debug("grpc DialCon Uncompressor", "did not connect", err)
if conn != nil {
errs := conn.Close()
if errs != nil {
log.Error("conn", "close err", errs)
}
}
return nil, err
}
return conn, nil
}
This diff is collapsed.
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
import (
"sync"
"sync/atomic"
"github.com/33cn/chain33/p2p/utils"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
)
// NodeInfo is interface object of the node
type NodeInfo struct {
mtx sync.Mutex
externalAddr *NetAddress
listenAddr *NetAddress
monitorChan chan *Peer
natNoticeChain chan struct{}
natResultChain chan bool
p2pCfg *types.P2P
cfg *subConfig
client queue.Client
blacklist *BlackList
peerInfos *PeerInfos
addrBook *AddrBook // known peers
natDone int32
outSide int32
ServiceType int32
channelVersion int32
}
// NewNodeInfo new a node object
func NewNodeInfo(p2pCfg *types.P2P, subCfg *subConfig) *NodeInfo {
nodeInfo := new(NodeInfo)
nodeInfo.monitorChan = make(chan *Peer, 1024)
nodeInfo.natNoticeChain = make(chan struct{}, 1)
nodeInfo.natResultChain = make(chan bool, 1)
nodeInfo.blacklist = &BlackList{badPeers: make(map[string]int64)}
nodeInfo.p2pCfg = p2pCfg
nodeInfo.cfg = subCfg
nodeInfo.peerInfos = new(PeerInfos)
nodeInfo.peerInfos.infos = make(map[string]*types.Peer)
nodeInfo.externalAddr = new(NetAddress)
nodeInfo.listenAddr = new(NetAddress)
nodeInfo.addrBook = NewAddrBook(p2pCfg, subCfg)
nodeInfo.channelVersion = utils.CalcChannelVersion(subCfg.Channel, VERSION)
return nodeInfo
}
// PeerInfos encapsulation peer information
type PeerInfos struct {
mtx sync.Mutex
//key:peerName
infos map[string]*types.Peer
}
// PeerSize return a size of peer information
func (p *PeerInfos) PeerSize() int {
p.mtx.Lock()
defer p.mtx.Unlock()
return len(p.infos)
}
// FlushPeerInfos flush peer information
func (p *PeerInfos) FlushPeerInfos(in []*types.Peer) {
p.mtx.Lock()
defer p.mtx.Unlock()
for k := range p.infos {
delete(p.infos, k)
}
for _, peer := range in {
p.infos[peer.GetName()] = peer
}
}
// GetPeerInfos return a map for peerinfos
func (p *PeerInfos) GetPeerInfos() map[string]*types.Peer {
p.mtx.Lock()
defer p.mtx.Unlock()
var pinfos = make(map[string]*types.Peer)
for k, v := range p.infos {
pinfos[k] = v
}
return pinfos
}
// SetPeerInfo modify peer infos
func (p *PeerInfos) SetPeerInfo(peer *types.Peer) {
p.mtx.Lock()
defer p.mtx.Unlock()
if peer.GetName() == "" {
return
}
p.infos[peer.GetName()] = peer
}
// GetPeerInfo return a infos by key
func (p *PeerInfos) GetPeerInfo(peerName string) *types.Peer {
p.mtx.Lock()
defer p.mtx.Unlock()
if peer, ok := p.infos[peerName]; ok {
return peer
}
return nil
}
// BlackList badpeers list
type BlackList struct {
mtx sync.Mutex
badPeers map[string]int64
}
// FetchPeerInfo get peerinfo by node
func (nf *NodeInfo) FetchPeerInfo(n *Node) {
var peerlist []*types.Peer
peerInfos := nf.latestPeerInfo(n)
for _, peerinfo := range peerInfos {
peerlist = append(peerlist, peerinfo)
}
nf.flushPeerInfos(peerlist)
}
func (nf *NodeInfo) flushPeerInfos(in []*types.Peer) {
nf.peerInfos.FlushPeerInfos(in)
}
func (nf *NodeInfo) latestPeerInfo(n *Node) map[string]*types.Peer {
var peerlist = make(map[string]*types.Peer)
peers := n.GetRegisterPeers()
log.Debug("latestPeerInfo", "register peer num", len(peers))
for _, peer := range peers {
if !peer.GetRunning() || peer.Addr() == n.nodeInfo.GetExternalAddr().String() {
n.remove(peer.Addr())
continue
}
peerinfo, err := peer.GetPeerInfo()
if err != nil || peerinfo.GetName() == "" {
P2pComm.CollectPeerStat(err, peer)
log.Error("latestPeerInfo", "Err", err, "peer", peer.Addr())
continue
}
var pr types.Peer
pr.Addr = peerinfo.GetAddr()
pr.Port = peerinfo.GetPort()
pr.Name = peerinfo.GetName()
pr.MempoolSize = peerinfo.GetMempoolSize()
pr.Header = peerinfo.GetHeader()
peerlist[pr.Name] = &pr
}
return peerlist
}
// Set modidy nodeinfo by nodeinfo
func (nf *NodeInfo) Set(n *NodeInfo) {
nf.mtx.Lock()
defer nf.mtx.Unlock()
nf = n
}
// Get return nodeinfo
func (nf *NodeInfo) Get() *NodeInfo {
nf.mtx.Lock()
defer nf.mtx.Unlock()
return nf
}
// SetExternalAddr modidy address of the nodeinfo
func (nf *NodeInfo) SetExternalAddr(addr *NetAddress) {
nf.mtx.Lock()
defer nf.mtx.Unlock()
nf.externalAddr = addr
}
// GetExternalAddr return external address
func (nf *NodeInfo) GetExternalAddr() *NetAddress {
nf.mtx.Lock()
defer nf.mtx.Unlock()
return nf.externalAddr
}
// SetListenAddr modify listen address
func (nf *NodeInfo) SetListenAddr(addr *NetAddress) {
nf.mtx.Lock()
defer nf.mtx.Unlock()
nf.listenAddr = addr
}
// GetListenAddr return listen address
func (nf *NodeInfo) GetListenAddr() *NetAddress {
nf.mtx.Lock()
defer nf.mtx.Unlock()
return nf.listenAddr
}
// SetNatDone modify natdone
func (nf *NodeInfo) SetNatDone() {
atomic.StoreInt32(&nf.natDone, 1)
}
// IsNatDone return ture and false
func (nf *NodeInfo) IsNatDone() bool {
return atomic.LoadInt32(&nf.natDone) == 1
}
// IsOutService return true and false for out service
func (nf *NodeInfo) IsOutService() bool {
if !nf.cfg.ServerStart {
return false
}
if nf.OutSide() || nf.ServiceTy() == Service {
return true
}
return false
}
// SetServiceTy set service type
func (nf *NodeInfo) SetServiceTy(ty int32) {
atomic.StoreInt32(&nf.ServiceType, ty)
}
// ServiceTy return serveice type
func (nf *NodeInfo) ServiceTy() int32 {
return atomic.LoadInt32(&nf.ServiceType)
}
// SetNetSide set net side
func (nf *NodeInfo) SetNetSide(ok bool) {
var isoutside int32
if ok {
isoutside = 1
}
atomic.StoreInt32(&nf.outSide, isoutside)
}
// OutSide return true and false for outside
func (nf *NodeInfo) OutSide() bool {
return atomic.LoadInt32(&nf.outSide) == 1
}
// Add add badpeer
func (bl *BlackList) Add(addr string, deadline int64) {
bl.mtx.Lock()
defer bl.mtx.Unlock()
bl.badPeers[addr] = types.Now().Unix() + deadline
}
// Delete delete badpeer
func (bl *BlackList) Delete(addr string) {
bl.mtx.Lock()
defer bl.mtx.Unlock()
delete(bl.badPeers, addr)
}
// Has the badpeer true and false
func (bl *BlackList) Has(addr string) bool {
bl.mtx.Lock()
defer bl.mtx.Unlock()
if _, ok := bl.badPeers[addr]; ok {
return true
}
return false
}
// GetBadPeers reurn black list peers
func (bl *BlackList) GetBadPeers() map[string]int64 {
bl.mtx.Lock()
defer bl.mtx.Unlock()
var copyData = make(map[string]int64)
for k, v := range bl.badPeers {
copyData[k] = v
}
return copyData
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
import (
"strings"
"sync"
"sync/atomic"
"time"
v "github.com/33cn/chain33/common/version"
pb "github.com/33cn/chain33/types"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
// Start peer start
func (p *Peer) Start() {
log.Debug("Peer", "Start", p.Addr())
go p.heartBeat()
}
// Close peer close
func (p *Peer) Close() {
//避免重复关闭
if !atomic.CompareAndSwapInt32(&p.isclose, 0, 1) {
return
}
p.mconn.Close()
if p.taskChan != nil {
//unsub all topics
p.node.pubsub.Unsub(p.taskChan)
}
log.Info("Peer", "closed", p.Addr())
}
// Peer object information
type Peer struct {
mutex sync.RWMutex
node *Node
conn *grpc.ClientConn // source connection
persistent bool
isclose int32
version *Version
name string //远程节点的name
mconn *MConnection
peerAddr *NetAddress
peerStat *Stat
taskChan chan interface{} //tx block
inBounds int32 //连接此节点的客户端节点数量
IsMaxInbouds bool
}
// NewPeer produce a peer object
func NewPeer(conn *grpc.ClientConn, node *Node, remote *NetAddress) *Peer {
p := &Peer{
conn: conn,
node: node,
}
p.peerStat = new(Stat)
p.version = new(Version)
p.version.SetSupport(true)
p.mconn = NewMConnection(conn, remote, p)
return p
}
// Version version object information
type Version struct {
mtx sync.Mutex
version int32
versionSupport bool
}
// Stat object information
type Stat struct {
mtx sync.Mutex
ok bool
}
// Ok start is ok
func (st *Stat) Ok() {
st.mtx.Lock()
defer st.mtx.Unlock()
st.ok = true
}
// NotOk start is not ok
func (st *Stat) NotOk() {
st.mtx.Lock()
defer st.mtx.Unlock()
st.ok = false
}
// IsOk start is ok or not
func (st *Stat) IsOk() bool {
st.mtx.Lock()
defer st.mtx.Unlock()
return st.ok
}
// SetSupport set support of version
func (v *Version) SetSupport(ok bool) {
v.mtx.Lock()
defer v.mtx.Unlock()
v.versionSupport = ok
}
// IsSupport is support version
func (v *Version) IsSupport() bool {
v.mtx.Lock()
defer v.mtx.Unlock()
return v.versionSupport
}
// SetVersion set version number
func (v *Version) SetVersion(ver int32) {
v.mtx.Lock()
defer v.mtx.Unlock()
v.version = ver
}
// GetVersion get version number
func (v *Version) GetVersion() int32 {
v.mtx.Lock()
defer v.mtx.Unlock()
return v.version
}
func (p *Peer) heartBeat() {
pcli := NewNormalP2PCli()
for {
if !p.GetRunning() {
return
}
peername, err := pcli.SendVersion(p, p.node.nodeInfo)
P2pComm.CollectPeerStat(err, p)
if err != nil || peername == "" {
//版本不对,直接关掉
log.Error("PeerHeartBeatSendVersion", "peerName", peername, "err", err)
p.Close()
return
}
log.Debug("sendVersion", "peer name", peername)
p.SetPeerName(peername) //设置连接的远程节点的节点名称
p.taskChan = p.node.pubsub.Sub("block", "tx", peername)
go p.sendStream()
go p.readStream()
break
}
ticker := time.NewTicker(PingTimeout)
defer ticker.Stop()
for {
if !p.GetRunning() {
return
}
peerNum, err := pcli.GetInPeersNum(p)
if err == nil {
atomic.StoreInt32(&p.inBounds, int32(peerNum))
}
err = pcli.SendPing(p, p.node.nodeInfo)
if err != nil {
log.Error("SendPeerPing", "peer", p.Addr(), "err", err)
}
<-ticker.C
}
}
// GetInBouns get inbounds of peer
func (p *Peer) GetInBouns() int32 {
return atomic.LoadInt32(&p.inBounds)
}
// GetPeerInfo get peer information of peer
func (p *Peer) GetPeerInfo() (*pb.P2PPeerInfo, error) {
return p.mconn.gcli.GetPeerInfo(context.Background(), &pb.P2PGetPeerInfo{Version: p.node.nodeInfo.channelVersion}, grpc.FailFast(true))
}
func (p *Peer) sendStream() {
//Stream Send data
for {
if !p.GetRunning() {
log.Debug("sendStream peer connect closed", "peerid", p.GetPeerName())
return
}
ctx, cancel := context.WithCancel(context.Background())
resp, err := p.mconn.gcli.ServerStreamRead(ctx)
P2pComm.CollectPeerStat(err, p)
if err != nil {
cancel()
log.Error("sendStream", "ServerStreamRead", err)
time.Sleep(time.Second * 5)
continue
}
//send ping package
ping, err := P2pComm.NewPingData(p.node.nodeInfo)
if err != nil {
errs := resp.CloseSend()
if errs != nil {
log.Error("CloseSend", "err", errs)
}
cancel()
time.Sleep(time.Second)
continue
}
p2pdata := new(pb.BroadCastData)
p2pdata.Value = &pb.BroadCastData_Ping{Ping: ping}
if err := resp.Send(p2pdata); err != nil {
P2pComm.CollectPeerStat(err, p)
errs := resp.CloseSend()
if errs != nil {
log.Error("CloseSend", "err", errs)
}
cancel()
log.Error("sendStream", "sendping", err)
time.Sleep(time.Second)
continue
}
//send softversion&p2pversion
_, peerName := p.node.nodeInfo.addrBook.GetPrivPubKey()
p2pdata.Value = &pb.BroadCastData_Version{Version: &pb.Versions{P2Pversion: p.node.nodeInfo.channelVersion,
Softversion: v.GetVersion(), Peername: peerName}}
if err := resp.Send(p2pdata); err != nil {
P2pComm.CollectPeerStat(err, p)
errs := resp.CloseSend()
if errs != nil {
log.Error("CloseSend", "err", errs)
}
cancel()
log.Error("sendStream", "sendversion", err)
continue
}
timeout := time.NewTimer(time.Second * 2)
defer timeout.Stop()
SEND_LOOP:
for {
if !p.GetRunning() {
return
}
select {
case task := <-p.taskChan:
if !p.GetRunning() {
errs := resp.CloseSend()
if errs != nil {
log.Error("CloseSend", "err", errs)
}
cancel()
log.Error("sendStream peer connect closed", "peerName", p.GetPeerName())
return
}
sendData, doSend := p.node.processSendP2P(task, p.version.GetVersion(), p.GetPeerName(), p.Addr())
if !doSend {
continue
}
err := resp.Send(sendData)
P2pComm.CollectPeerStat(err, p)
if err != nil {
log.Error("sendStream", "send", err)
if grpc.Code(err) == codes.Unimplemented { //maybe order peers delete peer to BlackList
p.node.nodeInfo.blacklist.Add(p.Addr(), 3600)
}
time.Sleep(time.Second) //have a rest
errs := resp.CloseSend()
if errs != nil {
log.Error("CloseSend", "err", errs)
}
cancel()
break SEND_LOOP //下一次外循环重新获取stream
}
log.Debug("sendStream", "send data", "ok")
case <-timeout.C:
if !p.GetRunning() {
log.Error("sendStream timeout")
errs := resp.CloseSend()
if errs != nil {
log.Error("CloseSend", "err", errs)
}
cancel()
return
}
timeout.Reset(time.Second * 2)
}
}
}
}
func (p *Peer) readStream() {
for {
if !p.GetRunning() {
log.Debug("readstream", "loop", "done")
return
}
ping, err := P2pComm.NewPingData(p.node.nodeInfo)
if err != nil {
log.Error("readStream", "err:", err.Error(), "peerIp", p.Addr())
continue
}
resp, err := p.mconn.gcli.ServerStreamSend(context.Background(), ping)
P2pComm.CollectPeerStat(err, p)
if err != nil {
log.Error("readStream", "serverstreamsend,err:", err, "peer", p.Addr())
time.Sleep(time.Second)
continue
}
log.Debug("SubStreamBlock", "Start", p.Addr())
for {
if !p.GetRunning() {
errs := resp.CloseSend()
if errs != nil {
log.Error("CloseSend", "err", errs)
}
return
}
data, err := resp.Recv()
if err != nil {
P2pComm.CollectPeerStat(err, p)
log.Error("readStream", "recv,err:", err.Error(), "peerAddr", p.Addr())
errs := resp.CloseSend()
if errs != nil {
log.Error("CloseSend", "err", errs)
}
log.Error("readStream", "recv,err:", err.Error(), "peerIp", p.Addr())
if grpc.Code(err) == codes.Unimplemented { //maybe order peers delete peer to BlackList
p.node.nodeInfo.blacklist.Add(p.Addr(), 3600)
return
}
//beyound max inbound num
if strings.Contains(err.Error(), "beyound max inbound num") {
log.Debug("readStream", "peer inbounds num", p.GetInBouns())
p.IsMaxInbouds = true
P2pComm.CollectPeerStat(err, p)
return
}
time.Sleep(time.Second) //have a rest
}
p.node.processRecvP2P(data, p.GetPeerName(), p.node.pubToPeer, p.Addr())
}
}
}
// GetRunning get running ok or not
func (p *Peer) GetRunning() bool {
if p.node.isClose() {
return false
}
return atomic.LoadInt32(&p.isclose) != 1
}
// MakePersistent marks the peer as persistent.
func (p *Peer) MakePersistent() {
p.persistent = true
}
// SetAddr set address of peer
func (p *Peer) SetAddr(addr *NetAddress) {
p.peerAddr = addr
}
// Addr returns peer's remote network address.
func (p *Peer) Addr() string {
return p.peerAddr.String()
}
// IsPersistent returns true if the peer is persitent, false otherwise.
func (p *Peer) IsPersistent() bool {
return p.persistent
}
// SetPeerName set name of peer
func (p *Peer) SetPeerName(name string) {
p.mutex.Lock()
defer p.mutex.Unlock()
if name == "" {
return
}
p.name = name
}
// GetPeerName get name of peer
func (p *Peer) GetPeerName() string {
p.mutex.RLock()
defer p.mutex.RUnlock()
return p.name
}
This diff is collapsed.
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
import (
"bytes"
"encoding/hex"
"testing"
"time"
"github.com/33cn/chain33/common/merkle"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
type versionData struct {
peerName string
rawData interface{}
version int32
}
func Test_processP2P(t *testing.T) {
cfg := types.NewChain33Config(types.ReadFile("../../../chain33.toml"))
q := queue.New("channel")
q.SetConfig(cfg)
go q.Start()
p2p := newP2p(cfg, 12345, "testProcessP2p", q)
defer freeP2p(p2p)
defer q.Close()
node := p2p.node
client := p2p.client
pid := "testPid"
sendChan := make(chan interface{}, 1)
recvChan := make(chan *types.BroadCastData, 1)
testDone := make(chan struct{})
payload := []byte("testpayload")
minerTx := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 14600, Expire: 200}
tx := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 4600, Expire: 2}
tx1 := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 460000000, Expire: 0}
tx2 := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 100, Expire: 1}
txGroup, _ := types.CreateTxGroup([]*types.Transaction{tx1, tx2}, cfg.GetMinTxFeeRate())
gtx := txGroup.Tx()
txList := append([]*types.Transaction{}, minerTx, tx, tx1, tx2)
memTxList := append([]*types.Transaction{}, tx, gtx)
block := &types.Block{
TxHash: []byte("123"),
Height: 10,
Txs: txList,
}
txHash := hex.EncodeToString(tx.Hash())
blockHash := hex.EncodeToString(block.Hash(cfg))
rootHash := merkle.CalcMerkleRoot(cfg, block.Height, txList)
//mempool handler
go func() {
client := q.Client()
client.Sub("mempool")
for msg := range client.Recv() {
switch msg.Ty {
case types.EventTxListByHash:
query := msg.Data.(*types.ReqTxHashList)
var txs []*types.Transaction
if !query.IsShortHash {
txs = memTxList[:1]
} else {
txs = memTxList
}
msg.Reply(client.NewMessage("p2p", types.EventTxListByHash, &types.ReplyTxList{Txs: txs}))
}
}
}()
//测试发送
go func() {
for data := range sendChan {
verData, ok := data.(*versionData)
assert.True(t, ok)
sendData, doSend := node.processSendP2P(verData.rawData, verData.version, verData.peerName, "testIP:port")
txHashFilter.Remove(txHash)
blockHashFilter.Remove(blockHash)
assert.True(t, doSend, "sendData:", verData.rawData)
recvChan <- sendData
}
}()
//测试接收
go func() {
for data := range recvChan {
txHashFilter.Remove(txHash)
blockHashFilter.Remove(blockHash)
handled := node.processRecvP2P(data, pid, node.pubToPeer, "testIP:port")
assert.True(t, handled)
}
}()
go func() {
p2pChan := node.pubsub.Sub("tx")
for data := range p2pChan {
if p2pTx, ok := data.(*types.P2PTx); ok {
sendChan <- &versionData{rawData: p2pTx, version: lightBroadCastVersion}
}
}
}()
//data test
go func() {
subChan := node.pubsub.Sub(pid)
//normal
sendChan <- &versionData{peerName: pid + "1", rawData: &types.P2PTx{Tx: tx, Route: &types.P2PRoute{}}, version: lightBroadCastVersion - 1}
p2p.mgr.PubSub.Pub(client.NewMessage("p2p", types.EventTxBroadcast, tx), P2PTypeName)
sendChan <- &versionData{peerName: pid + "1", rawData: &types.P2PBlock{Block: block}, version: lightBroadCastVersion - 1}
//light broadcast
txHashFilter.Add(hex.EncodeToString(tx1.Hash()), &types.P2PRoute{TTL: DefaultLtTxBroadCastTTL})
p2p.mgr.PubSub.Pub(client.NewMessage("p2p", types.EventTxBroadcast, tx1), P2PTypeName)
sendChan <- &versionData{peerName: pid + "2", rawData: &types.P2PTx{Tx: tx, Route: &types.P2PRoute{TTL: DefaultLtTxBroadCastTTL}}, version: lightBroadCastVersion}
<-subChan //query tx
sendChan <- &versionData{peerName: pid + "2", rawData: &types.P2PBlock{Block: block}, version: lightBroadCastVersion}
<-subChan //query block
for !ltBlockCache.Contains(blockHash) {
}
cpBlock := *ltBlockCache.Get(blockHash).(*types.Block)
assert.True(t, bytes.Equal(rootHash, merkle.CalcMerkleRoot(cfg, cpBlock.Height, cpBlock.Txs)))
//query tx
sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_TxReq{TxReq: &types.P2PTxReq{TxHash: tx.Hash()}}}}
_, ok := (<-subChan).(*types.P2PTx)
assert.True(t, ok)
sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_BlockTxReq{BlockTxReq: &types.P2PBlockTxReq{
BlockHash: blockHash,
TxIndices: []int32{1, 2},
}}}}
rep, ok := (<-subChan).(*types.P2PBlockTxReply)
assert.True(t, ok)
assert.Equal(t, 2, int(rep.TxIndices[1]))
sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_BlockTxReq{BlockTxReq: &types.P2PBlockTxReq{
BlockHash: blockHash,
TxIndices: nil,
}}}}
rep, ok = (<-subChan).(*types.P2PBlockTxReply)
assert.True(t, ok)
assert.Nil(t, rep.TxIndices)
//query reply
sendChan <- &versionData{rawData: &types.P2PBlockTxReply{
BlockHash: blockHash,
TxIndices: []int32{1},
Txs: txList[1:2],
}}
rep1, ok := (<-subChan).(*types.P2PQueryData)
assert.True(t, ok)
assert.Nil(t, rep1.GetBlockTxReq().GetTxIndices())
sendChan <- &versionData{rawData: &types.P2PBlockTxReply{
BlockHash: blockHash,
Txs: txList[0:],
}}
for ltBlockCache.Contains(blockHash) {
}
//max ttl
_, doSend := node.processSendP2P(&types.P2PTx{Tx: tx, Route: &types.P2PRoute{TTL: node.nodeInfo.cfg.MaxTTL + 1}}, lightBroadCastVersion, pid+"5", "testIP:port")
assert.False(t, doSend)
close(testDone)
}()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-testDone:
return
case <-ticker.C:
t.Error("TestP2PProcessTimeout")
return
}
}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gossip
//更新内容:
// 1.p2p 修改为在nat结束后,在启动peer的stream,ping,version 等功能
//2018-3-26 更新内容
// 1. p2p 过滤重复数据,改用blockhash 提换block height
// 2. 增加p2p私钥自动导入到钱包功能
//p2p版本区间 10020, 11000
//历史版本
const (
//p2p广播交易哈希而非完整区块数据
lightBroadCastVersion = 10030
)
// VERSION number
const VERSION = lightBroadCastVersion
// MainNet Channel = 0x0000
const (
defaultTestNetChannel = 256
)
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package init init p2p
package init
import (
_ "github.com/33cn/plugin/plugin/p2p/gossip" //原生gossip p2p插件
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment