Commit 39bac853 authored by kingwang's avatar kingwang

update chain33 03/25

parent 1d956b38
...@@ -18,7 +18,7 @@ PKG_LIST := `go list ./... | grep -v "vendor" | grep -v "mocks"` ...@@ -18,7 +18,7 @@ PKG_LIST := `go list ./... | grep -v "vendor" | grep -v "mocks"`
PKG_LIST_VET := `go list ./... | grep -v "vendor" | grep -v "common/crypto/sha3" | grep -v "common/log/log15"` PKG_LIST_VET := `go list ./... | grep -v "vendor" | grep -v "common/crypto/sha3" | grep -v "common/log/log15"`
PKG_LIST_INEFFASSIGN= `go list -f {{.Dir}} ./... | grep -v "vendor" | grep -v "common/crypto/sha3" | grep -v "common/log/log15" | grep -v "common/ed25519"` PKG_LIST_INEFFASSIGN= `go list -f {{.Dir}} ./... | grep -v "vendor" | grep -v "common/crypto/sha3" | grep -v "common/log/log15" | grep -v "common/ed25519"`
PKG_LIST_Q := `go list ./... | grep -v "vendor" | grep -v "mocks"` PKG_LIST_Q := `go list ./... | grep -v "vendor" | grep -v "mocks"`
PKG_LIST_GOSEC := `go list ./... | grep -v "vendor" | grep -v "mocks" | grep -v "cmd" | grep -v "types" | grep -v "commands" | grep -v "log15" | grep -v "ed25519" | grep -v "crypto"` PKG_LIST_GOSEC := `go list -f "${GOPATH}/src/{{.ImportPath}}" ./... | grep -v "vendor" | grep -v "mocks" | grep -v "cmd" | grep -v "types" | grep -v "commands" | grep -v "log15" | grep -v "ed25519" | grep -v "crypto"`
BUILD_FLAGS = -ldflags "-X github.com/33cn/chain33/common/version.GitCommit=`git rev-parse --short=8 HEAD`" BUILD_FLAGS = -ldflags "-X github.com/33cn/chain33/common/version.GitCommit=`git rev-parse --short=8 HEAD`"
MKPATH=$(abspath $(lastword $(MAKEFILE_LIST))) MKPATH=$(abspath $(lastword $(MAKEFILE_LIST)))
MKDIR=$(dir $(MKPATH)) MKDIR=$(dir $(MKPATH))
......
...@@ -24,20 +24,19 @@ import ( ...@@ -24,20 +24,19 @@ import (
//var //var
var ( var (
blockLastHeight = []byte("blockLastHeight") blockLastHeight = []byte("blockLastHeight")
bodyPerfix = []byte("Body:") bodyPerfix = []byte("Body:")
LastSequence = []byte("LastSequence") LastSequence = []byte("LastSequence")
headerPerfix = []byte("Header:") headerPerfix = []byte("Header:")
heightToHeaderPerfix = []byte("HH:") heightToHeaderPerfix = []byte("HH:")
hashPerfix = []byte("Hash:") hashPerfix = []byte("Hash:")
tdPerfix = []byte("TD:") tdPerfix = []byte("TD:")
heightToHashKeyPerfix = []byte("Height:") heightToHashKeyPerfix = []byte("Height:")
seqToHashKey = []byte("Seq:") seqToHashKey = []byte("Seq:")
HashToSeqPerfix = []byte("HashToSeq:") HashToSeqPerfix = []byte("HashToSeq:")
seqCBPrefix = []byte("SCB:") seqCBPrefix = []byte("SCB:")
seqCBLastNumPrefix = []byte("SCBL:") seqCBLastNumPrefix = []byte("SCBL:")
storeLog = chainlog.New("submodule", "store") storeLog = chainlog.New("submodule", "store")
lastheaderlock sync.Mutex
AddBlock int64 = 1 AddBlock int64 = 1
DelBlock int64 = 2 DelBlock int64 = 2
) )
...@@ -102,14 +101,16 @@ func calcHashToSequenceKey(hash []byte) []byte { ...@@ -102,14 +101,16 @@ func calcHashToSequenceKey(hash []byte) []byte {
//BlockStore 区块存储 //BlockStore 区块存储
type BlockStore struct { type BlockStore struct {
db dbm.DB db dbm.DB
client queue.Client client queue.Client
height int64 height int64
lastBlock *types.Block lastBlock *types.Block
lastheaderlock sync.Mutex
chain *BlockChain
} }
//NewBlockStore new //NewBlockStore new
func NewBlockStore(db dbm.DB, client queue.Client) *BlockStore { func NewBlockStore(chain *BlockChain, db dbm.DB, client queue.Client) *BlockStore {
height, err := LoadBlockStoreHeight(db) height, err := LoadBlockStoreHeight(db)
if err != nil { if err != nil {
chainlog.Info("init::LoadBlockStoreHeight::database may be crash", "err", err.Error()) chainlog.Info("init::LoadBlockStoreHeight::database may be crash", "err", err.Error())
...@@ -121,6 +122,7 @@ func NewBlockStore(db dbm.DB, client queue.Client) *BlockStore { ...@@ -121,6 +122,7 @@ func NewBlockStore(db dbm.DB, client queue.Client) *BlockStore {
height: height, height: height,
db: db, db: db,
client: client, client: client,
chain: chain,
} }
if height == -1 { if height == -1 {
chainlog.Info("load block height error, may be init database", "height", height) chainlog.Info("load block height error, may be init database", "height", height)
...@@ -354,8 +356,8 @@ func (bs *BlockStore) UpdateHeight2(height int64) { ...@@ -354,8 +356,8 @@ func (bs *BlockStore) UpdateHeight2(height int64) {
//LastHeader 返回BlockStore保存的当前blockheader //LastHeader 返回BlockStore保存的当前blockheader
func (bs *BlockStore) LastHeader() *types.Header { func (bs *BlockStore) LastHeader() *types.Header {
lastheaderlock.Lock() bs.lastheaderlock.Lock()
defer lastheaderlock.Unlock() defer bs.lastheaderlock.Unlock()
// 通过lastBlock获取lastheader // 通过lastBlock获取lastheader
var blockheader = types.Header{} var blockheader = types.Header{}
...@@ -382,8 +384,8 @@ func (bs *BlockStore) UpdateLastBlock(hash []byte) { ...@@ -382,8 +384,8 @@ func (bs *BlockStore) UpdateLastBlock(hash []byte) {
storeLog.Error("UpdateLastBlock", "hash", common.ToHex(hash), "error", err) storeLog.Error("UpdateLastBlock", "hash", common.ToHex(hash), "error", err)
return return
} }
lastheaderlock.Lock() bs.lastheaderlock.Lock()
defer lastheaderlock.Unlock() defer bs.lastheaderlock.Unlock()
if blockdetail != nil { if blockdetail != nil {
bs.lastBlock = blockdetail.Block bs.lastBlock = blockdetail.Block
} }
...@@ -392,16 +394,16 @@ func (bs *BlockStore) UpdateLastBlock(hash []byte) { ...@@ -392,16 +394,16 @@ func (bs *BlockStore) UpdateLastBlock(hash []byte) {
//UpdateLastBlock2 更新LastBlock到缓存中 //UpdateLastBlock2 更新LastBlock到缓存中
func (bs *BlockStore) UpdateLastBlock2(block *types.Block) { func (bs *BlockStore) UpdateLastBlock2(block *types.Block) {
lastheaderlock.Lock() bs.lastheaderlock.Lock()
defer lastheaderlock.Unlock() defer bs.lastheaderlock.Unlock()
bs.lastBlock = block bs.lastBlock = block
storeLog.Debug("UpdateLastBlock", "UpdateLastBlock", block.Height, "LastHederhash", common.ToHex(block.Hash())) storeLog.Debug("UpdateLastBlock", "UpdateLastBlock", block.Height, "LastHederhash", common.ToHex(block.Hash()))
} }
//LastBlock 获取最新的block信息 //LastBlock 获取最新的block信息
func (bs *BlockStore) LastBlock() *types.Block { func (bs *BlockStore) LastBlock() *types.Block {
lastheaderlock.Lock() bs.lastheaderlock.Lock()
defer lastheaderlock.Unlock() defer bs.lastheaderlock.Unlock()
if bs.lastBlock != nil { if bs.lastBlock != nil {
return bs.lastBlock return bs.lastBlock
} }
...@@ -545,7 +547,7 @@ func (bs *BlockStore) SaveBlock(storeBatch dbm.Batch, blockdetail *types.BlockDe ...@@ -545,7 +547,7 @@ func (bs *BlockStore) SaveBlock(storeBatch dbm.Batch, blockdetail *types.BlockDe
//存储block height和block hash的对应关系,便于通过height查询block //存储block height和block hash的对应关系,便于通过height查询block
storeBatch.Set(calcHeightToHashKey(height), hash) storeBatch.Set(calcHeightToHashKey(height), hash)
if isRecordBlockSequence || isParaChain { if bs.chain.isRecordBlockSequence || bs.chain.isParaChain {
//存储记录block序列执行的type add //存储记录block序列执行的type add
lastSequence, err = bs.saveBlockSequence(storeBatch, hash, height, AddBlock, sequence) lastSequence, err = bs.saveBlockSequence(storeBatch, hash, height, AddBlock, sequence)
if err != nil { if err != nil {
...@@ -574,7 +576,7 @@ func (bs *BlockStore) DelBlock(storeBatch dbm.Batch, blockdetail *types.BlockDet ...@@ -574,7 +576,7 @@ func (bs *BlockStore) DelBlock(storeBatch dbm.Batch, blockdetail *types.BlockDet
storeBatch.Delete(calcHeightToHashKey(height)) storeBatch.Delete(calcHeightToHashKey(height))
storeBatch.Delete(calcHeightToBlockHeaderKey(height)) storeBatch.Delete(calcHeightToBlockHeaderKey(height))
if isRecordBlockSequence || isParaChain { if bs.chain.isRecordBlockSequence || bs.chain.isParaChain {
//存储记录block序列执行的type del //存储记录block序列执行的type del
lastSequence, err := bs.saveBlockSequence(storeBatch, hash, height, DelBlock, sequence) lastSequence, err := bs.saveBlockSequence(storeBatch, hash, height, DelBlock, sequence)
if err != nil { if err != nil {
...@@ -944,7 +946,7 @@ func (bs *BlockStore) saveBlockSequence(storeBatch dbm.Batch, hash []byte, heigh ...@@ -944,7 +946,7 @@ func (bs *BlockStore) saveBlockSequence(storeBatch dbm.Batch, hash []byte, heigh
var blockSequence types.BlockSequence var blockSequence types.BlockSequence
var newSequence int64 var newSequence int64
if isRecordBlockSequence { if bs.chain.isRecordBlockSequence {
Sequence, err := bs.LoadBlockLastSequence() Sequence, err := bs.LoadBlockLastSequence()
if err != nil { if err != nil {
storeLog.Error("SaveBlockSequence", "LoadBlockLastSequence err", err) storeLog.Error("SaveBlockSequence", "LoadBlockLastSequence err", err)
...@@ -959,7 +961,7 @@ func (bs *BlockStore) saveBlockSequence(storeBatch dbm.Batch, hash []byte, heigh ...@@ -959,7 +961,7 @@ func (bs *BlockStore) saveBlockSequence(storeBatch dbm.Batch, hash []byte, heigh
storeLog.Error("isRecordBlockSequence is true must Synchronizing data from zero block", "height", height) storeLog.Error("isRecordBlockSequence is true must Synchronizing data from zero block", "height", height)
panic(errors.New("isRecordBlockSequence is true must Synchronizing data from zero block")) panic(errors.New("isRecordBlockSequence is true must Synchronizing data from zero block"))
} }
} else if isParaChain { } else if bs.chain.isParaChain {
newSequence = sequence newSequence = sequence
} }
blockSequence.Hash = hash blockSequence.Hash = hash
...@@ -1091,7 +1093,7 @@ func (bs *BlockStore) SetUpgradeMeta(meta *types.UpgradeMeta) error { ...@@ -1091,7 +1093,7 @@ func (bs *BlockStore) SetUpgradeMeta(meta *types.UpgradeMeta) error {
} }
//isRecordBlockSequence配置的合法性检测 //isRecordBlockSequence配置的合法性检测
func (bs *BlockStore) isRecordBlockSequenceValid() { func (bs *BlockStore) isRecordBlockSequenceValid(chain *BlockChain) {
lastHeight := bs.Height() lastHeight := bs.Height()
lastSequence, err := bs.LoadBlockLastSequence() lastSequence, err := bs.LoadBlockLastSequence()
if err != nil { if err != nil {
...@@ -1101,7 +1103,7 @@ func (bs *BlockStore) isRecordBlockSequenceValid() { ...@@ -1101,7 +1103,7 @@ func (bs *BlockStore) isRecordBlockSequenceValid() {
} }
} }
//使能isRecordBlockSequence时的检测 //使能isRecordBlockSequence时的检测
if isRecordBlockSequence { if chain.isRecordBlockSequence {
//中途开启isRecordBlockSequence报错 //中途开启isRecordBlockSequence报错
if lastSequence == -1 && lastHeight != -1 { if lastSequence == -1 && lastHeight != -1 {
storeLog.Error("isRecordBlockSequenceValid", "lastHeight", lastHeight, "lastSequence", lastSequence) storeLog.Error("isRecordBlockSequenceValid", "lastHeight", lastHeight, "lastSequence", lastSequence)
......
...@@ -24,17 +24,11 @@ import ( ...@@ -24,17 +24,11 @@ import (
//var //var
var ( var (
//cache 存贮的block个数 //cache 存贮的block个数
DefCacheSize int64 = 128 MaxSeqCB int64 = 20
MaxSeqCB int64 = 20 zeroHash [32]byte
cachelock sync.Mutex InitBlockNum int64 = 10240 //节点刚启动时从db向index和bestchain缓存中添加的blocknode数,和blockNodeCacheLimit保持一致
zeroHash [32]byte chainlog = log.New("module", "blockchain")
InitBlockNum int64 = 10240 //节点刚启动时从db向index和bestchain缓存中添加的blocknode数,和blockNodeCacheLimit保持一致 FutureBlockDelayTime int64 = 1
isStrongConsistency = false
chainlog = log.New("module", "blockchain")
FutureBlockDelayTime int64 = 1
isRecordBlockSequence = false //是否记录add或者del block的序列,方便blcokchain的恢复通过记录的序列表
isParaChain = false //是否是平行链。平行链需要记录Sequence信息
) )
const maxFutureBlocks = 256 const maxFutureBlocks = 256
...@@ -99,19 +93,44 @@ type BlockChain struct { ...@@ -99,19 +93,44 @@ type BlockChain struct {
futureBlocks *lru.Cache // future blocks are broadcast later processing futureBlocks *lru.Cache // future blocks are broadcast later processing
//downLoad block info //downLoad block info
downLoadInfo *DownLoadInfo downLoadInfo *DownLoadInfo
downLoadlock sync.Mutex isFastDownloadSync bool //当本节点落后很多时,可以先下载区块到db,启动单独的goroutine去执行block
isRecordBlockSequence bool //是否记录add或者del block的序列,方便blcokchain的恢复通过记录的序列表
isParaChain bool //是否是平行链。平行链需要记录Sequence信息
isStrongConsistency bool
//lock
synBlocklock sync.Mutex
peerMaxBlklock sync.Mutex
castlock sync.Mutex
ntpClockSynclock sync.Mutex
faultpeerlock sync.Mutex
bestpeerlock sync.Mutex
downLoadlock sync.Mutex
fastDownLoadSynLock sync.Mutex
isNtpClockSync bool //ntp时间是否同步
//cfg
MaxFetchBlockNum int64 //一次最多申请获取block个数
TimeoutSeconds int64
blockSynInterVal time.Duration
DefCacheSize int64
failed int32
} }
//New new //New new
func New(cfg *types.BlockChain) *BlockChain { func New(cfg *types.BlockChain) *BlockChain {
initConfig(cfg)
futureBlocks, err := lru.New(maxFutureBlocks) futureBlocks, err := lru.New(maxFutureBlocks)
if err != nil { if err != nil {
panic("when New BlockChain lru.New return err") panic("when New BlockChain lru.New return err")
} }
defCacheSize := int64(128)
if cfg.DefCacheSize > 0 {
defCacheSize = cfg.DefCacheSize
}
blockchain := &BlockChain{ blockchain := &BlockChain{
cache: NewBlockCache(DefCacheSize), cache: NewBlockCache(defCacheSize),
DefCacheSize: defCacheSize,
rcvLastBlockHeight: -1, rcvLastBlockHeight: -1,
synBlockHeight: -1, synBlockHeight: -1,
peerList: nil, peerList: nil,
...@@ -134,30 +153,31 @@ func New(cfg *types.BlockChain) *BlockChain { ...@@ -134,30 +153,31 @@ func New(cfg *types.BlockChain) *BlockChain {
bestChainPeerList: make(map[string]*BestPeerInfo), bestChainPeerList: make(map[string]*BestPeerInfo),
futureBlocks: futureBlocks, futureBlocks: futureBlocks,
downLoadInfo: &DownLoadInfo{}, downLoadInfo: &DownLoadInfo{},
isNtpClockSync: true,
MaxFetchBlockNum: 128 * 6, //一次最多申请获取block个数
TimeoutSeconds: 2,
isFastDownloadSync: true,
} }
blockchain.initConfig(cfg)
return blockchain return blockchain
} }
func initConfig(cfg *types.BlockChain) { func (chain *BlockChain) initConfig(cfg *types.BlockChain) {
if cfg.DefCacheSize > 0 { if types.IsEnable("TxHeight") && chain.DefCacheSize <= (types.LowAllowPackHeight+types.HighAllowPackHeight+1) {
DefCacheSize = cfg.DefCacheSize
}
if types.IsEnable("TxHeight") && DefCacheSize <= (types.LowAllowPackHeight+types.HighAllowPackHeight+1) {
panic("when Enable TxHeight DefCacheSize must big than types.LowAllowPackHeight") panic("when Enable TxHeight DefCacheSize must big than types.LowAllowPackHeight")
} }
if cfg.MaxFetchBlockNum > 0 { if cfg.MaxFetchBlockNum > 0 {
MaxFetchBlockNum = cfg.MaxFetchBlockNum chain.MaxFetchBlockNum = cfg.MaxFetchBlockNum
} }
if cfg.TimeoutSeconds > 0 { if cfg.TimeoutSeconds > 0 {
TimeoutSeconds = cfg.TimeoutSeconds chain.TimeoutSeconds = cfg.TimeoutSeconds
} }
isStrongConsistency = cfg.IsStrongConsistency chain.blockSynInterVal = time.Duration(chain.TimeoutSeconds)
isRecordBlockSequence = cfg.IsRecordBlockSequence chain.isStrongConsistency = cfg.IsStrongConsistency
isParaChain = cfg.IsParaChain chain.isRecordBlockSequence = cfg.IsRecordBlockSequence
chain.isParaChain = cfg.IsParaChain
types.S("quickIndex", cfg.EnableTxQuickIndex) types.S("quickIndex", cfg.EnableTxQuickIndex)
} }
...@@ -193,7 +213,7 @@ func (chain *BlockChain) SetQueueClient(client queue.Client) { ...@@ -193,7 +213,7 @@ func (chain *BlockChain) SetQueueClient(client queue.Client) {
chain.client.Sub("blockchain") chain.client.Sub("blockchain")
blockStoreDB := dbm.NewDB("blockchain", chain.cfg.Driver, chain.cfg.DbPath, chain.cfg.DbCache) blockStoreDB := dbm.NewDB("blockchain", chain.cfg.Driver, chain.cfg.DbPath, chain.cfg.DbCache)
blockStore := NewBlockStore(blockStoreDB, client) blockStore := NewBlockStore(chain, blockStoreDB, client)
chain.blockStore = blockStore chain.blockStore = blockStore
stateHash := chain.getStateHash() stateHash := chain.getStateHash()
chain.query = NewQuery(blockStoreDB, chain.client, stateHash) chain.query = NewQuery(blockStoreDB, chain.client, stateHash)
...@@ -226,7 +246,7 @@ func (chain *BlockChain) GetOrphanPool() *OrphanPool { ...@@ -226,7 +246,7 @@ func (chain *BlockChain) GetOrphanPool() *OrphanPool {
func (chain *BlockChain) InitBlockChain() { func (chain *BlockChain) InitBlockChain() {
//isRecordBlockSequence配置的合法性检测 //isRecordBlockSequence配置的合法性检测
if !chain.cfg.IsParaChain { if !chain.cfg.IsParaChain {
chain.blockStore.isRecordBlockSequenceValid() chain.blockStore.isRecordBlockSequenceValid(chain)
} }
//先缓存最新的128个block信息到cache中 //先缓存最新的128个block信息到cache中
curheight := chain.GetBlockHeight() curheight := chain.GetBlockHeight()
...@@ -394,7 +414,7 @@ func (chain *BlockChain) InitCache(height int64) { ...@@ -394,7 +414,7 @@ func (chain *BlockChain) InitCache(height int64) {
if height < 0 { if height < 0 {
return return
} }
for i := height - DefCacheSize; i <= height; i++ { for i := height - chain.DefCacheSize; i <= height; i++ {
if i < 0 { if i < 0 {
i = 0 i = 0
} }
......
...@@ -507,7 +507,7 @@ func testGetBlocksMsg(t *testing.T, blockchain *blockchain.BlockChain) { ...@@ -507,7 +507,7 @@ func testGetBlocksMsg(t *testing.T, blockchain *blockchain.BlockChain) {
blocks, err := blockchain.ProcGetBlockDetailsMsg(&reqBlock) blocks, err := blockchain.ProcGetBlockDetailsMsg(&reqBlock)
if err == nil && blocks != nil { if err == nil && blocks != nil {
for _, block := range blocks.Items { for _, block := range blocks.Items {
if checkheight != block.Block.Height || block.Receipts == nil { if checkheight != block.Block.Height {
t.Error("TestGetBlocksMsg", "checkheight", checkheight, "block", block) t.Error("TestGetBlocksMsg", "checkheight", checkheight, "block", block)
} }
checkheight++ checkheight++
......
...@@ -6,7 +6,6 @@ package blockchain ...@@ -6,7 +6,6 @@ package blockchain
import ( import (
"fmt" "fmt"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -17,10 +16,8 @@ import ( ...@@ -17,10 +16,8 @@ import (
//var //var
var ( var (
tempBlockKey = []byte("TB:") tempBlockKey = []byte("TB:")
lastTempBlockKey = []byte("LTB:") lastTempBlockKey = []byte("LTB:")
isFastDownloadSync = true //当本节点落后很多时,可以先下载区块到db,启动单独的goroutine去执行block
fastDownLoadSynLock sync.Mutex
) )
//const //const
...@@ -56,22 +53,21 @@ func calcLastTempBlockHeightKey() []byte { ...@@ -56,22 +53,21 @@ func calcLastTempBlockHeightKey() []byte {
} }
//GetDownloadSyncStatus 获取下载区块的同步模式 //GetDownloadSyncStatus 获取下载区块的同步模式
func GetDownloadSyncStatus() bool { func (chain *BlockChain) GetDownloadSyncStatus() bool {
fastDownLoadSynLock.Lock() chain.fastDownLoadSynLock.Lock()
defer fastDownLoadSynLock.Unlock() defer chain.fastDownLoadSynLock.Unlock()
return isFastDownloadSync return chain.isFastDownloadSync
} }
//UpdateDownloadSyncStatus 更新下载区块的同步模式 //UpdateDownloadSyncStatus 更新下载区块的同步模式
func UpdateDownloadSyncStatus(Sync bool) { func (chain *BlockChain) UpdateDownloadSyncStatus(Sync bool) {
fastDownLoadSynLock.Lock() chain.fastDownLoadSynLock.Lock()
defer fastDownLoadSynLock.Unlock() defer chain.fastDownLoadSynLock.Unlock()
isFastDownloadSync = Sync chain.isFastDownloadSync = Sync
} }
//FastDownLoadBlocks 开启快速下载区块的模式 //FastDownLoadBlocks 开启快速下载区块的模式
func (chain *BlockChain) FastDownLoadBlocks() { func (chain *BlockChain) FastDownLoadBlocks() {
defer chain.tickerwg.Done()
curHeight := chain.GetBlockHeight() curHeight := chain.GetBlockHeight()
lastTempHight := chain.GetLastTempBlockHeight() lastTempHight := chain.GetLastTempBlockHeight()
...@@ -93,7 +89,7 @@ func (chain *BlockChain) FastDownLoadBlocks() { ...@@ -93,7 +89,7 @@ func (chain *BlockChain) FastDownLoadBlocks() {
pids := chain.GetBestChainPids() pids := chain.GetBestChainPids()
//节点启动时只有落后最优链batchsyncblocknum个区块时才开启这种下载模式 //节点启动时只有落后最优链batchsyncblocknum个区块时才开启这种下载模式
if pids != nil && peerMaxBlkHeight != -1 && curheight+batchsyncblocknum >= peerMaxBlkHeight { if pids != nil && peerMaxBlkHeight != -1 && curheight+batchsyncblocknum >= peerMaxBlkHeight {
UpdateDownloadSyncStatus(false) chain.UpdateDownloadSyncStatus(false)
synlog.Info("FastDownLoadBlocks:quit!", "curheight", curheight, "peerMaxBlkHeight", peerMaxBlkHeight) synlog.Info("FastDownLoadBlocks:quit!", "curheight", curheight, "peerMaxBlkHeight", peerMaxBlkHeight)
break break
} else if curheight+batchsyncblocknum < peerMaxBlkHeight && len(pids) >= bestPeerCount { } else if curheight+batchsyncblocknum < peerMaxBlkHeight && len(pids) >= bestPeerCount {
...@@ -102,7 +98,7 @@ func (chain *BlockChain) FastDownLoadBlocks() { ...@@ -102,7 +98,7 @@ func (chain *BlockChain) FastDownLoadBlocks() {
go chain.ReadBlockToExec(peerMaxBlkHeight, true) go chain.ReadBlockToExec(peerMaxBlkHeight, true)
break break
} else if types.Since(startTime) > waitTimeDownLoad*time.Second || chain.cfg.SingleMode { } else if types.Since(startTime) > waitTimeDownLoad*time.Second || chain.cfg.SingleMode {
UpdateDownloadSyncStatus(false) chain.UpdateDownloadSyncStatus(false)
synlog.Info("FastDownLoadBlocks:waitTimeDownLoad:quit!", "curheight", curheight, "peerMaxBlkHeight", peerMaxBlkHeight, "pids", pids) synlog.Info("FastDownLoadBlocks:waitTimeDownLoad:quit!", "curheight", curheight, "peerMaxBlkHeight", peerMaxBlkHeight, "pids", pids)
break break
} else { } else {
...@@ -183,7 +179,7 @@ func (chain *BlockChain) ReadBlockToExec(height int64, isNewStart bool) { ...@@ -183,7 +179,7 @@ func (chain *BlockChain) ReadBlockToExec(height int64, isNewStart bool) {
//CancelFastDownLoadFlag 清除快速下载模式的一些标志 //CancelFastDownLoadFlag 清除快速下载模式的一些标志
func (chain *BlockChain) cancelFastDownLoadFlag(isNewStart bool) { func (chain *BlockChain) cancelFastDownLoadFlag(isNewStart bool) {
if isNewStart { if isNewStart {
UpdateDownloadSyncStatus(false) chain.UpdateDownloadSyncStatus(false)
} }
chain.DelLastTempBlockHeight() chain.DelLastTempBlockHeight()
synlog.Info("cancelFastDownLoadFlag", "isNewStart", isNewStart) synlog.Info("cancelFastDownLoadFlag", "isNewStart", isNewStart)
...@@ -222,7 +218,7 @@ func (chain *BlockChain) WriteBlockToDbTemp(block *types.Block) error { ...@@ -222,7 +218,7 @@ func (chain *BlockChain) WriteBlockToDbTemp(block *types.Block) error {
defer func() { defer func() {
chainlog.Debug("WriteBlockToDbTemp", "height", block.Height, "sync", sync, "cost", types.Since(beg)) chainlog.Debug("WriteBlockToDbTemp", "height", block.Height, "sync", sync, "cost", types.Since(beg))
}() }()
newbatch := chain.blockStore.NewBatch(false) newbatch := chain.blockStore.NewBatch(sync)
blockByte, err := proto.Marshal(block) blockByte, err := proto.Marshal(block)
if err != nil { if err != nil {
......
...@@ -21,12 +21,11 @@ const ( ...@@ -21,12 +21,11 @@ const (
var ( var (
ntpLog = chainlog.New("submodule", "ntp") ntpLog = chainlog.New("submodule", "ntp")
failed int32
) )
// checkClockDrift queries an NTP server for clock drifts and warns the user if // checkClockDrift queries an NTP server for clock drifts and warns the user if
// one large enough is detected. // one large enough is detected.
func checkClockDrift() { func (chain *BlockChain) checkClockDrift() {
realnow := common.GetRealTimeRetry(types.NtpHosts, 10) realnow := common.GetRealTimeRetry(types.NtpHosts, 10)
if realnow.IsZero() { if realnow.IsZero() {
ntpLog.Info("checkClockDrift", "sntpDrift err", "get ntptime error") ntpLog.Info("checkClockDrift", "sntpDrift err", "get ntptime error")
...@@ -42,14 +41,14 @@ func checkClockDrift() { ...@@ -42,14 +41,14 @@ func checkClockDrift() {
ntpLog.Warn(fmt.Sprint(warning)) ntpLog.Warn(fmt.Sprint(warning))
ntpLog.Warn(fmt.Sprint(howtofix)) ntpLog.Warn(fmt.Sprint(howtofix))
ntpLog.Warn(fmt.Sprint(separator)) ntpLog.Warn(fmt.Sprint(separator))
atomic.AddInt32(&failed, 1) atomic.AddInt32(&chain.failed, 1)
if atomic.LoadInt32(&failed) == ntpChecks { if atomic.LoadInt32(&chain.failed) == ntpChecks {
ntpLog.Error("System clock seems ERROR") ntpLog.Error("System clock seems ERROR")
UpdateNtpClockSyncStatus(false) chain.UpdateNtpClockSyncStatus(false)
} }
} else { } else {
atomic.StoreInt32(&failed, 0) atomic.StoreInt32(&chain.failed, 0)
UpdateNtpClockSyncStatus(true) chain.UpdateNtpClockSyncStatus(true)
ntpLog.Info(fmt.Sprintf("Sanity NTP check reported %v drift, all ok", drift)) ntpLog.Info(fmt.Sprintf("Sanity NTP check reported %v drift, all ok", drift))
} }
} }
...@@ -61,7 +61,7 @@ func (chain *BlockChain) ProcRecvMsg() { ...@@ -61,7 +61,7 @@ func (chain *BlockChain) ProcRecvMsg() {
case types.EventIsSync: case types.EventIsSync:
go chain.processMsg(msg, reqnum, chain.isSync) go chain.processMsg(msg, reqnum, chain.isSync)
case types.EventIsNtpClockSync: case types.EventIsNtpClockSync:
go chain.processMsg(msg, reqnum, chain.isNtpClockSync) go chain.processMsg(msg, reqnum, chain.isNtpClockSyncFunc)
case types.EventGetLastBlockSequence: case types.EventGetLastBlockSequence:
go chain.processMsg(msg, reqnum, chain.getLastBlockSequence) go chain.processMsg(msg, reqnum, chain.getLastBlockSequence)
...@@ -161,17 +161,17 @@ func (chain *BlockChain) addBlock(msg *queue.Message) { ...@@ -161,17 +161,17 @@ func (chain *BlockChain) addBlock(msg *queue.Message) {
reply.IsOk = true reply.IsOk = true
blockpid := msg.Data.(*types.BlockPid) blockpid := msg.Data.(*types.BlockPid)
//chainlog.Error("addBlock", "height", blockpid.Block.Height, "pid", blockpid.Pid) //chainlog.Error("addBlock", "height", blockpid.Block.Height, "pid", blockpid.Pid)
if GetDownloadSyncStatus() { if chain.GetDownloadSyncStatus() {
//downLoadTask 运行时设置对应的blockdone
if chain.downLoadTask.InProgress() {
chain.downLoadTask.Done(blockpid.Block.GetHeight())
}
err := chain.WriteBlockToDbTemp(blockpid.Block) err := chain.WriteBlockToDbTemp(blockpid.Block)
if err != nil { if err != nil {
chainlog.Error("WriteBlockToDbTemp", "height", blockpid.Block.Height, "err", err.Error()) chainlog.Error("WriteBlockToDbTemp", "height", blockpid.Block.Height, "err", err.Error())
reply.IsOk = false reply.IsOk = false
reply.Msg = []byte(err.Error()) reply.Msg = []byte(err.Error())
} }
//downLoadTask 运行时设置对应的blockdone
if chain.downLoadTask.InProgress() {
chain.downLoadTask.Done(blockpid.Block.GetHeight())
}
} else { } else {
_, err := chain.ProcAddBlockMsg(false, &types.BlockDetail{Block: blockpid.Block}, blockpid.Pid) _, err := chain.ProcAddBlockMsg(false, &types.BlockDetail{Block: blockpid.Block}, blockpid.Pid)
if err != nil { if err != nil {
...@@ -366,8 +366,8 @@ func (chain *BlockChain) getLastBlock(msg *queue.Message) { ...@@ -366,8 +366,8 @@ func (chain *BlockChain) getLastBlock(msg *queue.Message) {
} }
} }
func (chain *BlockChain) isNtpClockSync(msg *queue.Message) { func (chain *BlockChain) isNtpClockSyncFunc(msg *queue.Message) {
ok := GetNtpClockSyncStatus() ok := chain.GetNtpClockSyncStatus()
msg.Reply(chain.client.NewMessage("", types.EventReplyIsNtpClockSync, &types.IsNtpClockSync{Isntpclocksync: ok})) msg.Reply(chain.client.NewMessage("", types.EventReplyIsNtpClockSync, &types.IsNtpClockSync{Isntpclocksync: ok}))
} }
......
...@@ -402,7 +402,7 @@ func (b *BlockChain) connectBlock(node *blockNode, blockdetail *types.BlockDetai ...@@ -402,7 +402,7 @@ func (b *BlockChain) connectBlock(node *blockNode, blockdetail *types.BlockDetai
} }
} }
//目前非平行链并开启isRecordBlockSequence功能 //目前非平行链并开启isRecordBlockSequence功能
if isRecordBlockSequence && !isParaChain { if b.isRecordBlockSequence && !b.isParaChain {
b.pushseq.updateSeq(lastSequence) b.pushseq.updateSeq(lastSequence)
} }
return blockdetail, nil return blockdetail, nil
...@@ -471,7 +471,7 @@ func (b *BlockChain) disconnectBlock(node *blockNode, blockdetail *types.BlockDe ...@@ -471,7 +471,7 @@ func (b *BlockChain) disconnectBlock(node *blockNode, blockdetail *types.BlockDe
chainlog.Debug("disconnectBlock success", "newtipnode.hash", common.ToHex(newtipnode.hash), "delblock.parent.hash", common.ToHex(blockdetail.Block.GetParentHash())) chainlog.Debug("disconnectBlock success", "newtipnode.hash", common.ToHex(newtipnode.hash), "delblock.parent.hash", common.ToHex(blockdetail.Block.GetParentHash()))
//目前非平行链并开启isRecordBlockSequence功能 //目前非平行链并开启isRecordBlockSequence功能
if isRecordBlockSequence && !isParaChain { if b.isRecordBlockSequence && !b.isParaChain {
b.pushseq.updateSeq(lastSequence) b.pushseq.updateSeq(lastSequence)
} }
return nil return nil
......
...@@ -96,7 +96,7 @@ func (chain *BlockChain) ProcAddBlockSeqCB(cb *types.BlockSeqCB) error { ...@@ -96,7 +96,7 @@ func (chain *BlockChain) ProcAddBlockSeqCB(cb *types.BlockSeqCB) error {
return types.ErrInvalidParam return types.ErrInvalidParam
} }
if !isRecordBlockSequence { if !chain.isRecordBlockSequence {
return types.ErrRecordBlockSequence return types.ErrRecordBlockSequence
} }
if chain.blockStore.seqCBNum() >= MaxSeqCB && !chain.blockStore.isSeqCBExist(cb.Name) { if chain.blockStore.seqCBNum() >= MaxSeqCB && !chain.blockStore.isSeqCBExist(cb.Name) {
......
...@@ -105,9 +105,11 @@ func (exec *Executor) SetQueueClient(qcli queue.Client) { ...@@ -105,9 +105,11 @@ func (exec *Executor) SetQueueClient(qcli queue.Client) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
exec.grpccli, err = grpcclient.NewMainChainClient("") if types.IsPara() {
if err != nil { exec.grpccli, err = grpcclient.NewMainChainClient("")
panic(err) if err != nil {
panic(err)
}
} }
//recv 消息的处理 //recv 消息的处理
go func() { go func() {
......
...@@ -107,10 +107,9 @@ func (c Comm) newPeerFromConn(rawConn *grpc.ClientConn, remote *NetAddress, node ...@@ -107,10 +107,9 @@ func (c Comm) newPeerFromConn(rawConn *grpc.ClientConn, remote *NetAddress, node
func (c Comm) dialPeer(addr *NetAddress, node *Node) (*Peer, error) { func (c Comm) dialPeer(addr *NetAddress, node *Node) (*Peer, error) {
log.Debug("dialPeer", "will connect", addr.String()) log.Debug("dialPeer", "will connect", addr.String())
var persistent bool var persistent bool
for _, seed := range node.nodeInfo.cfg.Seeds { //TODO待优化
if seed == addr.String() { if _, ok := node.cfgSeeds.Load(addr.String()); ok {
persistent = true //种子节点要一直连接 persistent = true
}
} }
peer, err := c.dialPeerWithAddress(addr, persistent, node) peer, err := c.dialPeerWithAddress(addr, persistent, node)
if err != nil { if err != nil {
...@@ -138,7 +137,7 @@ func (c Comm) GenPrivPubkey() ([]byte, []byte, error) { ...@@ -138,7 +137,7 @@ func (c Comm) GenPrivPubkey() ([]byte, []byte, error) {
return key.Bytes(), key.PubKey().Bytes(), nil return key.Bytes(), key.PubKey().Bytes(), nil
} }
// Pubkey get pubkey by key // Pubkey get pubkey by priv key
func (c Comm) Pubkey(key string) (string, error) { func (c Comm) Pubkey(key string) (string, error) {
cr, err := crypto.New(types.GetSignName("", types.SECP256K1)) cr, err := crypto.New(types.GetSignName("", types.SECP256K1))
...@@ -163,8 +162,8 @@ func (c Comm) Pubkey(key string) (string, error) { ...@@ -163,8 +162,8 @@ func (c Comm) Pubkey(key string) (string, error) {
// NewPingData get ping node ,return p2pping // NewPingData get ping node ,return p2pping
func (c Comm) NewPingData(nodeInfo *NodeInfo) (*types.P2PPing, error) { func (c Comm) NewPingData(nodeInfo *NodeInfo) (*types.P2PPing, error) {
randNonce := rand.Int31n(102040) randNonce := rand.New(rand.NewSource(time.Now().UnixNano())).Int63()
ping := &types.P2PPing{Nonce: int64(randNonce), Addr: nodeInfo.GetExternalAddr().IP.String(), Port: int32(nodeInfo.GetExternalAddr().Port)} ping := &types.P2PPing{Nonce: randNonce, Addr: nodeInfo.GetExternalAddr().IP.String(), Port: int32(nodeInfo.GetExternalAddr().Port)}
var err error var err error
p2pPrivKey, _ := nodeInfo.addrBook.GetPrivPubKey() p2pPrivKey, _ := nodeInfo.addrBook.GetPrivPubKey()
ping, err = c.Signature(p2pPrivKey, ping) ping, err = c.Signature(p2pPrivKey, ping)
...@@ -240,6 +239,9 @@ func (c Comm) CheckSign(in *types.P2PPing) bool { ...@@ -240,6 +239,9 @@ func (c Comm) CheckSign(in *types.P2PPing) bool {
// CollectPeerStat collect peer stat and report // CollectPeerStat collect peer stat and report
func (c Comm) CollectPeerStat(err error, peer *Peer) { func (c Comm) CollectPeerStat(err error, peer *Peer) {
if err != nil { if err != nil {
if err == types.ErrVersion {
peer.version.SetSupport(false)
}
peer.peerStat.NotOk() peer.peerStat.NotOk()
} else { } else {
peer.peerStat.Ok() peer.peerStat.Ok()
......
...@@ -18,24 +18,26 @@ var ( ...@@ -18,24 +18,26 @@ var (
StreamPingTimeout = 20 * time.Second StreamPingTimeout = 20 * time.Second
MonitorPeerInfoInterval = 10 * time.Second MonitorPeerInfoInterval = 10 * time.Second
MonitorPeerNumInterval = 30 * time.Second MonitorPeerNumInterval = 30 * time.Second
MonitorReBalanceInterval = 2 * time.Minute MonitorReBalanceInterval = 15 * time.Minute
GetAddrFromAddrBookInterval = 5 * time.Second GetAddrFromAddrBookInterval = 5 * time.Second
GetAddrFromOnlineInterval = 5 * time.Second GetAddrFromOnlineInterval = 5 * time.Second
GetAddrFromGitHubInterval = 5 * time.Minute GetAddrFromGitHubInterval = 5 * time.Minute
CheckActivePeersInterVal = 5 * time.Second CheckActivePeersInterVal = 5 * time.Second
CheckBlackListInterVal = 30 * time.Second CheckBlackListInterVal = 30 * time.Second
CheckCfgSeedsInterVal = 1 * time.Minute
) )
const ( const (
msgTx = 1 msgTx = 1
msgBlock = 2 msgBlock = 2
tryMapPortTimes = 20 tryMapPortTimes = 20
maxSamIPNum = 20
) )
var ( var (
// LocalAddr local address // LocalAddr local address
LocalAddr string LocalAddr string
defaultPort = 13802 //defaultPort = 13802
) )
const ( const (
...@@ -77,8 +79,8 @@ var TestNetSeeds = []string{ ...@@ -77,8 +79,8 @@ var TestNetSeeds = []string{
"47.104.125.177:13802", "47.104.125.177:13802",
} }
// InnerSeeds built-in list of seed // MainNetSeeds built-in list of seed
var InnerSeeds = []string{ var MainNetSeeds = []string{
"39.107.234.240:13802", "39.107.234.240:13802",
"39.105.88.66:13802", "39.105.88.66:13802",
"39.105.87.114:13802", "39.105.87.114:13802",
......
...@@ -8,14 +8,35 @@ import ( ...@@ -8,14 +8,35 @@ import (
"container/list" "container/list"
"fmt" "fmt"
"io" "io"
"sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
//"time"
pb "github.com/33cn/chain33/types" pb "github.com/33cn/chain33/types"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
// Invs datastruct
type Invs []*pb.Inventory
//Len size of the Invs data
func (i Invs) Len() int {
return len(i)
}
//Less Sort from low to high
func (i Invs) Less(a, b int) bool {
return i[a].GetHeight() < i[b].GetHeight()
}
//Swap the param
func (i Invs) Swap(a, b int) {
i[a], i[b] = i[b], i[a]
}
// DownloadJob defines download job type // DownloadJob defines download job type
type DownloadJob struct { type DownloadJob struct {
wg sync.WaitGroup wg sync.WaitGroup
...@@ -25,11 +46,11 @@ type DownloadJob struct { ...@@ -25,11 +46,11 @@ type DownloadJob struct {
mtx sync.Mutex mtx sync.Mutex
busyPeer map[string]*peerJob busyPeer map[string]*peerJob
downloadPeers []*Peer downloadPeers []*Peer
MaxJob int32
} }
type peerJob struct { type peerJob struct {
pbPeer *pb.Peer limit int32
limit int32
} }
// NewDownloadJob create a downloadjob object // NewDownloadJob create a downloadjob object
...@@ -39,6 +60,12 @@ func NewDownloadJob(p2pcli *Cli, peers []*Peer) *DownloadJob { ...@@ -39,6 +60,12 @@ func NewDownloadJob(p2pcli *Cli, peers []*Peer) *DownloadJob {
job.p2pcli = p2pcli job.p2pcli = p2pcli
job.busyPeer = make(map[string]*peerJob) job.busyPeer = make(map[string]*peerJob)
job.downloadPeers = peers job.downloadPeers = peers
job.MaxJob = 2
if len(peers) < 5 {
job.MaxJob = 10
}
//job.okChan = make(chan *pb.Inventory, 512)
return job return job
} }
...@@ -46,23 +73,60 @@ func (d *DownloadJob) isBusyPeer(pid string) bool { ...@@ -46,23 +73,60 @@ func (d *DownloadJob) isBusyPeer(pid string) bool {
d.mtx.Lock() d.mtx.Lock()
defer d.mtx.Unlock() defer d.mtx.Unlock()
if pjob, ok := d.busyPeer[pid]; ok { if pjob, ok := d.busyPeer[pid]; ok {
return atomic.LoadInt32(&pjob.limit) > 10 //每个节点最多同时接受10个下载任务 return atomic.LoadInt32(&pjob.limit) >= d.MaxJob //每个节点最多同时接受10个下载任务
} }
return false return false
} }
func (d *DownloadJob) setBusyPeer(peer *pb.Peer) { func (d *DownloadJob) getJobNum(pid string) int32 {
d.mtx.Lock() d.mtx.Lock()
defer d.mtx.Unlock() defer d.mtx.Unlock()
if pjob, ok := d.busyPeer[peer.GetName()]; ok { if pjob, ok := d.busyPeer[pid]; ok {
return atomic.LoadInt32(&pjob.limit)
}
return 0
}
func (d *DownloadJob) setBusyPeer(pid string) {
d.mtx.Lock()
defer d.mtx.Unlock()
if pjob, ok := d.busyPeer[pid]; ok {
atomic.AddInt32(&pjob.limit, 1) atomic.AddInt32(&pjob.limit, 1)
d.busyPeer[peer.GetName()] = pjob d.busyPeer[pid] = pjob
return return
} }
d.busyPeer[peer.GetName()] = &peerJob{peer, 1} d.busyPeer[pid] = &peerJob{1}
} }
func (d *DownloadJob) removePeer(pid string) {
d.mtx.Lock()
defer d.mtx.Unlock()
for i, pr := range d.downloadPeers {
if pr.GetPeerName() == pid {
if i != len(d.downloadPeers)-1 {
d.downloadPeers = append(d.downloadPeers[:i], d.downloadPeers[i+1:]...)
return
}
d.downloadPeers = d.downloadPeers[:i]
return
}
}
}
// ResetDownloadPeers reset download peers
func (d *DownloadJob) ResetDownloadPeers(peers []*Peer) {
d.mtx.Lock()
defer d.mtx.Unlock()
copy(d.downloadPeers, peers)
}
func (d *DownloadJob) avalidPeersNum() int {
d.mtx.Lock()
defer d.mtx.Unlock()
return len(d.downloadPeers)
}
func (d *DownloadJob) setFreePeer(pid string) { func (d *DownloadJob) setFreePeer(pid string) {
d.mtx.Lock() d.mtx.Lock()
defer d.mtx.Unlock() defer d.mtx.Unlock()
...@@ -77,8 +141,10 @@ func (d *DownloadJob) setFreePeer(pid string) { ...@@ -77,8 +141,10 @@ func (d *DownloadJob) setFreePeer(pid string) {
} }
// GetFreePeer get free peer ,return peer // GetFreePeer get free peer ,return peer
func (d *DownloadJob) GetFreePeer(joblimit int64) *Peer { func (d *DownloadJob) GetFreePeer(blockHeight int64) *Peer {
_, infos := d.p2pcli.network.node.GetActivePeers() _, infos := d.p2pcli.network.node.GetActivePeers()
var jobNum int32 = 10
var bestPeer *Peer
for _, peer := range d.downloadPeers { for _, peer := range d.downloadPeers {
pbpeer, ok := infos[peer.Addr()] pbpeer, ok := infos[peer.Addr()]
if ok { if ok {
...@@ -86,17 +152,24 @@ func (d *DownloadJob) GetFreePeer(joblimit int64) *Peer { ...@@ -86,17 +152,24 @@ func (d *DownloadJob) GetFreePeer(joblimit int64) *Peer {
peer.SetPeerName(pbpeer.GetName()) peer.SetPeerName(pbpeer.GetName())
} }
if pbpeer.GetHeader().GetHeight() >= joblimit { if pbpeer.GetHeader().GetHeight() >= blockHeight {
if d.isBusyPeer(pbpeer.GetName()) { if d.isBusyPeer(pbpeer.GetName()) {
continue continue
} }
d.setBusyPeer(pbpeer) peerJopNum := d.getJobNum(pbpeer.GetName())
return peer if jobNum > peerJopNum {
jobNum = peerJopNum
bestPeer = peer
}
} }
} }
} }
return nil if bestPeer != nil {
d.setBusyPeer(bestPeer.GetPeerName())
}
return bestPeer
} }
// CancelJob cancel the downloadjob object // CancelJob cancel the downloadjob object
...@@ -117,11 +190,11 @@ func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory, ...@@ -117,11 +190,11 @@ func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory,
} }
for _, inv := range invs { //让一个节点一次下载一个区块,下载失败区块,交给下一轮下载 for _, inv := range invs { //让一个节点一次下载一个区块,下载失败区块,交给下一轮下载
freePeer := d.GetFreePeer(inv.GetHeight()) REGET:
freePeer := d.GetFreePeer(inv.GetHeight()) //获取当前任务数最少的节点,相当于 下载速度最快的节点
if freePeer == nil { if freePeer == nil {
//log.Error("DownloadBlock", "freepeer is null", inv.GetHeight()) time.Sleep(time.Millisecond * 100)
d.retryList.PushBack(inv) goto REGET
continue
} }
d.wg.Add(1) d.wg.Add(1)
...@@ -129,7 +202,10 @@ func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory, ...@@ -129,7 +202,10 @@ func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory,
defer d.wg.Done() defer d.wg.Done()
err := d.syncDownloadBlock(peer, inv, bchan) err := d.syncDownloadBlock(peer, inv, bchan)
if err != nil { if err != nil {
d.retryList.PushBack(inv) //失败的下载,放在下一轮ReDownload进行下载 d.removePeer(peer.GetPeerName())
log.Error("DownloadBlock:syncDownloadBlock", "height", inv.GetHeight(), "peer", peer.GetPeerName(), "err", err)
d.retryList.PushFront(inv) //失败的下载,放在下一轮ReDownload进行下载
} else { } else {
d.setFreePeer(peer.GetPeerName()) d.setFreePeer(peer.GetPeerName())
} }
...@@ -152,19 +228,21 @@ func (d *DownloadJob) restOfInvs(bchan chan *pb.BlockPid) []*pb.Inventory { ...@@ -152,19 +228,21 @@ func (d *DownloadJob) restOfInvs(bchan chan *pb.BlockPid) []*pb.Inventory {
return errinvs return errinvs
} }
var invs []*pb.Inventory var invsArr Invs
for e := d.retryList.Front(); e != nil; { for e := d.retryList.Front(); e != nil; {
if e.Value == nil { if e.Value == nil {
continue continue
} }
log.Debug("restofInvs", "inv", e.Value.(*pb.Inventory).GetHeight()) log.Debug("resetofInvs", "inv", e.Value.(*pb.Inventory).GetHeight())
invs = append(invs, e.Value.(*pb.Inventory)) //把下载遗漏的区块,重新组合进行下载 invsArr = append(invsArr, e.Value.(*pb.Inventory)) //把下载遗漏的区块,重新组合进行下载
next := e.Next() next := e.Next()
d.retryList.Remove(e) d.retryList.Remove(e)
e = next e = next
} }
//Sort
return invs sort.Sort(invsArr)
//log.Info("resetOfInvs", "sorted:", invs)
return invsArr
} }
func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan chan *pb.BlockPid) error { func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan chan *pb.BlockPid) error {
...@@ -179,26 +257,33 @@ func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan cha ...@@ -179,26 +257,33 @@ func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan cha
var p2pdata pb.P2PGetData var p2pdata pb.P2PGetData
p2pdata.Version = d.p2pcli.network.node.nodeInfo.cfg.Version p2pdata.Version = d.p2pcli.network.node.nodeInfo.cfg.Version
p2pdata.Invs = []*pb.Inventory{inv} p2pdata.Invs = []*pb.Inventory{inv}
beg := pb.Now()
resp, err := peer.mconn.gcli.GetData(context.Background(), &p2pdata, grpc.FailFast(true)) resp, err := peer.mconn.gcli.GetData(context.Background(), &p2pdata, grpc.FailFast(true))
P2pComm.CollectPeerStat(err, peer) P2pComm.CollectPeerStat(err, peer)
if err != nil { if err != nil {
log.Error("syncDownloadBlock", "GetData err", err.Error()) log.Error("syncDownloadBlock", "GetData err", err.Error())
return err return err
} }
defer func() {
log.Debug("download", "frompeer", peer.Addr(), "blockheight", inv.GetHeight(), "downloadcost", pb.Since(beg))
}()
defer resp.CloseSend() defer resp.CloseSend()
for { for {
invdatas, err := resp.Recv() invdatas, err := resp.Recv()
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
log.Debug("download", "from", peer.Addr(), "block", inv.GetHeight()) if invdatas == nil {
return nil return nil
}
goto RECV
} }
log.Error("download", "resp,Recv err", err.Error(), "download from", peer.Addr()) log.Error("download", "resp,Recv err", err.Error(), "download from", peer.Addr())
return err return err
} }
RECV:
for _, item := range invdatas.Items { for _, item := range invdatas.Items {
bchan <- &pb.BlockPid{Pid: peer.GetPeerName(), Block: item.GetBlock()} //下载完成后插入bchan bchan <- &pb.BlockPid{Pid: peer.GetPeerName(), Block: item.GetBlock()} //下载完成后插入bchan
log.Debug("download", "frompeer", peer.Addr(), "blockheight", inv.GetHeight(), "Blocksize", item.GetBlock().XXX_Size())
} }
} }
} }
...@@ -7,11 +7,15 @@ package p2p ...@@ -7,11 +7,15 @@ package p2p
import ( import (
"fmt" "fmt"
"net" "net"
"sync"
"time" "time"
pb "github.com/33cn/chain33/types" pb "github.com/33cn/chain33/types"
"golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
pr "google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
) )
// Listener the actions // Listener the actions
...@@ -49,8 +53,8 @@ type listener struct { ...@@ -49,8 +53,8 @@ type listener struct {
// NewListener produce a listener object // NewListener produce a listener object
func NewListener(protocol string, node *Node) Listener { func NewListener(protocol string, node *Node) Listener {
log.Debug("NewListener", "localPort", defaultPort) log.Debug("NewListener", "localPort", node.listenPort)
l, err := net.Listen(protocol, fmt.Sprintf(":%v", defaultPort)) l, err := net.Listen(protocol, fmt.Sprintf(":%v", node.listenPort))
if err != nil { if err != nil {
log.Crit("Failed to listen", "Error", err.Error()) log.Crit("Failed to listen", "Error", err.Error())
return nil return nil
...@@ -65,6 +69,56 @@ func NewListener(protocol string, node *Node) Listener { ...@@ -65,6 +69,56 @@ func NewListener(protocol string, node *Node) Listener {
pServer := NewP2pServer() pServer := NewP2pServer()
pServer.node = dl.node pServer.node = dl.node
//一元拦截器 接口调用之前进行校验拦截
interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
//checkAuth
getctx, ok := pr.FromContext(ctx)
if !ok {
return nil, fmt.Errorf("")
}
ip, _, err := net.SplitHostPort(getctx.Addr.String())
if err != nil {
return nil, err
}
if pServer.node.nodeInfo.blacklist.Has(ip) {
return nil, fmt.Errorf("blacklist %v no authorized", ip)
}
if !auth(ip) {
log.Error("interceptor", "auth faild", ip)
//把相应的IP地址加入黑名单中
pServer.node.nodeInfo.blacklist.Add(ip, int64(3600))
return nil, fmt.Errorf("auth faild %v no authorized", ip)
}
// Continue processing the request
return handler(ctx, req)
}
//流拦截器
interceptorStream := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
getctx, ok := pr.FromContext(ss.Context())
if !ok {
log.Error("interceptorStream", "FromContext error", "")
return fmt.Errorf("stream Context err")
}
ip, _, err := net.SplitHostPort(getctx.Addr.String())
if err != nil {
return err
}
if pServer.node.nodeInfo.blacklist.Has(ip) {
return fmt.Errorf("blacklist %v no authorized", ip)
}
if !auth(ip) {
log.Error("interceptorStream", "auth faild", ip)
//把相应的IP地址加入黑名单中
pServer.node.nodeInfo.blacklist.Add(ip, int64(3600))
return fmt.Errorf("auth faild %v no authorized", ip)
}
return handler(srv, ss)
}
var opts []grpc.ServerOption
opts = append(opts, grpc.UnaryInterceptor(interceptor), grpc.StreamInterceptor(interceptorStream))
//区块最多10M //区块最多10M
msgRecvOp := grpc.MaxMsgSize(11 * 1024 * 1024) //设置最大接收数据大小位11M msgRecvOp := grpc.MaxMsgSize(11 * 1024 * 1024) //设置最大接收数据大小位11M
msgSendOp := grpc.MaxSendMsgSize(11 * 1024 * 1024) //设置最大发送数据大小为11M msgSendOp := grpc.MaxSendMsgSize(11 * 1024 * 1024) //设置最大发送数据大小为11M
...@@ -74,9 +128,79 @@ func NewListener(protocol string, node *Node) Listener { ...@@ -74,9 +128,79 @@ func NewListener(protocol string, node *Node) Listener {
keepparm.MaxConnectionIdle = 1 * time.Minute keepparm.MaxConnectionIdle = 1 * time.Minute
maxStreams := grpc.MaxConcurrentStreams(1000) maxStreams := grpc.MaxConcurrentStreams(1000)
keepOp := grpc.KeepaliveParams(keepparm) keepOp := grpc.KeepaliveParams(keepparm)
StatsOp := grpc.StatsHandler(&statshandler{})
dl.server = grpc.NewServer(msgRecvOp, msgSendOp, keepOp, maxStreams) opts = append(opts, msgRecvOp, msgSendOp, keepOp, maxStreams, StatsOp)
dl.server = grpc.NewServer(opts...)
dl.p2pserver = pServer dl.p2pserver = pServer
pb.RegisterP2PgserviceServer(dl.server, pServer) pb.RegisterP2PgserviceServer(dl.server, pServer)
return dl return dl
} }
type statshandler struct{}
func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return context.WithValue(ctx, connCtxKey{}, info)
}
func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return ctx
}
func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) {
if ctx == nil {
return
}
tag, ok := getConnTagFromContext(ctx)
if !ok {
fmt.Println("can not get conn tag")
return
}
ip, _, err := net.SplitHostPort(tag.RemoteAddr.String())
if err != nil {
return
}
connsMutex.Lock()
defer connsMutex.Unlock()
if _, ok := conns[ip]; !ok {
conns[ip] = 0
}
switch s.(type) {
case *stats.ConnBegin:
conns[ip] = conns[ip] + 1
case *stats.ConnEnd:
conns[ip] = conns[ip] - 1
if conns[ip] <= 0 {
delete(conns, ip)
}
log.Debug("ip connend", "ip", ip, "n", conns[ip])
default:
log.Error("illegal ConnStats type\n")
}
}
// HandleRPC 为空.
func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {}
type connCtxKey struct{}
var connsMutex sync.Mutex
var conns = make(map[string]uint)
func getConnTagFromContext(ctx context.Context) (*stats.ConnTagInfo, bool) {
tag, ok := ctx.Value(connCtxKey{}).(*stats.ConnTagInfo)
return tag, ok
}
func auth(checkIP string) bool {
connsMutex.Lock()
defer connsMutex.Unlock()
count, ok := conns[checkIP]
if ok && count > maxSamIPNum {
log.Error("AuthCheck", "sameIP num:", count, "checkIP:", checkIP, "diffIP num:", len(conns))
return false
}
return true
}
...@@ -28,7 +28,7 @@ func (n *Node) monitorErrPeer() { ...@@ -28,7 +28,7 @@ func (n *Node) monitorErrPeer() {
peer := <-n.nodeInfo.monitorChan peer := <-n.nodeInfo.monitorChan
if !peer.version.IsSupport() { if !peer.version.IsSupport() {
//如果版本不支持,直接删除节点 //如果版本不支持,直接删除节点
log.Debug("VersoinMonitor", "NotSupport,addr", peer.Addr()) log.Info("VersoinMonitor", "NotSupport,addr", peer.Addr())
n.destroyPeer(peer) n.destroyPeer(peer)
//加入黑名单12小时 //加入黑名单12小时
n.nodeInfo.blacklist.Add(peer.Addr(), int64(3600*12)) n.nodeInfo.blacklist.Add(peer.Addr(), int64(3600*12))
...@@ -123,6 +123,19 @@ func (n *Node) getAddrFromOnline() { ...@@ -123,6 +123,19 @@ func (n *Node) getAddrFromOnline() {
} }
if rangeCount < maxOutBoundNum {
//从innerSeeds 读取连接
n.innerSeeds.Range(func(k, v interface{}) bool {
rangeCount++
if rangeCount < maxOutBoundNum {
n.pubsub.FIFOPub(k.(string), "addr")
return true
}
return false
})
}
continue continue
} }
...@@ -162,15 +175,21 @@ func (n *Node) getAddrFromOnline() { ...@@ -162,15 +175,21 @@ func (n *Node) getAddrFromOnline() {
if _, ok := seedsMap[addr]; ok { if _, ok := seedsMap[addr]; ok {
continue continue
} }
//随机删除连接的一个种子 //随机删除连接的一个种子
for _, seed := range seedArr {
if n.Has(seed) {
n.remove(seed)
n.nodeInfo.addrBook.RemoveAddr(seed)
break
}
}
n.innerSeeds.Range(func(k, v interface{}) bool {
if n.Has(k.(string)) {
//不能包含在cfgseed中
if _, ok := n.cfgSeeds.Load(k.(string)); ok {
return true
}
n.remove(k.(string))
n.nodeInfo.addrBook.RemoveAddr(k.(string))
return false
}
return true
})
} }
} }
time.Sleep(MonitorPeerInfoInterval) time.Sleep(MonitorPeerInfoInterval)
...@@ -180,7 +199,7 @@ func (n *Node) getAddrFromOnline() { ...@@ -180,7 +199,7 @@ func (n *Node) getAddrFromOnline() {
if !n.nodeInfo.blacklist.Has(addr) || !Filter.QueryRecvData(addr) { if !n.nodeInfo.blacklist.Has(addr) || !Filter.QueryRecvData(addr) {
if ticktimes < 10 { if ticktimes < 10 {
//如果连接了其他节点,优先不连接种子节点 //如果连接了其他节点,优先不连接种子节点
if _, ok := seedsMap[addr]; !ok { if _, ok := n.innerSeeds.Load(addr); !ok {
//先把seed 排除在外 //先把seed 排除在外
n.pubsub.FIFOPub(addr, "addr") n.pubsub.FIFOPub(addr, "addr")
...@@ -263,8 +282,8 @@ func (n *Node) nodeReBalance() { ...@@ -263,8 +282,8 @@ func (n *Node) nodeReBalance() {
//筛选缓存备选节点负载最大和最小的节点 //筛选缓存备选节点负载最大和最小的节点
cachePeers := n.GetCacheBounds() cachePeers := n.GetCacheBounds()
var MixCacheInBounds int32 = 1000 var MinCacheInBounds int32 = 1000
var MixCacheInBoundPeer *Peer var MinCacheInBoundPeer *Peer
var MaxCacheInBounds int32 var MaxCacheInBounds int32
var MaxCacheInBoundPeer *Peer var MaxCacheInBoundPeer *Peer
for _, peer := range cachePeers { for _, peer := range cachePeers {
...@@ -275,9 +294,9 @@ func (n *Node) nodeReBalance() { ...@@ -275,9 +294,9 @@ func (n *Node) nodeReBalance() {
continue continue
} }
//选出最小负载 //选出最小负载
if int32(inbounds) < MixCacheInBounds { if int32(inbounds) < MinCacheInBounds {
MixCacheInBounds = int32(inbounds) MinCacheInBounds = int32(inbounds)
MixCacheInBoundPeer = peer MinCacheInBoundPeer = peer
} }
//选出负载最大 //选出负载最大
...@@ -287,7 +306,7 @@ func (n *Node) nodeReBalance() { ...@@ -287,7 +306,7 @@ func (n *Node) nodeReBalance() {
} }
} }
if MixCacheInBoundPeer == nil || MaxCacheInBoundPeer == nil { if MinCacheInBoundPeer == nil || MaxCacheInBoundPeer == nil {
continue continue
} }
...@@ -297,7 +316,7 @@ func (n *Node) nodeReBalance() { ...@@ -297,7 +316,7 @@ func (n *Node) nodeReBalance() {
MaxCacheInBoundPeer.Close() MaxCacheInBoundPeer.Close()
} }
//如果最大的负载量比缓存中负载最小的小,则删除缓存中所有的节点 //如果最大的负载量比缓存中负载最小的小,则删除缓存中所有的节点
if MaxInBounds < MixCacheInBounds { if MaxInBounds < MinCacheInBounds {
cachePeers := n.GetCacheBounds() cachePeers := n.GetCacheBounds()
for _, peer := range cachePeers { for _, peer := range cachePeers {
n.RemoveCachePeer(peer.Addr()) n.RemoveCachePeer(peer.Addr())
...@@ -306,16 +325,16 @@ func (n *Node) nodeReBalance() { ...@@ -306,16 +325,16 @@ func (n *Node) nodeReBalance() {
continue continue
} }
log.Info("nodeReBalance", "MaxInBounds", MaxInBounds, "MixCacheInBounds", MixCacheInBounds) log.Info("nodeReBalance", "MaxInBounds", MaxInBounds, "MixCacheInBounds", MinCacheInBounds)
if MaxInBounds-MixCacheInBounds < 50 { if MaxInBounds-MinCacheInBounds < 50 {
continue continue
} }
if MixCacheInBoundPeer != nil { if MinCacheInBoundPeer != nil {
info, err := MixCacheInBoundPeer.GetPeerInfo(VERSION) info, err := MinCacheInBoundPeer.GetPeerInfo(VERSION)
if err != nil { if err != nil {
n.RemoveCachePeer(MixCacheInBoundPeer.Addr()) n.RemoveCachePeer(MinCacheInBoundPeer.Addr())
MixCacheInBoundPeer.Close() MinCacheInBoundPeer.Close()
continue continue
} }
localBlockHeight, err := p2pcli.GetBlockHeight(n.nodeInfo) localBlockHeight, err := p2pcli.GetBlockHeight(n.nodeInfo)
...@@ -324,10 +343,10 @@ func (n *Node) nodeReBalance() { ...@@ -324,10 +343,10 @@ func (n *Node) nodeReBalance() {
} }
peerBlockHeight := info.GetHeader().GetHeight() peerBlockHeight := info.GetHeader().GetHeight()
if localBlockHeight-peerBlockHeight < 2048 { if localBlockHeight-peerBlockHeight < 2048 {
log.Info("noReBalance", "Repalce node new node", MixCacheInBoundPeer.Addr(), "old node", MaxCacheInBoundPeer.Addr()) log.Info("noReBalance", "Repalce node new node", MinCacheInBoundPeer.Addr(), "old node", MaxInBoundPeer.Addr())
n.addPeer(MixCacheInBoundPeer) n.addPeer(MinCacheInBoundPeer)
n.remove(MaxInBoundPeer.Addr()) n.remove(MaxInBoundPeer.Addr())
n.RemoveCachePeer(MixCacheInBoundPeer.Addr()) n.RemoveCachePeer(MinCacheInBoundPeer.Addr())
} }
} }
} }
...@@ -373,11 +392,13 @@ func (n *Node) monitorPeers() { ...@@ -373,11 +392,13 @@ func (n *Node) monitorPeers() {
if n.Size() <= stableBoundNum { if n.Size() <= stableBoundNum {
continue continue
} }
//如果是配置节点,则不删除
if _, ok := n.cfgSeeds.Load(paddr); ok {
continue
}
//删除节点数过低的节点 //删除节点数过低的节点
n.remove(paddr) n.remove(paddr)
n.nodeInfo.addrBook.RemoveAddr(paddr) n.nodeInfo.addrBook.RemoveAddr(paddr)
//短暂加入黑名单5分钟
//n.nodeInfo.blacklist.Add(paddr, int64(60*5))
} }
} }
...@@ -469,7 +490,9 @@ func (n *Node) monitorDialPeers() { ...@@ -469,7 +490,9 @@ func (n *Node) monitorDialPeers() {
if peer != nil { if peer != nil {
peer.Close() peer.Close()
} }
n.nodeInfo.blacklist.Add(netAddr.String(), int64(60*10)) if _, ok := n.cfgSeeds.Load(netAddr.String()); !ok {
n.nodeInfo.blacklist.Add(netAddr.String(), int64(60*10))
}
return return
} }
//查询远程节点的负载 //查询远程节点的负载
...@@ -532,3 +555,48 @@ func (n *Node) monitorBlackList() { ...@@ -532,3 +555,48 @@ func (n *Node) monitorBlackList() {
func (n *Node) monitorFilter() { func (n *Node) monitorFilter() {
Filter.ManageRecvFilter() Filter.ManageRecvFilter()
} }
//独立goroutine 监控配置的
func (n *Node) monitorCfgSeeds() {
ticker := time.NewTicker(CheckCfgSeedsInterVal)
defer ticker.Stop()
for {
if n.isClose() {
log.Info("monitorCfgSeeds", "loop", "done")
return
}
<-ticker.C
n.cfgSeeds.Range(func(k, v interface{}) bool {
if !n.Has(k.(string)) {
//尝试连接此节点
if n.needMore() { //如果需要更多的节点
n.pubsub.FIFOPub(k.(string), "addr")
} else {
//腾笼换鸟
peers, _ := n.GetActivePeers()
//选出当前连接的节点中,负载最大的节点
var MaxInBounds int32
var MaxInBoundPeer *Peer
for _, peer := range peers {
if peer.GetInBouns() > MaxInBounds {
MaxInBounds = peer.GetInBouns()
MaxInBoundPeer = peer
}
}
n.remove(MaxInBoundPeer.Addr())
n.pubsub.FIFOPub(k.(string), "addr")
}
}
return true
})
}
}
...@@ -172,6 +172,7 @@ func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) { ...@@ -172,6 +172,7 @@ func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) {
ch2 <- P2pComm.GrpcConfig() ch2 <- P2pComm.GrpcConfig()
log.Debug("NetAddress", "Dial with unCompressor", na.String()) log.Debug("NetAddress", "Dial with unCompressor", na.String())
conn, err = grpc.Dial(na.String(), grpc.WithInsecure(), grpc.WithServiceConfig(ch2), keepaliveOp, timeoutOp) conn, err = grpc.Dial(na.String(), grpc.WithInsecure(), grpc.WithServiceConfig(ch2), keepaliveOp, timeoutOp)
} }
if err != nil { if err != nil {
...@@ -184,132 +185,8 @@ func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) { ...@@ -184,132 +185,8 @@ func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) {
} }
return nil, err return nil, err
} }
return conn, nil
}
// Routable returns true if the address is routable.
func (na *NetAddress) Routable() bool {
// TODO(oga) bitcoind doesn't include RFC3849 here, but should we?
return na.Valid() && !(na.RFC1918() || na.RFC3927() || na.RFC4862() ||
na.RFC4193() || na.RFC4843() || na.Local())
}
// Valid For IPv4 these are either a 0 or all bits set address. For IPv6 a zero
// address or one that matches the RFC3849 documentation address format.
func (na *NetAddress) Valid() bool {
return na.IP != nil && !(na.IP.IsUnspecified() || na.RFC3849() ||
na.IP.Equal(net.IPv4bcast))
}
// Local returns true if it is a local address.
func (na *NetAddress) Local() bool {
return na.IP.IsLoopback() || zero4.Contains(na.IP)
}
// ReachabilityTo checks whenever o can be reached from na.
func (na *NetAddress) ReachabilityTo(o *NetAddress) int {
const (
Unreachable = 0
Default = iota
Teredo
Ipv6Weak
Ipv4
Ipv6Strong
)
if !na.Routable() {
return Unreachable
} else if na.RFC4380() {
if !o.Routable() {
return Default
} else if o.RFC4380() {
return Teredo
} else if o.IP.To4() != nil {
return Ipv4
}
// ipv6
return Ipv6Weak
} else if na.IP.To4() != nil {
if o.Routable() && o.IP.To4() != nil {
return Ipv4
}
return Default
}
/* ipv6 */
var tunnelled bool
// Is our v6 is tunnelled?
if o.RFC3964() || o.RFC6052() || o.RFC6145() {
tunnelled = true
}
if !o.Routable() {
return Default
} else if o.RFC4380() {
return Teredo
} else if o.IP.To4() != nil {
return Ipv4
} else if tunnelled {
// only prioritise ipv6 if we aren't tunnelling it.
return Ipv6Weak
}
return Ipv6Strong
}
// RFC1918: IPv4 Private networks (10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12) //p2p version check
// RFC3849: IPv6 Documentation address (2001:0DB8::/32)
// RFC3927: IPv4 Autoconfig (169.254.0.0/16)
// RFC3964: IPv6 6to4 (2002::/16)
// RFC4193: IPv6 unique local (FC00::/7)
// RFC4380: IPv6 Teredo tunneling (2001::/32)
// RFC4843: IPv6 ORCHID: (2001:10::/28)
// RFC4862: IPv6 Autoconfig (FE80::/64)
// RFC6052: IPv6 well known prefix (64:FF9B::/96)
// RFC6145: IPv6 IPv4 translated address ::FFFF:0:0:0/96
var (
rfc1918_10 = net.IPNet{IP: net.ParseIP("10.0.0.0"), Mask: net.CIDRMask(8, 32)}
rfc1918_192 = net.IPNet{IP: net.ParseIP("192.168.0.0"), Mask: net.CIDRMask(16, 32)}
rfc1918_172 = net.IPNet{IP: net.ParseIP("172.16.0.0"), Mask: net.CIDRMask(12, 32)}
rfc3849 = net.IPNet{IP: net.ParseIP("2001:0DB8::"), Mask: net.CIDRMask(32, 128)}
rfc3927 = net.IPNet{IP: net.ParseIP("169.254.0.0"), Mask: net.CIDRMask(16, 32)}
rfc3964 = net.IPNet{IP: net.ParseIP("2002::"), Mask: net.CIDRMask(16, 128)}
rfc4193 = net.IPNet{IP: net.ParseIP("FC00::"), Mask: net.CIDRMask(7, 128)}
rfc4380 = net.IPNet{IP: net.ParseIP("2001::"), Mask: net.CIDRMask(32, 128)}
rfc4843 = net.IPNet{IP: net.ParseIP("2001:10::"), Mask: net.CIDRMask(28, 128)}
rfc4862 = net.IPNet{IP: net.ParseIP("FE80::"), Mask: net.CIDRMask(64, 128)}
rfc6052 = net.IPNet{IP: net.ParseIP("64:FF9B::"), Mask: net.CIDRMask(96, 128)}
rfc6145 = net.IPNet{IP: net.ParseIP("::FFFF:0:0:0"), Mask: net.CIDRMask(96, 128)}
zero4 = net.IPNet{IP: net.ParseIP("0.0.0.0"), Mask: net.CIDRMask(8, 32)}
)
// RFC1918 defines ipv4 private network function return conn, nil
func (na *NetAddress) RFC1918() bool {
return rfc1918_10.Contains(na.IP) ||
rfc1918_192.Contains(na.IP) ||
rfc1918_172.Contains(na.IP)
} }
// RFC3849 defines ipv6 network function
func (na *NetAddress) RFC3849() bool { return rfc3849.Contains(na.IP) }
// RFC3927 defines ipv4 network function
func (na *NetAddress) RFC3927() bool { return rfc3927.Contains(na.IP) }
// RFC3964 defines ipv6 6to4 function
func (na *NetAddress) RFC3964() bool { return rfc3964.Contains(na.IP) }
// RFC4193 defines ipv6 unique local function
func (na *NetAddress) RFC4193() bool { return rfc4193.Contains(na.IP) }
// RFC4380 defines ipv6 teredo tunneling function
func (na *NetAddress) RFC4380() bool { return rfc4380.Contains(na.IP) }
// RFC4843 defines ipv6 orchid function
func (na *NetAddress) RFC4843() bool { return rfc4843.Contains(na.IP) }
// RFC4862 defines ipv6 autoconfig function
func (na *NetAddress) RFC4862() bool { return rfc4862.Contains(na.IP) }
// RFC6052 defines ipv6 well know prefix function
func (na *NetAddress) RFC6052() bool { return rfc6052.Contains(na.IP) }
// RFC6145 defines ipv6 ipv4 translated addredd function
func (na *NetAddress) RFC6145() bool { return rfc6145.Contains(na.IP) }
...@@ -67,6 +67,9 @@ type Node struct { ...@@ -67,6 +67,9 @@ type Node struct {
cacheBound map[string]*Peer cacheBound map[string]*Peer
outBound map[string]*Peer outBound map[string]*Peer
listener Listener listener Listener
listenPort int
innerSeeds sync.Map
cfgSeeds sync.Map
closed int32 closed int32
pubsub *pubsub.PubSub pubsub *pubsub.PubSub
} }
...@@ -84,21 +87,24 @@ func NewNode(cfg *types.P2P) (*Node, error) { ...@@ -84,21 +87,24 @@ func NewNode(cfg *types.P2P) (*Node, error) {
cacheBound: make(map[string]*Peer), cacheBound: make(map[string]*Peer),
pubsub: pubsub.NewPubSub(10200), pubsub: pubsub.NewPubSub(10200),
} }
node.listenPort = 13802
if cfg.Port != 0 && cfg.Port <= 65535 && cfg.Port > 1024 { if cfg.Port != 0 && cfg.Port <= 65535 && cfg.Port > 1024 {
defaultPort = int(cfg.Port) node.listenPort = int(cfg.Port)
} }
if cfg.InnerSeedEnable { seeds := MainNetSeeds
if types.IsTestNet() { if types.IsTestNet() {
cfg.Seeds = append(cfg.Seeds, TestNetSeeds...) seeds = TestNetSeeds
} else { }
cfg.Seeds = append(cfg.Seeds, InnerSeeds...)
}
for _, seed := range seeds {
node.innerSeeds.Store(seed, "inner")
} }
for _, seed := range cfg.Seeds {
node.cfgSeeds.Store(seed, "cfg")
}
node.nodeInfo = NewNodeInfo(cfg) node.nodeInfo = NewNodeInfo(cfg)
if cfg.ServerStart { if cfg.ServerStart {
node.listener = NewListener(protocol, node) node.listener = NewListener(protocol, node)
...@@ -133,7 +139,7 @@ func (n *Node) doNat() { ...@@ -133,7 +139,7 @@ func (n *Node) doNat() {
} }
time.Sleep(time.Second) time.Sleep(time.Second)
} }
testExaddr := fmt.Sprintf("%v:%v", n.nodeInfo.GetExternalAddr().IP.String(), defaultPort) testExaddr := fmt.Sprintf("%v:%v", n.nodeInfo.GetExternalAddr().IP.String(), n.listenPort)
log.Info("TestNetAddr", "testExaddr", testExaddr) log.Info("TestNetAddr", "testExaddr", testExaddr)
if len(P2pComm.AddrRouteble([]string{testExaddr})) != 0 { if len(P2pComm.AddrRouteble([]string{testExaddr})) != 0 {
log.Info("node outside") log.Info("node outside")
...@@ -162,7 +168,7 @@ func (n *Node) doNat() { ...@@ -162,7 +168,7 @@ func (n *Node) doNat() {
p2pcli := NewNormalP2PCli() p2pcli := NewNormalP2PCli()
//测试映射后的端口能否连通或者外网+本地端口 //测试映射后的端口能否连通或者外网+本地端口
if p2pcli.CheckPeerNatOk(n.nodeInfo.GetExternalAddr().String()) || if p2pcli.CheckPeerNatOk(n.nodeInfo.GetExternalAddr().String()) ||
p2pcli.CheckPeerNatOk(fmt.Sprintf("%v:%v", n.nodeInfo.GetExternalAddr().IP.String(), defaultPort)) { p2pcli.CheckPeerNatOk(fmt.Sprintf("%v:%v", n.nodeInfo.GetExternalAddr().IP.String(), n.listenPort)) {
n.nodeInfo.SetServiceTy(Service) n.nodeInfo.SetServiceTy(Service)
log.Info("doNat", "NatOk", "Support Service") log.Info("doNat", "NatOk", "Support Service")
...@@ -329,6 +335,7 @@ func (n *Node) monitor() { ...@@ -329,6 +335,7 @@ func (n *Node) monitor() {
go n.monitorFilter() go n.monitorFilter()
go n.monitorPeers() go n.monitorPeers()
go n.nodeReBalance() go n.nodeReBalance()
go n.monitorCfgSeeds()
} }
func (n *Node) needMore() bool { func (n *Node) needMore() bool {
...@@ -366,7 +373,7 @@ func (n *Node) detectNodeAddr() { ...@@ -366,7 +373,7 @@ func (n *Node) detectNodeAddr() {
var externalPort int var externalPort int
if cfg.IsSeed { if cfg.IsSeed {
externalPort = defaultPort externalPort = n.listenPort
} else { } else {
exportBytes, err := n.nodeInfo.addrBook.bookDb.Get([]byte(externalPortTag)) exportBytes, err := n.nodeInfo.addrBook.bookDb.Get([]byte(externalPortTag))
if len(exportBytes) != 0 { if len(exportBytes) != 0 {
...@@ -390,13 +397,11 @@ func (n *Node) detectNodeAddr() { ...@@ -390,13 +397,11 @@ func (n *Node) detectNodeAddr() {
log.Error("DetectionNodeAddr", "error", err.Error()) log.Error("DetectionNodeAddr", "error", err.Error())
} }
if listaddr, err := NewNetAddressString(fmt.Sprintf("%v:%v", laddr, defaultPort)); err == nil { if listaddr, err := NewNetAddressString(fmt.Sprintf("%v:%v", laddr, n.listenPort)); err == nil {
n.nodeInfo.SetListenAddr(listaddr) n.nodeInfo.SetListenAddr(listaddr)
n.nodeInfo.addrBook.AddOurAddress(listaddr) n.nodeInfo.addrBook.AddOurAddress(listaddr)
} }
//log.Info("DetectionNodeAddr", "ExternalIp", externalIP, "LocalAddr", LocalAddr, "IsOutSide", n.nodeInfo.OutSide())
break break
} }
} }
...@@ -417,7 +422,7 @@ func (n *Node) natMapPort() { ...@@ -417,7 +422,7 @@ func (n *Node) natMapPort() {
ok := p2pcli.CheckSelf(n.nodeInfo.GetExternalAddr().String(), n.nodeInfo) ok := p2pcli.CheckSelf(n.nodeInfo.GetExternalAddr().String(), n.nodeInfo)
if !ok { if !ok {
log.Info("natMapPort", "port is used", n.nodeInfo.GetExternalAddr().String()) log.Info("natMapPort", "port is used", n.nodeInfo.GetExternalAddr().String())
n.flushNodePort(uint16(defaultPort), uint16(rand.Intn(64512)+1023)) n.flushNodePort(uint16(n.listenPort), uint16(rand.Intn(64512)+1023))
} }
} }
...@@ -425,11 +430,11 @@ func (n *Node) natMapPort() { ...@@ -425,11 +430,11 @@ func (n *Node) natMapPort() {
log.Info("natMapPort", "netport", n.nodeInfo.GetExternalAddr().Port) log.Info("natMapPort", "netport", n.nodeInfo.GetExternalAddr().Port)
for i := 0; i < tryMapPortTimes; i++ { for i := 0; i < tryMapPortTimes; i++ {
//映射事件持续约48小时 //映射事件持续约48小时
err = nat.Any().AddMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), defaultPort, nodename[:8], time.Hour*48) err = nat.Any().AddMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), n.listenPort, nodename[:8], time.Hour*48)
if err != nil { if err != nil {
if i > tryMapPortTimes/2 { //如果连续失败次数超过最大限制次数的二分之一则切换为随机端口映射 if i > tryMapPortTimes/2 { //如果连续失败次数超过最大限制次数的二分之一则切换为随机端口映射
log.Error("NatMapPort", "err", err.Error()) log.Error("NatMapPort", "err", err.Error())
n.flushNodePort(uint16(defaultPort), uint16(rand.Intn(64512)+1023)) n.flushNodePort(uint16(n.listenPort), uint16(rand.Intn(64512)+1023))
} }
log.Info("NatMapPort", "External Port", n.nodeInfo.GetExternalAddr().Port) log.Info("NatMapPort", "External Port", n.nodeInfo.GetExternalAddr().Port)
...@@ -442,7 +447,7 @@ func (n *Node) natMapPort() { ...@@ -442,7 +447,7 @@ func (n *Node) natMapPort() {
if err != nil { if err != nil {
//映射失败 //映射失败
log.Warn("NatMapPort", "Nat", "Faild") log.Warn("NatMapPort", "Nat", "Faild")
n.flushNodePort(uint16(defaultPort), uint16(defaultPort)) n.flushNodePort(uint16(n.listenPort), uint16(n.listenPort))
n.nodeInfo.natResultChain <- false n.nodeInfo.natResultChain <- false
return return
} }
...@@ -460,7 +465,7 @@ func (n *Node) natMapPort() { ...@@ -460,7 +465,7 @@ func (n *Node) natMapPort() {
<-refresh.C <-refresh.C
log.Info("NatWorkRefresh") log.Info("NatWorkRefresh")
for { for {
if err := nat.Any().AddMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), defaultPort, nodename[:8], time.Hour*48); err != nil { if err := nat.Any().AddMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), n.listenPort, nodename[:8], time.Hour*48); err != nil {
log.Error("NatMapPort update", "err", err.Error()) log.Error("NatMapPort update", "err", err.Error())
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
...@@ -476,7 +481,8 @@ func (n *Node) deleteNatMapPort() { ...@@ -476,7 +481,8 @@ func (n *Node) deleteNatMapPort() {
if n.nodeInfo.OutSide() { if n.nodeInfo.OutSide() {
return return
} }
err := nat.Any().DeleteMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), defaultPort)
err := nat.Any().DeleteMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), n.listenPort)
if err != nil { if err != nil {
log.Error("deleteNatMapPort", "DeleteMapping err", err.Error()) log.Error("deleteNatMapPort", "DeleteMapping err", err.Error())
} }
......
...@@ -55,8 +55,7 @@ func New(cfg *types.P2P) *P2p { ...@@ -55,8 +55,7 @@ func New(cfg *types.P2P) *P2p {
} }
VERSION = cfg.Version VERSION = cfg.Version
log.Info("p2p", "Version", VERSION) log.Info("p2p", "Version", VERSION, "IsTest", types.IsTestNet())
if cfg.InnerBounds == 0 { if cfg.InnerBounds == 0 {
cfg.InnerBounds = 500 cfg.InnerBounds = 500
} }
...@@ -93,6 +92,7 @@ func (network *P2p) Close() { ...@@ -93,6 +92,7 @@ func (network *P2p) Close() {
network.client.Close() network.client.Close()
} }
network.node.pubsub.Shutdown() network.node.pubsub.Shutdown()
} }
// SetQueueClient set the queue // SetQueueClient set the queue
...@@ -215,6 +215,7 @@ func (network *P2p) subP2pMsg() { ...@@ -215,6 +215,7 @@ func (network *P2p) subP2pMsg() {
} }
} }
switch msg.Ty { switch msg.Ty {
case types.EventTxBroadcast: //广播tx case types.EventTxBroadcast: //广播tx
go network.p2pCli.BroadCastTx(msg, taskIndex) go network.p2pCli.BroadCastTx(msg, taskIndex)
case types.EventBlockBroadcast: //广播block case types.EventBlockBroadcast: //广播block
......
package p2p
import (
"encoding/hex"
"net"
"os"
"sort"
"strings"
"testing"
"time"
l "github.com/33cn/chain33/common/log"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
//"github.com/33cn/chain33/util/testnode"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
var q queue.Queue
var p2pModule *P2p
var dataDir = "testdata"
func init() {
VERSION = 119
l.SetLogLevel("err")
q = queue.New("channel")
go q.Start()
p2pModule = initP2p(33802, dataDir)
p2pModule.Wait()
go func() {
blockchainKey := "blockchain"
client := q.Client()
client.Sub(blockchainKey)
for msg := range client.Recv() {
switch msg.Ty {
case types.EventGetBlocks:
if req, ok := msg.GetData().(*types.ReqBlocks); ok {
if req.Start == 1 {
msg.Reply(client.NewMessage(blockchainKey, types.EventBlocks, &types.Transaction{}))
} else {
msg.Reply(client.NewMessage(blockchainKey, types.EventBlocks, &types.BlockDetails{}))
}
} else {
msg.ReplyErr("Do not support", types.ErrInvalidParam)
}
case types.EventGetHeaders:
if req, ok := msg.GetData().(*types.ReqBlocks); ok {
if req.Start == 10 {
msg.Reply(client.NewMessage(blockchainKey, types.EventHeaders, &types.Transaction{}))
} else {
msg.Reply(client.NewMessage(blockchainKey, types.EventHeaders, &types.Headers{}))
}
} else {
msg.ReplyErr("Do not support", types.ErrInvalidParam)
}
case types.EventGetLastHeader:
msg.Reply(client.NewMessage("p2p", types.EventHeader, &types.Header{Height: 2019}))
case types.EventGetBlockHeight:
msg.Reply(client.NewMessage("p2p", types.EventReplyBlockHeight, &types.ReplyBlockHeight{Height: 2019}))
}
}
}()
go func() {
mempoolKey := "mempool"
client := q.Client()
client.Sub(mempoolKey)
for msg := range client.Recv() {
switch msg.Ty {
case types.EventGetMempoolSize:
msg.Reply(client.NewMessage("p2p", types.EventMempoolSize, &types.MempoolSize{Size: 0}))
}
}
}()
}
//初始化p2p模块
func initP2p(port int32, dbpath string) *P2p {
cfg := new(types.P2P)
cfg.Port = port
cfg.Enable = true
cfg.DbPath = dbpath
cfg.DbCache = 4
cfg.Version = 119
cfg.ServerStart = true
cfg.Driver = "leveldb"
p2pcli := New(cfg)
p2pcli.SetQueueClient(q.Client())
p2pcli.node.nodeInfo.SetServiceTy(7)
return p2pcli
}
func TestP2PEvent(t *testing.T) {
qcli := q.Client()
msg := qcli.NewMessage("p2p", types.EventBlockBroadcast, &types.Block{})
qcli.Send(msg, false)
msg = qcli.NewMessage("p2p", types.EventTxBroadcast, &types.Transaction{})
qcli.Send(msg, false)
msg = qcli.NewMessage("p2p", types.EventFetchBlocks, &types.ReqBlocks{})
qcli.Send(msg, false)
msg = qcli.NewMessage("p2p", types.EventGetMempool, nil)
qcli.Send(msg, false)
msg = qcli.NewMessage("p2p", types.EventPeerInfo, nil)
qcli.Send(msg, false)
msg = qcli.NewMessage("p2p", types.EventGetNetInfo, nil)
qcli.Send(msg, false)
msg = qcli.NewMessage("p2p", types.EventFetchBlockHeaders, &types.ReqBlocks{})
qcli.Send(msg, false)
}
func TestNetInfo(t *testing.T) {
p2pModule.node.nodeInfo.IsNatDone()
p2pModule.node.nodeInfo.SetNatDone()
p2pModule.node.nodeInfo.Get()
}
//测试Peer
func TestPeer(t *testing.T) {
conn, err := grpc.Dial("localhost:33802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
assert.Nil(t, err)
defer conn.Close()
remote, err := NewNetAddressString("127.0.0.1:33802")
assert.Nil(t, err)
localP2P := initP2p(43802, "testdata2")
defer os.RemoveAll("testdata2")
defer localP2P.Close()
t.Log(localP2P.node.CacheBoundsSize())
t.Log(localP2P.node.GetCacheBounds())
localP2P.node.RemoveCachePeer("localhost:12345")
peer, err := P2pComm.dialPeer(remote, localP2P.node)
assert.Nil(t, err)
defer peer.Close()
peer.MakePersistent()
localP2P.node.addPeer(peer)
time.Sleep(time.Second * 5)
t.Log(peer.GetInBouns())
t.Log(peer.version.GetVersion())
assert.IsType(t, "string", peer.GetPeerName())
localP2P.node.AddCachePeer(peer)
//
localP2P.node.natOk()
localP2P.node.flushNodePort(43803, 43802)
p2pcli := NewNormalP2PCli()
localP2P.node.nodeInfo.peerInfos.SetPeerInfo(nil)
localP2P.node.nodeInfo.peerInfos.GetPeerInfo("1222")
t.Log(p2pModule.node.GetRegisterPeer("localhost:43802"))
//测试发送Ping消息
err = p2pcli.SendPing(peer, localP2P.node.nodeInfo)
assert.Nil(t, err)
//获取peer节点的被连接数
pnum, err := p2pcli.GetInPeersNum(peer)
assert.Nil(t, err)
assert.Equal(t, 1, pnum)
_, err = peer.GetPeerInfo(VERSION)
assert.Nil(t, err)
//获取节点列表
_, err = p2pcli.GetAddrList(peer)
assert.Nil(t, err)
_, err = p2pcli.SendVersion(peer, localP2P.node.nodeInfo)
assert.Nil(t, err)
t.Log(p2pcli.CheckPeerNatOk("localhost:33802"))
t.Log("checkself:", p2pcli.CheckSelf("loadhost:43803", localP2P.node.nodeInfo))
_, err = p2pcli.GetAddr(peer)
assert.Nil(t, err)
// //测试获取高度
height, err := p2pcli.GetBlockHeight(localP2P.node.nodeInfo)
assert.Nil(t, err)
assert.Equal(t, int(height), 2019)
assert.Equal(t, false, p2pcli.CheckSelf("localhost:33802", localP2P.node.nodeInfo))
//测试下载
job := NewDownloadJob(NewP2PCli(localP2P).(*Cli), []*Peer{peer})
job.GetFreePeer(1)
var ins []*types.Inventory
var bChan = make(chan *types.BlockPid, 256)
respIns := job.DownloadBlock(ins, bChan)
t.Log(respIns)
job.ResetDownloadPeers([]*Peer{peer})
t.Log(job.avalidPeersNum())
job.setBusyPeer(peer.GetPeerName())
job.setFreePeer(peer.GetPeerName())
job.removePeer(peer.GetPeerName())
job.CancelJob()
os.Remove(dataDir)
}
func TestSortArr(t *testing.T) {
var Inventorys = make(Invs, 0)
for i := 100; i >= 0; i-- {
var inv types.Inventory
inv.Ty = 111
inv.Height = int64(i)
Inventorys = append(Inventorys, &inv)
}
sort.Sort(Inventorys)
}
//测试grpc 多连接
func TestGrpcConns(t *testing.T) {
var conns []*grpc.ClientConn
for i := 0; i < maxSamIPNum; i++ {
conn, err := grpc.Dial("localhost:33802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
assert.Nil(t, err)
cli := types.NewP2PgserviceClient(conn)
_, err = cli.GetHeaders(context.Background(), &types.P2PGetHeaders{
StartHeight: 0, EndHeight: 0, Version: 1002}, grpc.FailFast(true))
assert.Equal(t, false, strings.Contains(err.Error(), "no authorized"))
conns = append(conns, conn)
}
conn, err := grpc.Dial("localhost:33802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
assert.Nil(t, err)
cli := types.NewP2PgserviceClient(conn)
_, err = cli.GetHeaders(context.Background(), &types.P2PGetHeaders{
StartHeight: 0, EndHeight: 0, Version: 1002}, grpc.FailFast(true))
assert.Equal(t, true, strings.Contains(err.Error(), "no authorized"))
conn.Close()
for _, conn := range conns {
conn.Close()
}
}
//测试grpc 流多连接
func TestGrpcStreamConns(t *testing.T) {
conn, err := grpc.Dial("localhost:33802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
assert.Nil(t, err)
cli := types.NewP2PgserviceClient(conn)
var p2pdata types.P2PGetData
resp, err := cli.GetData(context.Background(), &p2pdata)
assert.Nil(t, err)
_, err = resp.Recv()
assert.Equal(t, true, strings.Contains(err.Error(), "no authorized"))
conn.Close()
}
func TestP2pComm(t *testing.T) {
addrs := P2pComm.AddrRouteble([]string{"localhost:33802"})
t.Log(addrs)
i32 := P2pComm.BytesToInt32([]byte{0xff})
t.Log(i32)
_, _, err := P2pComm.GenPrivPubkey()
assert.Nil(t, err)
ping, err := P2pComm.NewPingData(p2pModule.node.nodeInfo)
assert.Nil(t, err)
assert.Equal(t, true, P2pComm.CheckSign(ping))
assert.IsType(t, "string", P2pComm.GetLocalAddr())
assert.Equal(t, 5, len(P2pComm.RandStr(5)))
}
func TestFilter(t *testing.T) {
go Filter.ManageRecvFilter()
defer Filter.Close()
Filter.GetLock()
assert.Equal(t, true, Filter.RegRecvData("key"))
assert.Equal(t, true, Filter.QueryRecvData("key"))
Filter.RemoveRecvData("key")
assert.Equal(t, false, Filter.QueryRecvData("key"))
Filter.ReleaseLock()
}
func TestAddrRouteble(t *testing.T) {
resp := P2pComm.AddrRouteble([]string{"114.55.101.159:13802"})
t.Log(resp)
}
func TestRandStr(t *testing.T) {
t.Log(P2pComm.RandStr(5))
}
func TestGetLocalAddr(t *testing.T) {
t.Log(P2pComm.GetLocalAddr())
}
func TestAddrBook(t *testing.T) {
prv, pub, err := P2pComm.GenPrivPubkey()
if err != nil {
t.Log(err.Error())
return
}
t.Log("priv:", hex.EncodeToString(prv), "pub:", hex.EncodeToString(pub))
pubstr, err := P2pComm.Pubkey(hex.EncodeToString(prv))
if err != nil {
t.Log(err.Error())
return
}
t.Log("GenPubkey:", pubstr)
addrBook := p2pModule.node.nodeInfo.addrBook
addrBook.Size()
addrBook.saveToDb()
addrBook.GetPeerStat("locolhost:43802")
addrBook.genPubkey(hex.EncodeToString(prv))
assert.Equal(t, addrBook.genPubkey(hex.EncodeToString(prv)), pubstr)
addrBook.Save()
addrBook.GetAddrs()
}
func TestBytesToInt32(t *testing.T) {
t.Log(P2pComm.BytesToInt32([]byte{0xff}))
t.Log(P2pComm.Int32ToBytes(255))
}
func TestNetAddress(t *testing.T) {
tcpAddr := new(net.TCPAddr)
tcpAddr.IP = net.ParseIP("localhost")
tcpAddr.Port = 2223
nad := NewNetAddress(tcpAddr)
nad1 := nad.Copy()
nad.Equals(nad1)
nad2s, err := NewNetAddressStrings([]string{"localhost:3306"})
if err != nil {
return
}
nad.Less(nad2s[0])
}
func TestP2pClose(t *testing.T) {
p2pModule.Wait()
p2pModule.Close()
os.RemoveAll(dataDir)
}
...@@ -407,12 +407,20 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) { ...@@ -407,12 +407,20 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{Msg: []byte("no peers")})) msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{Msg: []byte("no peers")}))
return return
} }
msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{IsOk: true, Msg: []byte("downloading...")}))
req := msg.GetData().(*pb.ReqBlocks) req := msg.GetData().(*pb.ReqBlocks)
log.Info("GetBlocks", "start", req.GetStart(), "end", req.GetEnd()) log.Info("GetBlocks", "start", req.GetStart(), "end", req.GetEnd())
pids := req.GetPid() pids := req.GetPid()
var MaxInvs = new(pb.P2PInv) var Inventorys = make([]*pb.Inventory, 0)
for i := req.GetStart(); i <= req.GetEnd(); i++ {
var inv pb.Inventory
inv.Ty = msgBlock
inv.Height = i
Inventorys = append(Inventorys, &inv)
}
MaxInvs := &pb.P2PInv{Invs: Inventorys}
var downloadPeers []*Peer var downloadPeers []*Peer
peers, infos := m.network.node.GetActivePeers() peers, infos := m.network.node.GetActivePeers()
if len(pids) > 0 && pids[0] != "" { //指定Pid 下载数据 if len(pids) > 0 && pids[0] != "" { //指定Pid 下载数据
...@@ -427,21 +435,6 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) { ...@@ -427,21 +435,6 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
peer, ok := peers[paddr] peer, ok := peers[paddr]
if ok && peer != nil { if ok && peer != nil {
downloadPeers = append(downloadPeers, peer) downloadPeers = append(downloadPeers, peer)
//获取Invs
if len(MaxInvs.GetInvs()) != int(req.GetEnd()-req.GetStart())+1 {
var err error
MaxInvs, err = peer.mconn.gcli.GetBlocks(context.Background(), &pb.P2PGetBlocks{StartHeight: req.GetStart(), EndHeight: req.GetEnd(),
Version: m.network.node.nodeInfo.cfg.Version}, grpc.FailFast(true))
P2pComm.CollectPeerStat(err, peer)
if err != nil {
log.Error("GetBlocks", "Err", err.Error())
if err == pb.ErrVersion {
peer.version.SetSupport(false)
P2pComm.CollectPeerStat(err, peer)
}
continue
}
}
} }
} }
...@@ -449,45 +442,6 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) { ...@@ -449,45 +442,6 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
} else { } else {
log.Info("fetch from all peers in pids") log.Info("fetch from all peers in pids")
for _, peer := range peers { //限制对peer 的高频次调用
log.Info("peer", "addr", peer.Addr(), "start", req.GetStart(), "end", req.GetEnd())
peerinfo, ok := infos[peer.Addr()]
if !ok {
continue
}
var pr pb.Peer
pr.Addr = peerinfo.GetAddr()
pr.Port = peerinfo.GetPort()
pr.Name = peerinfo.GetName()
pr.MempoolSize = peerinfo.GetMempoolSize()
pr.Header = peerinfo.GetHeader()
if peerinfo.GetHeader().GetHeight() < req.GetEnd() {
continue
}
invs, err := peer.mconn.gcli.GetBlocks(context.Background(), &pb.P2PGetBlocks{StartHeight: req.GetStart(), EndHeight: req.GetEnd(),
Version: m.network.node.nodeInfo.cfg.Version}, grpc.FailFast(true))
P2pComm.CollectPeerStat(err, peer)
if err != nil {
log.Error("GetBlocks", "Err", err.Error())
if err == pb.ErrVersion {
peer.version.SetSupport(false)
P2pComm.CollectPeerStat(err, peer)
}
continue
}
if len(invs.Invs) > len(MaxInvs.Invs) {
MaxInvs = invs
if len(MaxInvs.GetInvs()) == int(req.GetEnd()-req.GetStart())+1 {
break
}
}
}
for _, peer := range peers { for _, peer := range peers {
peerinfo, ok := infos[peer.Addr()] peerinfo, ok := infos[peer.Addr()]
if !ok { if !ok {
...@@ -501,18 +455,19 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) { ...@@ -501,18 +455,19 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
} }
} }
log.Debug("Invs", "Invs show", MaxInvs.GetInvs()) if len(downloadPeers) == 0 {
if len(MaxInvs.GetInvs()) == 0 { log.Error("GetBlocks", "downloadPeers", 0)
log.Error("GetBlocks", "getInvs", 0) msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{Msg: []byte("no downloadPeers")}))
return return
} }
msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{IsOk: true, Msg: []byte("downloading...")}))
//使用新的下载模式进行下载 //使用新的下载模式进行下载
var bChan = make(chan *pb.BlockPid, 256) var bChan = make(chan *pb.BlockPid, 512)
invs := MaxInvs.GetInvs() invs := MaxInvs.GetInvs()
job := NewDownloadJob(m, downloadPeers) job := NewDownloadJob(m, downloadPeers)
var jobcancel int32 var jobcancel int32
var maxDownloadRetryCount = 100
go func(cancel *int32, invs []*pb.Inventory) { go func(cancel *int32, invs []*pb.Inventory) {
for { for {
if atomic.LoadInt32(cancel) == 1 { if atomic.LoadInt32(cancel) == 1 {
...@@ -523,15 +478,25 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) { ...@@ -523,15 +478,25 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
if len(invs) == 0 { if len(invs) == 0 {
return return
} }
maxDownloadRetryCount--
if maxDownloadRetryCount < 0 { if job.avalidPeersNum() <= 0 {
job.ResetDownloadPeers(downloadPeers)
continue
}
if job.isCancel() {
return return
} }
} }
}(&jobcancel, invs) }(&jobcancel, invs)
i := 0 i := 0
for { for {
timeout := time.NewTimer(time.Minute) if job.isCancel() {
return
}
timeout := time.NewTimer(time.Minute * 10)
select { select {
case <-timeout.C: case <-timeout.C:
atomic.StoreInt32(&jobcancel, 1) atomic.StoreInt32(&jobcancel, 1)
...@@ -664,7 +629,7 @@ func (m *Cli) getLocalPeerInfo() (*pb.P2PPeerInfo, error) { ...@@ -664,7 +629,7 @@ func (m *Cli) getLocalPeerInfo() (*pb.P2PPeerInfo, error) {
localpeerinfo.MempoolSize = int32(meminfo.GetSize()) localpeerinfo.MempoolSize = int32(meminfo.GetSize())
if m.network.node.nodeInfo.GetExternalAddr().IP == nil { if m.network.node.nodeInfo.GetExternalAddr().IP == nil {
localpeerinfo.Addr = LocalAddr localpeerinfo.Addr = LocalAddr
localpeerinfo.Port = int32(defaultPort) localpeerinfo.Port = int32(m.network.node.listenPort)
} else { } else {
localpeerinfo.Addr = m.network.node.nodeInfo.GetExternalAddr().IP.String() localpeerinfo.Addr = m.network.node.nodeInfo.GetExternalAddr().IP.String()
localpeerinfo.Port = int32(m.network.node.nodeInfo.GetExternalAddr().Port) localpeerinfo.Port = int32(m.network.node.nodeInfo.GetExternalAddr().Port)
......
...@@ -7,7 +7,6 @@ package p2p ...@@ -7,7 +7,6 @@ package p2p
import ( import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io"
"net" "net"
"strconv" "strconv"
"sync" "sync"
...@@ -17,6 +16,7 @@ import ( ...@@ -17,6 +16,7 @@ import (
"github.com/33cn/chain33/common/version" "github.com/33cn/chain33/common/version"
pb "github.com/33cn/chain33/types" pb "github.com/33cn/chain33/types"
"golang.org/x/net/context" "golang.org/x/net/context"
pr "google.golang.org/grpc/peer" pr "google.golang.org/grpc/peer"
) )
...@@ -65,6 +65,7 @@ func NewP2pServer() *P2pserver { ...@@ -65,6 +65,7 @@ func NewP2pServer() *P2pserver {
// Ping p2pserver ping // Ping p2pserver ping
func (s *P2pserver) Ping(ctx context.Context, in *pb.P2PPing) (*pb.P2PPong, error) { func (s *P2pserver) Ping(ctx context.Context, in *pb.P2PPing) (*pb.P2PPong, error) {
log.Debug("ping") log.Debug("ping")
if !P2pComm.CheckSign(in) { if !P2pComm.CheckSign(in) {
log.Error("Ping", "p2p server", "check sig err") log.Error("Ping", "p2p server", "check sig err")
...@@ -96,6 +97,7 @@ func (s *P2pserver) Ping(ctx context.Context, in *pb.P2PPing) (*pb.P2PPong, erro ...@@ -96,6 +97,7 @@ func (s *P2pserver) Ping(ctx context.Context, in *pb.P2PPing) (*pb.P2PPong, erro
// GetAddr get address // GetAddr get address
func (s *P2pserver) GetAddr(ctx context.Context, in *pb.P2PGetAddr) (*pb.P2PAddr, error) { func (s *P2pserver) GetAddr(ctx context.Context, in *pb.P2PGetAddr) (*pb.P2PAddr, error) {
log.Debug("GETADDR", "RECV ADDR", in, "OutBound Len", s.node.Size()) log.Debug("GETADDR", "RECV ADDR", in, "OutBound Len", s.node.Size())
var addrlist []string var addrlist []string
peers, _ := s.node.GetActivePeers() peers, _ := s.node.GetActivePeers()
...@@ -127,6 +129,7 @@ func (s *P2pserver) Version(ctx context.Context, in *pb.P2PVersion) (*pb.P2PVerA ...@@ -127,6 +129,7 @@ func (s *P2pserver) Version(ctx context.Context, in *pb.P2PVersion) (*pb.P2PVerA
// Version2 p2pserver version // Version2 p2pserver version
func (s *P2pserver) Version2(ctx context.Context, in *pb.P2PVersion) (*pb.P2PVersion, error) { func (s *P2pserver) Version2(ctx context.Context, in *pb.P2PVersion) (*pb.P2PVersion, error) {
log.Debug("Version2") log.Debug("Version2")
var peerip string var peerip string
var err error var err error
...@@ -177,6 +180,7 @@ func (s *P2pserver) SoftVersion(ctx context.Context, in *pb.P2PPing) (*pb.Reply, ...@@ -177,6 +180,7 @@ func (s *P2pserver) SoftVersion(ctx context.Context, in *pb.P2PPing) (*pb.Reply,
// BroadCastTx broadcast transactions of p2pserver // BroadCastTx broadcast transactions of p2pserver
func (s *P2pserver) BroadCastTx(ctx context.Context, in *pb.P2PTx) (*pb.Reply, error) { func (s *P2pserver) BroadCastTx(ctx context.Context, in *pb.P2PTx) (*pb.Reply, error) {
log.Debug("p2pServer RECV TRANSACTION", "in", in) log.Debug("p2pServer RECV TRANSACTION", "in", in)
client := s.node.nodeInfo.client client := s.node.nodeInfo.client
msg := client.NewMessage("mempool", pb.EventTx, in.Tx) msg := client.NewMessage("mempool", pb.EventTx, in.Tx)
err := client.Send(msg, false) err := client.Send(msg, false)
...@@ -223,6 +227,7 @@ func (s *P2pserver) GetMemPool(ctx context.Context, in *pb.P2PGetMempool) (*pb.P ...@@ -223,6 +227,7 @@ func (s *P2pserver) GetMemPool(ctx context.Context, in *pb.P2PGetMempool) (*pb.P
if !s.checkVersion(in.GetVersion()) { if !s.checkVersion(in.GetVersion()) {
return nil, pb.ErrVersion return nil, pb.ErrVersion
} }
memtx, err := s.loadMempool() memtx, err := s.loadMempool()
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -244,6 +249,7 @@ func (s *P2pserver) GetData(in *pb.P2PGetData, stream pb.P2Pgservice_GetDataServ ...@@ -244,6 +249,7 @@ func (s *P2pserver) GetData(in *pb.P2PGetData, stream pb.P2Pgservice_GetDataServ
if !s.checkVersion(in.GetVersion()) { if !s.checkVersion(in.GetVersion()) {
return pb.ErrVersion return pb.ErrVersion
} }
invs := in.GetInvs() invs := in.GetInvs()
client := s.node.nodeInfo.client client := s.node.nodeInfo.client
for _, inv := range invs { //过滤掉不需要的数据 for _, inv := range invs { //过滤掉不需要的数据
...@@ -314,6 +320,7 @@ func (s *P2pserver) GetHeaders(ctx context.Context, in *pb.P2PGetHeaders) (*pb.P ...@@ -314,6 +320,7 @@ func (s *P2pserver) GetHeaders(ctx context.Context, in *pb.P2PGetHeaders) (*pb.P
if !s.checkVersion(in.GetVersion()) { if !s.checkVersion(in.GetVersion()) {
return nil, pb.ErrVersion return nil, pb.ErrVersion
} }
if in.GetEndHeight()-in.GetStartHeight() > 2000 || in.GetEndHeight() < in.GetStartHeight() { if in.GetEndHeight()-in.GetStartHeight() > 2000 || in.GetEndHeight() < in.GetStartHeight() {
return nil, fmt.Errorf("out of range") return nil, fmt.Errorf("out of range")
} }
...@@ -341,6 +348,7 @@ func (s *P2pserver) GetPeerInfo(ctx context.Context, in *pb.P2PGetPeerInfo) (*pb ...@@ -341,6 +348,7 @@ func (s *P2pserver) GetPeerInfo(ctx context.Context, in *pb.P2PGetPeerInfo) (*pb
if !s.checkVersion(in.GetVersion()) { if !s.checkVersion(in.GetVersion()) {
return nil, pb.ErrVersion return nil, pb.ErrVersion
} }
client := s.node.nodeInfo.client client := s.node.nodeInfo.client
log.Debug("GetPeerInfo", "GetMempoolSize", "befor") log.Debug("GetPeerInfo", "GetMempoolSize", "befor")
msg := client.NewMessage("mempool", pb.EventGetMempoolSize, nil) msg := client.NewMessage("mempool", pb.EventGetMempoolSize, nil)
...@@ -386,6 +394,7 @@ func (s *P2pserver) GetPeerInfo(ctx context.Context, in *pb.P2PGetPeerInfo) (*pb ...@@ -386,6 +394,7 @@ func (s *P2pserver) GetPeerInfo(ctx context.Context, in *pb.P2PGetPeerInfo) (*pb
// BroadCastBlock broadcast block of p2pserver // BroadCastBlock broadcast block of p2pserver
func (s *P2pserver) BroadCastBlock(ctx context.Context, in *pb.P2PBlock) (*pb.Reply, error) { func (s *P2pserver) BroadCastBlock(ctx context.Context, in *pb.P2PBlock) (*pb.Reply, error) {
log.Debug("BroadCastBlock") log.Debug("BroadCastBlock")
client := s.node.nodeInfo.client client := s.node.nodeInfo.client
msg := client.NewMessage("blockchain", pb.EventBroadcastAddBlock, in.GetBlock()) msg := client.NewMessage("blockchain", pb.EventBroadcastAddBlock, in.GetBlock())
err := client.Send(msg, false) err := client.Send(msg, false)
...@@ -401,6 +410,7 @@ func (s *P2pserver) ServerStreamSend(in *pb.P2PPing, stream pb.P2Pgservice_Serve ...@@ -401,6 +410,7 @@ func (s *P2pserver) ServerStreamSend(in *pb.P2PPing, stream pb.P2Pgservice_Serve
if len(s.getInBoundPeers()) > int(s.node.nodeInfo.cfg.InnerBounds) { if len(s.getInBoundPeers()) > int(s.node.nodeInfo.cfg.InnerBounds) {
return fmt.Errorf("beyound max inbound num") return fmt.Errorf("beyound max inbound num")
} }
log.Debug("ServerStreamSend") log.Debug("ServerStreamSend")
peername := hex.EncodeToString(in.GetSign().GetPubkey()) peername := hex.EncodeToString(in.GetSign().GetPubkey())
dataChain := s.addStreamHandler(stream) dataChain := s.addStreamHandler(stream)
...@@ -446,20 +456,28 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe ...@@ -446,20 +456,28 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
return fmt.Errorf("beyound max inbound num:%v>%v", len(s.getInBoundPeers()), int(s.node.nodeInfo.cfg.InnerBounds)) return fmt.Errorf("beyound max inbound num:%v>%v", len(s.getInBoundPeers()), int(s.node.nodeInfo.cfg.InnerBounds))
} }
log.Debug("StreamRead") log.Debug("StreamRead")
var remoteIP string
var err error
getctx, ok := pr.FromContext(stream.Context())
if ok {
remoteIP, _, err = net.SplitHostPort(getctx.Addr.String())
if err != nil {
return fmt.Errorf("ctx.Addr format err")
}
} else {
return fmt.Errorf("getctx err")
}
var hash [64]byte var hash [64]byte
var peeraddr, peername string var peeraddr, peername string
defer s.deleteInBoundPeerInfo(peername) defer s.deleteInBoundPeerInfo(peername)
var in = new(pb.BroadCastData) var in = new(pb.BroadCastData)
var err error
for { for {
if s.IsClose() { if s.IsClose() {
return fmt.Errorf("node close") return fmt.Errorf("node close")
} }
in, err = stream.Recv() in, err = stream.Recv()
if err == io.EOF {
log.Info("ServerStreamRead", "Recv", "EOF")
return err
}
if err != nil { if err != nil {
log.Error("ServerStreamRead", "Recv", err) log.Error("ServerStreamRead", "Recv", err)
return err return err
...@@ -510,25 +528,19 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe ...@@ -510,25 +528,19 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
} else if ping := in.GetPing(); ping != nil { ///被远程节点初次连接后,会收到ping 数据包,收到后注册到inboundpeers. } else if ping := in.GetPing(); ping != nil { ///被远程节点初次连接后,会收到ping 数据包,收到后注册到inboundpeers.
//Ping package //Ping package
if !P2pComm.CheckSign(ping) { if !P2pComm.CheckSign(ping) {
log.Error("ServerStreamRead", "check stream", "check sig err") log.Error("ServerStreamRead", "check stream", "check sig err")
return pb.ErrStreamPing return pb.ErrStreamPing
} }
getctx, ok := pr.FromContext(stream.Context()) if s.node.Size() > 0 {
if ok && s.node.Size() > 0 {
//peerIp := strings.Split(getctx.Addr.String(), ":")[0] if remoteIP != LocalAddr && remoteIP != s.node.nodeInfo.GetExternalAddr().IP.String() {
peerIP, _, err := net.SplitHostPort(getctx.Addr.String())
if err != nil {
return fmt.Errorf("ctx.Addr format err")
}
if peerIP != LocalAddr && peerIP != s.node.nodeInfo.GetExternalAddr().IP.String() {
s.node.nodeInfo.SetServiceTy(Service) s.node.nodeInfo.SetServiceTy(Service)
} }
} }
peername = hex.EncodeToString(ping.GetSign().GetPubkey()) peername = hex.EncodeToString(ping.GetSign().GetPubkey())
peeraddr = fmt.Sprintf("%s:%v", in.GetPing().GetAddr(), in.GetPing().GetPort()) peeraddr = fmt.Sprintf("%s:%v", remoteIP, in.GetPing().GetPort())
s.addInBoundPeerInfo(peername, innerpeer{addr: peeraddr, name: peername, timestamp: pb.Now().Unix()}) s.addInBoundPeerInfo(peername, innerpeer{addr: peeraddr, name: peername, timestamp: pb.Now().Unix()})
} else if ver := in.GetVersion(); ver != nil { } else if ver := in.GetVersion(); ver != nil {
//接收版本信息 //接收版本信息
...@@ -537,9 +549,15 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe ...@@ -537,9 +549,15 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
p2pversion := ver.GetP2Pversion() p2pversion := ver.GetP2Pversion()
innerpeer := s.getInBoundPeerInfo(peername) innerpeer := s.getInBoundPeerInfo(peername)
if innerpeer != nil { if innerpeer != nil {
if !s.checkVersion(p2pversion) {
return pb.ErrVersion
}
innerpeer.p2pversion = p2pversion innerpeer.p2pversion = p2pversion
innerpeer.softversion = softversion innerpeer.softversion = softversion
s.addInBoundPeerInfo(peername, *innerpeer) s.addInBoundPeerInfo(peername, *innerpeer)
} else {
//没有获取到peername 的信息,说明没有获取ping的消息包
return pb.ErrStreamPing
} }
} }
......
...@@ -142,8 +142,9 @@ func (p *Peer) heartBeat() { ...@@ -142,8 +142,9 @@ func (p *Peer) heartBeat() {
go p.readStream() go p.readStream()
break break
} else { } else {
time.Sleep(time.Second * 5) //版本不对,直接关掉
continue p.Close()
return
} }
} }
...@@ -206,6 +207,7 @@ func (p *Peer) sendStream() { ...@@ -206,6 +207,7 @@ func (p *Peer) sendStream() {
p2pdata := new(pb.BroadCastData) p2pdata := new(pb.BroadCastData)
p2pdata.Value = &pb.BroadCastData_Ping{Ping: ping} p2pdata.Value = &pb.BroadCastData_Ping{Ping: ping}
if err := resp.Send(p2pdata); err != nil { if err := resp.Send(p2pdata); err != nil {
P2pComm.CollectPeerStat(err, p)
errs := resp.CloseSend() errs := resp.CloseSend()
if errs != nil { if errs != nil {
log.Error("CloseSend", "err", errs) log.Error("CloseSend", "err", errs)
...@@ -222,13 +224,13 @@ func (p *Peer) sendStream() { ...@@ -222,13 +224,13 @@ func (p *Peer) sendStream() {
Softversion: v.GetVersion(), Peername: peername}} Softversion: v.GetVersion(), Peername: peername}}
if err := resp.Send(p2pdata); err != nil { if err := resp.Send(p2pdata); err != nil {
P2pComm.CollectPeerStat(err, p)
errs := resp.CloseSend() errs := resp.CloseSend()
if errs != nil { if errs != nil {
log.Error("CloseSend", "err", errs) log.Error("CloseSend", "err", errs)
} }
cancel() cancel()
log.Error("sendStream", "sendping", err) log.Error("sendStream", "sendversion", err)
time.Sleep(time.Second)
continue continue
} }
timeout := time.NewTimer(time.Second * 2) timeout := time.NewTimer(time.Second * 2)
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
package p2p package p2p
// VERSION number // VERSION number
var VERSION int32 = 119 var VERSION int32
//更新内容: //更新内容:
// 1.p2p 修改为在nat结束后,在启动peer的stream,ping,version 等功能 // 1.p2p 修改为在nat结束后,在启动peer的stream,ping,version 等功能
......
...@@ -243,7 +243,7 @@ func (c *Chain33) GetTxByAddr(in types.ReqAddr, result *interface{}) error { ...@@ -243,7 +243,7 @@ func (c *Chain33) GetTxByAddr(in types.ReqAddr, result *interface{}) error {
infos := reply.GetTxInfos() infos := reply.GetTxInfos()
for _, info := range infos { for _, info := range infos {
txinfos.TxInfos = append(txinfos.TxInfos, &rpctypes.ReplyTxInfo{Hash: common.ToHex(info.GetHash()), txinfos.TxInfos = append(txinfos.TxInfos, &rpctypes.ReplyTxInfo{Hash: common.ToHex(info.GetHash()),
Height: info.GetHeight(), Index: info.GetIndex(), Assets: info.Assets}) Height: info.GetHeight(), Index: info.GetIndex(), Assets: fmtAsssets(info.Assets)})
} }
*result = &txinfos *result = &txinfos
} }
...@@ -336,10 +336,23 @@ func fmtTxDetail(tx *types.TransactionDetail, disableDetail bool) (*rpctypes.Tra ...@@ -336,10 +336,23 @@ func fmtTxDetail(tx *types.TransactionDetail, disableDetail bool) (*rpctypes.Tra
Amount: tx.GetAmount(), Amount: tx.GetAmount(),
Fromaddr: tx.GetFromaddr(), Fromaddr: tx.GetFromaddr(),
ActionName: tx.GetActionName(), ActionName: tx.GetActionName(),
Assets: tx.GetAssets(), Assets: fmtAsssets(tx.GetAssets()),
}, nil }, nil
} }
func fmtAsssets(assets []*types.Asset) []*rpctypes.Asset {
var result []*rpctypes.Asset
for _, a := range assets {
asset := &rpctypes.Asset{
Exec: a.Exec,
Symbol: a.Symbol,
Amount: a.Amount,
}
result = append(result, asset)
}
return result
}
// GetMempool get mempool information // GetMempool get mempool information
func (c *Chain33) GetMempool(in *types.ReqNil, result *interface{}) error { func (c *Chain33) GetMempool(in *types.ReqNil, result *interface{}) error {
......
...@@ -178,14 +178,16 @@ func NewGRpcServer(c queue.Client, api client.QueueProtocolAPI) *Grpcserver { ...@@ -178,14 +178,16 @@ func NewGRpcServer(c queue.Client, api client.QueueProtocolAPI) *Grpcserver {
func NewJSONRPCServer(c queue.Client, api client.QueueProtocolAPI) *JSONRPCServer { func NewJSONRPCServer(c queue.Client, api client.QueueProtocolAPI) *JSONRPCServer {
j := &JSONRPCServer{jrpc: &Chain33{}} j := &JSONRPCServer{jrpc: &Chain33{}}
j.jrpc.cli.Init(c, api) j.jrpc.cli.Init(c, api)
grpcCli, err := grpcclient.NewMainChainClient("") if types.IsPara() {
if err != nil { grpcCli, err := grpcclient.NewMainChainClient("")
panic(err) if err != nil {
panic(err)
}
j.jrpc.mainGrpcCli = grpcCli
} }
j.jrpc.mainGrpcCli = grpcCli
server := rpc.NewServer() server := rpc.NewServer()
j.s = server j.s = server
err = server.RegisterName("Chain33", j.jrpc) err := server.RegisterName("Chain33", j.jrpc)
if err != nil { if err != nil {
return nil return nil
} }
......
...@@ -7,8 +7,6 @@ package types ...@@ -7,8 +7,6 @@ package types
import ( import (
"encoding/json" "encoding/json"
"github.com/33cn/chain33/types"
) )
// TransParm transport parameter // TransParm transport parameter
...@@ -135,6 +133,13 @@ type BlockDetails struct { ...@@ -135,6 +133,13 @@ type BlockDetails struct {
Items []*BlockDetail `json:"items"` Items []*BlockDetail `json:"items"`
} }
// Asset asset
type Asset struct {
Exec string `json:"exec"`
Symbol string `json:"symbol"`
Amount int64 `json:"amount"`
}
// TransactionDetail transaction detail // TransactionDetail transaction detail
type TransactionDetail struct { type TransactionDetail struct {
Tx *Transaction `json:"tx"` Tx *Transaction `json:"tx"`
...@@ -146,7 +151,7 @@ type TransactionDetail struct { ...@@ -146,7 +151,7 @@ type TransactionDetail struct {
Amount int64 `json:"amount"` Amount int64 `json:"amount"`
Fromaddr string `json:"fromAddr"` Fromaddr string `json:"fromAddr"`
ActionName string `json:"actionName"` ActionName string `json:"actionName"`
Assets []*types.Asset `json:"assets"` Assets []*Asset `json:"assets"`
} }
// ReplyTxInfos reply tx infos // ReplyTxInfos reply tx infos
...@@ -156,10 +161,10 @@ type ReplyTxInfos struct { ...@@ -156,10 +161,10 @@ type ReplyTxInfos struct {
// ReplyTxInfo reply tx information // ReplyTxInfo reply tx information
type ReplyTxInfo struct { type ReplyTxInfo struct {
Hash string `json:"hash"` Hash string `json:"hash"`
Height int64 `json:"height"` Height int64 `json:"height"`
Index int64 `json:"index"` Index int64 `json:"index"`
Assets []*types.Asset `json:"assets"` Assets []*Asset `json:"assets"`
} }
// TransactionDetails transaction details // TransactionDetails transaction details
......
...@@ -7,7 +7,6 @@ package types ...@@ -7,7 +7,6 @@ package types
import ( import (
rpctypes "github.com/33cn/chain33/rpc/types" rpctypes "github.com/33cn/chain33/rpc/types"
"github.com/33cn/chain33/types"
) )
// AccountsResult defines accountsresult command // AccountsResult defines accountsresult command
...@@ -85,7 +84,7 @@ type TxDetailResult struct { ...@@ -85,7 +84,7 @@ type TxDetailResult struct {
Amount string `json:"amount"` Amount string `json:"amount"`
Fromaddr string `json:"fromaddr"` Fromaddr string `json:"fromaddr"`
ActionName string `json:"actionname"` ActionName string `json:"actionname"`
Assets []*types.Asset `json:"assets"` Assets []*rpctypes.Asset `json:"assets"`
} }
// TxDetailsResult defines txdetails result command // TxDetailsResult defines txdetails result command
......
...@@ -6,6 +6,7 @@ package mavl ...@@ -6,6 +6,7 @@ package mavl
import ( import (
"fmt" "fmt"
"sync"
"time" "time"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
...@@ -26,6 +27,7 @@ type MemTreeOpera interface { ...@@ -26,6 +27,7 @@ type MemTreeOpera interface {
// TreeMap map形式memtree // TreeMap map形式memtree
type TreeMap struct { type TreeMap struct {
mpCache map[interface{}]interface{} mpCache map[interface{}]interface{}
lock sync.RWMutex
} }
// NewTreeMap new mem tree // NewTreeMap new mem tree
...@@ -37,6 +39,8 @@ func NewTreeMap(size int) *TreeMap { ...@@ -37,6 +39,8 @@ func NewTreeMap(size int) *TreeMap {
// Add 添加元素 // Add 添加元素
func (tm *TreeMap) Add(key, value interface{}) { func (tm *TreeMap) Add(key, value interface{}) {
tm.lock.Lock()
defer tm.lock.Unlock()
if _, ok := tm.mpCache[key]; ok { if _, ok := tm.mpCache[key]; ok {
delete(tm.mpCache, key) delete(tm.mpCache, key)
return return
...@@ -46,6 +50,8 @@ func (tm *TreeMap) Add(key, value interface{}) { ...@@ -46,6 +50,8 @@ func (tm *TreeMap) Add(key, value interface{}) {
// Get 获取元素 // Get 获取元素
func (tm *TreeMap) Get(key interface{}) (value interface{}, ok bool) { func (tm *TreeMap) Get(key interface{}) (value interface{}, ok bool) {
tm.lock.Lock()
defer tm.lock.Unlock()
if value, ok := tm.mpCache[key]; ok { if value, ok := tm.mpCache[key]; ok {
return value, ok return value, ok
} }
...@@ -54,6 +60,8 @@ func (tm *TreeMap) Get(key interface{}) (value interface{}, ok bool) { ...@@ -54,6 +60,8 @@ func (tm *TreeMap) Get(key interface{}) (value interface{}, ok bool) {
// Delete 删除元素 // Delete 删除元素
func (tm *TreeMap) Delete(key interface{}) { func (tm *TreeMap) Delete(key interface{}) {
tm.lock.Lock()
defer tm.lock.Unlock()
if _, ok := tm.mpCache[key]; ok { if _, ok := tm.mpCache[key]; ok {
delete(tm.mpCache, key) delete(tm.mpCache, key)
} }
...@@ -61,6 +69,8 @@ func (tm *TreeMap) Delete(key interface{}) { ...@@ -61,6 +69,8 @@ func (tm *TreeMap) Delete(key interface{}) {
// Contains 查看是否包含元素 // Contains 查看是否包含元素
func (tm *TreeMap) Contains(key interface{}) bool { func (tm *TreeMap) Contains(key interface{}) bool {
tm.lock.Lock()
defer tm.lock.Unlock()
if _, ok := tm.mpCache[key]; ok { if _, ok := tm.mpCache[key]; ok {
return true return true
} }
...@@ -69,6 +79,8 @@ func (tm *TreeMap) Contains(key interface{}) bool { ...@@ -69,6 +79,8 @@ func (tm *TreeMap) Contains(key interface{}) bool {
// Len 元素长度 // Len 元素长度
func (tm *TreeMap) Len() int { func (tm *TreeMap) Len() int {
tm.lock.Lock()
defer tm.lock.Unlock()
return len(tm.mpCache) return len(tm.mpCache)
} }
......
...@@ -224,7 +224,6 @@ func S(key string, value interface{}) { ...@@ -224,7 +224,6 @@ func S(key string, value interface{}) {
} else { } else {
tlog.Error("modify " + key + " is only for test") tlog.Error("modify " + key + " is only for test")
} }
return
} }
setChainConfig(key, value) setChainConfig(key, value)
} }
......
...@@ -12,7 +12,6 @@ import ( ...@@ -12,7 +12,6 @@ import (
"math/rand" "math/rand"
"os" "os"
"strings" "strings"
"sync"
"time" "time"
"github.com/33cn/chain33/account" "github.com/33cn/chain33/account"
...@@ -49,7 +48,6 @@ func init() { ...@@ -49,7 +48,6 @@ func init() {
//保证只有一个chain33 会运行 //保证只有一个chain33 会运行
var lognode = log15.New("module", "lognode") var lognode = log15.New("module", "lognode")
var chain33globalLock sync.Mutex
//Chain33Mock : //Chain33Mock :
type Chain33Mock struct { type Chain33Mock struct {
...@@ -82,7 +80,6 @@ func NewWithConfig(cfg *types.Config, sub *types.ConfigSubModule, mockapi client ...@@ -82,7 +80,6 @@ func NewWithConfig(cfg *types.Config, sub *types.ConfigSubModule, mockapi client
} }
func newWithConfig(cfg *types.Config, sub *types.ConfigSubModule, mockapi client.QueueProtocolAPI) *Chain33Mock { func newWithConfig(cfg *types.Config, sub *types.ConfigSubModule, mockapi client.QueueProtocolAPI) *Chain33Mock {
chain33globalLock.Lock()
return newWithConfigNoLock(cfg, sub, mockapi) return newWithConfigNoLock(cfg, sub, mockapi)
} }
...@@ -172,13 +169,17 @@ func (mock *Chain33Mock) Listen() { ...@@ -172,13 +169,17 @@ func (mock *Chain33Mock) Listen() {
l := len(mock.cfg.RPC.GrpcBindAddr) l := len(mock.cfg.RPC.GrpcBindAddr)
mock.cfg.RPC.GrpcBindAddr = mock.cfg.RPC.GrpcBindAddr[0:l-2] + ":" + fmt.Sprint(portgrpc) mock.cfg.RPC.GrpcBindAddr = mock.cfg.RPC.GrpcBindAddr[0:l-2] + ":" + fmt.Sprint(portgrpc)
} }
if mock.sub.Consensus["para"] != nil { }
data, err := types.ModifySubConfig(mock.sub.Consensus["para"], "ParaRemoteGrpcClient", mock.cfg.RPC.GrpcBindAddr)
//ModifyParaClient modify para config
func ModifyParaClient(sub *types.ConfigSubModule, gaddr string) {
if sub.Consensus["para"] != nil {
data, err := types.ModifySubConfig(sub.Consensus["para"], "ParaRemoteGrpcClient", gaddr)
if err != nil { if err != nil {
panic(err) panic(err)
} }
mock.sub.Consensus["para"] = data sub.Consensus["para"] = data
types.S("config.consensus.sub.para.ParaRemoteGrpcClient", mock.cfg.RPC.GrpcBindAddr) types.S("config.consensus.sub.para.ParaRemoteGrpcClient", gaddr)
} }
} }
...@@ -292,7 +293,6 @@ func (mock *Chain33Mock) GetCfg() *types.Config { ...@@ -292,7 +293,6 @@ func (mock *Chain33Mock) GetCfg() *types.Config {
//Close : //Close :
func (mock *Chain33Mock) Close() { func (mock *Chain33Mock) Close() {
mock.closeNoLock() mock.closeNoLock()
chain33globalLock.Unlock()
} }
func (mock *Chain33Mock) closeNoLock() { func (mock *Chain33Mock) closeNoLock() {
...@@ -373,6 +373,22 @@ func (mock *Chain33Mock) SendTx(tx *types.Transaction) []byte { ...@@ -373,6 +373,22 @@ func (mock *Chain33Mock) SendTx(tx *types.Transaction) []byte {
return reply.GetMsg() return reply.GetMsg()
} }
//SendTxRPC :
func (mock *Chain33Mock) SendTxRPC(tx *types.Transaction) []byte {
var txhash string
hextx := common.ToHex(types.Encode(tx))
err := mock.GetJSONC().Call("Chain33.SendTransaction", &rpctypes.RawParm{Data: hextx}, &txhash)
if err != nil {
panic(err)
}
hash, err := common.FromHex(txhash)
if err != nil {
panic(err)
}
mock.lastsend = hash
return hash
}
//Wait : //Wait :
func (mock *Chain33Mock) Wait() error { func (mock *Chain33Mock) Wait() error {
if mock.lastsend == nil { if mock.lastsend == nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment