Commit 1d956b38 authored by kingwang's avatar kingwang Committed by 33cn

update chain33 03-20

parent 5dac10d3
......@@ -41,7 +41,7 @@ type DB struct {
// NewCoinsAccount 新建账户
func NewCoinsAccount() *DB {
prefix := "mavl-coins-bty-"
prefix := "mavl-coins-" + types.GetCoinSymbol() + "-"
return newAccountDB(prefix)
}
......
......@@ -47,7 +47,7 @@ func GetLocalDBKeyList() [][]byte {
return [][]byte{
blockLastHeight, bodyPerfix, LastSequence, headerPerfix, heightToHeaderPerfix,
hashPerfix, tdPerfix, heightToHashKeyPerfix, seqToHashKey, HashToSeqPerfix,
seqCBPrefix, seqCBLastNumPrefix,
seqCBPrefix, seqCBLastNumPrefix, tempBlockKey, lastTempBlockKey,
}
}
......@@ -156,11 +156,12 @@ func NewBlockStore(db dbm.DB, client queue.Client) *BlockStore {
//如果没有,那么进行下面的步骤
//1. 先把hash 都给改成 TX:hash
//2. 把所有的 Tx:hash 都加一个 8字节的index
//3. 10000个区块 处理一次,并且打印进度
//3. 2000个交易处理一次,并且打印进度
//4. 全部处理完成了,添加quickIndex 的标记
func (bs *BlockStore) initQuickIndex(height int64) {
batch := bs.db.NewBatch(true)
var count int
var maxsize = 100 * 1024 * 1024
var count = 0
for i := int64(0); i <= height; i++ {
blockdetail, err := bs.LoadBlockByHeight(i)
if err != nil {
......@@ -172,17 +173,17 @@ func (bs *BlockStore) initQuickIndex(height int64) {
if err != nil {
panic(err)
}
count++
count += len(txresult)
batch.Set(types.CalcTxKey(hash), txresult)
batch.Set(types.CalcTxShortKey(hash), []byte("1"))
}
if count > 100000 {
if count > maxsize {
storeLog.Info("initQuickIndex", "height", i)
err := batch.Write()
if err != nil {
panic(err)
}
batch = bs.db.NewBatch(true)
batch.Reset()
count = 0
}
}
......@@ -192,6 +193,7 @@ func (bs *BlockStore) initQuickIndex(height int64) {
panic(err)
}
storeLog.Info("initQuickIndex", "height", height)
batch.Reset()
}
bs.saveQuickIndexFlag()
}
......
......@@ -42,13 +42,6 @@ var (
synlog = chainlog.New("submodule", "syn")
)
//ForkInfo blockchain模块fork处理结构体
type ForkInfo struct {
ForkStartHeight int64
ForkEndHeight int64
ForkPid string
}
//PeerInfo blockchain模块需要保存的peerinfo
type PeerInfo struct {
Name string
......@@ -128,6 +121,9 @@ func (chain *BlockChain) SynRoutine() {
//2分钟尝试检测一次最优链,确保本节点在最优链
checkBestChainTicker := time.NewTicker(120 * time.Second)
//节点启动后首先尝试开启快速下载模式
chain.tickerwg.Add(1)
go chain.FastDownLoadBlocks()
for {
select {
case <-chain.quit:
......@@ -135,9 +131,9 @@ func (chain *BlockChain) SynRoutine() {
return
case <-blockSynTicker.C:
//synlog.Info("blockSynTicker")
//SynBlocksFromPeers在task任务中也会go线程调用
//WaitGroup不太好处理,暂时不加入WaitGroup中
go chain.SynBlocksFromPeers()
if !GetDownloadSyncStatus() {
go chain.SynBlocksFromPeers()
}
case <-fetchPeerListTicker.C:
//synlog.Info("blockUpdateTicker")
......@@ -207,15 +203,19 @@ func (chain *BlockChain) FetchBlock(start int64, end int64, pid []string, syncOr
var cb func()
if syncOrfork {
//还有区块需要请求,挂接钩子回调函数
if requestblock.End < chain.forkInfo.ForkEndHeight {
if requestblock.End < chain.downLoadInfo.EndHeight {
cb = func() {
chain.ReqForkBlocks()
chain.ReqDownLoadBlocks()
}
chain.UpdateForkStartHeight(requestblock.End + 1)
} else { // 所有fork block已请求结束,恢复forkinfo为默认值
chain.DefaultForkInfo()
chain.UpdateDownLoadStartHeight(requestblock.End + 1)
//快速下载时需要及时更新bestpeer,防止下载侧链的block
if GetDownloadSyncStatus() {
chain.UpdateDownLoadPids()
}
} else { // 所有DownLoad block已请求结束,恢复DownLoadInfo为默认值
chain.DefaultDownLoadInfo()
}
err = chain.forktask.Start(requestblock.Start, requestblock.End, cb)
err = chain.downLoadTask.Start(requestblock.Start, requestblock.End, cb)
if err != nil {
return err
}
......@@ -225,13 +225,13 @@ func (chain *BlockChain) FetchBlock(start int64, end int64, pid []string, syncOr
chain.SynBlocksFromPeers()
}
}
err = chain.task.Start(requestblock.Start, requestblock.End, cb)
err = chain.syncTask.Start(requestblock.Start, requestblock.End, cb)
if err != nil {
return err
}
}
synlog.Debug("FetchBlock", "Start", requestblock.Start, "End", requestblock.End)
synlog.Info("FetchBlock", "Start", requestblock.Start, "End", requestblock.End)
msg := chain.client.NewMessage("p2p", types.EventFetchBlocks, &requestblock)
Err := chain.client.Send(msg, true)
if Err != nil {
......@@ -426,6 +426,20 @@ func (chain *BlockChain) GetPeers() PeerInfoList {
return peers
}
//GetPeersMap 获取peers的map列表方便查找
func (chain *BlockChain) GetPeersMap() map[string]bool {
peerMaxBlklock.Lock()
defer peerMaxBlklock.Unlock()
peersmap := make(map[string]bool)
if chain.peerList != nil {
for _, peer := range chain.peerList {
peersmap[peer.Name] = true
}
}
return peersmap
}
//IsFaultPeer 判断指定pid是否在故障faultPeerList中
func (chain *BlockChain) IsFaultPeer(pid string) bool {
faultpeerlock.Lock()
......@@ -562,11 +576,10 @@ func (chain *BlockChain) SynBlocksFromPeers() {
} else {
atomic.CompareAndSwapInt32(&chain.isbatchsync, 0, 1)
}
//synlog.Info("SynBlocksFromPeers", "isbatchsync", chain.isbatchsync)
//如果任务正常,那么不重复启动任务
if chain.task.InProgress() {
synlog.Info("chain task InProgress")
if chain.syncTask.InProgress() {
synlog.Info("chain syncTask InProgress")
return
}
//获取peers的最新高度.处理没有收到广播block的情况
......@@ -759,78 +772,15 @@ func (chain *BlockChain) ProcBlockHeaders(headers *types.Headers, pid string) er
peermaxheight := peerinfo.Height
//启动一个线程在后台获取分叉的blcok
if chain.forktask.InProgress() {
synlog.Info("ProcBlockHeaders forktask.InProgress")
if chain.downLoadTask.InProgress() {
synlog.Info("ProcBlockHeaders downLoadTask.InProgress")
return nil
}
go chain.ProcBlockChainFork(ForkHeight, peermaxheight, pid)
return nil
}
//ProcBlockChainFork 处理从peer获取的headers消息
func (chain *BlockChain) ProcBlockChainFork(forkStartHeight int64, forkEndHeight int64, pid string) {
forkinfo := chain.GetForkInfo()
//可能存在上次fork 处理过程中下载区块超时,forktask任务退出,但forkinfo没有恢复成默认值
if forkinfo.ForkStartHeight != -1 || forkinfo.ForkEndHeight != -1 {
synlog.Error("ProcBlockChainFork Fork processing", "pid", forkinfo.ForkPid, "ForkStartHeight", forkinfo.ForkStartHeight, "ForkEndHeight", forkinfo.ForkEndHeight)
}
chain.DefaultForkInfo()
chain.InitForkInfo(forkStartHeight, forkEndHeight, pid)
chain.ReqForkBlocks()
}
//InitForkInfo 开始新的fork处理
func (chain *BlockChain) InitForkInfo(forkStartHeight int64, forkEndHeight int64, pid string) {
chain.forklock.Lock()
defer chain.forklock.Unlock()
chain.forkInfo.ForkStartHeight = forkStartHeight
chain.forkInfo.ForkEndHeight = forkEndHeight
chain.forkInfo.ForkPid = pid
synlog.Debug("InitForkInfo Fork process begin", "ForkStartHeight", forkStartHeight, "ForkEndHeight", forkEndHeight, "pid", pid)
}
//DefaultForkInfo 将forkinfo恢复成默认值
func (chain *BlockChain) DefaultForkInfo() {
chain.forklock.Lock()
defer chain.forklock.Unlock()
chain.forkInfo.ForkStartHeight = -1
chain.forkInfo.ForkEndHeight = -1
chain.forkInfo.ForkPid = ""
synlog.Debug("DefaultForkInfo")
}
//GetForkInfo 获取forkinfo
func (chain *BlockChain) GetForkInfo() *ForkInfo {
chain.forklock.Lock()
defer chain.forklock.Unlock()
return chain.forkInfo
}
//UpdateForkStartHeight 更新fork 请求的起始block高度
func (chain *BlockChain) UpdateForkStartHeight(forkStartHeight int64) {
chain.forklock.Lock()
defer chain.forklock.Unlock()
chain.forkInfo.ForkStartHeight = forkStartHeight
synlog.Debug("UpdateForkStartHeight", "ForkStartHeight", chain.forkInfo.ForkStartHeight, "ForkEndHeight", chain.forkInfo.ForkEndHeight, "pid", chain.forkInfo.ForkPid)
}
//ReqForkBlocks 请求fork处理的blocks
func (chain *BlockChain) ReqForkBlocks() {
forkinfo := chain.GetForkInfo()
if forkinfo.ForkStartHeight != -1 && forkinfo.ForkEndHeight != -1 && forkinfo.ForkPid != "" {
synlog.Info("ReqForkBlocks", "ForkStartHeight", forkinfo.ForkStartHeight, "ForkEndHeight", forkinfo.ForkEndHeight, "pid", forkinfo.ForkPid)
err := chain.FetchBlock(forkinfo.ForkStartHeight, forkinfo.ForkEndHeight, []string{forkinfo.ForkPid}, true)
if err != nil {
synlog.Error("ReqForkBlocks FetchBlock ", "err", err)
}
//在快速下载block阶段不处理fork的处理
if !GetDownloadSyncStatus() {
go chain.ProcDownLoadBlocks(ForkHeight, peermaxheight, []string{pid})
}
return nil
}
//ProcAddBlockHeadersMsg 处理从peer获取的headers消息
......@@ -1019,7 +969,13 @@ func (chain *BlockChain) GetBestChainPids() []string {
bestpeerlock.Lock()
defer bestpeerlock.Unlock()
peersmap := chain.GetPeersMap()
for key, value := range chain.bestChainPeerList {
if !peersmap[value.Peer.Name] {
delete(chain.bestChainPeerList, value.Peer.Name)
synlog.Debug("GetBestChainPids:delete", "peer", value.Peer.Name)
continue
}
if value.IsBestChain {
ok := chain.IsFaultPeer(value.Peer.Name)
if !ok {
......@@ -1027,7 +983,7 @@ func (chain *BlockChain) GetBestChainPids() []string {
}
}
}
synlog.Debug("GetBestChainPids ", "pids", PeerPids)
synlog.Debug("GetBestChainPids", "pids", PeerPids)
return PeerPids
}
......
......@@ -47,9 +47,9 @@ type BlockChain struct {
blockStore *BlockStore
pushseq *pushseq
//cache 缓存block方便快速查询
cfg *types.BlockChain
task *Task
forktask *Task
cfg *types.BlockChain
syncTask *Task
downLoadTask *Task
query *Query
......@@ -98,9 +98,9 @@ type BlockChain struct {
//记录futureblocks
futureBlocks *lru.Cache // future blocks are broadcast later processing
//fork block req
forkInfo *ForkInfo
forklock sync.Mutex
//downLoad block info
downLoadInfo *DownLoadInfo
downLoadlock sync.Mutex
}
//New new
......@@ -119,8 +119,8 @@ func New(cfg *types.BlockChain) *BlockChain {
recvwg: &sync.WaitGroup{},
tickerwg: &sync.WaitGroup{},
task: newTask(300 * time.Second), //考虑到区块交易多时执行耗时,需要延长task任务的超时时间
forktask: newTask(300 * time.Second),
syncTask: newTask(300 * time.Second), //考虑到区块交易多时执行耗时,需要延长task任务的超时时间
downLoadTask: newTask(300 * time.Second),
quit: make(chan struct{}),
synblock: make(chan struct{}, 1),
......@@ -133,7 +133,7 @@ func New(cfg *types.BlockChain) *BlockChain {
faultPeerList: make(map[string]*FaultPeerInfo),
bestChainPeerList: make(map[string]*BestPeerInfo),
futureBlocks: futureBlocks,
forkInfo: &ForkInfo{},
downLoadInfo: &DownLoadInfo{},
}
return blockchain
......@@ -257,8 +257,8 @@ func (chain *BlockChain) InitBlockChain() {
// 定时处理futureblock
go chain.UpdateRoutine()
}
//初始化默认forkinfo
chain.DefaultForkInfo()
//初始化默认DownLoadInfo
chain.DefaultDownLoadInfo()
}
func (chain *BlockChain) getStateHash() []byte {
......
......@@ -121,6 +121,12 @@ func TestBlockChain(t *testing.T) {
testDelBlock(t, blockchain, curBlock)
testIsRecordFaultErr(t)
testGetsynBlkHeight(t, blockchain)
testProcDelChainBlockMsg(t, mock33, blockchain)
testFaultPeer(t, blockchain)
testCheckBlock(t, blockchain)
testWriteBlockToDbTemp(t, blockchain)
testReadBlockToExec(t, blockchain)
}
func testProcAddBlockMsg(t *testing.T, mock33 *testnode.Chain33Mock, blockchain *blockchain.BlockChain) {
......@@ -923,11 +929,11 @@ func testProcBlockChainFork(t *testing.T, blockchain *blockchain.BlockChain) {
chainlog.Info("testProcBlockChainFork begin --------------------")
curheight := blockchain.GetBlockHeight()
blockchain.ProcBlockChainFork(curheight-1, curheight+256, "self")
blockchain.ProcDownLoadBlocks(curheight-1, curheight+256, []string{"self"})
chainlog.Info("testProcBlockChainFork end --------------------")
}
func testAddBlockSeqCB(t *testing.T, blockchain *blockchain.BlockChain) {
func testAddBlockSeqCB(t *testing.T, chain *blockchain.BlockChain) {
chainlog.Info("testAddBlockSeqCB begin ---------------------")
cb := &types.BlockSeqCB{
......@@ -935,11 +941,11 @@ func testAddBlockSeqCB(t *testing.T, blockchain *blockchain.BlockChain) {
URL: "http://192.168.1.107:15760",
Encode: "json",
}
err := blockchain.ProcAddBlockSeqCB(cb)
blockchain.MaxSeqCB = 1
err := chain.ProcAddBlockSeqCB(cb)
require.NoError(t, err)
cbs, err := blockchain.ProcListBlockSeqCB()
cbs, err := chain.ProcListBlockSeqCB()
require.NoError(t, err)
exist := false
for _, temcb := range cbs.Items {
......@@ -950,10 +956,22 @@ func testAddBlockSeqCB(t *testing.T, blockchain *blockchain.BlockChain) {
if !exist {
t.Error("testAddBlockSeqCB listSeqCB fail", "cb", cb, "cbs", cbs)
}
num := blockchain.ProcGetSeqCBLastNum(cb.Name)
num := chain.ProcGetSeqCBLastNum(cb.Name)
if num != -1 {
t.Error("testAddBlockSeqCB getSeqCBLastNum", "num", num, "name", cb.Name)
}
cb2 := &types.BlockSeqCB{
Name: "test1",
URL: "http://192.168.1.107:15760",
Encode: "json",
}
err = chain.ProcAddBlockSeqCB(cb2)
if err != types.ErrTooManySeqCB {
t.Error("testAddBlockSeqCB", "cb", cb2, "err", err)
}
chainlog.Info("testAddBlockSeqCB end -------------------------")
}
func testIsRecordFaultErr(t *testing.T) {
......@@ -962,5 +980,132 @@ func testIsRecordFaultErr(t *testing.T) {
if isok {
t.Error("testIsRecordFaultErr IsRecordFaultErr", "isok", isok)
}
chainlog.Info("testIsRecordFaultErr begin ---------------------")
chainlog.Info("testIsRecordFaultErr end ---------------------")
}
func testProcDelChainBlockMsg(t *testing.T, mock33 *testnode.Chain33Mock, blockchain *blockchain.BlockChain) {
chainlog.Info("testProcDelChainBlockMsg begin --------------------")
curheight := blockchain.GetBlockHeight()
block, err := blockchain.GetBlock(curheight)
require.NoError(t, err)
var parablockDetail types.ParaChainBlockDetail
parablockDetail.Blockdetail = block
parablockDetail.Sequence = curheight
msgGen := mock33.GetClient().NewMessage("blockchain", types.EventDelParaChainBlockDetail, &parablockDetail)
mock33.GetClient().Send(msgGen, true)
mock33.GetClient().Wait(msgGen)
chainlog.Info("testProcDelChainBlockMsg end --------------------")
}
func testGetsynBlkHeight(t *testing.T, chain *blockchain.BlockChain) {
chainlog.Info("testGetsynBlkHeight begin --------------------")
curheight := chain.GetBlockHeight()
chain.UpdatesynBlkHeight(curheight)
height := chain.GetsynBlkHeight()
if height != curheight {
chainlog.Error("testGetsynBlkHeight", "curheight", curheight, "height", height)
}
//get peerinfo
peerinfo := chain.GetPeerInfo("self")
if peerinfo != nil {
chainlog.Error("testGetsynBlkHeight:GetPeerInfo", "peerinfo", peerinfo)
}
maxpeer := chain.GetMaxPeerInfo()
if maxpeer != nil {
chainlog.Error("testGetsynBlkHeight:GetMaxPeerInfo", "maxpeer", maxpeer)
}
chainlog.Info("testGetsynBlkHeight end --------------------")
}
func testFaultPeer(t *testing.T, chain *blockchain.BlockChain) {
chainlog.Info("testFaultPeer begin ---------------------")
curheight := chain.GetBlockHeight()
block, err := chain.GetBlock(curheight)
require.NoError(t, err)
isok := chain.IsFaultPeer("self")
if isok {
t.Error("testFaultPeer:IsFaultPeer")
}
//记录故障peer信息
chain.RecordFaultPeer("self", curheight+1, block.Block.Hash(), types.ErrSign)
var faultnode blockchain.FaultPeerInfo
var peerinfo blockchain.PeerInfo
peerinfo.Name = "self"
faultnode.Peer = &peerinfo
faultnode.FaultHeight = curheight + 1
faultnode.FaultHash = block.Block.Hash()
faultnode.ErrInfo = types.ErrSign
faultnode.ReqFlag = false
chain.AddFaultPeer(&faultnode)
peer := chain.GetFaultPeer("self")
if peer == nil {
t.Error("testFaultPeer:GetFaultPeer is nil")
}
chain.UpdateFaultPeer("self", true)
chain.RemoveFaultPeer("self")
chainlog.Info("testFaultPeer end ---------------------")
}
func testCheckBlock(t *testing.T, chain *blockchain.BlockChain) {
curheight := chain.GetBlockHeight()
//chain.CheckHeightNoIncrease()
chain.FetchBlockHeaders(0, curheight, "self")
header, err := chain.ProcGetLastHeaderMsg()
var blockheader types.Header
if header != nil && err == nil {
blockheader.Version = header.Version
blockheader.ParentHash = header.ParentHash
blockheader.TxHash = header.TxHash
blockheader.StateHash = header.StateHash
blockheader.Height = header.Height
blockheader.BlockTime = header.BlockTime
blockheader.TxCount = header.TxCount
blockheader.Hash = header.Hash
blockheader.Difficulty = header.Difficulty
blockheader.Signature = header.Signature
}
var blockheaders types.Headers
blockheaders.Items = append(blockheaders.Items, &blockheader)
chain.ProcAddBlockHeadersMsg(&blockheaders, "self")
chain.ProcBlockHeaders(&blockheaders, "self")
}
func testReadBlockToExec(t *testing.T, chain *blockchain.BlockChain) {
chainlog.Info("testReadBlockToExec begin ---------------------")
curheight := chain.GetBlockHeight()
chain.ReadBlockToExec(curheight+1, false)
chainlog.Info("testReadBlockToExec end ---------------------")
}
func testWriteBlockToDbTemp(t *testing.T, chain *blockchain.BlockChain) {
chainlog.Info("WriteBlockToDbTemp begin ---------------------")
curheight := chain.GetBlockHeight()
block, err := chain.GetBlock(curheight)
if err != nil {
t.Error("testWriteBlockToDbTemp", "err", err)
}
var rawblock types.Block
rawblock.Version = block.Block.Version
rawblock.ParentHash = block.Block.ParentHash
rawblock.TxHash = block.Block.TxHash
rawblock.StateHash = block.Block.StateHash
rawblock.BlockTime = block.Block.BlockTime
rawblock.Difficulty = block.Block.Difficulty
rawblock.MainHash = block.Block.MainHash
rawblock.MainHeight = block.Block.MainHeight
rawblock.Height = block.Block.Height + 1
err = chain.WriteBlockToDbTemp(&rawblock)
if err != nil {
t.Error("testWriteBlockToDbTemp", "err", err)
}
chainlog.Info("WriteBlockToDbTemp end ---------------------")
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package blockchain
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/types"
"github.com/golang/protobuf/proto"
)
//var
var (
tempBlockKey = []byte("TB:")
lastTempBlockKey = []byte("LTB:")
isFastDownloadSync = true //当本节点落后很多时,可以先下载区块到db,启动单独的goroutine去执行block
fastDownLoadSynLock sync.Mutex
)
//const
const (
//waitTimeDownLoad 节点启动之后等待开始快速下载的时间秒,超时就切换到普通同步模式
waitTimeDownLoad = 120
//快速下载时需要的最少peer数量
bestPeerCount = 2
)
//DownLoadInfo blockchain模块下载block处理结构体
type DownLoadInfo struct {
StartHeight int64
EndHeight int64
Pids []string
}
//ErrCountInfo 启动download时read一个block失败等待最长时间为2分钟,120秒
type ErrCountInfo struct {
Height int64
Count int64
}
//存储temp block height 对应的block
func calcHeightToTempBlockKey(height int64) []byte {
return append(tempBlockKey, []byte(fmt.Sprintf("%012d", height))...)
}
//存储last temp block height
func calcLastTempBlockHeightKey() []byte {
return lastTempBlockKey
}
//GetDownloadSyncStatus 获取下载区块的同步模式
func GetDownloadSyncStatus() bool {
fastDownLoadSynLock.Lock()
defer fastDownLoadSynLock.Unlock()
return isFastDownloadSync
}
//UpdateDownloadSyncStatus 更新下载区块的同步模式
func UpdateDownloadSyncStatus(Sync bool) {
fastDownLoadSynLock.Lock()
defer fastDownLoadSynLock.Unlock()
isFastDownloadSync = Sync
}
//FastDownLoadBlocks 开启快速下载区块的模式
func (chain *BlockChain) FastDownLoadBlocks() {
defer chain.tickerwg.Done()
curHeight := chain.GetBlockHeight()
lastTempHight := chain.GetLastTempBlockHeight()
synlog.Info("FastDownLoadBlocks", "curHeight", curHeight, "lastTempHight", lastTempHight)
//需要执行完上次已经下载并临时存贮在db中的blocks
if lastTempHight != -1 && lastTempHight > curHeight {
chain.ReadBlockToExec(lastTempHight, false)
}
//1:满足bestpeer数量,并且落后区块数量大于5000个开启快速同步
//2:落后区块数量小于5000个不开启快速同步,启动普通同步模式
//3:启动二分钟如果还不满足快速下载的条件就直接退出,启动普通同步模式
startTime := types.Now()
for {
curheight := chain.GetBlockHeight()
peerMaxBlkHeight := chain.GetPeerMaxBlkHeight()
pids := chain.GetBestChainPids()
//节点启动时只有落后最优链batchsyncblocknum个区块时才开启这种下载模式
if pids != nil && peerMaxBlkHeight != -1 && curheight+batchsyncblocknum >= peerMaxBlkHeight {
UpdateDownloadSyncStatus(false)
synlog.Info("FastDownLoadBlocks:quit!", "curheight", curheight, "peerMaxBlkHeight", peerMaxBlkHeight)
break
} else if curheight+batchsyncblocknum < peerMaxBlkHeight && len(pids) >= bestPeerCount {
synlog.Info("start download blocks!FastDownLoadBlocks", "curheight", curheight, "peerMaxBlkHeight", peerMaxBlkHeight)
go chain.ProcDownLoadBlocks(curheight, peerMaxBlkHeight, pids)
go chain.ReadBlockToExec(peerMaxBlkHeight, true)
break
} else if types.Since(startTime) > waitTimeDownLoad*time.Second || chain.cfg.SingleMode {
UpdateDownloadSyncStatus(false)
synlog.Info("FastDownLoadBlocks:waitTimeDownLoad:quit!", "curheight", curheight, "peerMaxBlkHeight", peerMaxBlkHeight, "pids", pids)
break
} else {
synlog.Info("FastDownLoadBlocks task sleep 1 second !")
time.Sleep(time.Second)
}
}
}
//ReadBlockToExec 执行快速下载临时存储在db中的block
func (chain *BlockChain) ReadBlockToExec(height int64, isNewStart bool) {
synlog.Info("ReadBlockToExec starting!!!", "height", height, "isNewStart", isNewStart)
var waitCount ErrCountInfo
waitCount.Height = 0
waitCount.Count = 0
for {
curheight := chain.GetBlockHeight()
peerMaxBlkHeight := chain.GetPeerMaxBlkHeight()
// 节点同步阶段自己高度小于最大高度batchsyncblocknum时存储block到db批量处理时不刷盘
if peerMaxBlkHeight > curheight+batchsyncblocknum && !chain.cfgBatchSync {
atomic.CompareAndSwapInt32(&chain.isbatchsync, 1, 0)
} else {
atomic.CompareAndSwapInt32(&chain.isbatchsync, 0, 1)
}
if (curheight >= peerMaxBlkHeight && peerMaxBlkHeight != -1) || curheight >= height {
chain.cancelFastDownLoadFlag(isNewStart)
synlog.Info("ReadBlockToExec complete!", "curheight", curheight, "height", height, "peerMaxBlkHeight", peerMaxBlkHeight)
break
}
block, err := chain.ReadBlockByHeight(curheight + 1)
if err != nil {
//在downLoadTask任务退出后,尝试获取block2分钟,还获取不到就直接退出download下载
if isNewStart {
if !chain.downLoadTask.InProgress() {
if waitCount.Height == curheight+1 {
waitCount.Count++
} else {
waitCount.Height = curheight + 1
waitCount.Count = 1
}
if waitCount.Count >= 120 {
chain.cancelFastDownLoadFlag(isNewStart)
synlog.Error("ReadBlockToExec:ReadBlockByHeight:timeout", "height", curheight+1, "peerMaxBlkHeight", peerMaxBlkHeight, "err", err)
break
}
time.Sleep(time.Second)
continue
} else {
synlog.Info("ReadBlockToExec:ReadBlockByHeight", "height", curheight+1, "peerMaxBlkHeight", peerMaxBlkHeight, "err", err)
time.Sleep(time.Second)
continue
}
} else {
chain.cancelFastDownLoadFlag(isNewStart)
synlog.Error("ReadBlockToExec:ReadBlockByHeight", "height", curheight+1, "peerMaxBlkHeight", peerMaxBlkHeight, "err", err)
break
}
}
_, ismain, isorphan, err := chain.ProcessBlock(false, &types.BlockDetail{Block: block}, "download", true, -1)
if err != nil {
//执行失败强制结束快速下载模式并切换到普通下载模式
if isNewStart && chain.downLoadTask.InProgress() {
Err := chain.downLoadTask.Cancel()
if Err != nil {
synlog.Error("ReadBlockToExec:downLoadTask.Cancel!", "height", block.Height, "hash", common.ToHex(block.Hash()), "isNewStart", isNewStart, "err", Err)
}
chain.DefaultDownLoadInfo()
}
chain.cancelFastDownLoadFlag(isNewStart)
synlog.Error("ReadBlockToExec:ProcessBlock:err!", "height", block.Height, "hash", common.ToHex(block.Hash()), "isNewStart", isNewStart, "err", err)
break
}
synlog.Debug("ReadBlockToExec:ProcessBlock:success!", "height", block.Height, "ismain", ismain, "isorphan", isorphan, "hash", common.ToHex(block.Hash()))
}
}
//CancelFastDownLoadFlag 清除快速下载模式的一些标志
func (chain *BlockChain) cancelFastDownLoadFlag(isNewStart bool) {
if isNewStart {
UpdateDownloadSyncStatus(false)
}
chain.DelLastTempBlockHeight()
synlog.Info("cancelFastDownLoadFlag", "isNewStart", isNewStart)
}
//ReadBlockByHeight 从数据库中读取快速下载临时存储的block信息
func (chain *BlockChain) ReadBlockByHeight(height int64) (*types.Block, error) {
blockByte, err := chain.blockStore.db.Get(calcHeightToTempBlockKey(height))
if blockByte == nil || err != nil {
return nil, types.ErrHeightNotExist
}
var block types.Block
err = proto.Unmarshal(blockByte, &block)
if err != nil {
storeLog.Error("ReadBlockByHeight", "err", err)
return nil, err
}
//读取成功之后将将此临时存贮删除
err = chain.blockStore.db.Delete(calcHeightToTempBlockKey(height - 1))
if err != nil {
storeLog.Error("ReadBlockByHeight:Delete", "height", height, "err", err)
}
return &block, err
}
//WriteBlockToDbTemp 快速下载的block临时存贮到数据库
func (chain *BlockChain) WriteBlockToDbTemp(block *types.Block) error {
if block == nil {
panic("WriteBlockToDbTemp block is nil")
}
sync := true
if atomic.LoadInt32(&chain.isbatchsync) == 0 {
sync = false
}
beg := types.Now()
defer func() {
chainlog.Debug("WriteBlockToDbTemp", "height", block.Height, "sync", sync, "cost", types.Since(beg))
}()
newbatch := chain.blockStore.NewBatch(false)
blockByte, err := proto.Marshal(block)
if err != nil {
chainlog.Error("WriteBlockToDbTemp:Marshal", "height", block.Height)
}
newbatch.Set(calcHeightToTempBlockKey(block.Height), blockByte)
heightbytes := types.Encode(&types.Int64{Data: block.Height})
newbatch.Set(calcLastTempBlockHeightKey(), heightbytes)
return newbatch.Write()
}
//GetLastTempBlockHeight 从数据库中获取快速下载的最新的block高度
func (chain *BlockChain) GetLastTempBlockHeight() int64 {
heightbytes, err := chain.blockStore.db.Get(calcLastTempBlockHeightKey())
if heightbytes == nil || err != nil {
chainlog.Error("GetLastTempBlockHeight", "err", err)
return -1
}
var height types.Int64
err = types.Decode(heightbytes, &height)
if err != nil {
chainlog.Error("GetLastTempBlockHeight:Decode", "err", err)
return -1
}
return height.Data
}
//DelLastTempBlockHeight 快速下载结束时删除此标志位
func (chain *BlockChain) DelLastTempBlockHeight() {
err := chain.blockStore.db.Delete(calcLastTempBlockHeightKey())
if err != nil {
synlog.Error("DelLastTempBlockHeight", "err", err)
}
}
//ProcDownLoadBlocks 处理下载blocks
func (chain *BlockChain) ProcDownLoadBlocks(StartHeight int64, EndHeight int64, pids []string) {
info := chain.GetDownLoadInfo()
//可能存在上次DownLoad处理过程中下载区块超时,DownLoad任务退出,但DownLoad没有恢复成默认值
if info.StartHeight != -1 || info.EndHeight != -1 {
synlog.Info("ProcDownLoadBlocks", "pids", info.Pids, "StartHeight", info.StartHeight, "EndHeight", info.EndHeight)
}
chain.DefaultDownLoadInfo()
chain.InitDownLoadInfo(StartHeight, EndHeight, pids)
chain.ReqDownLoadBlocks()
}
//InitDownLoadInfo 开始新的DownLoad处理
func (chain *BlockChain) InitDownLoadInfo(StartHeight int64, EndHeight int64, pids []string) {
chain.downLoadlock.Lock()
defer chain.downLoadlock.Unlock()
chain.downLoadInfo.StartHeight = StartHeight
chain.downLoadInfo.EndHeight = EndHeight
chain.downLoadInfo.Pids = pids
synlog.Debug("InitDownLoadInfo begin", "StartHeight", StartHeight, "EndHeight", EndHeight, "pids", pids)
}
//DefaultDownLoadInfo 将DownLoadInfo恢复成默认值
func (chain *BlockChain) DefaultDownLoadInfo() {
chain.downLoadlock.Lock()
defer chain.downLoadlock.Unlock()
chain.downLoadInfo.StartHeight = -1
chain.downLoadInfo.EndHeight = -1
chain.downLoadInfo.Pids = nil
synlog.Debug("DefaultDownLoadInfo")
}
//GetDownLoadInfo 获取DownLoadInfo
func (chain *BlockChain) GetDownLoadInfo() *DownLoadInfo {
chain.downLoadlock.Lock()
defer chain.downLoadlock.Unlock()
return chain.downLoadInfo
}
//UpdateDownLoadStartHeight 更新DownLoad请求的起始block高度
func (chain *BlockChain) UpdateDownLoadStartHeight(StartHeight int64) {
chain.downLoadlock.Lock()
defer chain.downLoadlock.Unlock()
chain.downLoadInfo.StartHeight = StartHeight
synlog.Debug("UpdateDownLoadStartHeight", "StartHeight", chain.downLoadInfo.StartHeight, "EndHeight", chain.downLoadInfo.EndHeight, "pids", len(chain.downLoadInfo.Pids))
}
//UpdateDownLoadPids 更新bestpeers列表
func (chain *BlockChain) UpdateDownLoadPids() {
pids := chain.GetBestChainPids()
chain.downLoadlock.Lock()
defer chain.downLoadlock.Unlock()
if len(pids) != 0 {
chain.downLoadInfo.Pids = pids
synlog.Info("UpdateDownLoadPids", "StartHeight", chain.downLoadInfo.StartHeight, "EndHeight", chain.downLoadInfo.EndHeight, "pids", len(chain.downLoadInfo.Pids))
}
}
//ReqDownLoadBlocks 请求DownLoad处理的blocks
func (chain *BlockChain) ReqDownLoadBlocks() {
info := chain.GetDownLoadInfo()
if info.StartHeight != -1 && info.EndHeight != -1 && info.Pids != nil {
synlog.Info("ReqDownLoadBlocks", "StartHeight", info.StartHeight, "EndHeight", info.EndHeight, "pids", len(info.Pids))
err := chain.FetchBlock(info.StartHeight, info.EndHeight, info.Pids, true)
if err != nil {
synlog.Error("ReqDownLoadBlocks:FetchBlock", "err", err)
}
}
}
......@@ -157,19 +157,30 @@ func (chain *BlockChain) getBlocks(msg *queue.Message) {
}
func (chain *BlockChain) addBlock(msg *queue.Message) {
//var block *types.Block
var reply types.Reply
reply.IsOk = true
blockpid := msg.Data.(*types.BlockPid)
_, err := chain.ProcAddBlockMsg(false, &types.BlockDetail{Block: blockpid.Block}, blockpid.Pid)
if err != nil {
chainlog.Error("ProcAddBlockMsg", "height", blockpid.Block.Height, "err", err.Error())
reply.IsOk = false
reply.Msg = []byte(err.Error())
//chainlog.Error("addBlock", "height", blockpid.Block.Height, "pid", blockpid.Pid)
if GetDownloadSyncStatus() {
//downLoadTask 运行时设置对应的blockdone
if chain.downLoadTask.InProgress() {
chain.downLoadTask.Done(blockpid.Block.GetHeight())
}
err := chain.WriteBlockToDbTemp(blockpid.Block)
if err != nil {
chainlog.Error("WriteBlockToDbTemp", "height", blockpid.Block.Height, "err", err.Error())
reply.IsOk = false
reply.Msg = []byte(err.Error())
}
} else {
//chain.notifySync()
_, err := chain.ProcAddBlockMsg(false, &types.BlockDetail{Block: blockpid.Block}, blockpid.Pid)
if err != nil {
chainlog.Error("ProcAddBlockMsg", "height", blockpid.Block.Height, "err", err.Error())
reply.IsOk = false
reply.Msg = []byte(err.Error())
}
chainlog.Debug("EventAddBlock", "height", blockpid.Block.Height, "pid", blockpid.Pid, "success", "ok")
}
chainlog.Debug("EventAddBlock", "height", blockpid.Block.Height, "pid", blockpid.Pid, "success", "ok")
msg.Reply(chain.client.NewMessage("p2p", types.EventReply, &reply))
}
......
......@@ -389,7 +389,7 @@ func (b *BlockChain) connectBlock(node *blockNode, blockdetail *types.BlockDetai
chainlog.Debug("connectBlock SendAddBlockEvent", "err", err)
}
// 通知此block已经处理完,主要处理孤儿节点时需要设置
b.task.Done(blockdetail.Block.GetHeight())
b.syncTask.Done(blockdetail.Block.GetHeight())
//广播此block到全网络
if node.broadcast {
......
......@@ -240,14 +240,14 @@ func (chain *BlockChain) ProcAddBlockMsg(broadcast bool, blockdetail *types.Bloc
blockdetail = b
}
//非孤儿block或者已经存在的block
if chain.task.InProgress() {
if chain.syncTask.InProgress() {
if (!isorphan && err == nil) || (err == types.ErrBlockExist) {
chain.task.Done(blockdetail.Block.GetHeight())
chain.syncTask.Done(blockdetail.Block.GetHeight())
}
}
//forktask 运行时设置对应的blockdone
if chain.forktask.InProgress() {
chain.forktask.Done(blockdetail.Block.GetHeight())
//downLoadTask 运行时设置对应的blockdone
if chain.downLoadTask.InProgress() {
chain.downLoadTask.Done(blockdetail.Block.GetHeight())
}
//此处只更新广播block的高度
if broadcast {
......
......@@ -300,11 +300,23 @@ function transfer() {
done
${CLI} account balance -a 12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv -e coins
balance=$(${CLI} account balance -a 12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv -e coins | jq -r ".balance")
if [ "${balance}" != "10.0000" ]; then
echo "wrong balance=$balance, should not be 10.0000"
exit 1
fi
local times=100
while true; do
balance=$(${CLI} account balance -a 12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv -e coins | jq -r ".balance")
echo "account balance is ${balance}, expect 10.0000 "
if [ "${balance}" != "10.0000" ]; then
block_wait 2
times=$((times - 1))
if [ $times -le 0 ]; then
echo "account balance transfer failed"
exit 1
fi
else
echo "account balance transfer success"
break
fi
done
}
......
Title="chain33"
TestNet=true
CoinSymbol="bty"
[log]
# 日志级别,支持debug(dbug)/info/warn/error(eror)/crit
......@@ -197,4 +198,4 @@ superManager=[
[health]
listenAddr="localhost:8805"
checkInterval=1
unSyncMaxTimes=2
\ No newline at end of file
unSyncMaxTimes=2
Title="chain33"
TestNet=true
FixTime=false
CoinSymbol="bty"
[log]
# 日志级别,支持debug(dbug)/info/warn/error(eror)/crit
......@@ -239,6 +240,7 @@ minerwhitelist=["*"]
isFree=false
#执行器执行所需最小费用,低于Mempool和Wallet设置的MinFee,在minExecFee = 0 的情况下,isFree = true才会生效
minExecFee=100000
maxExecFee=1000000000
#是否开启stat插件
enableStat=false
#是否开启MVCC插件
......
syntax = "proto3";
package calculator;
// calculator 合约交易行为总类型
message CalculatorAction {
oneof value {
Add add = 1;
Subtract sub = 2;
Multiply mul = 3;
Divide div = 4;
}
int32 ty = 5;
}
message Add {
int32 summand = 1; //被加数
int32 addend = 2; //加数
}
message AddLog {
int32 sum = 1; //和
}
message Subtract {
int32 minuend = 1; //被减数
int32 subtrahend = 2; //减数
}
message SubLog {
int32 remainder = 1; //差
}
message Multiply {
int32 faciend = 1; //被乘数
int32 multiplier = 2; //乘数
}
message MultiplyLog {
int32 product = 1; //积
}
message Divide {
int32 dividend = 1; //被除数
int32 divisor = 2; //除数
}
message DivideLog {
int32 quotient = 1; //商
int32 remain = 2; //余数
}
message ReqQueryCalcCount {
string action = 1;
}
message ReplyQueryCalcCount {
int32 count = 1;
}
service calculator {
rpc QueryCalcCount(ReqQueryCalcCount) returns (ReplyQueryCalcCount) {}
}
# calculator generate
基于gendapp自动生成合约命令,介绍合约的完整开发步骤
### 简介
calculator合约支持在区块链上进行整数加减乘除交易操作,同时方便演示
开发,记录运算符参与运算的次数,并提供查询接口
### 编写合约proto
```proto
syntax = "proto3";
package calculator;
//calculator 合约交易行为总类型
message CalculatorAction {
oneof value {
Add add = 1;
Subtract sub = 2;
Multiply mul = 3;
Divide div = 4;
}
int32 ty = 5;
}
message Add { //加法action类型
int32 summand = 1; //被加数
int32 addend = 2; //加数
}
message AddLog { //加法log类型
int32 sum = 1; //和
}
message Subtract {
int32 minuend = 1; //被减数
int32 subtrahend = 2; //减数
}
message SubLog {
int32 remainder = 1; //差
}
message Multiply {
int32 faciend = 1; //被乘数
int32 multiplier = 2; //乘数
}
message MultiplyLog {
int32 product = 1; //积
}
message Divide {
int32 dividend = 1; //被除数
int32 divisor = 2; //除数
}
message DivideLog {
int32 quotient = 1; //商
int32 remain = 2; //余数
}
message ReqQueryCalcCount { //查询计算次数请求结构
string action = 1;
}
message ReplyQueryCalcCount { //查询计算次数响应结构
int32 count = 1;
}
service calculator {
rpc QueryCalcCount(ReqQueryCalcCount) returns (ReplyQueryCalcCount) {}
}
```
主要有以下几个部分:
* 定义交易行为总结构,CalculatorAction,包含加减乘除
* 分别定义涉及的交易行为结构, Add,Sub等
* 定义交易涉及到的日志结构,每种运算除均有对应结果日志
* 如果需要grpc服务,定义service结构,如本例增加了查询次数的rpc
* 定义查询中涉及的request,reply结构
### 代码生成
##### 生成基本代码
>使用chain33-tool,工具使用参考[文档]([开发步骤](https://github.com/33cn/chain33/blob/master/cmd/tools/doc/gendapp.md))
```
//本例默认将calculator生成至官方plugin项目dapp目录下
$ chain33-tool gendapp -n calculator -p calculator.proto
$ cd $GOPATH/src/github.com/33cn/plugin/plugin/dapp/calculator
//显示生成目录结构
$ tree -d
.
├── cmd //官方ci目录
├── commands //命令行模块
├── executor //执行模块
├── proto //proto脚本模块
├── rpc //rpc模块
└── types //类型模块
└── calculator
```
##### 生成pb.go文件
```
//进入生成合约的目录
$ cd $GOPATH/src/github.com/33cn/plugin/plugin/dapp/calculator
//执行脚本生成calculator.pb.go
$ cd proto && chmod +x ./create_protobuf.sh && make
```
### 后续开发
以下将以模块为顺序,依次介绍
#### types类型模块
此目录统一归纳合约类型相关的代码
##### 交易的action和log(types/calculator/calculator.go)
> 每一种交易通常有交易请求(action),交易结果日志(log),
目前框架要求合约开发者自定义aciton和log的id及name,
已经自动生成了这些常量,可以根据需要修改
```go
// action类型id和name,可以自定义修改
const (
TyAddAction= iota + 100
TySubAction
TyMulAction
TyDivAction
NameAddAction = "Add"
NameSubAction = "Sub"
NameMulAction = "Mul"
NameDivAction = "Div"
)
// log类型id值
const (
TyUnknownLog = iota + 100
TyAddLog
TySubLog
TyMulLog
TyDivLog
)
```
> 开发者还需要提供name和id的映射结构,其中actionMap已自动生成,logMap需要自定义编写,
如本例中加减乘除都有对应的log类型,依次按照格式填入即可
```go
//定义action的name和id
actionMap = map[string]int32{
NameAddAction: TyAddAction,
NameSubAction: TySubAction,
NameMulAction: TyMulAction,
NameDivAction: TyDivAction,
}
//定义log的id和具体log类型及名称,填入具体自定义log类型
logMap = map[int64]*types.LogInfo{
TyAddLog: {Ty:reflect.TypeOf(AddLog{}), Name: "AddLog"},
TySubLog: {Ty:reflect.TypeOf(SubLog{}), Name: "SubLog"},
TyMulLog: {Ty:reflect.TypeOf(MultiplyLog{}), Name: "MultiplyLog"},
TyDivLog: {Ty:reflect.TypeOf(DivideLog{}), Name: "DivideLog"},
}
```
##### 注册dapp启用高度(types/calculator/calculator.go)
> 默认生成的代码,启用高度设为0,可以自定义修改
```go
types.RegisterDappFork(CalculatorX, "Enable", 0)
```
##### 实现CreateTx接口(types/calculator/calculator.go)
> CreateTx即根据不同action name创建交易,隶属于框架ExcutorType接口。
合约的CreateTx功能可以通过框架相关接口调用,将在rpc模块开发进行演示,
本例中简单实现了加法和除法的创建逻辑,其余类似
```go
func (t *calculatorType) CreateTx(action string, message json.RawMessage) (*types.Transaction, error) {
var tx *types.Transaction
if action == NameAddAction {
param := &Add{}
err := json.Unmarshal(message, param)
if err != nil {
tlog.Error("CreateTx","UnmarshalErr", err)
return nil, types.ErrUnmarshal
}
tx = &types.Transaction{
Execer: []byte(types.ExecName(CalculatorX)),
Payload: types.Encode(&CalculatorAction{Ty:TyAddAction, Value:&CalculatorAction_Add{Add:param}}),
Nonce: rand.New(rand.NewSource(time.Now().UnixNano())).Int63(),
//"github.com/33cn/chain33/common/address"
To: address.ExecAddress(types.ExecName(CalculatorX)),
}
return tx, nil
} else if action == NameSubAction {
} else if action == NameMulAction {
} else if action == NameDivAction{
param := &Divide{}
err := json.Unmarshal(message, param)
if err != nil {
tlog.Error("CreateTx","UnmarshalErr", err)
return nil, err
}
tx = &types.Transaction{
Execer: []byte(types.ExecName(CalculatorX)),
Payload: types.Encode(&CalculatorAction{Ty:TyDivAction, Value:&CalculatorAction_Div{Div:param}}),
Nonce: rand.New(rand.NewSource(time.Now().UnixNano())).Int63(),
To: address.ExecAddress(types.ExecName(CalculatorX)),
}
return tx, nil
}
return tx, types.ErrNotSupport
}
```
#### executor执行模块
此目录归纳了交易执行逻辑实现代码
##### 实现CheckTx接口(executor/calculator.go)
> CheckTx即检查交易合法性,隶属于框架Driver接口,将在交易执行前被框架调用,
本例简单实现除法非零检测
```go
func (*calculator) CheckTx(tx *types.Transaction, index int) error {
action := &ptypes.CalculatorAction{}
err := types.Decode(tx.GetPayload(), action)
if err != nil {
elog.Error("CheckTx", "DecodeActionErr", err)
return types.ErrDecode
}
//这里只做除法除数零值检查
if action.Ty == ptypes.TyDivAction {
div, ok := action.Value.(*ptypes.CalculatorAction_Div)
if !ok {
return types.ErrTypeAsset
}
if div.Div.Divisor == 0 { //除数不能为零
elog.Error("CheckTx", "Err", "ZeroDivisor")
return types.ErrInvalidParam
}
}
return nil
}
```
##### KV常量(executor/kv.go)
>目前合约进行存取框架KV数据库(stateDB或localDB)时,
其Key的前缀必须满足框架要求规范,已经以常量形式自动生成在代码中
```
var (
//KeyPrefixStateDB state db key必须前缀
KeyPrefixStateDB = "mavl-calculator-"
//KeyPrefixLocalDB local db的key必须前缀
KeyPrefixLocalDB = "LODB-calculator-"
)
```
##### 实现Exec类接口(executor/exec.go)
>Exec类接口是交易链上执行的函数,实现交易执行的业务逻辑,
数据上链也是此部分完成(生成stateDB KV对),以及生成交易日志,以Add交易为例
```go
func (c *calculator) Exec_Add(payload *ptypes.Add, tx *types.Transaction, index int) (*types.Receipt, error) {
var receipt *types.Receipt
sum := payload.Addend + payload.Summand
addLog := &ptypes.AddLog{Sum: sum}
logs := []*types.ReceiptLog{{Ty:ptypes.TyAddLog, Log: types.Encode(addLog)}}
key := fmt.Sprintf("%s-%s-formula", KeyPrefixStateDB, tx.Hash())
val := fmt.Sprintf("%d+%d=%d", payload.Summand, payload.Addend, sum)
receipt = &types.Receipt{
Ty: types.ExecOk,
KV: []*types.KeyValue{{Key:[]byte(key), Value:[]byte(val)}},
Logs: logs,
}
return receipt, nil
}
```
##### 实现ExecLocal类接口(executor/exec_local.go)
>ExecLocal类接口是交易执行成功后本地执行,
主要目的是将辅助性数据进行localDB存取,方便前端查询,
以Add为例,在localDB中存入加法运算的次数,
```go
func (c *calculator) ExecLocal_Add(payload *ptypes.Add, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
var dbSet *types.LocalDBSet
var countInfo ptypes.ReplyQueryCalcCount
localKey := []byte(fmt.Sprintf("%s-CalcCount-Add", KeyPrefixLocalDB))
oldVal, err := c.GetLocalDB().Get(localKey)
//此处需要注意,目前db接口,获取key未找到记录,返回空时候也带一个notFound错误,需要特殊处理,而不是直接返回错误
if err != nil && err != types.ErrNotFound{
return nil, err
}
err = types.Decode(oldVal, &countInfo)
if err != nil {
elog.Error("execLocalAdd", "DecodeErr", err)
return nil, types.ErrDecode
}
countInfo.Count++
dbSet = &types.LocalDBSet{KV: []*types.KeyValue{{Key:localKey, Value:types.Encode(&countInfo)}}}
return dbSet, nil
}
```
##### 实现ExecDelLocal类接口(executor/exec_del_local.go)
>ExecDelLocal类接口可以理解为ExecLocal的逆过程,在区块回退时候被调用
```go
func (c *calculator) ExecDelLocal_Add(payload *ptypes.Add, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
var dbSet *types.LocalDBSet
var countInfo ptypes.ReplyQueryCalcCount
localKey := []byte(fmt.Sprintf("%s-CalcCount-Add", KeyPrefixLocalDB))
oldVal, err := c.GetLocalDB().Get(localKey)
if err != nil && err != types.ErrNotFound{
return nil, err
}
err = types.Decode(oldVal, &countInfo)
if err != nil {
elog.Error("execDelLocalAdd", "DecodeErr", err)
return nil, types.ErrDecode
}
countInfo.Count--
if countInfo.Count < 0 {
countInfo.Count = 0
}
dbSet = &types.LocalDBSet{KV: []*types.KeyValue{{Key:localKey, Value:types.Encode(&countInfo)}}}
return dbSet, nil
}
```
##### 实现Query类接口(executor/calculator.go)
> Query类接口主要实现查询相关业务逻辑,如访问合约数据库,
Query类接口需要满足框架规范(固定格式函数名称和签名),才能被框架注册和使用,
具体调用方法将在rpc模块介绍,本例实现查询运算符计算次数的接口
```go
func (c *calculator) Query_CalcCount(in *ptypes.ReqQueryCalcCount) (types.Message, error) {
var countInfo ptypes.ReplyQueryCalcCount
localKey := []byte(fmt.Sprintf("%s-CalcCount-%s", KeyPrefixLocalDB, in.Action))
oldVal, err := c.GetLocalDB().Get(localKey)
if err != nil && err != types.ErrNotFound{
return nil, err
}
err = types.Decode(oldVal, &countInfo)
if err != nil {
elog.Error("execLocalAdd", "DecodeErr", err)
return nil, err
}
return &countInfo, nil
}
```
#### rpc模块
此目录归纳了rpc相关类型和具体调用服务端实现的代码
##### 类型(rpc/types.go)
>定义了rpc相关结构和初始化,此部分代码已经自动生成
```go
// 实现grpc的service接口
type channelClient struct { //实现grpc接口的类
rpctypes.ChannelClient
}
// Jrpc 实现json rpc调用实例
type Jrpc struct { //实现json rpc接口的类
cli *channelClient
}
```
##### grpc接口(rpc/rpc.go)
>grpc即实现proto文件中service声明的rpc接口,本例中即查询计算次数的rpc。
此处通过框架Query接口,间接调用之前实现的Query_CalcCount接口
```go
func (c *channelClient)QueryCalcCount(ctx context.Context, in *ptypes.ReqQueryCalcCount) (*ptypes.ReplyQueryCalcCount, error) {
msg, err := c.Query(ptypes.CalculatorX, "CalcCount", in)
if err != nil {
return nil, err
}
if reply, ok := msg.(*ptypes.ReplyQueryCalcCount); ok {
return reply, nil
}
return nil, types.ErrTypeAsset
}
```
##### json rpc相关接口
>json rpc主要给前端相关平台产品调用,本例子涉及创建Add交易和查询计算次数接口。
其中创建交易通过框架的CallCreateTx接口间接调用之前实现的CreateTx接口
```go
func (j *Jrpc)CreateRawAddTx(in *ptypes.Add, result *interface{}) error {
data, err := types.CallCreateTx(ptypes.CalculatorX, ptypes.NameAddAction, in)
if err != nil {
return err
}
//创建交易通常返回十六进制格式原数据
*result = hex.EncodeToString(data)
return nil
}
func (j *Jrpc)QueryCalcCount(in *ptypes.ReqQueryCalcCount, result *interface{}) error {
//这里直接转发至grpc接口
reply, err := j.cli.QueryCalcCount(context.Background(), in)
if err != nil {
return err
}
*result = *reply
return nil
}
```
##### rpc说明
>本例子中涉及的CreateTx和Query类rpc都可以通过框架自有的rpc去调用,
分别是Chain33.CreateTransaction和Chain33.Query,上述代码只是示例如何开发rpc接口,
实际开发中,这两类接口可以不用实现,
而直接调用框架的rpc,当然也支持进行个性化包装,两种调用方式将在commands模块介绍
#### commands命令行模块
如果需要支持命令行交互式访问区块节点,开发者需要实现具体合约的命令,
框架的命令行基于cobra开源库
##### import路径(commands/commands.go)
>涉及框架基础库使用,包括相关类型和网络组件
```go
import (
"github.com/33cn/chain33/rpc/jsonclient"
"github.com/33cn/chain33/types"
"github.com/spf13/cobra"
rpctypes "github.com/33cn/chain33/rpc/types"
ptypes "github.com/33cn/plugin/plugin/dapp/calculator/types/calculator"
)
```
##### 创建交易命令(commands/commands.go)
>前端输入相关参数,调用rpc实现创建原始交易的功能
```go
func createAddCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "add",
Short:"create add calc tx",
Run: createAdd,
}
cmd.Flags().Int32P("summand", "s", 0, "summand integer number")
cmd.Flags().Int32P("addend", "a", 0, "addend integer number")
cmd.MarkFlagRequired("summand")
cmd.MarkFlagRequired("addend")
return cmd
}
func createAdd(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
summand, _ := cmd.Flags().GetInt32("summand")
addend, _ := cmd.Flags().GetInt32("addend")
req := ptypes.Add{
Summand: summand,
Addend: addend,
}
chain33Req := rpctypes.CreateTxIn{
Execer: ptypes.CalculatorX,
ActionName: ptypes.NameAddAction,
Payload: types.MustPBToJSON(&req),
}
var res string
//通过框架rpc调用
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.CreateTransaction", chain33Req, &res)
//通过合约内部实现rpc调用
//ctx := jsonclient.NewRPCCtx(rpcLaddr, "calculator.CreateRawAddTx", req, &res)
ctx.RunWithoutMarshal()
}
```
##### 查询计算次数(commands/commands.go)
```go
func queryCalcCountCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "query_count",
Short: "query calculator count",
Run: queryCalcCount,
}
cmd.Flags().StringP("action", "a", "", "calc action name[Add | Sub | Mul | Div]")
cmd.MarkFlagRequired("action")
return cmd
}
func queryCalcCount(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
action, _ := cmd.Flags().GetString("action")
req := ptypes.ReqQueryCalcCount{
Action: action,
}
chain33Req := &rpctypes.Query4Jrpc{
Execer: ptypes.CalculatorX,
FuncName: "CalcCount",
Payload: types.MustPBToJSON(&req),
}
var res interface{}
res = &ptypes.ReplyQueryCalcCount{}
//调用框架Query rpc接口
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.Query", chain33Req, &res)
//调用合约内部rpc接口
//ctx := jsonclient.NewRPCCtx(rpcLaddr, "calculator.QueryCalcCount", req, &res)
ctx.Run()
}
```
##### 添加到主命令(commands/commands.go)
```go
func Cmd() *cobra.Command {
cmd := &cobra.Command{
Use: "calculator",
Short: "calculator command",
Args: cobra.MinimumNArgs(1),
}
cmd.AddCommand(
//add sub command
createAddCmd(),
queryCalcCountCmd(),
)
return cmd
}
```
#### 合约集成
开发者可以借助官方pugin项目进行合约调试,但需要显示初始化合约
##### 初始化(dapp/init/init.go)
>需要在此文件import目录,新增calculator包导入
```go
import (
_ "github.com/33cn/plugin/plugin/dapp/calculator" //auto gen
```
##### 编译
>直接通过官方makefile文件
```
$ cd $GOPATH/src/github.com/33cn/plugin && make
```
#### 单元测试
为合约代码增加必要的单元测试,提高测试覆盖
\ No newline at end of file
......@@ -5,7 +5,7 @@
### 编译
```
//本地存在chain33代码,该步骤可省略
$ git clone https://github.com/33cn/chain33.git $GOPATH/src/github.com/33cn/chain33
$ go get github.com/33cn/chain33
//编译chain33 tools
$ go build -i -o $GOPATH/bin/chain33-tool github.com/33cn/chain33/cmd/tools
```
......@@ -36,8 +36,6 @@ $ chain33-tool gendapp -n demo -p ./demo.proto
// 指定输出包路径
$ chain33-tool gendapp -n demo -p ./demo.proto -o github.com/33cn/chain33/plugin/dapp/
//生成proto
cd proto && chmod +x ./create_protobuf.sh && make
```
### proto规范
* 定义合约交易行为结构,采用**oneof value**形式,且名称必须为**NameAction**格式,
......@@ -51,6 +49,11 @@ message DemoAction {
int32 ty = 3;
}
```
* package name设为合约名,适配后续生成目录结构
```
package demo;
```
* 定义service,直接以合约名作为名称
```
service demo {
......@@ -61,7 +64,7 @@ service demo {
### 代码
#####目录结构,以demo合约为例
##### 目录结构,以demo合约为例
```
demo
├── cmd //包含官方ci集成相关脚本
......@@ -84,10 +87,18 @@ demo
│   ├── rpc.go
│   └── types.go
└── types //类型模块
└── demo.go
└── demo
└── demo.go
```
##### 生成pb.go文件
```
//进入到上述proto目录执行相关脚本,将会在types目录下生成对应pb.go文件
$ cd proto && chmod +x ./create_protobuf.sh && make
```
##### 后续开发
在生成代码基础上,需要实现交易创建,执行,及所需rpc服务,初次开发可以参考官方的echo合约
> github.com/33cn/plugin/plugin/dapp/echo
在生成代码基础上,需要实现交易创建,执行,及所需rpc服务<br/>
初次开发可以参考官方简单计算器合约
[开发步骤](https://github.com/33cn/chain33/blob/master/cmd/tools/doc/gencalculator.md)
......@@ -14,7 +14,8 @@ type ICodeFile interface {
GetCodeType() string
GetDirName() string
GetFiles() map[string]string //key:filename, val:file content
GetReplaceTags() []string
GetDirReplaceTags() []string
GetFileReplaceTags() []string
}
//RegisterCodeFile regeister code file
......@@ -47,8 +48,14 @@ func (CodeFile) GetFiles() map[string]string {
return nil
}
//GetReplaceTags get replace tags
func (CodeFile) GetReplaceTags() []string {
//GetDirReplaceTags get directory replace tags
func (CodeFile) GetDirReplaceTags() []string {
return nil
}
//GetFileReplaceTags get file replace tags
func (CodeFile) GetFileReplaceTags() []string {
return nil
}
......@@ -30,7 +30,7 @@ func (c commandsCodeFile) GetFiles() map[string]string {
}
}
func (c commandsCodeFile) GetReplaceTags() []string {
func (c commandsCodeFile) GetFileReplaceTags() []string {
return []string{types.TagExecName}
}
......@@ -39,7 +39,9 @@ var (
commandsFileContent = `/*Package commands implement dapp client commands*/
package commands
import "github.com/spf13/cobra"
import (
"github.com/spf13/cobra"
)
/*
* 实现合约对应客户端
......
......@@ -26,7 +26,7 @@ func (execCode) GetFiles() map[string]string {
}
}
func (execCode) GetReplaceTags() []string {
func (execCode) GetFileReplaceTags() []string {
return []string{types.TagExecName, types.TagImportPath, types.TagClassName, types.TagExecFileContent}
}
......@@ -42,7 +42,7 @@ func (execLocalCode) GetFiles() map[string]string {
}
}
func (execLocalCode) GetReplaceTags() []string {
func (execLocalCode) GetFileReplaceTags() []string {
return []string{types.TagExecName, types.TagImportPath, types.TagExecLocalFileContent}
}
......@@ -58,7 +58,7 @@ func (execDelLocalCode) GetFiles() map[string]string {
}
}
func (execDelLocalCode) GetReplaceTags() []string {
func (execDelLocalCode) GetFileReplaceTags() []string {
return []string{types.TagExecName, types.TagImportPath, types.TagExecDelLocalFileContent}
}
......@@ -68,7 +68,7 @@ var (
execContent = `package executor
import (
ptypes "${IMPORTPATH}/${EXECNAME}/types"
ptypes "${IMPORTPATH}/${EXECNAME}/types/${EXECNAME}"
"github.com/33cn/chain33/types"
)
......@@ -83,7 +83,7 @@ ${EXECFILECONTENT}`
execLocalContent = `package executor
import (
ptypes "${IMPORTPATH}/${EXECNAME}/types"
ptypes "${IMPORTPATH}/${EXECNAME}/types/${EXECNAME}"
"github.com/33cn/chain33/types"
)
......@@ -98,7 +98,7 @@ ${EXECLOCALFILECONTENT}`
execDelContent = `package executor
import (
ptypes "${IMPORTPATH}/${EXECNAME}/types"
ptypes "${IMPORTPATH}/${EXECNAME}/types/${EXECNAME}"
"github.com/33cn/chain33/types"
)
......
......@@ -31,7 +31,7 @@ func (c executorCodeFile) GetFiles() map[string]string {
}
}
func (c executorCodeFile) GetReplaceTags() []string {
func (c executorCodeFile) GetFileReplaceTags() []string {
return []string{types.TagExecName, types.TagImportPath, types.TagClassName}
}
......@@ -42,7 +42,7 @@ var (
import (
log "github.com/33cn/chain33/common/log/log15"
ptypes "${IMPORTPATH}/${EXECNAME}/types"
ptypes "${IMPORTPATH}/${EXECNAME}/types/${EXECNAME}"
drivers "github.com/33cn/chain33/system/dapp"
"github.com/33cn/chain33/types"
)
......@@ -55,7 +55,7 @@ import (
var (
//日志
elog = log.New("module", "execs.${EXECNAME}")
elog = log.New("module", "${EXECNAME}.executor")
)
var driverName = ptypes.${CLASSNAME}X
......
......@@ -25,7 +25,7 @@ func (c pluginCodeFile) GetFiles() map[string]string {
}
}
func (c pluginCodeFile) GetReplaceTags() []string {
func (c pluginCodeFile) GetFileReplaceTags() []string {
return []string{types.TagExecName, types.TagImportPath, types.TagClassName}
}
......@@ -37,7 +37,7 @@ package ${EXECNAME}
import (
"${IMPORTPATH}/${EXECNAME}/commands"
"${IMPORTPATH}/${EXECNAME}/types"
ptypes "${IMPORTPATH}/${EXECNAME}/types/${EXECNAME}"
"${IMPORTPATH}/${EXECNAME}/executor"
"${IMPORTPATH}/${EXECNAME}/rpc"
"github.com/33cn/chain33/pluginmgr"
......@@ -49,7 +49,7 @@ import (
func init() {
pluginmgr.Register(&pluginmgr.PluginBase{
Name: types.${CLASSNAME}X,
Name: ptypes.${CLASSNAME}X,
ExecName: executor.GetName(),
Exec: executor.Init,
Cmd: commands.Cmd,
......
......@@ -32,6 +32,10 @@ func (protoBase) GetFiles() map[string]string {
}
}
func (protoBase) GetFileReplaceTags() []string {
return []string{types.TagExecName}
}
type protoFile struct {
protoBase
}
......@@ -42,15 +46,15 @@ func (protoFile) GetFiles() map[string]string {
}
}
func (protoFile) GetReplaceTags() []string {
func (protoFile) GetFileReplaceTags() []string {
return []string{types.TagProtoFileContent, types.TagProtoFileAppend, types.TagExecName}
}
var (
protoShellName = "create_protobuf.sh"
protoShellContent = `#!/bin/sh
# proto生成命令,将pb.go文件生成到types目录下
protoc --go_out=plugins=grpc:../types ./*.proto
# proto生成命令,将pb.go文件生成到types/${EXECNAME}目录下
protoc --go_out=plugins=grpc:../types/${EXECNAME} ./*.proto
`
makeName = "Makefile"
......
......@@ -31,7 +31,7 @@ func (c rpcCodeFile) GetFiles() map[string]string {
}
}
func (c rpcCodeFile) GetReplaceTags() []string {
func (c rpcCodeFile) GetFileReplaceTags() []string {
return []string{types.TagExecName, types.TagImportPath, types.TagClassName}
}
......@@ -53,7 +53,7 @@ var (
typesContent = `package rpc
import (
ptypes "${IMPORTPATH}/${EXECNAME}/types"
ptypes "${IMPORTPATH}/${EXECNAME}/types/${EXECNAME}"
rpctypes "github.com/33cn/chain33/rpc/types"
)
......
......@@ -5,6 +5,8 @@
package types
import (
"os"
"github.com/33cn/chain33/cmd/tools/gencode/base"
"github.com/33cn/chain33/cmd/tools/types"
)
......@@ -20,7 +22,7 @@ type typesCode struct {
func (c typesCode) GetDirName() string {
return "types"
return "types" + string(os.PathSeparator) + "${EXECNAME}"
}
func (c typesCode) GetFiles() map[string]string {
......@@ -30,7 +32,11 @@ func (c typesCode) GetFiles() map[string]string {
}
}
func (c typesCode) GetReplaceTags() []string {
func (c typesCode) GetDirReplaceTags() []string {
return []string{types.TagExecName}
}
func (c typesCode) GetFileReplaceTags() []string {
return []string{types.TagExecName, types.TagClassName,
types.TagActionIDText, types.TagTyLogActionType,
......@@ -39,11 +45,11 @@ func (c typesCode) GetReplaceTags() []string {
var (
typesName = "${EXECNAME}.go"
typesContent = `package types
typesContent = `package ${EXECNAME}
import (
"encoding/json"
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/types"
)
......@@ -54,7 +60,7 @@ import (
*/
// action类型id
// action类型id和name,这些常量可以自定义修改
${ACTIONIDTEXT}
// log类型id值
......@@ -63,15 +69,18 @@ ${TYLOGACTIONTYPE}
var (
//${CLASSNAME}X 执行器名称定义
${CLASSNAME}X = "${EXECNAME}"
//定义action的name和id
//定义actionMap
actionMap = ${TYPEMAPTEXT}
//定义log的id和具体log类型及名称,填入具体自定义log类型
logMap = ${LOGMAPTEXT}
tlog = log.New("module", "${EXECNAME}.types")
)
func init() {
types.AllowUserExec = append(types.AllowUserExec, []byte(${CLASSNAME}X))
types.RegistorExecutor(${CLASSNAME}X, newType())
//注册合约启用高度
types.RegisterDappFork(${CLASSNAME}X, "Enable", 0)
}
type ${EXECNAME}Type struct {
......@@ -102,7 +111,6 @@ func (t *${EXECNAME}Type) GetLogMap() map[int64]*types.LogInfo {
// CreateTx 重载基类接口,实现本合约交易创建,供框架调用
func (t *${EXECNAME}Type) CreateTx(action string, message json.RawMessage) (*types.Transaction, error) {
var tx *types.Transaction
// pseudo code
//if action == someAction
//return new tx
return tx, types.ErrNotSupport
......
......@@ -101,10 +101,18 @@ func (c *GenDappCodeTask) genDappCode() error {
for _, code := range codeTypes {
dirPath := filepath.Join(c.DappDir, code.GetDirName())
_ = os.Mkdir(dirPath, os.ModePerm)
dirName := code.GetDirName()
for _, tag := range code.GetDirReplaceTags() {
dirName = strings.Replace(dirName, tag, c.replacePairs[tag], -1)
}
dirPath := filepath.Join(c.DappDir, dirName)
err := os.MkdirAll(dirPath, os.ModePerm)
if err != nil {
mlog.Error("MakeCodeDir", "Err", err.Error(), "DirPath", dirPath)
return err
}
files := code.GetFiles()
tags := code.GetReplaceTags()
tags := code.GetFileReplaceTags()
for name, content := range files {
......@@ -113,7 +121,7 @@ func (c *GenDappCodeTask) genDappCode() error {
content = strings.Replace(content, tag, c.replacePairs[tag], -1)
}
_, err := util.WriteStringToFile(filepath.Join(dirPath, name), content)
_, err = util.WriteStringToFile(filepath.Join(dirPath, name), content)
if err != nil {
mlog.Error("GenNewCodeFile", "Err", err.Error(), "CodeFile", filepath.Join(dirPath, name))
......
......@@ -60,8 +60,9 @@ func readDappActionFromProto(protoContent, actionName string) ([]*actionInfoItem
func formatExecContent(infos []*actionInfoItem, dappName string) string {
fnFmtStr := `func (c *%s) Exec_%s(payload *ptypes.%s, tx *types.Transaction, index int) (*types.Receipt, error) {
var receipt *types.Receipt
//implement code
return &types.Receipt{}, nil
return receipt, nil
}
`
......@@ -76,8 +77,9 @@ func formatExecContent(infos []*actionInfoItem, dappName string) string {
func formatExecLocalContent(infos []*actionInfoItem, dappName string) string {
fnFmtStr := `func (c *%s) ExecLocal_%s(payload *ptypes.%s, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
var dbSet *types.LocalDBSet
//implement code
return &types.LocalDBSet{}, nil
return dbSet, nil
}
`
......@@ -92,8 +94,9 @@ func formatExecLocalContent(infos []*actionInfoItem, dappName string) string {
func formatExecDelLocalContent(infos []*actionInfoItem, dappName string) string {
fnFmtStr := `func (c *%s) ExecDelLocal_%s(payload *ptypes.%s, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
var dbSet *types.LocalDBSet
//implement code
return &types.LocalDBSet{}, nil
return dbSet, nil
}
`
......@@ -107,9 +110,9 @@ func formatExecDelLocalContent(infos []*actionInfoItem, dappName string) string
// 组成规则是 TyLog+ActionName + ActionMemberName
func buildActionLogTypeText(infos []*actionInfoItem, className string) (text string) {
items := fmt.Sprintf("TyLog%sUnknown = iota\n", className)
items := fmt.Sprintf("TyUnknownLog = iota + 100\n")
for _, info := range infos {
items += fmt.Sprintf("TyLog%s%s\n", className, info.memberName)
items += fmt.Sprintf("Ty%sLog\n", info.memberName)
}
text = fmt.Sprintf("const (\n%s)\n", items)
return
......@@ -117,10 +120,16 @@ func buildActionLogTypeText(infos []*actionInfoItem, className string) (text str
// 组成规则是 ActionName + ActionMemberName
func buildActionIDText(infos []*actionInfoItem, className string) (text string) {
var items string
for index, info := range infos {
items += fmt.Sprintf("%sAction%s = %d\n", className, info.memberName, index)
items := fmt.Sprintf("TyUnknowAction = iota + 100\n")
for _, info := range infos {
items += fmt.Sprintf("Ty%sAction\n", info.memberName)
}
items += "\n"
for _, info := range infos {
items += fmt.Sprintf("Name%sAction = \"%s\"\n", info.memberName, info.memberName)
}
text = fmt.Sprintf("const (\n%s)\n", items)
return
}
......@@ -129,7 +138,7 @@ func buildActionIDText(infos []*actionInfoItem, className string) (text string)
func buildTypeMapText(infos []*actionInfoItem, className string) (text string) {
var items string
for _, info := range infos {
items += fmt.Sprintf("\"%s\": %sAction%s,\n", info.memberName, className, info.memberName)
items += fmt.Sprintf("Name%sAction: Ty%sAction,\n", info.memberName, info.memberName)
}
text = fmt.Sprintf("map[string]int32{\n%s}", items)
return
......@@ -137,6 +146,6 @@ func buildTypeMapText(infos []*actionInfoItem, className string) (text string) {
// 返回 map[string]*types.LogInfo
func buildLogMapText() (text string) {
text = fmt.Sprintf("map[int64]*types.LogInfo{\n\t//pseudo code\n\t//LogID: {Ty: refelct.TypeOf(LogStruct), Name: LogName},\n}")
text = fmt.Sprintf("map[int64]*types.LogInfo{\n\t//LogID: {Ty: reflect.TypeOf(LogStruct), Name: LogName},\n}")
return
}
......@@ -203,7 +203,7 @@ func (e *executor) Exec(tx *types.Transaction, index int) (*types.Receipt, error
if err := drivers.CheckAddress(tx.GetRealToAddr(), e.height); err != nil {
return nil, err
}
if e.localDB != nil {
if e.localDB != nil && types.IsFork(e.height, "ForkLocalDBAccess") {
e.localDB.(*LocalDB).DisableWrite()
if exec.ExecutorOrder() != drivers.ExecLocalSameTime {
e.localDB.(*LocalDB).DisableRead()
......
......@@ -10,7 +10,6 @@ import (
"strings"
"sync"
"github.com/33cn/chain33/account"
"github.com/33cn/chain33/client/api"
dbm "github.com/33cn/chain33/common/db"
clog "github.com/33cn/chain33/common/log"
......@@ -26,7 +25,7 @@ import (
)
var elog = log.New("module", "execs")
var coinsAccount = account.NewCoinsAccount()
var coinsAccount *dbm.DB
// SetLogLevel set log level
func SetLogLevel(level string) {
......
......@@ -1184,3 +1184,12 @@ func fmtAccount(balances []*types.Account) []*rpctypes.Account {
}
return accounts
}
// GetCoinSymbol get coin symbol
func (c *Chain33) GetCoinSymbol(in types.ReqNil, result *interface{}) error {
symbol := types.GetCoinSymbol()
resp := types.ReplyString{Data: symbol}
log.Warn("GetCoinSymbol", "Symbol", symbol)
*result = &resp
return nil
}
......@@ -140,7 +140,7 @@ func (c *CoinsType) GetAssets(tx *types.Transaction) ([]*types.Asset, error) {
return nil, err
}
if assetlist[0].Symbol == "" {
assetlist[0].Symbol = types.BTY
assetlist[0].Symbol = types.GetCoinSymbol()
}
return assetlist, nil
}
......@@ -22,6 +22,7 @@ type Config struct {
Pprof *Pprof `protobuf:"bytes,14,opt,name=pprof" json:"pprof,omitempty"`
Fork *ForkList `protobuf:"bytes,15,opt,name=fork" json:"fork,omitempty"`
Health *HealthCheck `protobuf:"bytes,16,opt,name=health" json:"health,omitempty"`
CoinSymbol string `protobuf:"bytes,16,opt,name=coinSymbol" json:"coinSymbol,omitempty"`
}
// ForkList fork列表配置
......
......@@ -28,6 +28,7 @@ var (
titles = map[string]bool{}
chainConfig = make(map[string]interface{})
mver = make(map[string]*mversion)
coinSymbol = "bty"
)
// coin conversation
......@@ -247,6 +248,7 @@ func Init(t string, cfg *Config) {
}
titles[t] = true
title = t
if cfg != nil {
if isLocal() {
setTestNet(true)
......@@ -264,6 +266,12 @@ func Init(t string, cfg *Config) {
if cfg.Exec.MaxExecFee > 0 {
setChainConfig("MaxFee", cfg.Exec.MaxExecFee)
}
if cfg.CoinSymbol != "" {
if strings.Contains(cfg.CoinSymbol, "-") {
panic("config CoinSymbol must without '-'")
}
coinSymbol = cfg.CoinSymbol
}
}
//local 只用于单元测试
if isLocal() {
......@@ -301,6 +309,14 @@ func GetTitle() string {
return s
}
// GetCoinSymbol 获取 coin symbol
func GetCoinSymbol() string {
mu.Lock()
s := coinSymbol
mu.Unlock()
return s
}
func isLocal() bool {
return title == "local"
}
......
......@@ -100,6 +100,7 @@ driver="leveldb"
[mempool]
poolCacheSize=102400
minTxFee=100000
maxTxFee=1000000000
[consensus]
name="ticket"
......@@ -186,6 +187,7 @@ signType="secp256k1"
[exec]
isFree=false
minExecFee=100000
maxExecFee=1000000000
[exec.sub.token]
#配置一个空值,防止配置文件被覆盖
......@@ -219,6 +221,7 @@ ForkTxHeight= -1
ForkTxGroupPara= -1
ForkChainParamV2= -1
ForkBlockCheck=1725000
ForkLocalDBAccess=1
[fork.sub.coins]
Enable=0
......
......@@ -210,8 +210,9 @@ func SetTestNetFork() {
systemFork.SetFork("chain33", "ForkTxGroupPara", 806578)
systemFork.SetFork("chain33", "ForkCheckBlockTime", 1200000)
systemFork.SetFork("chain33", "ForkMultiSignAddress", 1298600)
systemFork.SetFork("chain33", "ForkStateDBSet", MaxHeight)
systemFork.SetFork("chain33", "ForkStateDBSet", 1572391)
systemFork.SetFork("chain33", "ForkBlockCheck", 1560000)
systemFork.SetFork("chain33", "ForkLocalDBAccess", 1572391)
}
func setLocalFork() {
......
......@@ -173,6 +173,7 @@ ForkTxGroupPara= -1
ForkCheckBlockTime=1200000
ForkMultiSignAddress=1298600
ForkBlockCheck=1725000
ForkLocalDBAccess=1
[fork.sub.coins]
Enable=0
......
......@@ -189,6 +189,7 @@ ForkTxGroupPara= -1
ForkCheckBlockTime=1200000
ForkMultiSignAddress=1298600
ForkBlockCheck=1
ForkLocalDBAccess=0
[fork.sub.coins]
Enable=0
......
......@@ -105,7 +105,7 @@ func (s *HealthCheckServer) getHealth(sync bool) (bool, error) {
return false, err
}
log.Info("healthCheck tick", "peers", len(peerList.Peers), "isCaughtUp", reply.IsOk,
log.Debug("healthCheck tick", "peers", len(peerList.Peers), "isCaughtUp", reply.IsOk,
"health", len(peerList.Peers) > 1 && reply.IsOk, "listen", sync)
return len(peerList.Peers) > 1 && reply.IsOk, nil
......
......@@ -20,8 +20,8 @@ func TestStart(t *testing.T) {
health := NewHealthCheckServer(q.Client())
api := new(mocks.QueueProtocolAPI)
reply := &types.Reply{IsOk: true}
api.On("IsSync").Return(reply, nil)
api.On("IsSync").Return(&types.Reply{IsOk: true}, nil).Times(2)
api.On("IsSync").Return(&types.Reply{IsOk: false}, nil)
peer1 := &types.Peer{Addr: "addr1"}
peer2 := &types.Peer{Addr: "addr2"}
peers := &types.PeerList{Peers: []*types.Peer{peer1, peer2}}
......@@ -31,12 +31,11 @@ func TestStart(t *testing.T) {
cfg, _ := types.InitCfg("../cmd/chain33/chain33.test.toml")
health.Start(cfg.Health)
time.Sleep(time.Second * 3)
api.On("IsSync").Return(&types.Reply{IsOk: false}, nil)
health.Start(cfg.Health)
time.Sleep(time.Second * 3)
time.Sleep(time.Second * 6)
health.Close()
time.Sleep(time.Second * 1)
}
func TestGetHealth(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment