Commit e0e4222b authored by lynAzrael's avatar lynAzrael

Merge remote-tracking branch 'upstream/master'

parents 2b6b5872 a9e3544c
......@@ -12,10 +12,10 @@ import (
"sync"
"time"
log "github.com/33cn/chain33/common/log/log15"
//"github.com/33cn/chain33/common"
"encoding/hex"
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/client/api"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
......@@ -733,7 +733,10 @@ func (client *client) WriteBlock(prev []byte, paraBlock *types.Block, seq int64)
parablockDetail := &types.ParaChainBlockDetail{Blockdetail: blockDetail, Sequence: seq}
msg := client.GetQueueClient().NewMessage("blockchain", types.EventAddParaChainBlockDetail, parablockDetail)
client.GetQueueClient().Send(msg, true)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return err
......@@ -760,7 +763,10 @@ func (client *client) DelBlock(block *types.Block, seq int64) error {
panic("Parachain attempt to Delete GenesisBlock !")
}
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetBlocks, &types.ReqBlocks{Start: start, End: start, IsDetail: true, Pid: []string{""}})
client.GetQueueClient().Send(msg, true)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return err
......@@ -769,7 +775,10 @@ func (client *client) DelBlock(block *types.Block, seq int64) error {
parablockDetail := &types.ParaChainBlockDetail{Blockdetail: blocks.Items[0], Sequence: seq}
msg = client.GetQueueClient().NewMessage("blockchain", types.EventDelParaChainBlockDetail, parablockDetail)
client.GetQueueClient().Send(msg, true)
err = client.GetQueueClient().Send(msg, true)
if err != nil {
return err
}
resp, err = client.GetQueueClient().Wait(msg)
if err != nil {
return err
......
......@@ -551,7 +551,11 @@ out:
break out
case <-time.NewTimer(time.Second * 2).C:
msg := client.paraClient.GetQueueClient().NewMessage("wallet", types.EventDumpPrivkey, req)
client.paraClient.GetQueueClient().Send(msg, true)
err := client.paraClient.GetQueueClient().Send(msg, true)
if err != nil {
plog.Error("para commit send msg", "err", err.Error())
break out
}
resp, err := client.paraClient.GetQueueClient().Wait(msg)
if err != nil {
plog.Error("para commit msg sign to wallet", "err", err.Error())
......
......@@ -485,6 +485,9 @@ func (client *Client) searchTargetTicket(parent, block *types.Block) (*ty.Ticket
client.ticketmu.Lock()
defer client.ticketmu.Unlock()
for ticketID, ticket := range client.ticketsMap {
if client.IsClosed() {
return nil, nil, nil, nil, "", nil
}
if ticket == nil {
tlog.Warn("Client searchTargetTicket ticket is nil", "ticketID", ticketID)
continue
......@@ -668,6 +671,10 @@ func (client *Client) updateBlock(block *types.Block, txHashList [][]byte) (*typ
// CreateBlock ticket create block func
func (client *Client) CreateBlock() {
for {
if client.IsClosed() {
tlog.Info("create block stop")
break
}
if !client.IsMining() || !(client.IsCaughtUp() || client.Cfg.ForceMining) {
tlog.Debug("createblock.ismining is disable or client is caughtup is false")
time.Sleep(time.Second)
......
......@@ -21,8 +21,11 @@ import (
"github.com/stretchr/testify/assert"
)
// 执行: go test -cover
func TestTicket(t *testing.T) {
testTicket(t)
}
func testTicket(t *testing.T) {
mock33 := testnode.New("testdata/chain33.cfg.toml", nil)
defer mock33.Close()
mock33.Listen()
......@@ -61,7 +64,7 @@ func TestTicket(t *testing.T) {
status, err = mock33.GetAPI().GetWalletStatus()
assert.Nil(t, err)
assert.Equal(t, true, status.IsAutoMining)
err = mock33.WaitHeight(100)
err = mock33.WaitHeight(50)
assert.Nil(t, err)
//查询票是否自动close,并且购买了新的票
req := &types.ReqWalletTransactionList{Count: 1000}
......
......@@ -434,3 +434,41 @@ func genPrefixEdge(prefix []byte) (r []byte) {
return r
}
// Mint 铸币
func (acc *DB) Mint(addr string, amount int64) (*types.Receipt, error) {
if !types.CheckAmount(amount) {
return nil, types.ErrAmount
}
accTo := acc.LoadAccount(addr)
balance, err := safeAdd(accTo.Balance, amount)
if err != nil {
return nil, err
}
copyAcc := *accTo
accTo.Balance = balance
receipt := &types.ReceiptAccountMint{
Prev: &copyAcc,
Current: accTo,
}
kv := acc.GetKVSet(accTo)
acc.SaveKVSet(kv)
return acc.mintReceipt(kv, receipt), nil
}
func (acc *DB) mintReceipt(kv []*types.KeyValue, receipt proto.Message) *types.Receipt {
ty := int32(types.TyLogMint)
log1 := &types.ReceiptLog{
Ty: ty,
Log: types.Encode(receipt),
}
return &types.Receipt{
Ty: types.ExecOk,
KV: kv,
Logs: []*types.ReceiptLog{log1},
}
}
......@@ -554,3 +554,13 @@ func TestGetExecBalance2(t *testing.T) {
assert.Equal(t, 1, len(reply.Items))
*/
}
func TestDB_Mint(t *testing.T) {
_, tokenCoin := GenerAccDb()
tokenCoin.GenerAccData()
_, err := tokenCoin.Mint(addr1, 10*1e8)
require.NoError(t, err)
t.Logf("Token mint addr balance [%d]", tokenCoin.LoadAccount(addr1).Balance)
require.Equal(t, int64(1000*1e8+10*1e8), tokenCoin.LoadAccount(addr1).Balance)
}
......@@ -5,9 +5,6 @@
package blockchain
import (
"bytes"
"github.com/33cn/chain33/common/merkle"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
......@@ -15,127 +12,5 @@ import (
//执行区块将变成一个私有的函数
func execBlock(client queue.Client, prevStateRoot []byte, block *types.Block, errReturn bool, sync bool) (*types.BlockDetail, []*types.Transaction, error) {
//发送执行交易给execs模块
//通过consensus module 再次检查
chainlog.Debug("ExecBlock", "height------->", block.Height, "ntx", len(block.Txs))
beg := types.Now()
defer func() {
chainlog.Info("ExecBlock", "height", block.Height, "ntx", len(block.Txs), "writebatchsync", sync, "cost", types.Since(beg))
}()
if errReturn && block.Height > 0 && !block.CheckSign() {
//block的来源不是自己的mempool,而是别人的区块
return nil, nil, types.ErrSign
}
//tx交易去重处理, 这个地方要查询数据库,需要一个更快的办法
cacheTxs := types.TxsToCache(block.Txs)
oldtxscount := len(cacheTxs)
var err error
cacheTxs, err = util.CheckTxDup(client, cacheTxs, block.Height)
if err != nil {
return nil, nil, err
}
newtxscount := len(cacheTxs)
if oldtxscount != newtxscount && errReturn {
return nil, nil, types.ErrTxDup
}
chainlog.Debug("ExecBlock", "prevtx", oldtxscount, "newtx", newtxscount)
block.TxHash = merkle.CalcMerkleRootCache(cacheTxs)
block.Txs = types.CacheToTxs(cacheTxs)
//println("1")
receipts, err := util.ExecTx(client, prevStateRoot, block)
if err != nil {
return nil, nil, err
}
var maplist = make(map[string]*types.KeyValue)
var kvset []*types.KeyValue
var deltxlist = make(map[int]bool)
var rdata []*types.ReceiptData //save to db receipt log
for i := 0; i < len(receipts.Receipts); i++ {
receipt := receipts.Receipts[i]
if receipt.Ty == types.ExecErr {
chainlog.Error("exec tx err", "err", receipt)
if errReturn { //认为这个是一个错误的区块
return nil, nil, types.ErrBlockExec
}
deltxlist[i] = true
continue
}
rdata = append(rdata, &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs})
//处理KV
kvs := receipt.KV
for _, kv := range kvs {
if item, ok := maplist[string(kv.Key)]; ok {
item.Value = kv.Value //更新item 的value
} else {
maplist[string(kv.Key)] = kv
kvset = append(kvset, kv)
}
}
}
//check TxHash
calcHash := merkle.CalcMerkleRoot(block.Txs)
if errReturn && !bytes.Equal(calcHash, block.TxHash) {
return nil, nil, types.ErrCheckTxHash
}
block.TxHash = calcHash
//删除无效的交易
var deltx []*types.Transaction
if len(deltxlist) > 0 {
var newtx []*types.Transaction
for i := 0; i < len(block.Txs); i++ {
if deltxlist[i] {
deltx = append(deltx, block.Txs[i])
} else {
newtx = append(newtx, block.Txs[i])
}
}
block.Txs = newtx
block.TxHash = merkle.CalcMerkleRoot(block.Txs)
}
var detail types.BlockDetail
calcHash, err = util.ExecKVMemSet(client, prevStateRoot, block.Height, kvset, sync)
if err != nil {
return nil, nil, err
}
//println("2")
if errReturn && !bytes.Equal(block.StateHash, calcHash) {
err = util.ExecKVSetRollback(client, calcHash)
if err != nil {
chainlog.Error("execBlock-->ExecKVSetRollback", "err", err)
}
if len(rdata) > 0 {
for i, rd := range rdata {
rd.OutputReceiptDetails(block.Txs[i].Execer, chainlog)
}
}
return nil, nil, types.ErrCheckStateHash
}
block.StateHash = calcHash
detail.Block = block
detail.Receipts = rdata
if detail.Block.Height > 0 {
err := util.CheckBlock(client, &detail)
if err != nil {
chainlog.Debug("CheckBlock-->", "err=", err)
return nil, deltx, err
}
}
//println("3")
//save to db
// 写数据库失败时需要及时返回错误,防止错误数据被写入localdb中CHAIN33-567
err = util.ExecKVSetCommit(client, block.StateHash)
if err != nil {
return nil, nil, err
}
detail.KV = kvset
detail.PrevStatusHash = prevStateRoot
//get receipts
//save kvset and get state hash
//ulog.Debug("blockdetail-->", "detail=", detail)
//println("4")
return &detail, deltx, nil
return util.ExecBlock(client, prevStateRoot, block, errReturn, sync, true)
}
......@@ -226,6 +226,10 @@ func (chain *BlockChain) ProcGetBlockDetailsMsg(requestblock *types.ReqBlocks) (
//ProcAddBlockMsg 处理从peer对端同步过来的block消息
func (chain *BlockChain) ProcAddBlockMsg(broadcast bool, blockdetail *types.BlockDetail, pid string) (*types.BlockDetail, error) {
beg := types.Now()
defer func() {
chainlog.Info("ProcAddBlockMsg", "cost", types.Since(beg))
}()
block := blockdetail.Block
if block == nil {
chainlog.Error("ProcAddBlockMsg input block is null")
......
......@@ -25,7 +25,7 @@ func TestReindex(t *testing.T) {
chain := mock33.GetBlockChain()
db := chain.GetDB()
kvs := getAllKeys(db)
assert.Equal(t, len(kvs), 20)
assert.Equal(t, len(kvs), 22)
defer mock33.Close()
txs := util.GenCoinsTxs(mock33.GetGenesisKey(), 10)
for i := 0; i < len(txs); i++ {
......
......@@ -19,7 +19,7 @@ wget 127.0.0.1:8866 --no-proxy --post-data='{"id" : 1 , "method" : "ShowMinerAcc
8. expectIncrease 预期挖到多少币
9. expectMinerBlocks 预期间隔多少个块能挖到币
10. minerBtyDuring 在预期能挖到币的两倍时间间隔内,挖到多少币
11. expectTotalIncrease 一个小时内预期挖矿增加
11. expectTotalIncrease 一个小时内预期挖矿增加
需要监控
1. 挖矿总量异常: 根据总体币的挖矿情况报警 (不再根据原来的固定值)
......
......@@ -97,11 +97,31 @@ func pow2(d int) (p int) {
return p
}
func calcLevel(n int) int {
if n == 1 {
return 1
}
level := 0
for n > 1 {
if n&1 != 0 {
n++
}
n = n / 2
level++
}
return level
}
func getMerkleRootPad(hashes [][]byte, step int) []byte {
level1 := log2(len(hashes))
level1 := calcLevel(len(hashes))
level2 := log2(step)
root := getMerkleRoot(hashes)
var root []byte
cache := make([]byte, 64)
if len(hashes) == 1 {
root = GetHashFromTwoHash(cache, hashes[0], hashes[0])
} else {
root = getMerkleRoot(hashes)
}
for i := 0; i < level2-level1; i++ {
root = GetHashFromTwoHash(cache, root, root)
}
......
......@@ -331,9 +331,17 @@ func BenchmarkGetMerkelRoot2(b *testing.B) {
}
}
var testlen = 14
func TestGetMerkelRoot1(t *testing.T) {
for i := 0; i < 2000; i++ {
ok := testGetMerkelRoot1(t, i)
if !ok {
t.Error("calc merkel root error", i)
return
}
}
}
func testGetMerkelRoot1(t *testing.T, testlen int) bool {
var hashlist [][]byte
for i := 0; i < testlen; i++ {
key := sha256.Sum256([]byte(fmt.Sprint(i)))
......@@ -347,7 +355,10 @@ func TestGetMerkelRoot1(t *testing.T) {
hashlist = append(hashlist, key[:])
}
hash2 := getMerkleRoot(hashlist)
assert.Equal(t, hash1, hash2)
if !bytes.Equal(hash1, hash2) {
println("failed1")
return false
}
hashlist = nil
for i := 0; i < testlen; i++ {
......@@ -355,7 +366,11 @@ func TestGetMerkelRoot1(t *testing.T) {
hashlist = append(hashlist, key[:])
}
hash3, _, _ := Computation(hashlist, 1, 0)
assert.Equal(t, hash1, hash3)
if !bytes.Equal(hash1, hash3) {
println("failed2")
return false
}
return true
}
func TestLog2(t *testing.T) {
......
......@@ -448,4 +448,7 @@ func (exec *Executor) procExecDelBlock(msg *queue.Message) {
// Close close executor
func (exec *Executor) Close() {
elog.Info("exec module closed")
if exec.client != nil {
exec.client.Close()
}
}
......@@ -188,7 +188,7 @@ func TestExecBlock2(t *testing.T) {
txs := util.GenCoinsTxs(genkey, 2)
block2 := util.CreateNewBlock(block, txs)
detail, _, err := util.ExecBlock(mock33.GetClient(), block.StateHash, block2, false, true)
detail, _, err := util.ExecBlock(mock33.GetClient(), block.StateHash, block2, false, true, false)
if err != nil {
t.Error(err)
return
......@@ -205,7 +205,7 @@ func TestExecBlock2(t *testing.T) {
go func() {
txs := util.GenCoinsTxs(genkey, 2)
block3 := util.CreateNewBlock(block2, txs)
detail, _, err := util.ExecBlock(mock33.GetClient(), block2.StateHash, block3, false, true)
detail, _, err := util.ExecBlock(mock33.GetClient(), block2.StateHash, block3, false, true, false)
assert.Nil(t, err)
for _, Receipt := range detail.Receipts {
if Receipt.GetTy() != 2 {
......@@ -234,7 +234,7 @@ func TestSameTx(t *testing.T) {
newblock.Txs = append(newblock.Txs, newblock.Txs[2])
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
assert.Equal(t, hash1, newblock.TxHash)
_, _, err := util.ExecBlock(mock33.GetClient(), nil, newblock, true, true)
_, _, err := util.ExecBlock(mock33.GetClient(), nil, newblock, true, true, false)
assert.Equal(t, types.ErrTxDup, err)
//情况2
......@@ -244,15 +244,17 @@ func TestSameTx(t *testing.T) {
newblock.Txs = append(newblock.Txs, newblock.Txs[4:]...)
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
assert.Equal(t, hash1, newblock.TxHash)
_, _, err = util.ExecBlock(mock33.GetClient(), nil, newblock, true, true)
_, _, err = util.ExecBlock(mock33.GetClient(), nil, newblock, true, true, false)
assert.Equal(t, types.ErrTxDup, err)
}
func TestExecBlock(t *testing.T) {
mock33 := newMockNode()
defer mock33.Close()
block := util.CreateCoinsBlock(mock33.GetGenesisKey(), 1)
util.ExecBlock(mock33.GetClient(), nil, block, false, true)
mock33.WaitHeight(0)
block0 := mock33.GetBlock(0)
block := util.CreateCoinsBlock(mock33.GetGenesisKey(), 10)
util.ExecBlock(mock33.GetClient(), block0.StateHash, block, false, true, false)
}
//区块执行性能更好的一个测试
......@@ -274,7 +276,7 @@ func BenchmarkExecBlock(b *testing.B) {
assert.Equal(b, int64(10000000000000000), account.Balance)
b.ResetTimer()
for i := 0; i < b.N; i++ {
util.ExecBlock(mock33.GetClient(), block0.StateHash, block, false, true)
util.ExecBlock(mock33.GetClient(), block0.StateHash, block, false, true, false)
}
}
......@@ -363,7 +365,7 @@ func TestExecLocalSameTime1(t *testing.T) {
txs = append(txs, util.CreateTxWithExecer(priv1, "demo2"))
txs = append(txs, util.CreateTxWithExecer(priv1, "demo2"))
block2 := util.CreateNewBlock(block, txs)
detail, _, err := util.ExecBlock(mock33.GetClient(), block.StateHash, block2, false, true)
detail, _, err := util.ExecBlock(mock33.GetClient(), block.StateHash, block2, false, true, false)
if err != nil {
t.Error(err)
return
......@@ -393,7 +395,7 @@ func TestExecLocalSameTime0(t *testing.T) {
txs = append(txs, util.CreateTxWithExecer(priv1, "demo2"))
txs = append(txs, util.CreateTxWithExecer(priv1, "demo2"))
block2 := util.CreateNewBlock(block, txs)
detail, _, err := util.ExecBlock(mock33.GetClient(), block.StateHash, block2, false, true)
detail, _, err := util.ExecBlock(mock33.GetClient(), block.StateHash, block2, false, true, false)
if err != nil {
t.Error(err)
return
......@@ -426,7 +428,7 @@ func TestExecLocalSameTimeSetErrKey(t *testing.T) {
txs = append(txs, util.CreateTxWithExecer(priv1, "demo2"))
txs = append(txs, util.CreateTxWithExecer(priv1, "demo2"))
block2 := util.CreateNewBlock(block, txs)
detail, _, err := util.ExecBlock(mock33.GetClient(), block.StateHash, block2, false, true)
detail, _, err := util.ExecBlock(mock33.GetClient(), block.StateHash, block2, false, true, false)
if err != nil {
t.Error(err)
return
......
......@@ -128,7 +128,10 @@ func (s *StateDB) get(key []byte) ([]byte, error) {
}
query := &types.StoreGet{StateHash: s.stateHash, Keys: [][]byte{key}}
msg := s.client.NewMessage("store", types.EventStoreGet, query)
s.client.Send(msg, true)
err := s.client.Send(msg, true)
if err != nil {
return nil, err
}
resp, err := s.client.Wait(msg)
if err != nil {
panic(err) //no happen for ever
......
......@@ -176,6 +176,9 @@ func (client *client) Close() {
client.wg.Wait()
atomic.StoreInt32(&client.isClosed, 1)
close(client.Recv())
for msg := range client.Recv() {
msg.ReplyErr("client.close", types.ErrChannelClosed)
}
}
// CloseQueue 关闭消息队列
......
......@@ -423,3 +423,50 @@ func BenchmarkChanSubCallback2(b *testing.B) {
client.Reply(msg)
}
}
func TestChannelClose(t *testing.T) {
//send timeout and recv timeout
q := New("channel")
//mempool
done := make(chan struct{}, 1)
go func() {
client := q.Client()
client.Sub("mempool")
for {
select {
case msg := <-client.Recv():
if msg == nil {
return
}
if msg.Ty == types.EventTx {
msg.Reply(client.NewMessage("mempool", types.EventReply, types.Reply{IsOk: true, Msg: []byte("word")}))
}
case <-done:
client.Close()
return
}
}
}()
client := q.Client()
go q.Start()
//rpc 模块 会向其他模块发送消息,自己本身不需要订阅消息
go func() {
done <- struct{}{}
}()
for i := 0; i < 10000; i++ {
msg := client.NewMessage("mempool", types.EventTx, "hello")
err := client.SendTimeout(msg, true, 0)
if err == types.ErrChannelClosed {
return
}
if err != nil {
t.Error(err)
return
}
_, err = client.Wait(msg)
if err != nil {
t.Error(err)
}
}
}
......@@ -45,6 +45,7 @@ type BaseClient struct {
client queue.Client
api client.QueueProtocolAPI
minerStart int32
isclosed int32
once sync.Once
Cfg *types.Consensus
currentBlock *types.Block
......@@ -146,10 +147,16 @@ func (bc *BaseClient) InitBlock() {
//Close 关闭
func (bc *BaseClient) Close() {
atomic.StoreInt32(&bc.minerStart, 0)
atomic.StoreInt32(&bc.isclosed, 1)
bc.client.Close()
log.Info("consensus base closed")
}
//IsClosed 是否已经关闭
func (bc *BaseClient) IsClosed() bool {
return atomic.LoadInt32(&bc.isclosed) == 1
}
//CheckTxDup 为了不引起交易检查时候产生的无序
func (bc *BaseClient) CheckTxDup(txs []*types.Transaction) (transactions []*types.Transaction) {
cacheTxs := types.TxsToCache(txs)
......@@ -172,7 +179,10 @@ func (bc *BaseClient) IsCaughtUp() bool {
panic("bc not bind message queue.")
}
msg := bc.client.NewMessage("blockchain", types.EventIsSync, nil)
bc.client.Send(msg, true)
err := bc.client.Send(msg, true)
if err != nil {
return false
}
resp, err := bc.client.Wait(msg)
if err != nil {
return false
......@@ -273,7 +283,10 @@ func (bc *BaseClient) RequestTx(listSize int, txHashList [][]byte) []*types.Tran
panic("bc not bind message queue.")
}
msg := bc.client.NewMessage("mempool", types.EventTxList, &types.TxHashList{Hashes: txHashList, Count: int64(listSize)})
bc.client.Send(msg, true)
err := bc.client.Send(msg, true)
if err != nil {
return nil
}
resp, err := bc.client.Wait(msg)
if err != nil {
return nil
......@@ -288,7 +301,10 @@ func (bc *BaseClient) RequestBlock(start int64) (*types.Block, error) {
}
reqblock := &types.ReqBlocks{Start: start, End: start, IsDetail: false, Pid: []string{""}}
msg := bc.client.NewMessage("blockchain", types.EventGetBlocks, reqblock)
bc.client.Send(msg, true)
err := bc.client.Send(msg, true)
if err != nil {
return nil, err
}
resp, err := bc.client.Wait(msg)
if err != nil {
return nil, err
......@@ -303,7 +319,10 @@ func (bc *BaseClient) RequestLastBlock() (*types.Block, error) {
panic("client not bind message queue.")
}
msg := bc.client.NewMessage("blockchain", types.EventGetLastBlock, nil)
bc.client.Send(msg, true)
err := bc.client.Send(msg, true)
if err != nil {
return nil, err
}
resp, err := bc.client.Wait(msg)
if err != nil {
return nil, err
......@@ -316,7 +335,10 @@ func (bc *BaseClient) RequestLastBlock() (*types.Block, error) {
func (bc *BaseClient) delMempoolTx(deltx []*types.Transaction) error {
hashList := buildHashList(deltx)
msg := bc.client.NewMessage("mempool", types.EventDelTxList, hashList)
bc.client.Send(msg, true)
err := bc.client.Send(msg, true)
if err != nil {
return err
}
resp, err := bc.client.Wait(msg)
if err != nil {
return err
......@@ -343,7 +365,10 @@ func (bc *BaseClient) WriteBlock(prev []byte, block *types.Block) error {
blockdetail := &types.BlockDetail{Block: block}
msg := bc.client.NewMessage("blockchain", types.EventAddBlockDetail, blockdetail)
bc.client.Send(msg, true)
err := bc.client.Send(msg, true)
if err != nil {
return err
}
resp, err := bc.client.Wait(msg)
if err != nil {
return err
......
......@@ -96,6 +96,9 @@ func (client *Client) CheckBlock(parent *types.Block, current *types.BlockDetail
func (client *Client) CreateBlock() {
issleep := true
for {
if client.IsClosed() {
break
}
if !client.IsMining() || !client.IsCaughtUp() {
time.Sleep(client.sleepTime)
continue
......
......@@ -5,10 +5,13 @@
package solo
import (
"sync"
"testing"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
"github.com/33cn/chain33/util/testnode"
"github.com/stretchr/testify/assert"
//加载系统内置store, 不要依赖plugin
_ "github.com/33cn/chain33/system/dapp/init"
......@@ -31,3 +34,35 @@ func TestSolo(t *testing.T) {
}
mock33.WaitHeight(2)
}
func BenchmarkSolo(b *testing.B) {
cfg, subcfg := testnode.GetDefaultConfig()
solocfg, err := types.ModifySubConfig(subcfg.Consensus["solo"], "waitTxMs", 1000)
assert.Nil(b, err)
subcfg.Consensus["solo"] = solocfg
mock33 := testnode.NewWithConfig(cfg, subcfg, nil)
defer mock33.Close()
txs := util.GenCoinsTxs(mock33.GetGenesisKey(), int64(b.N))
var last []byte
var mu sync.Mutex
b.ResetTimer()
done := make(chan struct{}, 10)
for i := 0; i < 10; i++ {
go func(index int) {
for n := index; n < b.N; n += 10 {
reply, err := mock33.GetAPI().SendTx(txs[n])
if err != nil {
assert.Nil(b, err)
}
mu.Lock()
last = reply.GetMsg()
mu.Unlock()
}
done <- struct{}{}
}(i)
}
for i := 0; i < 10; i++ {
<-done
}
mock33.WaitTx(last)
}
......@@ -76,13 +76,7 @@ func (c *CoinsType) GetTypeMap() map[string]int32 {
//DecodePayloadValue 为了性能考虑,coins 是最常用的合约,我们这里不用反射吗,做了特殊化的优化
func (c *CoinsType) DecodePayloadValue(tx *types.Transaction) (string, reflect.Value, error) {
if txc, ok := types.TxCacheGet(tx); ok {
return txc.GetPayloadValue()
}
txc := types.NewTransactionCache(tx)
name, value, err := c.decodePayloadValue(tx)
txc.SetPayloadValue(name, value, err)
types.TxCacheSet(tx, txc)
return name, value, err
}
......
......@@ -198,6 +198,56 @@ func (m *ReceiptAccountTransfer) GetCurrent() *Account {
return nil
}
//铸币账户余额增加
type ReceiptAccountMint struct {
//铸币前
Prev *Account `protobuf:"bytes,1,opt,name=prev,proto3" json:"prev,omitempty"`
//铸币后
Current *Account `protobuf:"bytes,2,opt,name=current,proto3" json:"current,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReceiptAccountMint) Reset() { *m = ReceiptAccountMint{} }
func (m *ReceiptAccountMint) String() string { return proto.CompactTextString(m) }
func (*ReceiptAccountMint) ProtoMessage() {}
func (*ReceiptAccountMint) Descriptor() ([]byte, []int) {
return fileDescriptor_8e28828dcb8d24f0, []int{3}
}
func (m *ReceiptAccountMint) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReceiptAccountMint.Unmarshal(m, b)
}
func (m *ReceiptAccountMint) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ReceiptAccountMint.Marshal(b, m, deterministic)
}
func (m *ReceiptAccountMint) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReceiptAccountMint.Merge(m, src)
}
func (m *ReceiptAccountMint) XXX_Size() int {
return xxx_messageInfo_ReceiptAccountMint.Size(m)
}
func (m *ReceiptAccountMint) XXX_DiscardUnknown() {
xxx_messageInfo_ReceiptAccountMint.DiscardUnknown(m)
}
var xxx_messageInfo_ReceiptAccountMint proto.InternalMessageInfo
func (m *ReceiptAccountMint) GetPrev() *Account {
if m != nil {
return m.Prev
}
return nil
}
func (m *ReceiptAccountMint) GetCurrent() *Account {
if m != nil {
return m.Current
}
return nil
}
//查询一个地址列表在某个执行器中余额
type ReqBalance struct {
//地址列表
......@@ -216,7 +266,7 @@ func (m *ReqBalance) Reset() { *m = ReqBalance{} }
func (m *ReqBalance) String() string { return proto.CompactTextString(m) }
func (*ReqBalance) ProtoMessage() {}
func (*ReqBalance) Descriptor() ([]byte, []int) {
return fileDescriptor_8e28828dcb8d24f0, []int{3}
return fileDescriptor_8e28828dcb8d24f0, []int{4}
}
func (m *ReqBalance) XXX_Unmarshal(b []byte) error {
......@@ -284,7 +334,7 @@ func (m *Accounts) Reset() { *m = Accounts{} }
func (m *Accounts) String() string { return proto.CompactTextString(m) }
func (*Accounts) ProtoMessage() {}
func (*Accounts) Descriptor() ([]byte, []int) {
return fileDescriptor_8e28828dcb8d24f0, []int{4}
return fileDescriptor_8e28828dcb8d24f0, []int{5}
}
func (m *Accounts) XXX_Unmarshal(b []byte) error {
......@@ -324,7 +374,7 @@ func (m *ExecAccount) Reset() { *m = ExecAccount{} }
func (m *ExecAccount) String() string { return proto.CompactTextString(m) }
func (*ExecAccount) ProtoMessage() {}
func (*ExecAccount) Descriptor() ([]byte, []int) {
return fileDescriptor_8e28828dcb8d24f0, []int{5}
return fileDescriptor_8e28828dcb8d24f0, []int{6}
}
func (m *ExecAccount) XXX_Unmarshal(b []byte) error {
......@@ -371,7 +421,7 @@ func (m *AllExecBalance) Reset() { *m = AllExecBalance{} }
func (m *AllExecBalance) String() string { return proto.CompactTextString(m) }
func (*AllExecBalance) ProtoMessage() {}
func (*AllExecBalance) Descriptor() ([]byte, []int) {
return fileDescriptor_8e28828dcb8d24f0, []int{6}
return fileDescriptor_8e28828dcb8d24f0, []int{7}
}
func (m *AllExecBalance) XXX_Unmarshal(b []byte) error {
......@@ -423,7 +473,7 @@ func (m *ReqAllExecBalance) Reset() { *m = ReqAllExecBalance{} }
func (m *ReqAllExecBalance) String() string { return proto.CompactTextString(m) }
func (*ReqAllExecBalance) ProtoMessage() {}
func (*ReqAllExecBalance) Descriptor() ([]byte, []int) {
return fileDescriptor_8e28828dcb8d24f0, []int{7}
return fileDescriptor_8e28828dcb8d24f0, []int{8}
}
func (m *ReqAllExecBalance) XXX_Unmarshal(b []byte) error {
......@@ -483,6 +533,7 @@ func init() {
proto.RegisterType((*Account)(nil), "types.Account")
proto.RegisterType((*ReceiptExecAccountTransfer)(nil), "types.ReceiptExecAccountTransfer")
proto.RegisterType((*ReceiptAccountTransfer)(nil), "types.ReceiptAccountTransfer")
proto.RegisterType((*ReceiptAccountMint)(nil), "types.ReceiptAccountMint")
proto.RegisterType((*ReqBalance)(nil), "types.ReqBalance")
proto.RegisterType((*Accounts)(nil), "types.Accounts")
proto.RegisterType((*ExecAccount)(nil), "types.ExecAccount")
......@@ -493,31 +544,32 @@ func init() {
func init() { proto.RegisterFile("account.proto", fileDescriptor_8e28828dcb8d24f0) }
var fileDescriptor_8e28828dcb8d24f0 = []byte{
// 414 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x53, 0xc1, 0x6e, 0xd3, 0x40,
0x10, 0xd5, 0xc6, 0x49, 0x5d, 0x4f, 0xa0, 0x12, 0x7b, 0xa8, 0x56, 0x15, 0x15, 0x66, 0x4f, 0x3e,
0x20, 0x47, 0xc2, 0xfc, 0x40, 0x2b, 0x21, 0x71, 0x43, 0x5a, 0x38, 0xf5, 0x82, 0xd6, 0xdb, 0x09,
0x89, 0x70, 0x6d, 0x77, 0x77, 0x83, 0x08, 0x1f, 0xc0, 0x6f, 0x20, 0xf1, 0xa5, 0x68, 0x27, 0xeb,
0xc6, 0x25, 0x02, 0xe5, 0xc6, 0x2d, 0xf3, 0xde, 0xec, 0xbc, 0x37, 0x2f, 0x63, 0x78, 0xaa, 0x8d,
0xe9, 0x36, 0xad, 0x2f, 0x7b, 0xdb, 0xf9, 0x8e, 0xcf, 0xfc, 0xb6, 0x47, 0x27, 0xbf, 0x40, 0x7a,
0xb5, 0xc3, 0xf9, 0x05, 0x9c, 0x9a, 0x8d, 0xb5, 0xd8, 0x9a, 0xad, 0x60, 0x39, 0x2b, 0x66, 0xea,
0xa1, 0xe6, 0x02, 0xd2, 0x5a, 0x37, 0xba, 0x35, 0x28, 0x26, 0x39, 0x2b, 0x12, 0x35, 0x94, 0xfc,
0x1c, 0x4e, 0x96, 0xb6, 0xfb, 0x8e, 0xad, 0x48, 0x88, 0x88, 0x15, 0xe7, 0x30, 0xd5, 0xb7, 0xb7,
0x56, 0x4c, 0x73, 0x56, 0x64, 0x8a, 0x7e, 0xcb, 0x1f, 0x0c, 0x2e, 0x14, 0x1a, 0x5c, 0xf7, 0xfe,
0xed, 0x37, 0x34, 0x51, 0xf8, 0xa3, 0xd5, 0xad, 0x5b, 0xa2, 0x0d, 0x06, 0x30, 0xc0, 0xe1, 0x19,
0xa3, 0x67, 0x0f, 0x35, 0x97, 0x30, 0xed, 0x2d, 0x7e, 0x25, 0xf5, 0xf9, 0xeb, 0xb3, 0x92, 0xdc,
0x97, 0x71, 0x82, 0x22, 0x8e, 0x17, 0x90, 0xee, 0x0c, 0x7b, 0xf2, 0x72, 0xd8, 0x36, 0xd0, 0x72,
0x09, 0xe7, 0xd1, 0xc7, 0x9f, 0x1e, 0x06, 0x1d, 0x76, 0x9c, 0xce, 0xe4, 0xdf, 0x3a, 0xbf, 0x18,
0x80, 0xc2, 0xfb, 0xeb, 0x98, 0xd5, 0x73, 0xc8, 0x42, 0x0e, 0xe8, 0x1c, 0x3a, 0xc1, 0xf2, 0xa4,
0xc8, 0xd4, 0x1e, 0x08, 0x49, 0x86, 0x75, 0xd1, 0xd2, 0xd4, 0x4c, 0xc5, 0x2a, 0xbc, 0x72, 0x5e,
0x7b, 0x7c, 0xa7, 0xdd, 0x8a, 0x16, 0xcb, 0xd4, 0x1e, 0xe0, 0x97, 0x00, 0xda, 0x39, 0xf4, 0x9f,
0x42, 0x77, 0x4c, 0x3b, 0x23, 0x24, 0x44, 0xcc, 0x5f, 0xc2, 0x93, 0x1d, 0xed, 0xb6, 0x77, 0x75,
0xd7, 0x88, 0x19, 0x35, 0xcc, 0x09, 0xfb, 0x40, 0x90, 0x7c, 0x05, 0xa7, 0xd1, 0xb8, 0xe3, 0x39,
0x24, 0xda, 0x18, 0xf2, 0x76, 0xb8, 0x56, 0xa0, 0xe4, 0x7b, 0x98, 0x8f, 0xfe, 0xbb, 0x91, 0x69,
0xf6, 0xc8, 0x74, 0x01, 0x69, 0xbc, 0xb7, 0xbf, 0x65, 0x14, 0x69, 0x79, 0x03, 0x67, 0x57, 0x4d,
0x13, 0x66, 0x0e, 0x31, 0x0d, 0xa7, 0xc3, 0xf6, 0xa7, 0xc3, 0xdf, 0x3c, 0x92, 0x15, 0x13, 0x32,
0xc8, 0xe3, 0xcc, 0x11, 0xa3, 0xc6, 0x6d, 0xf2, 0x27, 0x83, 0x67, 0x0a, 0xef, 0x8f, 0x98, 0xff,
0x9f, 0xc2, 0xbf, 0x7e, 0x71, 0x73, 0xf9, 0x79, 0xed, 0x57, 0x9b, 0xba, 0x34, 0xdd, 0xdd, 0xa2,
0xaa, 0x4c, 0xbb, 0x30, 0x2b, 0xbd, 0x6e, 0xab, 0x6a, 0x41, 0xbb, 0xd5, 0x27, 0xf4, 0xb9, 0x56,
0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xf3, 0x95, 0x98, 0xa9, 0xbf, 0x03, 0x00, 0x00,
// 427 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x54, 0xcd, 0x6a, 0x14, 0x41,
0x10, 0xa6, 0xf7, 0x27, 0x9b, 0xa9, 0xd5, 0x80, 0x7d, 0x08, 0x4d, 0x30, 0x38, 0xf6, 0x69, 0x0e,
0xb2, 0x0b, 0x8e, 0x2f, 0x90, 0x80, 0xe0, 0x45, 0x84, 0xd6, 0x53, 0x2e, 0xd2, 0xd3, 0xa9, 0x75,
0x07, 0x27, 0x3d, 0x93, 0xee, 0x5e, 0x71, 0x7d, 0x00, 0x5f, 0x43, 0xf0, 0x49, 0xa5, 0x6b, 0x7b,
0xb2, 0xb3, 0x06, 0x25, 0x07, 0xc1, 0xdb, 0xd4, 0xf7, 0x55, 0xd5, 0xf7, 0x75, 0x55, 0x31, 0xf0,
0x58, 0x1b, 0xd3, 0x6e, 0x6c, 0x58, 0x74, 0xae, 0x0d, 0x2d, 0x9f, 0x86, 0x6d, 0x87, 0x5e, 0x7e,
0x86, 0xd9, 0xc5, 0x0e, 0xe7, 0x67, 0x70, 0x6c, 0x36, 0xce, 0xa1, 0x35, 0x5b, 0xc1, 0x72, 0x56,
0x4c, 0xd5, 0x5d, 0xcc, 0x05, 0xcc, 0x2a, 0xdd, 0x68, 0x6b, 0x50, 0x8c, 0x72, 0x56, 0x8c, 0x55,
0x1f, 0xf2, 0x53, 0x38, 0x5a, 0xb9, 0xf6, 0x1b, 0x5a, 0x31, 0x26, 0x22, 0x45, 0x9c, 0xc3, 0x44,
0x5f, 0x5f, 0x3b, 0x31, 0xc9, 0x59, 0x91, 0x29, 0xfa, 0x96, 0xdf, 0x19, 0x9c, 0x29, 0x34, 0x58,
0x77, 0xe1, 0xf5, 0x57, 0x34, 0x49, 0xf8, 0x83, 0xd3, 0xd6, 0xaf, 0xd0, 0x45, 0x03, 0x18, 0xe1,
0x58, 0xc6, 0xa8, 0xec, 0x2e, 0xe6, 0x12, 0x26, 0x9d, 0xc3, 0x2f, 0xa4, 0x3e, 0x7f, 0x79, 0xb2,
0x20, 0xf7, 0x8b, 0xd4, 0x41, 0x11, 0xc7, 0x0b, 0x98, 0xed, 0x0c, 0x07, 0xf2, 0x72, 0x3f, 0xad,
0xa7, 0xe5, 0x0a, 0x4e, 0x93, 0x8f, 0xdf, 0x3d, 0xf4, 0x3a, 0xec, 0x61, 0x3a, 0xa3, 0xbf, 0xeb,
0x54, 0xc0, 0x0f, 0x75, 0xde, 0xd6, 0x36, 0xfc, 0x63, 0x8d, 0x9f, 0x0c, 0x40, 0xe1, 0xed, 0x65,
0xda, 0xc7, 0x53, 0xc8, 0xe2, 0xac, 0xd1, 0x7b, 0xf4, 0x82, 0xe5, 0xe3, 0x22, 0x53, 0x7b, 0x20,
0x6e, 0x2b, 0x8e, 0x14, 0x1d, 0x75, 0xcd, 0x54, 0x8a, 0x62, 0x95, 0x0f, 0x3a, 0xe0, 0x1b, 0xed,
0xd7, 0x34, 0xbc, 0x4c, 0xed, 0x01, 0x7e, 0x0e, 0xa0, 0xbd, 0xc7, 0xf0, 0x31, 0x66, 0xa7, 0x8d,
0x66, 0x84, 0xc4, 0x35, 0xf2, 0xe7, 0xf0, 0x68, 0x47, 0xfb, 0xed, 0x4d, 0xd5, 0x36, 0x62, 0x4a,
0x09, 0x73, 0xc2, 0xde, 0x13, 0x24, 0x5f, 0xc0, 0x71, 0x32, 0xee, 0x79, 0x0e, 0x63, 0x6d, 0x0c,
0x79, 0xbb, 0xff, 0xac, 0x48, 0xc9, 0x77, 0x30, 0x1f, 0xdc, 0xc7, 0xc0, 0x34, 0x3b, 0x30, 0x5d,
0xc0, 0x2c, 0xdd, 0xf4, 0x9f, 0x66, 0x94, 0x68, 0x79, 0x05, 0x27, 0x17, 0x4d, 0x13, 0x7b, 0xf6,
0x63, 0xea, 0xcf, 0x93, 0xed, 0xcf, 0x93, 0xbf, 0x3a, 0x90, 0x15, 0x23, 0x32, 0xc8, 0x53, 0xcf,
0x01, 0xa3, 0x86, 0x69, 0xf2, 0x07, 0x83, 0x27, 0x0a, 0x6f, 0x1f, 0xd0, 0xff, 0x3f, 0x0d, 0xff,
0xf2, 0xd9, 0xd5, 0xf9, 0xa7, 0x3a, 0xac, 0x37, 0xd5, 0xc2, 0xb4, 0x37, 0xcb, 0xb2, 0x34, 0x76,
0x69, 0xd6, 0xba, 0xb6, 0x65, 0xb9, 0xa4, 0xb7, 0x55, 0x47, 0xf4, 0x4b, 0x28, 0x7f, 0x05, 0x00,
0x00, 0xff, 0xff, 0x1b, 0xce, 0xc2, 0x14, 0x23, 0x04, 0x00, 0x00,
}
package types
var gbuffer *buffer
//系统并非线程安全
func init() {
gbuffer = newBuffer(20 * 1024 * 1024)
}
//Buffer 一个连续的byte 空间,永远不会被释放
type buffer struct {
data []byte
offset int
}
//新建一个buffer 对象
func newBuffer(total int) *buffer {
return &buffer{data: make([]byte, total), offset: 0}
}
//BufferReset 重置buffer
func BufferReset() {
gbuffer.offset = 0
}
//BufferAlloc 部分空间
func BufferAlloc(size int) []byte {
if gbuffer.offset+size > 0 {
return make([]byte, size)
}
b := gbuffer.data[gbuffer.offset : gbuffer.offset+size]
gbuffer.offset += size
return b
}
//BufferAllocCap alloc cap
func BufferAllocCap(size int) []byte {
if gbuffer.offset+size > 0 {
return make([]byte, 0, size)
}
b := gbuffer.data[gbuffer.offset:gbuffer.offset]
gbuffer.offset += size
return b
}
package types
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestAlloc(t *testing.T) {
BufferReset()
data := BufferAlloc(10)
assert.Equal(t, 10, len(data))
data2 := BufferAlloc(10)
assert.Equal(t, 10, len(data2))
data3 := BufferAllocCap(10)
assert.Equal(t, 0, len(data3))
data4 := BufferAllocCap(10)
assert.Equal(t, 0, len(data4))
for i := range data {
data[i] = 1
}
for i := range data2 {
data2[i] = 2
}
for i := range data {
assert.Equal(t, byte(1), data[i])
}
for i := range data2 {
assert.Equal(t, byte(2), data2[i])
}
}
func BenchmarkAlloc(b *testing.B) {
BufferReset()
data := make([][]byte, b.N)
for i := 0; i < b.N; i++ {
a := BufferAlloc(10)
if a == nil {
panic("alloc")
}
data[i] = a
}
}
func BenchmarkAllocMake(b *testing.B) {
data := make([][]byte, b.N)
for i := 0; i < b.N; i++ {
a := make([]byte, 10)
if a == nil {
panic("alloc")
}
data[i] = a
}
}
......@@ -103,6 +103,7 @@ const (
TyLogGenesisTransfer = 11
TyLogGenesisDeposit = 12
TyLogRollback = 13
TyLogMint = 14
)
//SystemLog 系统log日志
......
......@@ -508,13 +508,7 @@ func (base *ExecTypeBase) DecodePayload(tx *Transaction) (Message, error) {
//DecodePayloadValue 解析tx交易中的payload具体Value值
func (base *ExecTypeBase) DecodePayloadValue(tx *Transaction) (string, reflect.Value, error) {
if txc, ok := txCache.Get(tx); ok {
return txc.(*TransactionCache).GetPayloadValue()
}
txc := NewTransactionCache(tx)
name, value, err := base.decodePayloadValue(tx)
txc.SetPayloadValue(name, value, err)
txCache.Add(tx, txc)
return name, value, err
}
......
......@@ -33,6 +33,14 @@ message ReceiptAccountTransfer {
Account current = 2;
}
//铸币账户余额增加
message ReceiptAccountMint {
//铸币前
Account prev = 1;
//铸币后
Account current = 2;
}
//查询一个地址列表在某个执行器中余额
message ReqBalance {
//地址列表
......
......@@ -207,3 +207,10 @@ func TestParseExpire(t *testing.T) {
assert.Equal(t, int64(123000000000), exp)
}
func BenchmarkHash(b *testing.B) {
tx := &Transaction{Payload: []byte("xxxxxxxxxxxxdggrgrgrgrgrgrgrrhthththhth"), Execer: []byte("hello")}
for i := 0; i < b.N; i++ {
tx.Hash()
}
}
......@@ -17,7 +17,10 @@ import (
func CheckBlock(client queue.Client, block *types.BlockDetail) error {
req := block
msg := client.NewMessage("consensus", types.EventCheckBlock, req)
client.Send(msg, true)
err := client.Send(msg, true)
if err != nil {
return err
}
resp, err := client.Wait(msg)
if err != nil {
return err
......@@ -43,7 +46,10 @@ func ExecTx(client queue.Client, prevStateRoot []byte, block *types.Block) (*typ
IsMempool: false,
}
msg := client.NewMessage("execs", types.EventExecTxList, list)
client.Send(msg, true)
err := client.Send(msg, true)
if err != nil {
return nil, err
}
resp, err := client.Wait(msg)
if err != nil {
return nil, err
......@@ -58,7 +64,10 @@ func ExecKVMemSet(client queue.Client, prevStateRoot []byte, height int64, kvset
setwithsync := &types.StoreSetWithSync{Storeset: set, Sync: sync}
msg := client.NewMessage("store", types.EventStoreMemSet, setwithsync)
client.Send(msg, true)
err := client.Send(msg, true)
if err != nil {
return nil, err
}
resp, err := client.Wait(msg)
if err != nil {
return nil, err
......@@ -71,8 +80,11 @@ func ExecKVMemSet(client queue.Client, prevStateRoot []byte, height int64, kvset
func ExecKVSetCommit(client queue.Client, hash []byte) error {
req := &types.ReqHash{Hash: hash}
msg := client.NewMessage("store", types.EventStoreCommit, req)
client.Send(msg, true)
msg, err := client.Wait(msg)
err := client.Send(msg, true)
if err != nil {
return err
}
msg, err = client.Wait(msg)
if err != nil {
return err
}
......@@ -85,8 +97,11 @@ func ExecKVSetCommit(client queue.Client, hash []byte) error {
func ExecKVSetRollback(client queue.Client, hash []byte) error {
req := &types.ReqHash{Hash: hash}
msg := client.NewMessage("store", types.EventStoreRollback, req)
client.Send(msg, true)
msg, err := client.Wait(msg)
err := client.Send(msg, true)
if err != nil {
return err
}
msg, err = client.Wait(msg)
if err != nil {
return err
}
......
......@@ -43,7 +43,7 @@ singleMode=true
batchsync=false
isRecordBlockSequence=true
isParaChain=false
enableTxQuickIndex=false
enableTxQuickIndex=true
[p2p]
seeds=[]
......@@ -68,9 +68,9 @@ grpcFuncWhitelist=["*"]
[mempool]
name="timeline"
poolCacheSize=10240
poolCacheSize=102400
minTxFee=100000
maxTxNumPerAccount=10000
maxTxNumPerAccount=100000
[consensus]
name="solo"
......@@ -89,7 +89,7 @@ futureBlockTime = 16
ticketFrozenTime = 5
ticketWithdrawTime = 10
ticketMinerWaitTime = 2
maxTxNumber = 1600
maxTxNumber = 10000
targetTimespan = 2304
targetTimePerBlock = 16
......
......@@ -289,14 +289,23 @@ func (mock *Chain33Mock) Close() {
}
func (mock *Chain33Mock) closeNoLock() {
lognode.Info("network close")
mock.network.Close()
lognode.Info("network close")
mock.rpc.Close()
lognode.Info("rpc close")
mock.mem.Close()
lognode.Info("mem close")
mock.exec.Close()
lognode.Info("exec close")
mock.cs.Close()
lognode.Info("cs close")
mock.wallet.Close()
lognode.Info("wallet close")
mock.chain.Close()
lognode.Info("chain close")
mock.store.Close()
lognode.Info("store close")
mock.client.Close()
os.RemoveAll(mock.datadir)
}
......
......@@ -32,7 +32,7 @@ func init() {
rand.Seed(types.Now().UnixNano())
}
var chainlog = log15.New("module", "util")
var ulog = log15.New("module", "util")
//GetParaExecName : 如果 name 没有 paraName 前缀,那么加上这个前缀
func GetParaExecName(paraName string, name string) string {
......@@ -220,15 +220,15 @@ func CreateCoinsBlock(priv crypto.PrivKey, n int64) *types.Block {
}
// ExecBlock : just exec block
func ExecBlock(client queue.Client, prevStateRoot []byte, block *types.Block, errReturn bool, sync bool) (*types.BlockDetail, []*types.Transaction, error) {
func ExecBlock(client queue.Client, prevStateRoot []byte, block *types.Block, errReturn, sync, checkblock bool) (*types.BlockDetail, []*types.Transaction, error) {
//发送执行交易给execs模块
//通过consensus module 再次检查
ulog := chainlog
ulog.Debug("ExecBlock", "height------->", block.Height, "ntx", len(block.Txs))
beg := types.Now()
defer func() {
ulog.Info("ExecBlock", "height", block.Height, "ntx", len(block.Txs), "writebatchsync", sync, "cost", types.Since(beg))
}()
if errReturn && block.Height > 0 && !block.CheckSign() {
//block的来源不是自己的mempool,而是别人的区块
return nil, nil, types.ErrSign
......@@ -241,18 +241,21 @@ func ExecBlock(client queue.Client, prevStateRoot []byte, block *types.Block, er
if err != nil {
return nil, nil, err
}
ulog.Info("ExecBlock", "CheckTxDup", types.Since(beg))
beg = types.Now()
newtxscount := len(cacheTxs)
if oldtxscount != newtxscount && errReturn {
return nil, nil, types.ErrTxDup
}
ulog.Debug("ExecBlock", "prevtx", oldtxscount, "newtx", newtxscount)
block.TxHash = merkle.CalcMerkleRootCache(cacheTxs)
block.Txs = types.CacheToTxs(cacheTxs)
//println("1")
receipts, err := ExecTx(client, prevStateRoot, block)
if err != nil {
return nil, nil, err
}
ulog.Info("ExecBlock", "ExecTx", types.Since(beg))
beg = types.Now()
var kvset []*types.KeyValue
var deltxlist = make(map[int]bool)
var rdata []*types.ReceiptData //save to db receipt log
......@@ -270,34 +273,45 @@ func ExecBlock(client queue.Client, prevStateRoot []byte, block *types.Block, er
kvset = append(kvset, receipt.KV...)
}
kvset = DelDupKey(kvset)
//check TxHash
calcHash := merkle.CalcMerkleRoot(block.Txs)
if errReturn && !bytes.Equal(calcHash, block.TxHash) {
return nil, nil, types.ErrCheckTxHash
}
block.TxHash = calcHash
//删除无效的交易
var deltx []*types.Transaction
if len(deltxlist) > 0 {
var newtx []*types.Transaction
index := 0
for i := 0; i < len(block.Txs); i++ {
if deltxlist[i] {
deltx = append(deltx, block.Txs[i])
} else {
newtx = append(newtx, block.Txs[i])
continue
}
block.Txs[index] = block.Txs[i]
cacheTxs[index] = cacheTxs[i]
index++
}
block.Txs = newtx
block.TxHash = merkle.CalcMerkleRoot(block.Txs)
block.Txs = block.Txs[0:index]
cacheTxs = cacheTxs[0:index]
}
//交易有执行不成功的,报错(TxHash一定不同)
if len(deltx) > 0 && errReturn {
return nil, nil, types.ErrCheckTxHash
}
//检查block的txhash值
calcHash := merkle.CalcMerkleRootCache(cacheTxs)
if errReturn && !bytes.Equal(calcHash, block.TxHash) {
return nil, nil, types.ErrCheckTxHash
}
ulog.Info("ExecBlock", "CalcMerkleRootCache", types.Since(beg))
beg = types.Now()
block.TxHash = calcHash
var detail types.BlockDetail
calcHash, err = ExecKVMemSet(client, prevStateRoot, block.Height, kvset, sync)
if err != nil {
return nil, nil, err
}
//println("2")
if errReturn && !bytes.Equal(block.StateHash, calcHash) {
ExecKVSetRollback(client, calcHash)
err = ExecKVSetRollback(client, calcHash)
if err != nil {
ulog.Error("execBlock-->ExecKVSetRollback", "err", err)
}
if len(rdata) > 0 {
for i, rd := range rdata {
rd.OutputReceiptDetails(block.Txs[i].Execer, ulog)
......@@ -308,7 +322,22 @@ func ExecBlock(client queue.Client, prevStateRoot []byte, block *types.Block, er
block.StateHash = calcHash
detail.Block = block
detail.Receipts = rdata
ExecKVSetCommit(client, block.StateHash)
if detail.Block.Height > 0 && checkblock {
err := CheckBlock(client, &detail)
if err != nil {
ulog.Debug("CheckBlock-->", "err=", err)
return nil, deltx, err
}
}
ulog.Info("ExecBlock", "CheckBlock", types.Since(beg))
beg = types.Now()
// 写数据库失败时需要及时返回错误,防止错误数据被写入localdb中CHAIN33-567
err = ExecKVSetCommit(client, block.StateHash)
if err != nil {
return nil, nil, err
}
detail.KV = kvset
detail.PrevStatusHash = prevStateRoot
return &detail, deltx, nil
}
......@@ -362,7 +391,7 @@ func ExecAndCheckBlock2(qclient queue.Client, block *types.Block, txs []*types.T
//ExecAndCheckBlockCB :
func ExecAndCheckBlockCB(qclient queue.Client, block *types.Block, txs []*types.Transaction, cb func(int, *types.ReceiptData) error) (*types.Block, error) {
block2 := CreateNewBlock(block, txs)
detail, deltx, err := ExecBlock(qclient, block.StateHash, block2, false, true)
detail, deltx, err := ExecBlock(qclient, block.StateHash, block2, false, true, false)
if err != nil {
return nil, err
}
......@@ -410,7 +439,7 @@ func ResetDatadir(cfg *types.Config, datadir string) string {
}
datadir = filepath.Join(dir, datadir[6:])
}
chainlog.Info("current user data dir is ", "dir", datadir)
ulog.Info("current user data dir is ", "dir", datadir)
cfg.Log.LogFile = filepath.Join(datadir, cfg.Log.LogFile)
cfg.BlockChain.DbPath = filepath.Join(datadir, cfg.BlockChain.DbPath)
cfg.P2P.DbPath = filepath.Join(datadir, cfg.P2P.DbPath)
......
......@@ -40,31 +40,35 @@ func (c *Cache) Purge() {
// Add adds a value to the cache. Returns true if an eviction occurred.
func (c *Cache) Add(key, value interface{}) (evicted bool) {
c.lock.Lock()
defer c.lock.Unlock()
return c.lru.Add(key, value)
evicted = c.lru.Add(key, value)
c.lock.Unlock()
return evicted
}
// Get looks up a key's value from the cache.
func (c *Cache) Get(key interface{}) (value interface{}, ok bool) {
c.lock.Lock()
defer c.lock.Unlock()
return c.lru.Get(key)
value, ok = c.lru.Get(key)
c.lock.Unlock()
return value, ok
}
// Contains checks if a key is in the cache, without updating the
// recent-ness or deleting it for being stale.
func (c *Cache) Contains(key interface{}) bool {
c.lock.RLock()
defer c.lock.RUnlock()
return c.lru.Contains(key)
containKey := c.lru.Contains(key)
c.lock.RUnlock()
return containKey
}
// Peek returns the key value (or undefined if not found) without updating
// the "recently used"-ness of the key.
func (c *Cache) Peek(key interface{}) (value interface{}, ok bool) {
c.lock.RLock()
defer c.lock.RUnlock()
return c.lru.Peek(key)
value, ok = c.lru.Peek(key)
c.lock.RUnlock()
return value, ok
}
// ContainsOrAdd checks if a key is in the cache without updating the
......@@ -98,13 +102,15 @@ func (c *Cache) RemoveOldest() {
// Keys returns a slice of the keys in the cache, from oldest to newest.
func (c *Cache) Keys() []interface{} {
c.lock.RLock()
defer c.lock.RUnlock()
return c.lru.Keys()
keys := c.lru.Keys()
c.lock.RUnlock()
return keys
}
// Len returns the number of items in the cache.
func (c *Cache) Len() int {
c.lock.RLock()
defer c.lock.RUnlock()
return c.lru.Len()
length := c.lru.Len()
c.lock.RUnlock()
return length
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment