Unverified Commit c9abccee authored by 33cn's avatar 33cn Committed by GitHub

Merge pull request #375 from vipwzw/update_0325

update chain33 03/25
parents 1ae720fe f4bce91c
...@@ -22,7 +22,7 @@ pipeline { ...@@ -22,7 +22,7 @@ pipeline {
dir("${PROJ_DIR}"){ dir("${PROJ_DIR}"){
gitlabCommitStatus(name: 'deploy'){ gitlabCommitStatus(name: 'deploy'){
sh 'make build_ci' sh 'make build_ci'
sh "cd build && mkdir ${env.BUILD_NUMBER} && cp ci/* ${env.BUILD_NUMBER} -r && cp chain33* Dockerfile* docker* *.sh ${env.BUILD_NUMBER}/ && cd ${env.BUILD_NUMBER}/ && ./docker-compose-pre.sh run ${env.BUILD_NUMBER} all " sh "cd build && mkdir ${env.BUILD_NUMBER} && cp ci/* ${env.BUILD_NUMBER} -r && ./docker-compose-pre.sh modify && cp chain33* Dockerfile* docker* *.sh ${env.BUILD_NUMBER}/ && cd ${env.BUILD_NUMBER}/ && ./docker-compose-pre.sh run ${env.BUILD_NUMBER} all "
} }
} }
} }
......
...@@ -122,7 +122,7 @@ docker-compose: ## build docker-compose for chain33 run ...@@ -122,7 +122,7 @@ docker-compose: ## build docker-compose for chain33 run
@cd build && if ! [ -d ci ]; then \ @cd build && if ! [ -d ci ]; then \
make -C ../ ; \ make -C ../ ; \
fi; \ fi; \
cp chain33* Dockerfile docker-compose* ci/ && cd ci/ && ./docker-compose-pre.sh run $(proj) $(dapp) && cd ../.. ./docker-compose-pre.sh modify && cp chain33* Dockerfile docker-compose* ci/ && cd ci/ && ./docker-compose-pre.sh run $(proj) $(dapp) && cd ../..
docker-compose-down: ## build docker-compose for chain33 run docker-compose-down: ## build docker-compose for chain33 run
@cd build && if [ -d ci ]; then \ @cd build && if [ -d ci ]; then \
...@@ -131,7 +131,7 @@ docker-compose-down: ## build docker-compose for chain33 run ...@@ -131,7 +131,7 @@ docker-compose-down: ## build docker-compose for chain33 run
cd .. cd ..
fork-test: ## build fork-test for chain33 run fork-test: ## build fork-test for chain33 run
@cd build && cp chain33* Dockerfile system-fork-test.sh docker-compose* ci/ && cd ci/ && ./docker-compose-pre.sh forktest $(proj) $(dapp) && cd ../.. @cd build && ./docker-compose-pre.sh modify && cp chain33* Dockerfile system-fork-test.sh docker-compose* ci/ && cd ci/ && ./docker-compose-pre.sh forktest $(proj) $(dapp) && cd ../..
clean: ## Remove previous build clean: ## Remove previous build
......
...@@ -126,6 +126,8 @@ function main() { ...@@ -126,6 +126,8 @@ function main() {
else else
./system-fork-test.sh "${PROJ}" ./system-fork-test.sh "${PROJ}"
fi fi
elif [ "${OP}" == "modify" ]; then
sed -i $sedfix '/^useGithub=.*/a version=1' chain33.toml
fi fi
} }
......
...@@ -18,7 +18,7 @@ PKG_LIST := `go list ./... | grep -v "vendor" | grep -v "mocks"` ...@@ -18,7 +18,7 @@ PKG_LIST := `go list ./... | grep -v "vendor" | grep -v "mocks"`
PKG_LIST_VET := `go list ./... | grep -v "vendor" | grep -v "common/crypto/sha3" | grep -v "common/log/log15"` PKG_LIST_VET := `go list ./... | grep -v "vendor" | grep -v "common/crypto/sha3" | grep -v "common/log/log15"`
PKG_LIST_INEFFASSIGN= `go list -f {{.Dir}} ./... | grep -v "vendor" | grep -v "common/crypto/sha3" | grep -v "common/log/log15" | grep -v "common/ed25519"` PKG_LIST_INEFFASSIGN= `go list -f {{.Dir}} ./... | grep -v "vendor" | grep -v "common/crypto/sha3" | grep -v "common/log/log15" | grep -v "common/ed25519"`
PKG_LIST_Q := `go list ./... | grep -v "vendor" | grep -v "mocks"` PKG_LIST_Q := `go list ./... | grep -v "vendor" | grep -v "mocks"`
PKG_LIST_GOSEC := `go list ./... | grep -v "vendor" | grep -v "mocks" | grep -v "cmd" | grep -v "types" | grep -v "commands" | grep -v "log15" | grep -v "ed25519" | grep -v "crypto"` PKG_LIST_GOSEC := `go list -f "${GOPATH}/src/{{.ImportPath}}" ./... | grep -v "vendor" | grep -v "mocks" | grep -v "cmd" | grep -v "types" | grep -v "commands" | grep -v "log15" | grep -v "ed25519" | grep -v "crypto"`
BUILD_FLAGS = -ldflags "-X github.com/33cn/chain33/common/version.GitCommit=`git rev-parse --short=8 HEAD`" BUILD_FLAGS = -ldflags "-X github.com/33cn/chain33/common/version.GitCommit=`git rev-parse --short=8 HEAD`"
MKPATH=$(abspath $(lastword $(MAKEFILE_LIST))) MKPATH=$(abspath $(lastword $(MAKEFILE_LIST)))
MKDIR=$(dir $(MKPATH)) MKDIR=$(dir $(MKPATH))
......
...@@ -108,9 +108,10 @@ func (chain *BlockChain) SynRoutine() { ...@@ -108,9 +108,10 @@ func (chain *BlockChain) SynRoutine() {
//2分钟尝试检测一次最优链,确保本节点在最优链 //2分钟尝试检测一次最优链,确保本节点在最优链
checkBestChainTicker := time.NewTicker(120 * time.Second) checkBestChainTicker := time.NewTicker(120 * time.Second)
//节点启动后首先尝试开启快速下载模式 //节点启动后首先尝试开启快速下载模式,目前默认开启
chain.tickerwg.Add(1) if chain.GetDownloadSyncStatus() {
go chain.FastDownLoadBlocks() go chain.FastDownLoadBlocks()
}
for { for {
select { select {
case <-chain.quit: case <-chain.quit:
......
...@@ -507,7 +507,7 @@ func testGetBlocksMsg(t *testing.T, blockchain *blockchain.BlockChain) { ...@@ -507,7 +507,7 @@ func testGetBlocksMsg(t *testing.T, blockchain *blockchain.BlockChain) {
blocks, err := blockchain.ProcGetBlockDetailsMsg(&reqBlock) blocks, err := blockchain.ProcGetBlockDetailsMsg(&reqBlock)
if err == nil && blocks != nil { if err == nil && blocks != nil {
for _, block := range blocks.Items { for _, block := range blocks.Items {
if checkheight != block.Block.Height || block.Receipts == nil { if checkheight != block.Block.Height {
t.Error("TestGetBlocksMsg", "checkheight", checkheight, "block", block) t.Error("TestGetBlocksMsg", "checkheight", checkheight, "block", block)
} }
checkheight++ checkheight++
......
...@@ -68,7 +68,6 @@ func (chain *BlockChain) UpdateDownloadSyncStatus(Sync bool) { ...@@ -68,7 +68,6 @@ func (chain *BlockChain) UpdateDownloadSyncStatus(Sync bool) {
//FastDownLoadBlocks 开启快速下载区块的模式 //FastDownLoadBlocks 开启快速下载区块的模式
func (chain *BlockChain) FastDownLoadBlocks() { func (chain *BlockChain) FastDownLoadBlocks() {
defer chain.tickerwg.Done()
curHeight := chain.GetBlockHeight() curHeight := chain.GetBlockHeight()
lastTempHight := chain.GetLastTempBlockHeight() lastTempHight := chain.GetLastTempBlockHeight()
...@@ -219,7 +218,7 @@ func (chain *BlockChain) WriteBlockToDbTemp(block *types.Block) error { ...@@ -219,7 +218,7 @@ func (chain *BlockChain) WriteBlockToDbTemp(block *types.Block) error {
defer func() { defer func() {
chainlog.Debug("WriteBlockToDbTemp", "height", block.Height, "sync", sync, "cost", types.Since(beg)) chainlog.Debug("WriteBlockToDbTemp", "height", block.Height, "sync", sync, "cost", types.Since(beg))
}() }()
newbatch := chain.blockStore.NewBatch(false) newbatch := chain.blockStore.NewBatch(sync)
blockByte, err := proto.Marshal(block) blockByte, err := proto.Marshal(block)
if err != nil { if err != nil {
......
...@@ -107,10 +107,9 @@ func (c Comm) newPeerFromConn(rawConn *grpc.ClientConn, remote *NetAddress, node ...@@ -107,10 +107,9 @@ func (c Comm) newPeerFromConn(rawConn *grpc.ClientConn, remote *NetAddress, node
func (c Comm) dialPeer(addr *NetAddress, node *Node) (*Peer, error) { func (c Comm) dialPeer(addr *NetAddress, node *Node) (*Peer, error) {
log.Debug("dialPeer", "will connect", addr.String()) log.Debug("dialPeer", "will connect", addr.String())
var persistent bool var persistent bool
for _, seed := range node.nodeInfo.cfg.Seeds { //TODO待优化
if seed == addr.String() { if _, ok := node.cfgSeeds.Load(addr.String()); ok {
persistent = true //种子节点要一直连接 persistent = true
}
} }
peer, err := c.dialPeerWithAddress(addr, persistent, node) peer, err := c.dialPeerWithAddress(addr, persistent, node)
if err != nil { if err != nil {
...@@ -138,7 +137,7 @@ func (c Comm) GenPrivPubkey() ([]byte, []byte, error) { ...@@ -138,7 +137,7 @@ func (c Comm) GenPrivPubkey() ([]byte, []byte, error) {
return key.Bytes(), key.PubKey().Bytes(), nil return key.Bytes(), key.PubKey().Bytes(), nil
} }
// Pubkey get pubkey by key // Pubkey get pubkey by priv key
func (c Comm) Pubkey(key string) (string, error) { func (c Comm) Pubkey(key string) (string, error) {
cr, err := crypto.New(types.GetSignName("", types.SECP256K1)) cr, err := crypto.New(types.GetSignName("", types.SECP256K1))
...@@ -163,8 +162,8 @@ func (c Comm) Pubkey(key string) (string, error) { ...@@ -163,8 +162,8 @@ func (c Comm) Pubkey(key string) (string, error) {
// NewPingData get ping node ,return p2pping // NewPingData get ping node ,return p2pping
func (c Comm) NewPingData(nodeInfo *NodeInfo) (*types.P2PPing, error) { func (c Comm) NewPingData(nodeInfo *NodeInfo) (*types.P2PPing, error) {
randNonce := rand.Int31n(102040) randNonce := rand.New(rand.NewSource(time.Now().UnixNano())).Int63()
ping := &types.P2PPing{Nonce: int64(randNonce), Addr: nodeInfo.GetExternalAddr().IP.String(), Port: int32(nodeInfo.GetExternalAddr().Port)} ping := &types.P2PPing{Nonce: randNonce, Addr: nodeInfo.GetExternalAddr().IP.String(), Port: int32(nodeInfo.GetExternalAddr().Port)}
var err error var err error
p2pPrivKey, _ := nodeInfo.addrBook.GetPrivPubKey() p2pPrivKey, _ := nodeInfo.addrBook.GetPrivPubKey()
ping, err = c.Signature(p2pPrivKey, ping) ping, err = c.Signature(p2pPrivKey, ping)
...@@ -240,6 +239,9 @@ func (c Comm) CheckSign(in *types.P2PPing) bool { ...@@ -240,6 +239,9 @@ func (c Comm) CheckSign(in *types.P2PPing) bool {
// CollectPeerStat collect peer stat and report // CollectPeerStat collect peer stat and report
func (c Comm) CollectPeerStat(err error, peer *Peer) { func (c Comm) CollectPeerStat(err error, peer *Peer) {
if err != nil { if err != nil {
if err == types.ErrVersion {
peer.version.SetSupport(false)
}
peer.peerStat.NotOk() peer.peerStat.NotOk()
} else { } else {
peer.peerStat.Ok() peer.peerStat.Ok()
......
...@@ -18,24 +18,26 @@ var ( ...@@ -18,24 +18,26 @@ var (
StreamPingTimeout = 20 * time.Second StreamPingTimeout = 20 * time.Second
MonitorPeerInfoInterval = 10 * time.Second MonitorPeerInfoInterval = 10 * time.Second
MonitorPeerNumInterval = 30 * time.Second MonitorPeerNumInterval = 30 * time.Second
MonitorReBalanceInterval = 2 * time.Minute MonitorReBalanceInterval = 15 * time.Minute
GetAddrFromAddrBookInterval = 5 * time.Second GetAddrFromAddrBookInterval = 5 * time.Second
GetAddrFromOnlineInterval = 5 * time.Second GetAddrFromOnlineInterval = 5 * time.Second
GetAddrFromGitHubInterval = 5 * time.Minute GetAddrFromGitHubInterval = 5 * time.Minute
CheckActivePeersInterVal = 5 * time.Second CheckActivePeersInterVal = 5 * time.Second
CheckBlackListInterVal = 30 * time.Second CheckBlackListInterVal = 30 * time.Second
CheckCfgSeedsInterVal = 1 * time.Minute
) )
const ( const (
msgTx = 1 msgTx = 1
msgBlock = 2 msgBlock = 2
tryMapPortTimes = 20 tryMapPortTimes = 20
maxSamIPNum = 20
) )
var ( var (
// LocalAddr local address // LocalAddr local address
LocalAddr string LocalAddr string
defaultPort = 13802 //defaultPort = 13802
) )
const ( const (
...@@ -77,8 +79,8 @@ var TestNetSeeds = []string{ ...@@ -77,8 +79,8 @@ var TestNetSeeds = []string{
"47.104.125.177:13802", "47.104.125.177:13802",
} }
// InnerSeeds built-in list of seed // MainNetSeeds built-in list of seed
var InnerSeeds = []string{ var MainNetSeeds = []string{
"39.107.234.240:13802", "39.107.234.240:13802",
"39.105.88.66:13802", "39.105.88.66:13802",
"39.105.87.114:13802", "39.105.87.114:13802",
......
...@@ -8,14 +8,35 @@ import ( ...@@ -8,14 +8,35 @@ import (
"container/list" "container/list"
"fmt" "fmt"
"io" "io"
"sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
//"time"
pb "github.com/33cn/chain33/types" pb "github.com/33cn/chain33/types"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
// Invs datastruct
type Invs []*pb.Inventory
//Len size of the Invs data
func (i Invs) Len() int {
return len(i)
}
//Less Sort from low to high
func (i Invs) Less(a, b int) bool {
return i[a].GetHeight() < i[b].GetHeight()
}
//Swap the param
func (i Invs) Swap(a, b int) {
i[a], i[b] = i[b], i[a]
}
// DownloadJob defines download job type // DownloadJob defines download job type
type DownloadJob struct { type DownloadJob struct {
wg sync.WaitGroup wg sync.WaitGroup
...@@ -25,11 +46,11 @@ type DownloadJob struct { ...@@ -25,11 +46,11 @@ type DownloadJob struct {
mtx sync.Mutex mtx sync.Mutex
busyPeer map[string]*peerJob busyPeer map[string]*peerJob
downloadPeers []*Peer downloadPeers []*Peer
MaxJob int32
} }
type peerJob struct { type peerJob struct {
pbPeer *pb.Peer limit int32
limit int32
} }
// NewDownloadJob create a downloadjob object // NewDownloadJob create a downloadjob object
...@@ -39,6 +60,12 @@ func NewDownloadJob(p2pcli *Cli, peers []*Peer) *DownloadJob { ...@@ -39,6 +60,12 @@ func NewDownloadJob(p2pcli *Cli, peers []*Peer) *DownloadJob {
job.p2pcli = p2pcli job.p2pcli = p2pcli
job.busyPeer = make(map[string]*peerJob) job.busyPeer = make(map[string]*peerJob)
job.downloadPeers = peers job.downloadPeers = peers
job.MaxJob = 2
if len(peers) < 5 {
job.MaxJob = 10
}
//job.okChan = make(chan *pb.Inventory, 512)
return job return job
} }
...@@ -46,23 +73,60 @@ func (d *DownloadJob) isBusyPeer(pid string) bool { ...@@ -46,23 +73,60 @@ func (d *DownloadJob) isBusyPeer(pid string) bool {
d.mtx.Lock() d.mtx.Lock()
defer d.mtx.Unlock() defer d.mtx.Unlock()
if pjob, ok := d.busyPeer[pid]; ok { if pjob, ok := d.busyPeer[pid]; ok {
return atomic.LoadInt32(&pjob.limit) > 10 //每个节点最多同时接受10个下载任务 return atomic.LoadInt32(&pjob.limit) >= d.MaxJob //每个节点最多同时接受10个下载任务
} }
return false return false
} }
func (d *DownloadJob) setBusyPeer(peer *pb.Peer) { func (d *DownloadJob) getJobNum(pid string) int32 {
d.mtx.Lock() d.mtx.Lock()
defer d.mtx.Unlock() defer d.mtx.Unlock()
if pjob, ok := d.busyPeer[peer.GetName()]; ok { if pjob, ok := d.busyPeer[pid]; ok {
return atomic.LoadInt32(&pjob.limit)
}
return 0
}
func (d *DownloadJob) setBusyPeer(pid string) {
d.mtx.Lock()
defer d.mtx.Unlock()
if pjob, ok := d.busyPeer[pid]; ok {
atomic.AddInt32(&pjob.limit, 1) atomic.AddInt32(&pjob.limit, 1)
d.busyPeer[peer.GetName()] = pjob d.busyPeer[pid] = pjob
return return
} }
d.busyPeer[peer.GetName()] = &peerJob{peer, 1} d.busyPeer[pid] = &peerJob{1}
} }
func (d *DownloadJob) removePeer(pid string) {
d.mtx.Lock()
defer d.mtx.Unlock()
for i, pr := range d.downloadPeers {
if pr.GetPeerName() == pid {
if i != len(d.downloadPeers)-1 {
d.downloadPeers = append(d.downloadPeers[:i], d.downloadPeers[i+1:]...)
return
}
d.downloadPeers = d.downloadPeers[:i]
return
}
}
}
// ResetDownloadPeers reset download peers
func (d *DownloadJob) ResetDownloadPeers(peers []*Peer) {
d.mtx.Lock()
defer d.mtx.Unlock()
copy(d.downloadPeers, peers)
}
func (d *DownloadJob) avalidPeersNum() int {
d.mtx.Lock()
defer d.mtx.Unlock()
return len(d.downloadPeers)
}
func (d *DownloadJob) setFreePeer(pid string) { func (d *DownloadJob) setFreePeer(pid string) {
d.mtx.Lock() d.mtx.Lock()
defer d.mtx.Unlock() defer d.mtx.Unlock()
...@@ -77,8 +141,10 @@ func (d *DownloadJob) setFreePeer(pid string) { ...@@ -77,8 +141,10 @@ func (d *DownloadJob) setFreePeer(pid string) {
} }
// GetFreePeer get free peer ,return peer // GetFreePeer get free peer ,return peer
func (d *DownloadJob) GetFreePeer(joblimit int64) *Peer { func (d *DownloadJob) GetFreePeer(blockHeight int64) *Peer {
_, infos := d.p2pcli.network.node.GetActivePeers() _, infos := d.p2pcli.network.node.GetActivePeers()
var jobNum int32 = 10
var bestPeer *Peer
for _, peer := range d.downloadPeers { for _, peer := range d.downloadPeers {
pbpeer, ok := infos[peer.Addr()] pbpeer, ok := infos[peer.Addr()]
if ok { if ok {
...@@ -86,17 +152,24 @@ func (d *DownloadJob) GetFreePeer(joblimit int64) *Peer { ...@@ -86,17 +152,24 @@ func (d *DownloadJob) GetFreePeer(joblimit int64) *Peer {
peer.SetPeerName(pbpeer.GetName()) peer.SetPeerName(pbpeer.GetName())
} }
if pbpeer.GetHeader().GetHeight() >= joblimit { if pbpeer.GetHeader().GetHeight() >= blockHeight {
if d.isBusyPeer(pbpeer.GetName()) { if d.isBusyPeer(pbpeer.GetName()) {
continue continue
} }
d.setBusyPeer(pbpeer) peerJopNum := d.getJobNum(pbpeer.GetName())
return peer if jobNum > peerJopNum {
jobNum = peerJopNum
bestPeer = peer
}
} }
} }
} }
return nil if bestPeer != nil {
d.setBusyPeer(bestPeer.GetPeerName())
}
return bestPeer
} }
// CancelJob cancel the downloadjob object // CancelJob cancel the downloadjob object
...@@ -117,11 +190,11 @@ func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory, ...@@ -117,11 +190,11 @@ func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory,
} }
for _, inv := range invs { //让一个节点一次下载一个区块,下载失败区块,交给下一轮下载 for _, inv := range invs { //让一个节点一次下载一个区块,下载失败区块,交给下一轮下载
freePeer := d.GetFreePeer(inv.GetHeight()) REGET:
freePeer := d.GetFreePeer(inv.GetHeight()) //获取当前任务数最少的节点,相当于 下载速度最快的节点
if freePeer == nil { if freePeer == nil {
//log.Error("DownloadBlock", "freepeer is null", inv.GetHeight()) time.Sleep(time.Millisecond * 100)
d.retryList.PushBack(inv) goto REGET
continue
} }
d.wg.Add(1) d.wg.Add(1)
...@@ -129,7 +202,10 @@ func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory, ...@@ -129,7 +202,10 @@ func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory,
defer d.wg.Done() defer d.wg.Done()
err := d.syncDownloadBlock(peer, inv, bchan) err := d.syncDownloadBlock(peer, inv, bchan)
if err != nil { if err != nil {
d.retryList.PushBack(inv) //失败的下载,放在下一轮ReDownload进行下载 d.removePeer(peer.GetPeerName())
log.Error("DownloadBlock:syncDownloadBlock", "height", inv.GetHeight(), "peer", peer.GetPeerName(), "err", err)
d.retryList.PushFront(inv) //失败的下载,放在下一轮ReDownload进行下载
} else { } else {
d.setFreePeer(peer.GetPeerName()) d.setFreePeer(peer.GetPeerName())
} }
...@@ -152,19 +228,21 @@ func (d *DownloadJob) restOfInvs(bchan chan *pb.BlockPid) []*pb.Inventory { ...@@ -152,19 +228,21 @@ func (d *DownloadJob) restOfInvs(bchan chan *pb.BlockPid) []*pb.Inventory {
return errinvs return errinvs
} }
var invs []*pb.Inventory var invsArr Invs
for e := d.retryList.Front(); e != nil; { for e := d.retryList.Front(); e != nil; {
if e.Value == nil { if e.Value == nil {
continue continue
} }
log.Debug("restofInvs", "inv", e.Value.(*pb.Inventory).GetHeight()) log.Debug("resetofInvs", "inv", e.Value.(*pb.Inventory).GetHeight())
invs = append(invs, e.Value.(*pb.Inventory)) //把下载遗漏的区块,重新组合进行下载 invsArr = append(invsArr, e.Value.(*pb.Inventory)) //把下载遗漏的区块,重新组合进行下载
next := e.Next() next := e.Next()
d.retryList.Remove(e) d.retryList.Remove(e)
e = next e = next
} }
//Sort
return invs sort.Sort(invsArr)
//log.Info("resetOfInvs", "sorted:", invs)
return invsArr
} }
func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan chan *pb.BlockPid) error { func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan chan *pb.BlockPid) error {
...@@ -179,26 +257,33 @@ func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan cha ...@@ -179,26 +257,33 @@ func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan cha
var p2pdata pb.P2PGetData var p2pdata pb.P2PGetData
p2pdata.Version = d.p2pcli.network.node.nodeInfo.cfg.Version p2pdata.Version = d.p2pcli.network.node.nodeInfo.cfg.Version
p2pdata.Invs = []*pb.Inventory{inv} p2pdata.Invs = []*pb.Inventory{inv}
beg := pb.Now()
resp, err := peer.mconn.gcli.GetData(context.Background(), &p2pdata, grpc.FailFast(true)) resp, err := peer.mconn.gcli.GetData(context.Background(), &p2pdata, grpc.FailFast(true))
P2pComm.CollectPeerStat(err, peer) P2pComm.CollectPeerStat(err, peer)
if err != nil { if err != nil {
log.Error("syncDownloadBlock", "GetData err", err.Error()) log.Error("syncDownloadBlock", "GetData err", err.Error())
return err return err
} }
defer func() {
log.Debug("download", "frompeer", peer.Addr(), "blockheight", inv.GetHeight(), "downloadcost", pb.Since(beg))
}()
defer resp.CloseSend() defer resp.CloseSend()
for { for {
invdatas, err := resp.Recv() invdatas, err := resp.Recv()
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
log.Debug("download", "from", peer.Addr(), "block", inv.GetHeight()) if invdatas == nil {
return nil return nil
}
goto RECV
} }
log.Error("download", "resp,Recv err", err.Error(), "download from", peer.Addr()) log.Error("download", "resp,Recv err", err.Error(), "download from", peer.Addr())
return err return err
} }
RECV:
for _, item := range invdatas.Items { for _, item := range invdatas.Items {
bchan <- &pb.BlockPid{Pid: peer.GetPeerName(), Block: item.GetBlock()} //下载完成后插入bchan bchan <- &pb.BlockPid{Pid: peer.GetPeerName(), Block: item.GetBlock()} //下载完成后插入bchan
log.Debug("download", "frompeer", peer.Addr(), "blockheight", inv.GetHeight(), "Blocksize", item.GetBlock().XXX_Size())
} }
} }
} }
...@@ -7,11 +7,15 @@ package p2p ...@@ -7,11 +7,15 @@ package p2p
import ( import (
"fmt" "fmt"
"net" "net"
"sync"
"time" "time"
pb "github.com/33cn/chain33/types" pb "github.com/33cn/chain33/types"
"golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
pr "google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
) )
// Listener the actions // Listener the actions
...@@ -49,8 +53,8 @@ type listener struct { ...@@ -49,8 +53,8 @@ type listener struct {
// NewListener produce a listener object // NewListener produce a listener object
func NewListener(protocol string, node *Node) Listener { func NewListener(protocol string, node *Node) Listener {
log.Debug("NewListener", "localPort", defaultPort) log.Debug("NewListener", "localPort", node.listenPort)
l, err := net.Listen(protocol, fmt.Sprintf(":%v", defaultPort)) l, err := net.Listen(protocol, fmt.Sprintf(":%v", node.listenPort))
if err != nil { if err != nil {
log.Crit("Failed to listen", "Error", err.Error()) log.Crit("Failed to listen", "Error", err.Error())
return nil return nil
...@@ -65,6 +69,56 @@ func NewListener(protocol string, node *Node) Listener { ...@@ -65,6 +69,56 @@ func NewListener(protocol string, node *Node) Listener {
pServer := NewP2pServer() pServer := NewP2pServer()
pServer.node = dl.node pServer.node = dl.node
//一元拦截器 接口调用之前进行校验拦截
interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
//checkAuth
getctx, ok := pr.FromContext(ctx)
if !ok {
return nil, fmt.Errorf("")
}
ip, _, err := net.SplitHostPort(getctx.Addr.String())
if err != nil {
return nil, err
}
if pServer.node.nodeInfo.blacklist.Has(ip) {
return nil, fmt.Errorf("blacklist %v no authorized", ip)
}
if !auth(ip) {
log.Error("interceptor", "auth faild", ip)
//把相应的IP地址加入黑名单中
pServer.node.nodeInfo.blacklist.Add(ip, int64(3600))
return nil, fmt.Errorf("auth faild %v no authorized", ip)
}
// Continue processing the request
return handler(ctx, req)
}
//流拦截器
interceptorStream := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
getctx, ok := pr.FromContext(ss.Context())
if !ok {
log.Error("interceptorStream", "FromContext error", "")
return fmt.Errorf("stream Context err")
}
ip, _, err := net.SplitHostPort(getctx.Addr.String())
if err != nil {
return err
}
if pServer.node.nodeInfo.blacklist.Has(ip) {
return fmt.Errorf("blacklist %v no authorized", ip)
}
if !auth(ip) {
log.Error("interceptorStream", "auth faild", ip)
//把相应的IP地址加入黑名单中
pServer.node.nodeInfo.blacklist.Add(ip, int64(3600))
return fmt.Errorf("auth faild %v no authorized", ip)
}
return handler(srv, ss)
}
var opts []grpc.ServerOption
opts = append(opts, grpc.UnaryInterceptor(interceptor), grpc.StreamInterceptor(interceptorStream))
//区块最多10M //区块最多10M
msgRecvOp := grpc.MaxMsgSize(11 * 1024 * 1024) //设置最大接收数据大小位11M msgRecvOp := grpc.MaxMsgSize(11 * 1024 * 1024) //设置最大接收数据大小位11M
msgSendOp := grpc.MaxSendMsgSize(11 * 1024 * 1024) //设置最大发送数据大小为11M msgSendOp := grpc.MaxSendMsgSize(11 * 1024 * 1024) //设置最大发送数据大小为11M
...@@ -74,9 +128,79 @@ func NewListener(protocol string, node *Node) Listener { ...@@ -74,9 +128,79 @@ func NewListener(protocol string, node *Node) Listener {
keepparm.MaxConnectionIdle = 1 * time.Minute keepparm.MaxConnectionIdle = 1 * time.Minute
maxStreams := grpc.MaxConcurrentStreams(1000) maxStreams := grpc.MaxConcurrentStreams(1000)
keepOp := grpc.KeepaliveParams(keepparm) keepOp := grpc.KeepaliveParams(keepparm)
StatsOp := grpc.StatsHandler(&statshandler{})
dl.server = grpc.NewServer(msgRecvOp, msgSendOp, keepOp, maxStreams) opts = append(opts, msgRecvOp, msgSendOp, keepOp, maxStreams, StatsOp)
dl.server = grpc.NewServer(opts...)
dl.p2pserver = pServer dl.p2pserver = pServer
pb.RegisterP2PgserviceServer(dl.server, pServer) pb.RegisterP2PgserviceServer(dl.server, pServer)
return dl return dl
} }
type statshandler struct{}
func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return context.WithValue(ctx, connCtxKey{}, info)
}
func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return ctx
}
func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) {
if ctx == nil {
return
}
tag, ok := getConnTagFromContext(ctx)
if !ok {
fmt.Println("can not get conn tag")
return
}
ip, _, err := net.SplitHostPort(tag.RemoteAddr.String())
if err != nil {
return
}
connsMutex.Lock()
defer connsMutex.Unlock()
if _, ok := conns[ip]; !ok {
conns[ip] = 0
}
switch s.(type) {
case *stats.ConnBegin:
conns[ip] = conns[ip] + 1
case *stats.ConnEnd:
conns[ip] = conns[ip] - 1
if conns[ip] <= 0 {
delete(conns, ip)
}
log.Debug("ip connend", "ip", ip, "n", conns[ip])
default:
log.Error("illegal ConnStats type\n")
}
}
// HandleRPC 为空.
func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {}
type connCtxKey struct{}
var connsMutex sync.Mutex
var conns = make(map[string]uint)
func getConnTagFromContext(ctx context.Context) (*stats.ConnTagInfo, bool) {
tag, ok := ctx.Value(connCtxKey{}).(*stats.ConnTagInfo)
return tag, ok
}
func auth(checkIP string) bool {
connsMutex.Lock()
defer connsMutex.Unlock()
count, ok := conns[checkIP]
if ok && count > maxSamIPNum {
log.Error("AuthCheck", "sameIP num:", count, "checkIP:", checkIP, "diffIP num:", len(conns))
return false
}
return true
}
...@@ -28,7 +28,7 @@ func (n *Node) monitorErrPeer() { ...@@ -28,7 +28,7 @@ func (n *Node) monitorErrPeer() {
peer := <-n.nodeInfo.monitorChan peer := <-n.nodeInfo.monitorChan
if !peer.version.IsSupport() { if !peer.version.IsSupport() {
//如果版本不支持,直接删除节点 //如果版本不支持,直接删除节点
log.Debug("VersoinMonitor", "NotSupport,addr", peer.Addr()) log.Info("VersoinMonitor", "NotSupport,addr", peer.Addr())
n.destroyPeer(peer) n.destroyPeer(peer)
//加入黑名单12小时 //加入黑名单12小时
n.nodeInfo.blacklist.Add(peer.Addr(), int64(3600*12)) n.nodeInfo.blacklist.Add(peer.Addr(), int64(3600*12))
...@@ -123,6 +123,19 @@ func (n *Node) getAddrFromOnline() { ...@@ -123,6 +123,19 @@ func (n *Node) getAddrFromOnline() {
} }
if rangeCount < maxOutBoundNum {
//从innerSeeds 读取连接
n.innerSeeds.Range(func(k, v interface{}) bool {
rangeCount++
if rangeCount < maxOutBoundNum {
n.pubsub.FIFOPub(k.(string), "addr")
return true
}
return false
})
}
continue continue
} }
...@@ -162,15 +175,21 @@ func (n *Node) getAddrFromOnline() { ...@@ -162,15 +175,21 @@ func (n *Node) getAddrFromOnline() {
if _, ok := seedsMap[addr]; ok { if _, ok := seedsMap[addr]; ok {
continue continue
} }
//随机删除连接的一个种子 //随机删除连接的一个种子
for _, seed := range seedArr {
if n.Has(seed) {
n.remove(seed)
n.nodeInfo.addrBook.RemoveAddr(seed)
break
}
}
n.innerSeeds.Range(func(k, v interface{}) bool {
if n.Has(k.(string)) {
//不能包含在cfgseed中
if _, ok := n.cfgSeeds.Load(k.(string)); ok {
return true
}
n.remove(k.(string))
n.nodeInfo.addrBook.RemoveAddr(k.(string))
return false
}
return true
})
} }
} }
time.Sleep(MonitorPeerInfoInterval) time.Sleep(MonitorPeerInfoInterval)
...@@ -180,7 +199,7 @@ func (n *Node) getAddrFromOnline() { ...@@ -180,7 +199,7 @@ func (n *Node) getAddrFromOnline() {
if !n.nodeInfo.blacklist.Has(addr) || !Filter.QueryRecvData(addr) { if !n.nodeInfo.blacklist.Has(addr) || !Filter.QueryRecvData(addr) {
if ticktimes < 10 { if ticktimes < 10 {
//如果连接了其他节点,优先不连接种子节点 //如果连接了其他节点,优先不连接种子节点
if _, ok := seedsMap[addr]; !ok { if _, ok := n.innerSeeds.Load(addr); !ok {
//先把seed 排除在外 //先把seed 排除在外
n.pubsub.FIFOPub(addr, "addr") n.pubsub.FIFOPub(addr, "addr")
...@@ -263,8 +282,8 @@ func (n *Node) nodeReBalance() { ...@@ -263,8 +282,8 @@ func (n *Node) nodeReBalance() {
//筛选缓存备选节点负载最大和最小的节点 //筛选缓存备选节点负载最大和最小的节点
cachePeers := n.GetCacheBounds() cachePeers := n.GetCacheBounds()
var MixCacheInBounds int32 = 1000 var MinCacheInBounds int32 = 1000
var MixCacheInBoundPeer *Peer var MinCacheInBoundPeer *Peer
var MaxCacheInBounds int32 var MaxCacheInBounds int32
var MaxCacheInBoundPeer *Peer var MaxCacheInBoundPeer *Peer
for _, peer := range cachePeers { for _, peer := range cachePeers {
...@@ -275,9 +294,9 @@ func (n *Node) nodeReBalance() { ...@@ -275,9 +294,9 @@ func (n *Node) nodeReBalance() {
continue continue
} }
//选出最小负载 //选出最小负载
if int32(inbounds) < MixCacheInBounds { if int32(inbounds) < MinCacheInBounds {
MixCacheInBounds = int32(inbounds) MinCacheInBounds = int32(inbounds)
MixCacheInBoundPeer = peer MinCacheInBoundPeer = peer
} }
//选出负载最大 //选出负载最大
...@@ -287,7 +306,7 @@ func (n *Node) nodeReBalance() { ...@@ -287,7 +306,7 @@ func (n *Node) nodeReBalance() {
} }
} }
if MixCacheInBoundPeer == nil || MaxCacheInBoundPeer == nil { if MinCacheInBoundPeer == nil || MaxCacheInBoundPeer == nil {
continue continue
} }
...@@ -297,7 +316,7 @@ func (n *Node) nodeReBalance() { ...@@ -297,7 +316,7 @@ func (n *Node) nodeReBalance() {
MaxCacheInBoundPeer.Close() MaxCacheInBoundPeer.Close()
} }
//如果最大的负载量比缓存中负载最小的小,则删除缓存中所有的节点 //如果最大的负载量比缓存中负载最小的小,则删除缓存中所有的节点
if MaxInBounds < MixCacheInBounds { if MaxInBounds < MinCacheInBounds {
cachePeers := n.GetCacheBounds() cachePeers := n.GetCacheBounds()
for _, peer := range cachePeers { for _, peer := range cachePeers {
n.RemoveCachePeer(peer.Addr()) n.RemoveCachePeer(peer.Addr())
...@@ -306,16 +325,16 @@ func (n *Node) nodeReBalance() { ...@@ -306,16 +325,16 @@ func (n *Node) nodeReBalance() {
continue continue
} }
log.Info("nodeReBalance", "MaxInBounds", MaxInBounds, "MixCacheInBounds", MixCacheInBounds) log.Info("nodeReBalance", "MaxInBounds", MaxInBounds, "MixCacheInBounds", MinCacheInBounds)
if MaxInBounds-MixCacheInBounds < 50 { if MaxInBounds-MinCacheInBounds < 50 {
continue continue
} }
if MixCacheInBoundPeer != nil { if MinCacheInBoundPeer != nil {
info, err := MixCacheInBoundPeer.GetPeerInfo(VERSION) info, err := MinCacheInBoundPeer.GetPeerInfo(VERSION)
if err != nil { if err != nil {
n.RemoveCachePeer(MixCacheInBoundPeer.Addr()) n.RemoveCachePeer(MinCacheInBoundPeer.Addr())
MixCacheInBoundPeer.Close() MinCacheInBoundPeer.Close()
continue continue
} }
localBlockHeight, err := p2pcli.GetBlockHeight(n.nodeInfo) localBlockHeight, err := p2pcli.GetBlockHeight(n.nodeInfo)
...@@ -324,10 +343,10 @@ func (n *Node) nodeReBalance() { ...@@ -324,10 +343,10 @@ func (n *Node) nodeReBalance() {
} }
peerBlockHeight := info.GetHeader().GetHeight() peerBlockHeight := info.GetHeader().GetHeight()
if localBlockHeight-peerBlockHeight < 2048 { if localBlockHeight-peerBlockHeight < 2048 {
log.Info("noReBalance", "Repalce node new node", MixCacheInBoundPeer.Addr(), "old node", MaxCacheInBoundPeer.Addr()) log.Info("noReBalance", "Repalce node new node", MinCacheInBoundPeer.Addr(), "old node", MaxInBoundPeer.Addr())
n.addPeer(MixCacheInBoundPeer) n.addPeer(MinCacheInBoundPeer)
n.remove(MaxInBoundPeer.Addr()) n.remove(MaxInBoundPeer.Addr())
n.RemoveCachePeer(MixCacheInBoundPeer.Addr()) n.RemoveCachePeer(MinCacheInBoundPeer.Addr())
} }
} }
} }
...@@ -373,11 +392,13 @@ func (n *Node) monitorPeers() { ...@@ -373,11 +392,13 @@ func (n *Node) monitorPeers() {
if n.Size() <= stableBoundNum { if n.Size() <= stableBoundNum {
continue continue
} }
//如果是配置节点,则不删除
if _, ok := n.cfgSeeds.Load(paddr); ok {
continue
}
//删除节点数过低的节点 //删除节点数过低的节点
n.remove(paddr) n.remove(paddr)
n.nodeInfo.addrBook.RemoveAddr(paddr) n.nodeInfo.addrBook.RemoveAddr(paddr)
//短暂加入黑名单5分钟
//n.nodeInfo.blacklist.Add(paddr, int64(60*5))
} }
} }
...@@ -469,7 +490,9 @@ func (n *Node) monitorDialPeers() { ...@@ -469,7 +490,9 @@ func (n *Node) monitorDialPeers() {
if peer != nil { if peer != nil {
peer.Close() peer.Close()
} }
n.nodeInfo.blacklist.Add(netAddr.String(), int64(60*10)) if _, ok := n.cfgSeeds.Load(netAddr.String()); !ok {
n.nodeInfo.blacklist.Add(netAddr.String(), int64(60*10))
}
return return
} }
//查询远程节点的负载 //查询远程节点的负载
...@@ -532,3 +555,48 @@ func (n *Node) monitorBlackList() { ...@@ -532,3 +555,48 @@ func (n *Node) monitorBlackList() {
func (n *Node) monitorFilter() { func (n *Node) monitorFilter() {
Filter.ManageRecvFilter() Filter.ManageRecvFilter()
} }
//独立goroutine 监控配置的
func (n *Node) monitorCfgSeeds() {
ticker := time.NewTicker(CheckCfgSeedsInterVal)
defer ticker.Stop()
for {
if n.isClose() {
log.Info("monitorCfgSeeds", "loop", "done")
return
}
<-ticker.C
n.cfgSeeds.Range(func(k, v interface{}) bool {
if !n.Has(k.(string)) {
//尝试连接此节点
if n.needMore() { //如果需要更多的节点
n.pubsub.FIFOPub(k.(string), "addr")
} else {
//腾笼换鸟
peers, _ := n.GetActivePeers()
//选出当前连接的节点中,负载最大的节点
var MaxInBounds int32
var MaxInBoundPeer *Peer
for _, peer := range peers {
if peer.GetInBouns() > MaxInBounds {
MaxInBounds = peer.GetInBouns()
MaxInBoundPeer = peer
}
}
n.remove(MaxInBoundPeer.Addr())
n.pubsub.FIFOPub(k.(string), "addr")
}
}
return true
})
}
}
...@@ -172,6 +172,7 @@ func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) { ...@@ -172,6 +172,7 @@ func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) {
ch2 <- P2pComm.GrpcConfig() ch2 <- P2pComm.GrpcConfig()
log.Debug("NetAddress", "Dial with unCompressor", na.String()) log.Debug("NetAddress", "Dial with unCompressor", na.String())
conn, err = grpc.Dial(na.String(), grpc.WithInsecure(), grpc.WithServiceConfig(ch2), keepaliveOp, timeoutOp) conn, err = grpc.Dial(na.String(), grpc.WithInsecure(), grpc.WithServiceConfig(ch2), keepaliveOp, timeoutOp)
} }
if err != nil { if err != nil {
...@@ -184,132 +185,8 @@ func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) { ...@@ -184,132 +185,8 @@ func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) {
} }
return nil, err return nil, err
} }
return conn, nil
}
// Routable returns true if the address is routable.
func (na *NetAddress) Routable() bool {
// TODO(oga) bitcoind doesn't include RFC3849 here, but should we?
return na.Valid() && !(na.RFC1918() || na.RFC3927() || na.RFC4862() ||
na.RFC4193() || na.RFC4843() || na.Local())
}
// Valid For IPv4 these are either a 0 or all bits set address. For IPv6 a zero
// address or one that matches the RFC3849 documentation address format.
func (na *NetAddress) Valid() bool {
return na.IP != nil && !(na.IP.IsUnspecified() || na.RFC3849() ||
na.IP.Equal(net.IPv4bcast))
}
// Local returns true if it is a local address.
func (na *NetAddress) Local() bool {
return na.IP.IsLoopback() || zero4.Contains(na.IP)
}
// ReachabilityTo checks whenever o can be reached from na.
func (na *NetAddress) ReachabilityTo(o *NetAddress) int {
const (
Unreachable = 0
Default = iota
Teredo
Ipv6Weak
Ipv4
Ipv6Strong
)
if !na.Routable() {
return Unreachable
} else if na.RFC4380() {
if !o.Routable() {
return Default
} else if o.RFC4380() {
return Teredo
} else if o.IP.To4() != nil {
return Ipv4
}
// ipv6
return Ipv6Weak
} else if na.IP.To4() != nil {
if o.Routable() && o.IP.To4() != nil {
return Ipv4
}
return Default
}
/* ipv6 */
var tunnelled bool
// Is our v6 is tunnelled?
if o.RFC3964() || o.RFC6052() || o.RFC6145() {
tunnelled = true
}
if !o.Routable() {
return Default
} else if o.RFC4380() {
return Teredo
} else if o.IP.To4() != nil {
return Ipv4
} else if tunnelled {
// only prioritise ipv6 if we aren't tunnelling it.
return Ipv6Weak
}
return Ipv6Strong
}
// RFC1918: IPv4 Private networks (10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12) //p2p version check
// RFC3849: IPv6 Documentation address (2001:0DB8::/32)
// RFC3927: IPv4 Autoconfig (169.254.0.0/16)
// RFC3964: IPv6 6to4 (2002::/16)
// RFC4193: IPv6 unique local (FC00::/7)
// RFC4380: IPv6 Teredo tunneling (2001::/32)
// RFC4843: IPv6 ORCHID: (2001:10::/28)
// RFC4862: IPv6 Autoconfig (FE80::/64)
// RFC6052: IPv6 well known prefix (64:FF9B::/96)
// RFC6145: IPv6 IPv4 translated address ::FFFF:0:0:0/96
var (
rfc1918_10 = net.IPNet{IP: net.ParseIP("10.0.0.0"), Mask: net.CIDRMask(8, 32)}
rfc1918_192 = net.IPNet{IP: net.ParseIP("192.168.0.0"), Mask: net.CIDRMask(16, 32)}
rfc1918_172 = net.IPNet{IP: net.ParseIP("172.16.0.0"), Mask: net.CIDRMask(12, 32)}
rfc3849 = net.IPNet{IP: net.ParseIP("2001:0DB8::"), Mask: net.CIDRMask(32, 128)}
rfc3927 = net.IPNet{IP: net.ParseIP("169.254.0.0"), Mask: net.CIDRMask(16, 32)}
rfc3964 = net.IPNet{IP: net.ParseIP("2002::"), Mask: net.CIDRMask(16, 128)}
rfc4193 = net.IPNet{IP: net.ParseIP("FC00::"), Mask: net.CIDRMask(7, 128)}
rfc4380 = net.IPNet{IP: net.ParseIP("2001::"), Mask: net.CIDRMask(32, 128)}
rfc4843 = net.IPNet{IP: net.ParseIP("2001:10::"), Mask: net.CIDRMask(28, 128)}
rfc4862 = net.IPNet{IP: net.ParseIP("FE80::"), Mask: net.CIDRMask(64, 128)}
rfc6052 = net.IPNet{IP: net.ParseIP("64:FF9B::"), Mask: net.CIDRMask(96, 128)}
rfc6145 = net.IPNet{IP: net.ParseIP("::FFFF:0:0:0"), Mask: net.CIDRMask(96, 128)}
zero4 = net.IPNet{IP: net.ParseIP("0.0.0.0"), Mask: net.CIDRMask(8, 32)}
)
// RFC1918 defines ipv4 private network function return conn, nil
func (na *NetAddress) RFC1918() bool {
return rfc1918_10.Contains(na.IP) ||
rfc1918_192.Contains(na.IP) ||
rfc1918_172.Contains(na.IP)
} }
// RFC3849 defines ipv6 network function
func (na *NetAddress) RFC3849() bool { return rfc3849.Contains(na.IP) }
// RFC3927 defines ipv4 network function
func (na *NetAddress) RFC3927() bool { return rfc3927.Contains(na.IP) }
// RFC3964 defines ipv6 6to4 function
func (na *NetAddress) RFC3964() bool { return rfc3964.Contains(na.IP) }
// RFC4193 defines ipv6 unique local function
func (na *NetAddress) RFC4193() bool { return rfc4193.Contains(na.IP) }
// RFC4380 defines ipv6 teredo tunneling function
func (na *NetAddress) RFC4380() bool { return rfc4380.Contains(na.IP) }
// RFC4843 defines ipv6 orchid function
func (na *NetAddress) RFC4843() bool { return rfc4843.Contains(na.IP) }
// RFC4862 defines ipv6 autoconfig function
func (na *NetAddress) RFC4862() bool { return rfc4862.Contains(na.IP) }
// RFC6052 defines ipv6 well know prefix function
func (na *NetAddress) RFC6052() bool { return rfc6052.Contains(na.IP) }
// RFC6145 defines ipv6 ipv4 translated addredd function
func (na *NetAddress) RFC6145() bool { return rfc6145.Contains(na.IP) }
...@@ -67,6 +67,9 @@ type Node struct { ...@@ -67,6 +67,9 @@ type Node struct {
cacheBound map[string]*Peer cacheBound map[string]*Peer
outBound map[string]*Peer outBound map[string]*Peer
listener Listener listener Listener
listenPort int
innerSeeds sync.Map
cfgSeeds sync.Map
closed int32 closed int32
pubsub *pubsub.PubSub pubsub *pubsub.PubSub
} }
...@@ -84,21 +87,24 @@ func NewNode(cfg *types.P2P) (*Node, error) { ...@@ -84,21 +87,24 @@ func NewNode(cfg *types.P2P) (*Node, error) {
cacheBound: make(map[string]*Peer), cacheBound: make(map[string]*Peer),
pubsub: pubsub.NewPubSub(10200), pubsub: pubsub.NewPubSub(10200),
} }
node.listenPort = 13802
if cfg.Port != 0 && cfg.Port <= 65535 && cfg.Port > 1024 { if cfg.Port != 0 && cfg.Port <= 65535 && cfg.Port > 1024 {
defaultPort = int(cfg.Port) node.listenPort = int(cfg.Port)
} }
if cfg.InnerSeedEnable { seeds := MainNetSeeds
if types.IsTestNet() { if types.IsTestNet() {
cfg.Seeds = append(cfg.Seeds, TestNetSeeds...) seeds = TestNetSeeds
} else { }
cfg.Seeds = append(cfg.Seeds, InnerSeeds...)
}
for _, seed := range seeds {
node.innerSeeds.Store(seed, "inner")
} }
for _, seed := range cfg.Seeds {
node.cfgSeeds.Store(seed, "cfg")
}
node.nodeInfo = NewNodeInfo(cfg) node.nodeInfo = NewNodeInfo(cfg)
if cfg.ServerStart { if cfg.ServerStart {
node.listener = NewListener(protocol, node) node.listener = NewListener(protocol, node)
...@@ -133,7 +139,7 @@ func (n *Node) doNat() { ...@@ -133,7 +139,7 @@ func (n *Node) doNat() {
} }
time.Sleep(time.Second) time.Sleep(time.Second)
} }
testExaddr := fmt.Sprintf("%v:%v", n.nodeInfo.GetExternalAddr().IP.String(), defaultPort) testExaddr := fmt.Sprintf("%v:%v", n.nodeInfo.GetExternalAddr().IP.String(), n.listenPort)
log.Info("TestNetAddr", "testExaddr", testExaddr) log.Info("TestNetAddr", "testExaddr", testExaddr)
if len(P2pComm.AddrRouteble([]string{testExaddr})) != 0 { if len(P2pComm.AddrRouteble([]string{testExaddr})) != 0 {
log.Info("node outside") log.Info("node outside")
...@@ -162,7 +168,7 @@ func (n *Node) doNat() { ...@@ -162,7 +168,7 @@ func (n *Node) doNat() {
p2pcli := NewNormalP2PCli() p2pcli := NewNormalP2PCli()
//测试映射后的端口能否连通或者外网+本地端口 //测试映射后的端口能否连通或者外网+本地端口
if p2pcli.CheckPeerNatOk(n.nodeInfo.GetExternalAddr().String()) || if p2pcli.CheckPeerNatOk(n.nodeInfo.GetExternalAddr().String()) ||
p2pcli.CheckPeerNatOk(fmt.Sprintf("%v:%v", n.nodeInfo.GetExternalAddr().IP.String(), defaultPort)) { p2pcli.CheckPeerNatOk(fmt.Sprintf("%v:%v", n.nodeInfo.GetExternalAddr().IP.String(), n.listenPort)) {
n.nodeInfo.SetServiceTy(Service) n.nodeInfo.SetServiceTy(Service)
log.Info("doNat", "NatOk", "Support Service") log.Info("doNat", "NatOk", "Support Service")
...@@ -329,6 +335,7 @@ func (n *Node) monitor() { ...@@ -329,6 +335,7 @@ func (n *Node) monitor() {
go n.monitorFilter() go n.monitorFilter()
go n.monitorPeers() go n.monitorPeers()
go n.nodeReBalance() go n.nodeReBalance()
go n.monitorCfgSeeds()
} }
func (n *Node) needMore() bool { func (n *Node) needMore() bool {
...@@ -366,7 +373,7 @@ func (n *Node) detectNodeAddr() { ...@@ -366,7 +373,7 @@ func (n *Node) detectNodeAddr() {
var externalPort int var externalPort int
if cfg.IsSeed { if cfg.IsSeed {
externalPort = defaultPort externalPort = n.listenPort
} else { } else {
exportBytes, err := n.nodeInfo.addrBook.bookDb.Get([]byte(externalPortTag)) exportBytes, err := n.nodeInfo.addrBook.bookDb.Get([]byte(externalPortTag))
if len(exportBytes) != 0 { if len(exportBytes) != 0 {
...@@ -390,13 +397,11 @@ func (n *Node) detectNodeAddr() { ...@@ -390,13 +397,11 @@ func (n *Node) detectNodeAddr() {
log.Error("DetectionNodeAddr", "error", err.Error()) log.Error("DetectionNodeAddr", "error", err.Error())
} }
if listaddr, err := NewNetAddressString(fmt.Sprintf("%v:%v", laddr, defaultPort)); err == nil { if listaddr, err := NewNetAddressString(fmt.Sprintf("%v:%v", laddr, n.listenPort)); err == nil {
n.nodeInfo.SetListenAddr(listaddr) n.nodeInfo.SetListenAddr(listaddr)
n.nodeInfo.addrBook.AddOurAddress(listaddr) n.nodeInfo.addrBook.AddOurAddress(listaddr)
} }
//log.Info("DetectionNodeAddr", "ExternalIp", externalIP, "LocalAddr", LocalAddr, "IsOutSide", n.nodeInfo.OutSide())
break break
} }
} }
...@@ -417,7 +422,7 @@ func (n *Node) natMapPort() { ...@@ -417,7 +422,7 @@ func (n *Node) natMapPort() {
ok := p2pcli.CheckSelf(n.nodeInfo.GetExternalAddr().String(), n.nodeInfo) ok := p2pcli.CheckSelf(n.nodeInfo.GetExternalAddr().String(), n.nodeInfo)
if !ok { if !ok {
log.Info("natMapPort", "port is used", n.nodeInfo.GetExternalAddr().String()) log.Info("natMapPort", "port is used", n.nodeInfo.GetExternalAddr().String())
n.flushNodePort(uint16(defaultPort), uint16(rand.Intn(64512)+1023)) n.flushNodePort(uint16(n.listenPort), uint16(rand.Intn(64512)+1023))
} }
} }
...@@ -425,11 +430,11 @@ func (n *Node) natMapPort() { ...@@ -425,11 +430,11 @@ func (n *Node) natMapPort() {
log.Info("natMapPort", "netport", n.nodeInfo.GetExternalAddr().Port) log.Info("natMapPort", "netport", n.nodeInfo.GetExternalAddr().Port)
for i := 0; i < tryMapPortTimes; i++ { for i := 0; i < tryMapPortTimes; i++ {
//映射事件持续约48小时 //映射事件持续约48小时
err = nat.Any().AddMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), defaultPort, nodename[:8], time.Hour*48) err = nat.Any().AddMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), n.listenPort, nodename[:8], time.Hour*48)
if err != nil { if err != nil {
if i > tryMapPortTimes/2 { //如果连续失败次数超过最大限制次数的二分之一则切换为随机端口映射 if i > tryMapPortTimes/2 { //如果连续失败次数超过最大限制次数的二分之一则切换为随机端口映射
log.Error("NatMapPort", "err", err.Error()) log.Error("NatMapPort", "err", err.Error())
n.flushNodePort(uint16(defaultPort), uint16(rand.Intn(64512)+1023)) n.flushNodePort(uint16(n.listenPort), uint16(rand.Intn(64512)+1023))
} }
log.Info("NatMapPort", "External Port", n.nodeInfo.GetExternalAddr().Port) log.Info("NatMapPort", "External Port", n.nodeInfo.GetExternalAddr().Port)
...@@ -442,7 +447,7 @@ func (n *Node) natMapPort() { ...@@ -442,7 +447,7 @@ func (n *Node) natMapPort() {
if err != nil { if err != nil {
//映射失败 //映射失败
log.Warn("NatMapPort", "Nat", "Faild") log.Warn("NatMapPort", "Nat", "Faild")
n.flushNodePort(uint16(defaultPort), uint16(defaultPort)) n.flushNodePort(uint16(n.listenPort), uint16(n.listenPort))
n.nodeInfo.natResultChain <- false n.nodeInfo.natResultChain <- false
return return
} }
...@@ -460,7 +465,7 @@ func (n *Node) natMapPort() { ...@@ -460,7 +465,7 @@ func (n *Node) natMapPort() {
<-refresh.C <-refresh.C
log.Info("NatWorkRefresh") log.Info("NatWorkRefresh")
for { for {
if err := nat.Any().AddMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), defaultPort, nodename[:8], time.Hour*48); err != nil { if err := nat.Any().AddMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), n.listenPort, nodename[:8], time.Hour*48); err != nil {
log.Error("NatMapPort update", "err", err.Error()) log.Error("NatMapPort update", "err", err.Error())
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
...@@ -476,7 +481,8 @@ func (n *Node) deleteNatMapPort() { ...@@ -476,7 +481,8 @@ func (n *Node) deleteNatMapPort() {
if n.nodeInfo.OutSide() { if n.nodeInfo.OutSide() {
return return
} }
err := nat.Any().DeleteMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), defaultPort)
err := nat.Any().DeleteMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), n.listenPort)
if err != nil { if err != nil {
log.Error("deleteNatMapPort", "DeleteMapping err", err.Error()) log.Error("deleteNatMapPort", "DeleteMapping err", err.Error())
} }
......
...@@ -55,8 +55,7 @@ func New(cfg *types.P2P) *P2p { ...@@ -55,8 +55,7 @@ func New(cfg *types.P2P) *P2p {
} }
VERSION = cfg.Version VERSION = cfg.Version
log.Info("p2p", "Version", VERSION) log.Info("p2p", "Version", VERSION, "IsTest", types.IsTestNet())
if cfg.InnerBounds == 0 { if cfg.InnerBounds == 0 {
cfg.InnerBounds = 500 cfg.InnerBounds = 500
} }
...@@ -93,6 +92,7 @@ func (network *P2p) Close() { ...@@ -93,6 +92,7 @@ func (network *P2p) Close() {
network.client.Close() network.client.Close()
} }
network.node.pubsub.Shutdown() network.node.pubsub.Shutdown()
} }
// SetQueueClient set the queue // SetQueueClient set the queue
...@@ -215,6 +215,7 @@ func (network *P2p) subP2pMsg() { ...@@ -215,6 +215,7 @@ func (network *P2p) subP2pMsg() {
} }
} }
switch msg.Ty { switch msg.Ty {
case types.EventTxBroadcast: //广播tx case types.EventTxBroadcast: //广播tx
go network.p2pCli.BroadCastTx(msg, taskIndex) go network.p2pCli.BroadCastTx(msg, taskIndex)
case types.EventBlockBroadcast: //广播block case types.EventBlockBroadcast: //广播block
......
package p2p
import (
"encoding/hex"
"net"
"os"
"sort"
"strings"
"testing"
"time"
l "github.com/33cn/chain33/common/log"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
//"github.com/33cn/chain33/util/testnode"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
var q queue.Queue
var p2pModule *P2p
var dataDir = "testdata"
func init() {
VERSION = 119
l.SetLogLevel("err")
q = queue.New("channel")
go q.Start()
p2pModule = initP2p(33802, dataDir)
p2pModule.Wait()
go func() {
blockchainKey := "blockchain"
client := q.Client()
client.Sub(blockchainKey)
for msg := range client.Recv() {
switch msg.Ty {
case types.EventGetBlocks:
if req, ok := msg.GetData().(*types.ReqBlocks); ok {
if req.Start == 1 {
msg.Reply(client.NewMessage(blockchainKey, types.EventBlocks, &types.Transaction{}))
} else {
msg.Reply(client.NewMessage(blockchainKey, types.EventBlocks, &types.BlockDetails{}))
}
} else {
msg.ReplyErr("Do not support", types.ErrInvalidParam)
}
case types.EventGetHeaders:
if req, ok := msg.GetData().(*types.ReqBlocks); ok {
if req.Start == 10 {
msg.Reply(client.NewMessage(blockchainKey, types.EventHeaders, &types.Transaction{}))
} else {
msg.Reply(client.NewMessage(blockchainKey, types.EventHeaders, &types.Headers{}))
}
} else {
msg.ReplyErr("Do not support", types.ErrInvalidParam)
}
case types.EventGetLastHeader:
msg.Reply(client.NewMessage("p2p", types.EventHeader, &types.Header{Height: 2019}))
case types.EventGetBlockHeight:
msg.Reply(client.NewMessage("p2p", types.EventReplyBlockHeight, &types.ReplyBlockHeight{Height: 2019}))
}
}
}()
go func() {
mempoolKey := "mempool"
client := q.Client()
client.Sub(mempoolKey)
for msg := range client.Recv() {
switch msg.Ty {
case types.EventGetMempoolSize:
msg.Reply(client.NewMessage("p2p", types.EventMempoolSize, &types.MempoolSize{Size: 0}))
}
}
}()
}
//初始化p2p模块
func initP2p(port int32, dbpath string) *P2p {
cfg := new(types.P2P)
cfg.Port = port
cfg.Enable = true
cfg.DbPath = dbpath
cfg.DbCache = 4
cfg.Version = 119
cfg.ServerStart = true
cfg.Driver = "leveldb"
p2pcli := New(cfg)
p2pcli.SetQueueClient(q.Client())
p2pcli.node.nodeInfo.SetServiceTy(7)
return p2pcli
}
func TestP2PEvent(t *testing.T) {
qcli := q.Client()
msg := qcli.NewMessage("p2p", types.EventBlockBroadcast, &types.Block{})
qcli.Send(msg, false)
msg = qcli.NewMessage("p2p", types.EventTxBroadcast, &types.Transaction{})
qcli.Send(msg, false)
msg = qcli.NewMessage("p2p", types.EventFetchBlocks, &types.ReqBlocks{})
qcli.Send(msg, false)
msg = qcli.NewMessage("p2p", types.EventGetMempool, nil)
qcli.Send(msg, false)
msg = qcli.NewMessage("p2p", types.EventPeerInfo, nil)
qcli.Send(msg, false)
msg = qcli.NewMessage("p2p", types.EventGetNetInfo, nil)
qcli.Send(msg, false)
msg = qcli.NewMessage("p2p", types.EventFetchBlockHeaders, &types.ReqBlocks{})
qcli.Send(msg, false)
}
func TestNetInfo(t *testing.T) {
p2pModule.node.nodeInfo.IsNatDone()
p2pModule.node.nodeInfo.SetNatDone()
p2pModule.node.nodeInfo.Get()
}
//测试Peer
func TestPeer(t *testing.T) {
conn, err := grpc.Dial("localhost:33802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
assert.Nil(t, err)
defer conn.Close()
remote, err := NewNetAddressString("127.0.0.1:33802")
assert.Nil(t, err)
localP2P := initP2p(43802, "testdata2")
defer os.RemoveAll("testdata2")
defer localP2P.Close()
t.Log(localP2P.node.CacheBoundsSize())
t.Log(localP2P.node.GetCacheBounds())
localP2P.node.RemoveCachePeer("localhost:12345")
peer, err := P2pComm.dialPeer(remote, localP2P.node)
assert.Nil(t, err)
defer peer.Close()
peer.MakePersistent()
localP2P.node.addPeer(peer)
time.Sleep(time.Second * 5)
t.Log(peer.GetInBouns())
t.Log(peer.version.GetVersion())
assert.IsType(t, "string", peer.GetPeerName())
localP2P.node.AddCachePeer(peer)
//
localP2P.node.natOk()
localP2P.node.flushNodePort(43803, 43802)
p2pcli := NewNormalP2PCli()
localP2P.node.nodeInfo.peerInfos.SetPeerInfo(nil)
localP2P.node.nodeInfo.peerInfos.GetPeerInfo("1222")
t.Log(p2pModule.node.GetRegisterPeer("localhost:43802"))
//测试发送Ping消息
err = p2pcli.SendPing(peer, localP2P.node.nodeInfo)
assert.Nil(t, err)
//获取peer节点的被连接数
pnum, err := p2pcli.GetInPeersNum(peer)
assert.Nil(t, err)
assert.Equal(t, 1, pnum)
_, err = peer.GetPeerInfo(VERSION)
assert.Nil(t, err)
//获取节点列表
_, err = p2pcli.GetAddrList(peer)
assert.Nil(t, err)
_, err = p2pcli.SendVersion(peer, localP2P.node.nodeInfo)
assert.Nil(t, err)
t.Log(p2pcli.CheckPeerNatOk("localhost:33802"))
t.Log("checkself:", p2pcli.CheckSelf("loadhost:43803", localP2P.node.nodeInfo))
_, err = p2pcli.GetAddr(peer)
assert.Nil(t, err)
// //测试获取高度
height, err := p2pcli.GetBlockHeight(localP2P.node.nodeInfo)
assert.Nil(t, err)
assert.Equal(t, int(height), 2019)
assert.Equal(t, false, p2pcli.CheckSelf("localhost:33802", localP2P.node.nodeInfo))
//测试下载
job := NewDownloadJob(NewP2PCli(localP2P).(*Cli), []*Peer{peer})
job.GetFreePeer(1)
var ins []*types.Inventory
var bChan = make(chan *types.BlockPid, 256)
respIns := job.DownloadBlock(ins, bChan)
t.Log(respIns)
job.ResetDownloadPeers([]*Peer{peer})
t.Log(job.avalidPeersNum())
job.setBusyPeer(peer.GetPeerName())
job.setFreePeer(peer.GetPeerName())
job.removePeer(peer.GetPeerName())
job.CancelJob()
os.Remove(dataDir)
}
func TestSortArr(t *testing.T) {
var Inventorys = make(Invs, 0)
for i := 100; i >= 0; i-- {
var inv types.Inventory
inv.Ty = 111
inv.Height = int64(i)
Inventorys = append(Inventorys, &inv)
}
sort.Sort(Inventorys)
}
//测试grpc 多连接
func TestGrpcConns(t *testing.T) {
var conns []*grpc.ClientConn
for i := 0; i < maxSamIPNum; i++ {
conn, err := grpc.Dial("localhost:33802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
assert.Nil(t, err)
cli := types.NewP2PgserviceClient(conn)
_, err = cli.GetHeaders(context.Background(), &types.P2PGetHeaders{
StartHeight: 0, EndHeight: 0, Version: 1002}, grpc.FailFast(true))
assert.Equal(t, false, strings.Contains(err.Error(), "no authorized"))
conns = append(conns, conn)
}
conn, err := grpc.Dial("localhost:33802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
assert.Nil(t, err)
cli := types.NewP2PgserviceClient(conn)
_, err = cli.GetHeaders(context.Background(), &types.P2PGetHeaders{
StartHeight: 0, EndHeight: 0, Version: 1002}, grpc.FailFast(true))
assert.Equal(t, true, strings.Contains(err.Error(), "no authorized"))
conn.Close()
for _, conn := range conns {
conn.Close()
}
}
//测试grpc 流多连接
func TestGrpcStreamConns(t *testing.T) {
conn, err := grpc.Dial("localhost:33802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
assert.Nil(t, err)
cli := types.NewP2PgserviceClient(conn)
var p2pdata types.P2PGetData
resp, err := cli.GetData(context.Background(), &p2pdata)
assert.Nil(t, err)
_, err = resp.Recv()
assert.Equal(t, true, strings.Contains(err.Error(), "no authorized"))
conn.Close()
}
func TestP2pComm(t *testing.T) {
addrs := P2pComm.AddrRouteble([]string{"localhost:33802"})
t.Log(addrs)
i32 := P2pComm.BytesToInt32([]byte{0xff})
t.Log(i32)
_, _, err := P2pComm.GenPrivPubkey()
assert.Nil(t, err)
ping, err := P2pComm.NewPingData(p2pModule.node.nodeInfo)
assert.Nil(t, err)
assert.Equal(t, true, P2pComm.CheckSign(ping))
assert.IsType(t, "string", P2pComm.GetLocalAddr())
assert.Equal(t, 5, len(P2pComm.RandStr(5)))
}
func TestFilter(t *testing.T) {
go Filter.ManageRecvFilter()
defer Filter.Close()
Filter.GetLock()
assert.Equal(t, true, Filter.RegRecvData("key"))
assert.Equal(t, true, Filter.QueryRecvData("key"))
Filter.RemoveRecvData("key")
assert.Equal(t, false, Filter.QueryRecvData("key"))
Filter.ReleaseLock()
}
func TestAddrRouteble(t *testing.T) {
resp := P2pComm.AddrRouteble([]string{"114.55.101.159:13802"})
t.Log(resp)
}
func TestRandStr(t *testing.T) {
t.Log(P2pComm.RandStr(5))
}
func TestGetLocalAddr(t *testing.T) {
t.Log(P2pComm.GetLocalAddr())
}
func TestAddrBook(t *testing.T) {
prv, pub, err := P2pComm.GenPrivPubkey()
if err != nil {
t.Log(err.Error())
return
}
t.Log("priv:", hex.EncodeToString(prv), "pub:", hex.EncodeToString(pub))
pubstr, err := P2pComm.Pubkey(hex.EncodeToString(prv))
if err != nil {
t.Log(err.Error())
return
}
t.Log("GenPubkey:", pubstr)
addrBook := p2pModule.node.nodeInfo.addrBook
addrBook.Size()
addrBook.saveToDb()
addrBook.GetPeerStat("locolhost:43802")
addrBook.genPubkey(hex.EncodeToString(prv))
assert.Equal(t, addrBook.genPubkey(hex.EncodeToString(prv)), pubstr)
addrBook.Save()
addrBook.GetAddrs()
}
func TestBytesToInt32(t *testing.T) {
t.Log(P2pComm.BytesToInt32([]byte{0xff}))
t.Log(P2pComm.Int32ToBytes(255))
}
func TestNetAddress(t *testing.T) {
tcpAddr := new(net.TCPAddr)
tcpAddr.IP = net.ParseIP("localhost")
tcpAddr.Port = 2223
nad := NewNetAddress(tcpAddr)
nad1 := nad.Copy()
nad.Equals(nad1)
nad2s, err := NewNetAddressStrings([]string{"localhost:3306"})
if err != nil {
return
}
nad.Less(nad2s[0])
}
func TestP2pClose(t *testing.T) {
p2pModule.Wait()
p2pModule.Close()
os.RemoveAll(dataDir)
}
...@@ -407,12 +407,20 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) { ...@@ -407,12 +407,20 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{Msg: []byte("no peers")})) msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{Msg: []byte("no peers")}))
return return
} }
msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{IsOk: true, Msg: []byte("downloading...")}))
req := msg.GetData().(*pb.ReqBlocks) req := msg.GetData().(*pb.ReqBlocks)
log.Info("GetBlocks", "start", req.GetStart(), "end", req.GetEnd()) log.Info("GetBlocks", "start", req.GetStart(), "end", req.GetEnd())
pids := req.GetPid() pids := req.GetPid()
var MaxInvs = new(pb.P2PInv) var Inventorys = make([]*pb.Inventory, 0)
for i := req.GetStart(); i <= req.GetEnd(); i++ {
var inv pb.Inventory
inv.Ty = msgBlock
inv.Height = i
Inventorys = append(Inventorys, &inv)
}
MaxInvs := &pb.P2PInv{Invs: Inventorys}
var downloadPeers []*Peer var downloadPeers []*Peer
peers, infos := m.network.node.GetActivePeers() peers, infos := m.network.node.GetActivePeers()
if len(pids) > 0 && pids[0] != "" { //指定Pid 下载数据 if len(pids) > 0 && pids[0] != "" { //指定Pid 下载数据
...@@ -427,21 +435,6 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) { ...@@ -427,21 +435,6 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
peer, ok := peers[paddr] peer, ok := peers[paddr]
if ok && peer != nil { if ok && peer != nil {
downloadPeers = append(downloadPeers, peer) downloadPeers = append(downloadPeers, peer)
//获取Invs
if len(MaxInvs.GetInvs()) != int(req.GetEnd()-req.GetStart())+1 {
var err error
MaxInvs, err = peer.mconn.gcli.GetBlocks(context.Background(), &pb.P2PGetBlocks{StartHeight: req.GetStart(), EndHeight: req.GetEnd(),
Version: m.network.node.nodeInfo.cfg.Version}, grpc.FailFast(true))
P2pComm.CollectPeerStat(err, peer)
if err != nil {
log.Error("GetBlocks", "Err", err.Error())
if err == pb.ErrVersion {
peer.version.SetSupport(false)
P2pComm.CollectPeerStat(err, peer)
}
continue
}
}
} }
} }
...@@ -449,45 +442,6 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) { ...@@ -449,45 +442,6 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
} else { } else {
log.Info("fetch from all peers in pids") log.Info("fetch from all peers in pids")
for _, peer := range peers { //限制对peer 的高频次调用
log.Info("peer", "addr", peer.Addr(), "start", req.GetStart(), "end", req.GetEnd())
peerinfo, ok := infos[peer.Addr()]
if !ok {
continue
}
var pr pb.Peer
pr.Addr = peerinfo.GetAddr()
pr.Port = peerinfo.GetPort()
pr.Name = peerinfo.GetName()
pr.MempoolSize = peerinfo.GetMempoolSize()
pr.Header = peerinfo.GetHeader()
if peerinfo.GetHeader().GetHeight() < req.GetEnd() {
continue
}
invs, err := peer.mconn.gcli.GetBlocks(context.Background(), &pb.P2PGetBlocks{StartHeight: req.GetStart(), EndHeight: req.GetEnd(),
Version: m.network.node.nodeInfo.cfg.Version}, grpc.FailFast(true))
P2pComm.CollectPeerStat(err, peer)
if err != nil {
log.Error("GetBlocks", "Err", err.Error())
if err == pb.ErrVersion {
peer.version.SetSupport(false)
P2pComm.CollectPeerStat(err, peer)
}
continue
}
if len(invs.Invs) > len(MaxInvs.Invs) {
MaxInvs = invs
if len(MaxInvs.GetInvs()) == int(req.GetEnd()-req.GetStart())+1 {
break
}
}
}
for _, peer := range peers { for _, peer := range peers {
peerinfo, ok := infos[peer.Addr()] peerinfo, ok := infos[peer.Addr()]
if !ok { if !ok {
...@@ -501,18 +455,19 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) { ...@@ -501,18 +455,19 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
} }
} }
log.Debug("Invs", "Invs show", MaxInvs.GetInvs()) if len(downloadPeers) == 0 {
if len(MaxInvs.GetInvs()) == 0 { log.Error("GetBlocks", "downloadPeers", 0)
log.Error("GetBlocks", "getInvs", 0) msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{Msg: []byte("no downloadPeers")}))
return return
} }
msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{IsOk: true, Msg: []byte("downloading...")}))
//使用新的下载模式进行下载 //使用新的下载模式进行下载
var bChan = make(chan *pb.BlockPid, 256) var bChan = make(chan *pb.BlockPid, 512)
invs := MaxInvs.GetInvs() invs := MaxInvs.GetInvs()
job := NewDownloadJob(m, downloadPeers) job := NewDownloadJob(m, downloadPeers)
var jobcancel int32 var jobcancel int32
var maxDownloadRetryCount = 100
go func(cancel *int32, invs []*pb.Inventory) { go func(cancel *int32, invs []*pb.Inventory) {
for { for {
if atomic.LoadInt32(cancel) == 1 { if atomic.LoadInt32(cancel) == 1 {
...@@ -523,15 +478,25 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) { ...@@ -523,15 +478,25 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
if len(invs) == 0 { if len(invs) == 0 {
return return
} }
maxDownloadRetryCount--
if maxDownloadRetryCount < 0 { if job.avalidPeersNum() <= 0 {
job.ResetDownloadPeers(downloadPeers)
continue
}
if job.isCancel() {
return return
} }
} }
}(&jobcancel, invs) }(&jobcancel, invs)
i := 0 i := 0
for { for {
timeout := time.NewTimer(time.Minute) if job.isCancel() {
return
}
timeout := time.NewTimer(time.Minute * 10)
select { select {
case <-timeout.C: case <-timeout.C:
atomic.StoreInt32(&jobcancel, 1) atomic.StoreInt32(&jobcancel, 1)
...@@ -664,7 +629,7 @@ func (m *Cli) getLocalPeerInfo() (*pb.P2PPeerInfo, error) { ...@@ -664,7 +629,7 @@ func (m *Cli) getLocalPeerInfo() (*pb.P2PPeerInfo, error) {
localpeerinfo.MempoolSize = int32(meminfo.GetSize()) localpeerinfo.MempoolSize = int32(meminfo.GetSize())
if m.network.node.nodeInfo.GetExternalAddr().IP == nil { if m.network.node.nodeInfo.GetExternalAddr().IP == nil {
localpeerinfo.Addr = LocalAddr localpeerinfo.Addr = LocalAddr
localpeerinfo.Port = int32(defaultPort) localpeerinfo.Port = int32(m.network.node.listenPort)
} else { } else {
localpeerinfo.Addr = m.network.node.nodeInfo.GetExternalAddr().IP.String() localpeerinfo.Addr = m.network.node.nodeInfo.GetExternalAddr().IP.String()
localpeerinfo.Port = int32(m.network.node.nodeInfo.GetExternalAddr().Port) localpeerinfo.Port = int32(m.network.node.nodeInfo.GetExternalAddr().Port)
......
...@@ -7,7 +7,6 @@ package p2p ...@@ -7,7 +7,6 @@ package p2p
import ( import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io"
"net" "net"
"strconv" "strconv"
"sync" "sync"
...@@ -17,6 +16,7 @@ import ( ...@@ -17,6 +16,7 @@ import (
"github.com/33cn/chain33/common/version" "github.com/33cn/chain33/common/version"
pb "github.com/33cn/chain33/types" pb "github.com/33cn/chain33/types"
"golang.org/x/net/context" "golang.org/x/net/context"
pr "google.golang.org/grpc/peer" pr "google.golang.org/grpc/peer"
) )
...@@ -65,6 +65,7 @@ func NewP2pServer() *P2pserver { ...@@ -65,6 +65,7 @@ func NewP2pServer() *P2pserver {
// Ping p2pserver ping // Ping p2pserver ping
func (s *P2pserver) Ping(ctx context.Context, in *pb.P2PPing) (*pb.P2PPong, error) { func (s *P2pserver) Ping(ctx context.Context, in *pb.P2PPing) (*pb.P2PPong, error) {
log.Debug("ping") log.Debug("ping")
if !P2pComm.CheckSign(in) { if !P2pComm.CheckSign(in) {
log.Error("Ping", "p2p server", "check sig err") log.Error("Ping", "p2p server", "check sig err")
...@@ -96,6 +97,7 @@ func (s *P2pserver) Ping(ctx context.Context, in *pb.P2PPing) (*pb.P2PPong, erro ...@@ -96,6 +97,7 @@ func (s *P2pserver) Ping(ctx context.Context, in *pb.P2PPing) (*pb.P2PPong, erro
// GetAddr get address // GetAddr get address
func (s *P2pserver) GetAddr(ctx context.Context, in *pb.P2PGetAddr) (*pb.P2PAddr, error) { func (s *P2pserver) GetAddr(ctx context.Context, in *pb.P2PGetAddr) (*pb.P2PAddr, error) {
log.Debug("GETADDR", "RECV ADDR", in, "OutBound Len", s.node.Size()) log.Debug("GETADDR", "RECV ADDR", in, "OutBound Len", s.node.Size())
var addrlist []string var addrlist []string
peers, _ := s.node.GetActivePeers() peers, _ := s.node.GetActivePeers()
...@@ -127,6 +129,7 @@ func (s *P2pserver) Version(ctx context.Context, in *pb.P2PVersion) (*pb.P2PVerA ...@@ -127,6 +129,7 @@ func (s *P2pserver) Version(ctx context.Context, in *pb.P2PVersion) (*pb.P2PVerA
// Version2 p2pserver version // Version2 p2pserver version
func (s *P2pserver) Version2(ctx context.Context, in *pb.P2PVersion) (*pb.P2PVersion, error) { func (s *P2pserver) Version2(ctx context.Context, in *pb.P2PVersion) (*pb.P2PVersion, error) {
log.Debug("Version2") log.Debug("Version2")
var peerip string var peerip string
var err error var err error
...@@ -177,6 +180,7 @@ func (s *P2pserver) SoftVersion(ctx context.Context, in *pb.P2PPing) (*pb.Reply, ...@@ -177,6 +180,7 @@ func (s *P2pserver) SoftVersion(ctx context.Context, in *pb.P2PPing) (*pb.Reply,
// BroadCastTx broadcast transactions of p2pserver // BroadCastTx broadcast transactions of p2pserver
func (s *P2pserver) BroadCastTx(ctx context.Context, in *pb.P2PTx) (*pb.Reply, error) { func (s *P2pserver) BroadCastTx(ctx context.Context, in *pb.P2PTx) (*pb.Reply, error) {
log.Debug("p2pServer RECV TRANSACTION", "in", in) log.Debug("p2pServer RECV TRANSACTION", "in", in)
client := s.node.nodeInfo.client client := s.node.nodeInfo.client
msg := client.NewMessage("mempool", pb.EventTx, in.Tx) msg := client.NewMessage("mempool", pb.EventTx, in.Tx)
err := client.Send(msg, false) err := client.Send(msg, false)
...@@ -223,6 +227,7 @@ func (s *P2pserver) GetMemPool(ctx context.Context, in *pb.P2PGetMempool) (*pb.P ...@@ -223,6 +227,7 @@ func (s *P2pserver) GetMemPool(ctx context.Context, in *pb.P2PGetMempool) (*pb.P
if !s.checkVersion(in.GetVersion()) { if !s.checkVersion(in.GetVersion()) {
return nil, pb.ErrVersion return nil, pb.ErrVersion
} }
memtx, err := s.loadMempool() memtx, err := s.loadMempool()
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -244,6 +249,7 @@ func (s *P2pserver) GetData(in *pb.P2PGetData, stream pb.P2Pgservice_GetDataServ ...@@ -244,6 +249,7 @@ func (s *P2pserver) GetData(in *pb.P2PGetData, stream pb.P2Pgservice_GetDataServ
if !s.checkVersion(in.GetVersion()) { if !s.checkVersion(in.GetVersion()) {
return pb.ErrVersion return pb.ErrVersion
} }
invs := in.GetInvs() invs := in.GetInvs()
client := s.node.nodeInfo.client client := s.node.nodeInfo.client
for _, inv := range invs { //过滤掉不需要的数据 for _, inv := range invs { //过滤掉不需要的数据
...@@ -314,6 +320,7 @@ func (s *P2pserver) GetHeaders(ctx context.Context, in *pb.P2PGetHeaders) (*pb.P ...@@ -314,6 +320,7 @@ func (s *P2pserver) GetHeaders(ctx context.Context, in *pb.P2PGetHeaders) (*pb.P
if !s.checkVersion(in.GetVersion()) { if !s.checkVersion(in.GetVersion()) {
return nil, pb.ErrVersion return nil, pb.ErrVersion
} }
if in.GetEndHeight()-in.GetStartHeight() > 2000 || in.GetEndHeight() < in.GetStartHeight() { if in.GetEndHeight()-in.GetStartHeight() > 2000 || in.GetEndHeight() < in.GetStartHeight() {
return nil, fmt.Errorf("out of range") return nil, fmt.Errorf("out of range")
} }
...@@ -341,6 +348,7 @@ func (s *P2pserver) GetPeerInfo(ctx context.Context, in *pb.P2PGetPeerInfo) (*pb ...@@ -341,6 +348,7 @@ func (s *P2pserver) GetPeerInfo(ctx context.Context, in *pb.P2PGetPeerInfo) (*pb
if !s.checkVersion(in.GetVersion()) { if !s.checkVersion(in.GetVersion()) {
return nil, pb.ErrVersion return nil, pb.ErrVersion
} }
client := s.node.nodeInfo.client client := s.node.nodeInfo.client
log.Debug("GetPeerInfo", "GetMempoolSize", "befor") log.Debug("GetPeerInfo", "GetMempoolSize", "befor")
msg := client.NewMessage("mempool", pb.EventGetMempoolSize, nil) msg := client.NewMessage("mempool", pb.EventGetMempoolSize, nil)
...@@ -386,6 +394,7 @@ func (s *P2pserver) GetPeerInfo(ctx context.Context, in *pb.P2PGetPeerInfo) (*pb ...@@ -386,6 +394,7 @@ func (s *P2pserver) GetPeerInfo(ctx context.Context, in *pb.P2PGetPeerInfo) (*pb
// BroadCastBlock broadcast block of p2pserver // BroadCastBlock broadcast block of p2pserver
func (s *P2pserver) BroadCastBlock(ctx context.Context, in *pb.P2PBlock) (*pb.Reply, error) { func (s *P2pserver) BroadCastBlock(ctx context.Context, in *pb.P2PBlock) (*pb.Reply, error) {
log.Debug("BroadCastBlock") log.Debug("BroadCastBlock")
client := s.node.nodeInfo.client client := s.node.nodeInfo.client
msg := client.NewMessage("blockchain", pb.EventBroadcastAddBlock, in.GetBlock()) msg := client.NewMessage("blockchain", pb.EventBroadcastAddBlock, in.GetBlock())
err := client.Send(msg, false) err := client.Send(msg, false)
...@@ -401,6 +410,7 @@ func (s *P2pserver) ServerStreamSend(in *pb.P2PPing, stream pb.P2Pgservice_Serve ...@@ -401,6 +410,7 @@ func (s *P2pserver) ServerStreamSend(in *pb.P2PPing, stream pb.P2Pgservice_Serve
if len(s.getInBoundPeers()) > int(s.node.nodeInfo.cfg.InnerBounds) { if len(s.getInBoundPeers()) > int(s.node.nodeInfo.cfg.InnerBounds) {
return fmt.Errorf("beyound max inbound num") return fmt.Errorf("beyound max inbound num")
} }
log.Debug("ServerStreamSend") log.Debug("ServerStreamSend")
peername := hex.EncodeToString(in.GetSign().GetPubkey()) peername := hex.EncodeToString(in.GetSign().GetPubkey())
dataChain := s.addStreamHandler(stream) dataChain := s.addStreamHandler(stream)
...@@ -446,20 +456,28 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe ...@@ -446,20 +456,28 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
return fmt.Errorf("beyound max inbound num:%v>%v", len(s.getInBoundPeers()), int(s.node.nodeInfo.cfg.InnerBounds)) return fmt.Errorf("beyound max inbound num:%v>%v", len(s.getInBoundPeers()), int(s.node.nodeInfo.cfg.InnerBounds))
} }
log.Debug("StreamRead") log.Debug("StreamRead")
var remoteIP string
var err error
getctx, ok := pr.FromContext(stream.Context())
if ok {
remoteIP, _, err = net.SplitHostPort(getctx.Addr.String())
if err != nil {
return fmt.Errorf("ctx.Addr format err")
}
} else {
return fmt.Errorf("getctx err")
}
var hash [64]byte var hash [64]byte
var peeraddr, peername string var peeraddr, peername string
defer s.deleteInBoundPeerInfo(peername) defer s.deleteInBoundPeerInfo(peername)
var in = new(pb.BroadCastData) var in = new(pb.BroadCastData)
var err error
for { for {
if s.IsClose() { if s.IsClose() {
return fmt.Errorf("node close") return fmt.Errorf("node close")
} }
in, err = stream.Recv() in, err = stream.Recv()
if err == io.EOF {
log.Info("ServerStreamRead", "Recv", "EOF")
return err
}
if err != nil { if err != nil {
log.Error("ServerStreamRead", "Recv", err) log.Error("ServerStreamRead", "Recv", err)
return err return err
...@@ -510,25 +528,19 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe ...@@ -510,25 +528,19 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
} else if ping := in.GetPing(); ping != nil { ///被远程节点初次连接后,会收到ping 数据包,收到后注册到inboundpeers. } else if ping := in.GetPing(); ping != nil { ///被远程节点初次连接后,会收到ping 数据包,收到后注册到inboundpeers.
//Ping package //Ping package
if !P2pComm.CheckSign(ping) { if !P2pComm.CheckSign(ping) {
log.Error("ServerStreamRead", "check stream", "check sig err") log.Error("ServerStreamRead", "check stream", "check sig err")
return pb.ErrStreamPing return pb.ErrStreamPing
} }
getctx, ok := pr.FromContext(stream.Context()) if s.node.Size() > 0 {
if ok && s.node.Size() > 0 {
//peerIp := strings.Split(getctx.Addr.String(), ":")[0] if remoteIP != LocalAddr && remoteIP != s.node.nodeInfo.GetExternalAddr().IP.String() {
peerIP, _, err := net.SplitHostPort(getctx.Addr.String())
if err != nil {
return fmt.Errorf("ctx.Addr format err")
}
if peerIP != LocalAddr && peerIP != s.node.nodeInfo.GetExternalAddr().IP.String() {
s.node.nodeInfo.SetServiceTy(Service) s.node.nodeInfo.SetServiceTy(Service)
} }
} }
peername = hex.EncodeToString(ping.GetSign().GetPubkey()) peername = hex.EncodeToString(ping.GetSign().GetPubkey())
peeraddr = fmt.Sprintf("%s:%v", in.GetPing().GetAddr(), in.GetPing().GetPort()) peeraddr = fmt.Sprintf("%s:%v", remoteIP, in.GetPing().GetPort())
s.addInBoundPeerInfo(peername, innerpeer{addr: peeraddr, name: peername, timestamp: pb.Now().Unix()}) s.addInBoundPeerInfo(peername, innerpeer{addr: peeraddr, name: peername, timestamp: pb.Now().Unix()})
} else if ver := in.GetVersion(); ver != nil { } else if ver := in.GetVersion(); ver != nil {
//接收版本信息 //接收版本信息
...@@ -537,9 +549,15 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe ...@@ -537,9 +549,15 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
p2pversion := ver.GetP2Pversion() p2pversion := ver.GetP2Pversion()
innerpeer := s.getInBoundPeerInfo(peername) innerpeer := s.getInBoundPeerInfo(peername)
if innerpeer != nil { if innerpeer != nil {
if !s.checkVersion(p2pversion) {
return pb.ErrVersion
}
innerpeer.p2pversion = p2pversion innerpeer.p2pversion = p2pversion
innerpeer.softversion = softversion innerpeer.softversion = softversion
s.addInBoundPeerInfo(peername, *innerpeer) s.addInBoundPeerInfo(peername, *innerpeer)
} else {
//没有获取到peername 的信息,说明没有获取ping的消息包
return pb.ErrStreamPing
} }
} }
......
...@@ -142,8 +142,9 @@ func (p *Peer) heartBeat() { ...@@ -142,8 +142,9 @@ func (p *Peer) heartBeat() {
go p.readStream() go p.readStream()
break break
} else { } else {
time.Sleep(time.Second * 5) //版本不对,直接关掉
continue p.Close()
return
} }
} }
...@@ -206,6 +207,7 @@ func (p *Peer) sendStream() { ...@@ -206,6 +207,7 @@ func (p *Peer) sendStream() {
p2pdata := new(pb.BroadCastData) p2pdata := new(pb.BroadCastData)
p2pdata.Value = &pb.BroadCastData_Ping{Ping: ping} p2pdata.Value = &pb.BroadCastData_Ping{Ping: ping}
if err := resp.Send(p2pdata); err != nil { if err := resp.Send(p2pdata); err != nil {
P2pComm.CollectPeerStat(err, p)
errs := resp.CloseSend() errs := resp.CloseSend()
if errs != nil { if errs != nil {
log.Error("CloseSend", "err", errs) log.Error("CloseSend", "err", errs)
...@@ -222,13 +224,13 @@ func (p *Peer) sendStream() { ...@@ -222,13 +224,13 @@ func (p *Peer) sendStream() {
Softversion: v.GetVersion(), Peername: peername}} Softversion: v.GetVersion(), Peername: peername}}
if err := resp.Send(p2pdata); err != nil { if err := resp.Send(p2pdata); err != nil {
P2pComm.CollectPeerStat(err, p)
errs := resp.CloseSend() errs := resp.CloseSend()
if errs != nil { if errs != nil {
log.Error("CloseSend", "err", errs) log.Error("CloseSend", "err", errs)
} }
cancel() cancel()
log.Error("sendStream", "sendping", err) log.Error("sendStream", "sendversion", err)
time.Sleep(time.Second)
continue continue
} }
timeout := time.NewTimer(time.Second * 2) timeout := time.NewTimer(time.Second * 2)
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
package p2p package p2p
// VERSION number // VERSION number
var VERSION int32 = 119 var VERSION int32
//更新内容: //更新内容:
// 1.p2p 修改为在nat结束后,在启动peer的stream,ping,version 等功能 // 1.p2p 修改为在nat结束后,在启动peer的stream,ping,version 等功能
......
...@@ -243,7 +243,7 @@ func (c *Chain33) GetTxByAddr(in types.ReqAddr, result *interface{}) error { ...@@ -243,7 +243,7 @@ func (c *Chain33) GetTxByAddr(in types.ReqAddr, result *interface{}) error {
infos := reply.GetTxInfos() infos := reply.GetTxInfos()
for _, info := range infos { for _, info := range infos {
txinfos.TxInfos = append(txinfos.TxInfos, &rpctypes.ReplyTxInfo{Hash: common.ToHex(info.GetHash()), txinfos.TxInfos = append(txinfos.TxInfos, &rpctypes.ReplyTxInfo{Hash: common.ToHex(info.GetHash()),
Height: info.GetHeight(), Index: info.GetIndex(), Assets: info.Assets}) Height: info.GetHeight(), Index: info.GetIndex(), Assets: fmtAsssets(info.Assets)})
} }
*result = &txinfos *result = &txinfos
} }
...@@ -336,10 +336,23 @@ func fmtTxDetail(tx *types.TransactionDetail, disableDetail bool) (*rpctypes.Tra ...@@ -336,10 +336,23 @@ func fmtTxDetail(tx *types.TransactionDetail, disableDetail bool) (*rpctypes.Tra
Amount: tx.GetAmount(), Amount: tx.GetAmount(),
Fromaddr: tx.GetFromaddr(), Fromaddr: tx.GetFromaddr(),
ActionName: tx.GetActionName(), ActionName: tx.GetActionName(),
Assets: tx.GetAssets(), Assets: fmtAsssets(tx.GetAssets()),
}, nil }, nil
} }
func fmtAsssets(assets []*types.Asset) []*rpctypes.Asset {
var result []*rpctypes.Asset
for _, a := range assets {
asset := &rpctypes.Asset{
Exec: a.Exec,
Symbol: a.Symbol,
Amount: a.Amount,
}
result = append(result, asset)
}
return result
}
// GetMempool get mempool information // GetMempool get mempool information
func (c *Chain33) GetMempool(in *types.ReqNil, result *interface{}) error { func (c *Chain33) GetMempool(in *types.ReqNil, result *interface{}) error {
......
...@@ -7,8 +7,6 @@ package types ...@@ -7,8 +7,6 @@ package types
import ( import (
"encoding/json" "encoding/json"
"github.com/33cn/chain33/types"
) )
// TransParm transport parameter // TransParm transport parameter
...@@ -135,6 +133,13 @@ type BlockDetails struct { ...@@ -135,6 +133,13 @@ type BlockDetails struct {
Items []*BlockDetail `json:"items"` Items []*BlockDetail `json:"items"`
} }
// Asset asset
type Asset struct {
Exec string `json:"exec"`
Symbol string `json:"symbol"`
Amount int64 `json:"amount"`
}
// TransactionDetail transaction detail // TransactionDetail transaction detail
type TransactionDetail struct { type TransactionDetail struct {
Tx *Transaction `json:"tx"` Tx *Transaction `json:"tx"`
...@@ -146,7 +151,7 @@ type TransactionDetail struct { ...@@ -146,7 +151,7 @@ type TransactionDetail struct {
Amount int64 `json:"amount"` Amount int64 `json:"amount"`
Fromaddr string `json:"fromAddr"` Fromaddr string `json:"fromAddr"`
ActionName string `json:"actionName"` ActionName string `json:"actionName"`
Assets []*types.Asset `json:"assets"` Assets []*Asset `json:"assets"`
} }
// ReplyTxInfos reply tx infos // ReplyTxInfos reply tx infos
...@@ -156,10 +161,10 @@ type ReplyTxInfos struct { ...@@ -156,10 +161,10 @@ type ReplyTxInfos struct {
// ReplyTxInfo reply tx information // ReplyTxInfo reply tx information
type ReplyTxInfo struct { type ReplyTxInfo struct {
Hash string `json:"hash"` Hash string `json:"hash"`
Height int64 `json:"height"` Height int64 `json:"height"`
Index int64 `json:"index"` Index int64 `json:"index"`
Assets []*types.Asset `json:"assets"` Assets []*Asset `json:"assets"`
} }
// TransactionDetails transaction details // TransactionDetails transaction details
......
...@@ -7,7 +7,6 @@ package types ...@@ -7,7 +7,6 @@ package types
import ( import (
rpctypes "github.com/33cn/chain33/rpc/types" rpctypes "github.com/33cn/chain33/rpc/types"
"github.com/33cn/chain33/types"
) )
// AccountsResult defines accountsresult command // AccountsResult defines accountsresult command
...@@ -85,7 +84,7 @@ type TxDetailResult struct { ...@@ -85,7 +84,7 @@ type TxDetailResult struct {
Amount string `json:"amount"` Amount string `json:"amount"`
Fromaddr string `json:"fromaddr"` Fromaddr string `json:"fromaddr"`
ActionName string `json:"actionname"` ActionName string `json:"actionname"`
Assets []*types.Asset `json:"assets"` Assets []*rpctypes.Asset `json:"assets"`
} }
// TxDetailsResult defines txdetails result command // TxDetailsResult defines txdetails result command
......
...@@ -6,6 +6,7 @@ package mavl ...@@ -6,6 +6,7 @@ package mavl
import ( import (
"fmt" "fmt"
"sync"
"time" "time"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
...@@ -26,6 +27,7 @@ type MemTreeOpera interface { ...@@ -26,6 +27,7 @@ type MemTreeOpera interface {
// TreeMap map形式memtree // TreeMap map形式memtree
type TreeMap struct { type TreeMap struct {
mpCache map[interface{}]interface{} mpCache map[interface{}]interface{}
lock sync.RWMutex
} }
// NewTreeMap new mem tree // NewTreeMap new mem tree
...@@ -37,6 +39,8 @@ func NewTreeMap(size int) *TreeMap { ...@@ -37,6 +39,8 @@ func NewTreeMap(size int) *TreeMap {
// Add 添加元素 // Add 添加元素
func (tm *TreeMap) Add(key, value interface{}) { func (tm *TreeMap) Add(key, value interface{}) {
tm.lock.Lock()
defer tm.lock.Unlock()
if _, ok := tm.mpCache[key]; ok { if _, ok := tm.mpCache[key]; ok {
delete(tm.mpCache, key) delete(tm.mpCache, key)
return return
...@@ -46,6 +50,8 @@ func (tm *TreeMap) Add(key, value interface{}) { ...@@ -46,6 +50,8 @@ func (tm *TreeMap) Add(key, value interface{}) {
// Get 获取元素 // Get 获取元素
func (tm *TreeMap) Get(key interface{}) (value interface{}, ok bool) { func (tm *TreeMap) Get(key interface{}) (value interface{}, ok bool) {
tm.lock.Lock()
defer tm.lock.Unlock()
if value, ok := tm.mpCache[key]; ok { if value, ok := tm.mpCache[key]; ok {
return value, ok return value, ok
} }
...@@ -54,6 +60,8 @@ func (tm *TreeMap) Get(key interface{}) (value interface{}, ok bool) { ...@@ -54,6 +60,8 @@ func (tm *TreeMap) Get(key interface{}) (value interface{}, ok bool) {
// Delete 删除元素 // Delete 删除元素
func (tm *TreeMap) Delete(key interface{}) { func (tm *TreeMap) Delete(key interface{}) {
tm.lock.Lock()
defer tm.lock.Unlock()
if _, ok := tm.mpCache[key]; ok { if _, ok := tm.mpCache[key]; ok {
delete(tm.mpCache, key) delete(tm.mpCache, key)
} }
...@@ -61,6 +69,8 @@ func (tm *TreeMap) Delete(key interface{}) { ...@@ -61,6 +69,8 @@ func (tm *TreeMap) Delete(key interface{}) {
// Contains 查看是否包含元素 // Contains 查看是否包含元素
func (tm *TreeMap) Contains(key interface{}) bool { func (tm *TreeMap) Contains(key interface{}) bool {
tm.lock.Lock()
defer tm.lock.Unlock()
if _, ok := tm.mpCache[key]; ok { if _, ok := tm.mpCache[key]; ok {
return true return true
} }
...@@ -69,6 +79,8 @@ func (tm *TreeMap) Contains(key interface{}) bool { ...@@ -69,6 +79,8 @@ func (tm *TreeMap) Contains(key interface{}) bool {
// Len 元素长度 // Len 元素长度
func (tm *TreeMap) Len() int { func (tm *TreeMap) Len() int {
tm.lock.Lock()
defer tm.lock.Unlock()
return len(tm.mpCache) return len(tm.mpCache)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment