Commit 1ae720fe authored by vipwzw's avatar vipwzw Committed by mdj33

修改blockchain 模块的全局变量,尽量减少全局变量,防止多个实例启动后出问题

parent 3507c095
......@@ -24,20 +24,19 @@ import (
//var
var (
blockLastHeight = []byte("blockLastHeight")
bodyPerfix = []byte("Body:")
LastSequence = []byte("LastSequence")
headerPerfix = []byte("Header:")
heightToHeaderPerfix = []byte("HH:")
hashPerfix = []byte("Hash:")
tdPerfix = []byte("TD:")
heightToHashKeyPerfix = []byte("Height:")
seqToHashKey = []byte("Seq:")
HashToSeqPerfix = []byte("HashToSeq:")
seqCBPrefix = []byte("SCB:")
seqCBLastNumPrefix = []byte("SCBL:")
storeLog = chainlog.New("submodule", "store")
lastheaderlock sync.Mutex
blockLastHeight = []byte("blockLastHeight")
bodyPerfix = []byte("Body:")
LastSequence = []byte("LastSequence")
headerPerfix = []byte("Header:")
heightToHeaderPerfix = []byte("HH:")
hashPerfix = []byte("Hash:")
tdPerfix = []byte("TD:")
heightToHashKeyPerfix = []byte("Height:")
seqToHashKey = []byte("Seq:")
HashToSeqPerfix = []byte("HashToSeq:")
seqCBPrefix = []byte("SCB:")
seqCBLastNumPrefix = []byte("SCBL:")
storeLog = chainlog.New("submodule", "store")
AddBlock int64 = 1
DelBlock int64 = 2
)
......@@ -102,14 +101,16 @@ func calcHashToSequenceKey(hash []byte) []byte {
//BlockStore 区块存储
type BlockStore struct {
db dbm.DB
client queue.Client
height int64
lastBlock *types.Block
db dbm.DB
client queue.Client
height int64
lastBlock *types.Block
lastheaderlock sync.Mutex
chain *BlockChain
}
//NewBlockStore new
func NewBlockStore(db dbm.DB, client queue.Client) *BlockStore {
func NewBlockStore(chain *BlockChain, db dbm.DB, client queue.Client) *BlockStore {
height, err := LoadBlockStoreHeight(db)
if err != nil {
chainlog.Info("init::LoadBlockStoreHeight::database may be crash", "err", err.Error())
......@@ -121,6 +122,7 @@ func NewBlockStore(db dbm.DB, client queue.Client) *BlockStore {
height: height,
db: db,
client: client,
chain: chain,
}
if height == -1 {
chainlog.Info("load block height error, may be init database", "height", height)
......@@ -354,8 +356,8 @@ func (bs *BlockStore) UpdateHeight2(height int64) {
//LastHeader 返回BlockStore保存的当前blockheader
func (bs *BlockStore) LastHeader() *types.Header {
lastheaderlock.Lock()
defer lastheaderlock.Unlock()
bs.lastheaderlock.Lock()
defer bs.lastheaderlock.Unlock()
// 通过lastBlock获取lastheader
var blockheader = types.Header{}
......@@ -382,8 +384,8 @@ func (bs *BlockStore) UpdateLastBlock(hash []byte) {
storeLog.Error("UpdateLastBlock", "hash", common.ToHex(hash), "error", err)
return
}
lastheaderlock.Lock()
defer lastheaderlock.Unlock()
bs.lastheaderlock.Lock()
defer bs.lastheaderlock.Unlock()
if blockdetail != nil {
bs.lastBlock = blockdetail.Block
}
......@@ -392,16 +394,16 @@ func (bs *BlockStore) UpdateLastBlock(hash []byte) {
//UpdateLastBlock2 更新LastBlock到缓存中
func (bs *BlockStore) UpdateLastBlock2(block *types.Block) {
lastheaderlock.Lock()
defer lastheaderlock.Unlock()
bs.lastheaderlock.Lock()
defer bs.lastheaderlock.Unlock()
bs.lastBlock = block
storeLog.Debug("UpdateLastBlock", "UpdateLastBlock", block.Height, "LastHederhash", common.ToHex(block.Hash()))
}
//LastBlock 获取最新的block信息
func (bs *BlockStore) LastBlock() *types.Block {
lastheaderlock.Lock()
defer lastheaderlock.Unlock()
bs.lastheaderlock.Lock()
defer bs.lastheaderlock.Unlock()
if bs.lastBlock != nil {
return bs.lastBlock
}
......@@ -545,7 +547,7 @@ func (bs *BlockStore) SaveBlock(storeBatch dbm.Batch, blockdetail *types.BlockDe
//存储block height和block hash的对应关系,便于通过height查询block
storeBatch.Set(calcHeightToHashKey(height), hash)
if isRecordBlockSequence || isParaChain {
if bs.chain.isRecordBlockSequence || bs.chain.isParaChain {
//存储记录block序列执行的type add
lastSequence, err = bs.saveBlockSequence(storeBatch, hash, height, AddBlock, sequence)
if err != nil {
......@@ -574,7 +576,7 @@ func (bs *BlockStore) DelBlock(storeBatch dbm.Batch, blockdetail *types.BlockDet
storeBatch.Delete(calcHeightToHashKey(height))
storeBatch.Delete(calcHeightToBlockHeaderKey(height))
if isRecordBlockSequence || isParaChain {
if bs.chain.isRecordBlockSequence || bs.chain.isParaChain {
//存储记录block序列执行的type del
lastSequence, err := bs.saveBlockSequence(storeBatch, hash, height, DelBlock, sequence)
if err != nil {
......@@ -944,7 +946,7 @@ func (bs *BlockStore) saveBlockSequence(storeBatch dbm.Batch, hash []byte, heigh
var blockSequence types.BlockSequence
var newSequence int64
if isRecordBlockSequence {
if bs.chain.isRecordBlockSequence {
Sequence, err := bs.LoadBlockLastSequence()
if err != nil {
storeLog.Error("SaveBlockSequence", "LoadBlockLastSequence err", err)
......@@ -959,7 +961,7 @@ func (bs *BlockStore) saveBlockSequence(storeBatch dbm.Batch, hash []byte, heigh
storeLog.Error("isRecordBlockSequence is true must Synchronizing data from zero block", "height", height)
panic(errors.New("isRecordBlockSequence is true must Synchronizing data from zero block"))
}
} else if isParaChain {
} else if bs.chain.isParaChain {
newSequence = sequence
}
blockSequence.Hash = hash
......@@ -1091,7 +1093,7 @@ func (bs *BlockStore) SetUpgradeMeta(meta *types.UpgradeMeta) error {
}
//isRecordBlockSequence配置的合法性检测
func (bs *BlockStore) isRecordBlockSequenceValid() {
func (bs *BlockStore) isRecordBlockSequenceValid(chain *BlockChain) {
lastHeight := bs.Height()
lastSequence, err := bs.LoadBlockLastSequence()
if err != nil {
......@@ -1101,7 +1103,7 @@ func (bs *BlockStore) isRecordBlockSequenceValid() {
}
}
//使能isRecordBlockSequence时的检测
if isRecordBlockSequence {
if chain.isRecordBlockSequence {
//中途开启isRecordBlockSequence报错
if lastSequence == -1 && lastHeight != -1 {
storeLog.Error("isRecordBlockSequenceValid", "lastHeight", lastHeight, "lastSequence", lastSequence)
......
......@@ -24,17 +24,11 @@ import (
//var
var (
//cache 存贮的block个数
DefCacheSize int64 = 128
MaxSeqCB int64 = 20
cachelock sync.Mutex
zeroHash [32]byte
InitBlockNum int64 = 10240 //节点刚启动时从db向index和bestchain缓存中添加的blocknode数,和blockNodeCacheLimit保持一致
isStrongConsistency = false
chainlog = log.New("module", "blockchain")
FutureBlockDelayTime int64 = 1
isRecordBlockSequence = false //是否记录add或者del block的序列,方便blcokchain的恢复通过记录的序列表
isParaChain = false //是否是平行链。平行链需要记录Sequence信息
MaxSeqCB int64 = 20
zeroHash [32]byte
InitBlockNum int64 = 10240 //节点刚启动时从db向index和bestchain缓存中添加的blocknode数,和blockNodeCacheLimit保持一致
chainlog = log.New("module", "blockchain")
FutureBlockDelayTime int64 = 1
)
const maxFutureBlocks = 256
......@@ -99,19 +93,45 @@ type BlockChain struct {
futureBlocks *lru.Cache // future blocks are broadcast later processing
//downLoad block info
downLoadInfo *DownLoadInfo
downLoadlock sync.Mutex
downLoadInfo *DownLoadInfo
isFastDownloadSync bool //当本节点落后很多时,可以先下载区块到db,启动单独的goroutine去执行block
isRecordBlockSequence bool //是否记录add或者del block的序列,方便blcokchain的恢复通过记录的序列表
isParaChain bool //是否是平行链。平行链需要记录Sequence信息
isStrongConsistency bool
//lock
cachelock sync.Mutex
synBlocklock sync.Mutex
peerMaxBlklock sync.Mutex
castlock sync.Mutex
ntpClockSynclock sync.Mutex
faultpeerlock sync.Mutex
bestpeerlock sync.Mutex
downLoadlock sync.Mutex
fastDownLoadSynLock sync.Mutex
isNtpClockSync bool //ntp时间是否同步
//cfg
MaxFetchBlockNum int64 //一次最多申请获取block个数
TimeoutSeconds int64
blockSynInterVal time.Duration
DefCacheSize int64
failed int32
}
//New new
func New(cfg *types.BlockChain) *BlockChain {
initConfig(cfg)
futureBlocks, err := lru.New(maxFutureBlocks)
if err != nil {
panic("when New BlockChain lru.New return err")
}
defCacheSize := int64(128)
if cfg.DefCacheSize > 0 {
defCacheSize = cfg.DefCacheSize
}
blockchain := &BlockChain{
cache: NewBlockCache(DefCacheSize),
cache: NewBlockCache(defCacheSize),
DefCacheSize: defCacheSize,
rcvLastBlockHeight: -1,
synBlockHeight: -1,
peerList: nil,
......@@ -134,30 +154,31 @@ func New(cfg *types.BlockChain) *BlockChain {
bestChainPeerList: make(map[string]*BestPeerInfo),
futureBlocks: futureBlocks,
downLoadInfo: &DownLoadInfo{},
isNtpClockSync: true,
MaxFetchBlockNum: 128 * 6, //一次最多申请获取block个数
TimeoutSeconds: 2,
isFastDownloadSync: true,
}
blockchain.initConfig(cfg)
return blockchain
}
func initConfig(cfg *types.BlockChain) {
if cfg.DefCacheSize > 0 {
DefCacheSize = cfg.DefCacheSize
}
if types.IsEnable("TxHeight") && DefCacheSize <= (types.LowAllowPackHeight+types.HighAllowPackHeight+1) {
func (chain *BlockChain) initConfig(cfg *types.BlockChain) {
if types.IsEnable("TxHeight") && chain.DefCacheSize <= (types.LowAllowPackHeight+types.HighAllowPackHeight+1) {
panic("when Enable TxHeight DefCacheSize must big than types.LowAllowPackHeight")
}
if cfg.MaxFetchBlockNum > 0 {
MaxFetchBlockNum = cfg.MaxFetchBlockNum
chain.MaxFetchBlockNum = cfg.MaxFetchBlockNum
}
if cfg.TimeoutSeconds > 0 {
TimeoutSeconds = cfg.TimeoutSeconds
chain.TimeoutSeconds = cfg.TimeoutSeconds
}
isStrongConsistency = cfg.IsStrongConsistency
isRecordBlockSequence = cfg.IsRecordBlockSequence
isParaChain = cfg.IsParaChain
chain.blockSynInterVal = time.Duration(chain.TimeoutSeconds)
chain.isStrongConsistency = cfg.IsStrongConsistency
chain.isRecordBlockSequence = cfg.IsRecordBlockSequence
chain.isParaChain = cfg.IsParaChain
types.S("quickIndex", cfg.EnableTxQuickIndex)
}
......@@ -193,7 +214,7 @@ func (chain *BlockChain) SetQueueClient(client queue.Client) {
chain.client.Sub("blockchain")
blockStoreDB := dbm.NewDB("blockchain", chain.cfg.Driver, chain.cfg.DbPath, chain.cfg.DbCache)
blockStore := NewBlockStore(blockStoreDB, client)
blockStore := NewBlockStore(chain, blockStoreDB, client)
chain.blockStore = blockStore
stateHash := chain.getStateHash()
chain.query = NewQuery(blockStoreDB, chain.client, stateHash)
......@@ -226,7 +247,7 @@ func (chain *BlockChain) GetOrphanPool() *OrphanPool {
func (chain *BlockChain) InitBlockChain() {
//isRecordBlockSequence配置的合法性检测
if !chain.cfg.IsParaChain {
chain.blockStore.isRecordBlockSequenceValid()
chain.blockStore.isRecordBlockSequenceValid(chain)
}
//先缓存最新的128个block信息到cache中
curheight := chain.GetBlockHeight()
......@@ -394,7 +415,7 @@ func (chain *BlockChain) InitCache(height int64) {
if height < 0 {
return
}
for i := height - DefCacheSize; i <= height; i++ {
for i := height - chain.DefCacheSize; i <= height; i++ {
if i < 0 {
i = 0
}
......
......@@ -6,7 +6,6 @@ package blockchain
import (
"fmt"
"sync"
"sync/atomic"
"time"
......@@ -17,10 +16,8 @@ import (
//var
var (
tempBlockKey = []byte("TB:")
lastTempBlockKey = []byte("LTB:")
isFastDownloadSync = true //当本节点落后很多时,可以先下载区块到db,启动单独的goroutine去执行block
fastDownLoadSynLock sync.Mutex
tempBlockKey = []byte("TB:")
lastTempBlockKey = []byte("LTB:")
)
//const
......@@ -56,17 +53,17 @@ func calcLastTempBlockHeightKey() []byte {
}
//GetDownloadSyncStatus 获取下载区块的同步模式
func GetDownloadSyncStatus() bool {
fastDownLoadSynLock.Lock()
defer fastDownLoadSynLock.Unlock()
return isFastDownloadSync
func (chain *BlockChain) GetDownloadSyncStatus() bool {
chain.fastDownLoadSynLock.Lock()
defer chain.fastDownLoadSynLock.Unlock()
return chain.isFastDownloadSync
}
//UpdateDownloadSyncStatus 更新下载区块的同步模式
func UpdateDownloadSyncStatus(Sync bool) {
fastDownLoadSynLock.Lock()
defer fastDownLoadSynLock.Unlock()
isFastDownloadSync = Sync
func (chain *BlockChain) UpdateDownloadSyncStatus(Sync bool) {
chain.fastDownLoadSynLock.Lock()
defer chain.fastDownLoadSynLock.Unlock()
chain.isFastDownloadSync = Sync
}
//FastDownLoadBlocks 开启快速下载区块的模式
......@@ -93,7 +90,7 @@ func (chain *BlockChain) FastDownLoadBlocks() {
pids := chain.GetBestChainPids()
//节点启动时只有落后最优链batchsyncblocknum个区块时才开启这种下载模式
if pids != nil && peerMaxBlkHeight != -1 && curheight+batchsyncblocknum >= peerMaxBlkHeight {
UpdateDownloadSyncStatus(false)
chain.UpdateDownloadSyncStatus(false)
synlog.Info("FastDownLoadBlocks:quit!", "curheight", curheight, "peerMaxBlkHeight", peerMaxBlkHeight)
break
} else if curheight+batchsyncblocknum < peerMaxBlkHeight && len(pids) >= bestPeerCount {
......@@ -102,7 +99,7 @@ func (chain *BlockChain) FastDownLoadBlocks() {
go chain.ReadBlockToExec(peerMaxBlkHeight, true)
break
} else if types.Since(startTime) > waitTimeDownLoad*time.Second || chain.cfg.SingleMode {
UpdateDownloadSyncStatus(false)
chain.UpdateDownloadSyncStatus(false)
synlog.Info("FastDownLoadBlocks:waitTimeDownLoad:quit!", "curheight", curheight, "peerMaxBlkHeight", peerMaxBlkHeight, "pids", pids)
break
} else {
......@@ -183,7 +180,7 @@ func (chain *BlockChain) ReadBlockToExec(height int64, isNewStart bool) {
//CancelFastDownLoadFlag 清除快速下载模式的一些标志
func (chain *BlockChain) cancelFastDownLoadFlag(isNewStart bool) {
if isNewStart {
UpdateDownloadSyncStatus(false)
chain.UpdateDownloadSyncStatus(false)
}
chain.DelLastTempBlockHeight()
synlog.Info("cancelFastDownLoadFlag", "isNewStart", isNewStart)
......
......@@ -21,12 +21,11 @@ const (
var (
ntpLog = chainlog.New("submodule", "ntp")
failed int32
)
// checkClockDrift queries an NTP server for clock drifts and warns the user if
// one large enough is detected.
func checkClockDrift() {
func (chain *BlockChain) checkClockDrift() {
realnow := common.GetRealTimeRetry(types.NtpHosts, 10)
if realnow.IsZero() {
ntpLog.Info("checkClockDrift", "sntpDrift err", "get ntptime error")
......@@ -42,14 +41,14 @@ func checkClockDrift() {
ntpLog.Warn(fmt.Sprint(warning))
ntpLog.Warn(fmt.Sprint(howtofix))
ntpLog.Warn(fmt.Sprint(separator))
atomic.AddInt32(&failed, 1)
if atomic.LoadInt32(&failed) == ntpChecks {
atomic.AddInt32(&chain.failed, 1)
if atomic.LoadInt32(&chain.failed) == ntpChecks {
ntpLog.Error("System clock seems ERROR")
UpdateNtpClockSyncStatus(false)
chain.UpdateNtpClockSyncStatus(false)
}
} else {
atomic.StoreInt32(&failed, 0)
UpdateNtpClockSyncStatus(true)
atomic.StoreInt32(&chain.failed, 0)
chain.UpdateNtpClockSyncStatus(true)
ntpLog.Info(fmt.Sprintf("Sanity NTP check reported %v drift, all ok", drift))
}
}
......@@ -61,7 +61,7 @@ func (chain *BlockChain) ProcRecvMsg() {
case types.EventIsSync:
go chain.processMsg(msg, reqnum, chain.isSync)
case types.EventIsNtpClockSync:
go chain.processMsg(msg, reqnum, chain.isNtpClockSync)
go chain.processMsg(msg, reqnum, chain.isNtpClockSyncFunc)
case types.EventGetLastBlockSequence:
go chain.processMsg(msg, reqnum, chain.getLastBlockSequence)
......@@ -161,7 +161,7 @@ func (chain *BlockChain) addBlock(msg *queue.Message) {
reply.IsOk = true
blockpid := msg.Data.(*types.BlockPid)
//chainlog.Error("addBlock", "height", blockpid.Block.Height, "pid", blockpid.Pid)
if GetDownloadSyncStatus() {
if chain.GetDownloadSyncStatus() {
//downLoadTask 运行时设置对应的blockdone
if chain.downLoadTask.InProgress() {
chain.downLoadTask.Done(blockpid.Block.GetHeight())
......@@ -366,8 +366,8 @@ func (chain *BlockChain) getLastBlock(msg *queue.Message) {
}
}
func (chain *BlockChain) isNtpClockSync(msg *queue.Message) {
ok := GetNtpClockSyncStatus()
func (chain *BlockChain) isNtpClockSyncFunc(msg *queue.Message) {
ok := chain.GetNtpClockSyncStatus()
msg.Reply(chain.client.NewMessage("", types.EventReplyIsNtpClockSync, &types.IsNtpClockSync{Isntpclocksync: ok}))
}
......
......@@ -402,7 +402,7 @@ func (b *BlockChain) connectBlock(node *blockNode, blockdetail *types.BlockDetai
}
}
//目前非平行链并开启isRecordBlockSequence功能
if isRecordBlockSequence && !isParaChain {
if b.isRecordBlockSequence && !b.isParaChain {
b.pushseq.updateSeq(lastSequence)
}
return blockdetail, nil
......@@ -471,7 +471,7 @@ func (b *BlockChain) disconnectBlock(node *blockNode, blockdetail *types.BlockDe
chainlog.Debug("disconnectBlock success", "newtipnode.hash", common.ToHex(newtipnode.hash), "delblock.parent.hash", common.ToHex(blockdetail.Block.GetParentHash()))
//目前非平行链并开启isRecordBlockSequence功能
if isRecordBlockSequence && !isParaChain {
if b.isRecordBlockSequence && !b.isParaChain {
b.pushseq.updateSeq(lastSequence)
}
return nil
......
......@@ -96,7 +96,7 @@ func (chain *BlockChain) ProcAddBlockSeqCB(cb *types.BlockSeqCB) error {
return types.ErrInvalidParam
}
if !isRecordBlockSequence {
if !chain.isRecordBlockSequence {
return types.ErrRecordBlockSequence
}
if chain.blockStore.seqCBNum() >= MaxSeqCB && !chain.blockStore.isSeqCBExist(cb.Name) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment