Commit e9ed2067 authored by 张振华's avatar 张振华

Merge branch 'master' into guess

parents 2c249090 9eac9d7d
...@@ -349,11 +349,11 @@ func (client *client) getLastBlockInfo() (int64, *types.Block, []byte, int64, er ...@@ -349,11 +349,11 @@ func (client *client) getLastBlockInfo() (int64, *types.Block, []byte, int64, er
if seq == -1 { if seq == -1 {
seq = 0 seq = 0
} }
savedBlockOnMain, _, err := client.GetBlockOnMainBySeq(seq) main, err := client.GetBlockOnMainBySeq(seq)
if err != nil { if err != nil {
return -2, nil, nil, -2, err return -2, nil, nil, -2, err
} }
return blockedSeq, lastBlock, savedBlockOnMain.Block.Hash(), savedBlockOnMain.Block.Height, nil return blockedSeq, lastBlock, main.Seq.Hash, main.Detail.Block.Height, nil
} }
...@@ -404,72 +404,35 @@ func (client *client) GetSeqByHashOnMainChain(hash []byte) (int64, error) { ...@@ -404,72 +404,35 @@ func (client *client) GetSeqByHashOnMainChain(hash []byte) (int64, error) {
return seq.Data, nil return seq.Data, nil
} }
func (client *client) GetBlocksByHashesFromMainChain(hashes [][]byte) (*types.BlockDetails, error) { func (client *client) GetBlockOnMainBySeq(seq int64) (*types.BlockSeq, error) {
req := &types.ReqHashes{Hashes: hashes} blockSeq, err := client.grpcClient.GetBlockBySeq(context.Background(), &types.Int64{Data: seq})
blocks, err := client.grpcClient.GetBlockByHashes(context.Background(), req)
if err != nil { if err != nil {
plog.Error("GetBlocksByHashesFromMainChain", "Error", err.Error()) plog.Error("Not found block on main", "seq", seq)
return nil, err return nil, err
} }
return blocks, nil return blockSeq, nil
}
func (client *client) GetBlockHashFromMainChain(start int64, end int64) (*types.BlockSequences, error) {
req := &types.ReqBlocks{Start: start, End: end, IsDetail: true, Pid: []string{}}
blockSeqs, err := client.grpcClient.GetBlockSequences(context.Background(), req)
if err != nil {
plog.Error("GetBlockHashFromMainChain", "Error", err.Error())
return nil, err
}
return blockSeqs, nil
}
func (client *client) GetBlockOnMainBySeq(seq int64) (*types.BlockDetail, int64, error) {
blockSeqs, err := client.GetBlockHashFromMainChain(seq, seq)
if err != nil {
plog.Error("Not found block hash on seq", "start", seq, "end", seq)
return nil, -1, err
}
var hashes [][]byte
for _, item := range blockSeqs.Items {
hashes = append(hashes, item.Hash)
}
blockDetails, err := client.GetBlocksByHashesFromMainChain(hashes)
if err != nil {
return nil, -1, err
}
//protect the boundary
if len(blockSeqs.Items) != len(blockDetails.Items) {
panic("Inconsistency between GetBlockSequences and GetBlockByHashes")
}
return blockDetails.Items[0], blockSeqs.Items[0].Type, nil
} }
// preBlockHash to identify the same main node // preBlockHash to identify the same main node
func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*types.Transaction, *types.Block, int64, error) { func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*types.Transaction, *types.BlockSeq, error) {
plog.Debug("Para consensus RequestTx") plog.Debug("Para consensus RequestTx")
lastSeq, err := client.GetLastSeqOnMainChain() lastSeq, err := client.GetLastSeqOnMainChain()
if err != nil { if err != nil {
return nil, nil, -1, err return nil, nil, err
} }
plog.Info("RequestTx", "LastMainSeq", lastSeq, "CurrSeq", currSeq) plog.Info("RequestTx", "LastMainSeq", lastSeq, "CurrSeq", currSeq)
if lastSeq >= currSeq { if lastSeq >= currSeq {
blockDetail, seqTy, err := client.GetBlockOnMainBySeq(currSeq) blockSeq, err := client.GetBlockOnMainBySeq(currSeq)
if err != nil { if err != nil {
return nil, nil, -1, err return nil, nil, err
} }
//genesis block start with seq=-1 not check //genesis block start with seq=-1 not check
if currSeq == 0 || if currSeq == 0 ||
(bytes.Equal(preMainBlockHash, blockDetail.Block.ParentHash) && seqTy == addAct) || (bytes.Equal(preMainBlockHash, blockSeq.Detail.Block.ParentHash) && blockSeq.Seq.Type == addAct) ||
(bytes.Equal(preMainBlockHash, blockDetail.Block.Hash()) && seqTy == delAct) { (bytes.Equal(preMainBlockHash, blockSeq.Seq.Hash) && blockSeq.Seq.Type == delAct) {
txs := client.FilterTxsForPara(blockDetail) txs := client.FilterTxsForPara(blockSeq.Detail)
plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", seqTy) plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", blockSeq.Seq.Type)
if lastSeq-currSeq > emptyBlockInterval { if lastSeq-currSeq > emptyBlockInterval {
client.isCaughtUp = false client.isCaughtUp = false
...@@ -478,26 +441,26 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type ...@@ -478,26 +441,26 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type
} }
if client.authAccount != "" { if client.authAccount != "" {
client.commitMsgClient.onMainBlockAdded(blockDetail) client.commitMsgClient.onMainBlockAdded(blockSeq.Detail)
} }
return txs, blockDetail.Block, seqTy, nil return txs, blockSeq, nil
} }
//not consistent case be processed at below //not consistent case be processed at below
plog.Error("RequestTx", "preMainHash", common.Bytes2Hex(preMainBlockHash), "currSeq preMainHash", common.Bytes2Hex(blockDetail.Block.ParentHash), plog.Error("RequestTx", "preMainHash", common.Bytes2Hex(preMainBlockHash), "currSeq preMainHash", common.Bytes2Hex(blockSeq.Detail.Block.ParentHash),
"currSeq mainHash", common.Bytes2Hex(blockDetail.Block.Hash()), "curr seq", currSeq, "ty", seqTy, "currSeq Mainheight", blockDetail.Block.Height) "currSeq mainHash", common.Bytes2Hex(blockSeq.Seq.Hash), "curr seq", currSeq, "ty", blockSeq.Seq.Type, "currSeq Mainheight", blockSeq.Detail.Block.Height)
return nil, nil, -1, paracross.ErrParaCurHashNotMatch return nil, nil, paracross.ErrParaCurHashNotMatch
} }
//lastSeq < CurrSeq case: //lastSeq < CurrSeq case:
//lastSeq = currSeq-1, main node not update //lastSeq = currSeq-1, main node not update
if lastSeq+1 == currSeq { if lastSeq+1 == currSeq {
plog.Debug("Waiting new sequence from main chain") plog.Debug("Waiting new sequence from main chain")
return nil, nil, -1, paracross.ErrParaWaitingNewSeq return nil, nil, paracross.ErrParaWaitingNewSeq
} }
// 1. lastSeq < currSeq-1 // 1. lastSeq < currSeq-1
// 2. lastSeq >= currSeq and seq not consistent or fork case // 2. lastSeq >= currSeq and seq not consistent or fork case
return nil, nil, -1, paracross.ErrParaCurHashNotMatch return nil, nil, paracross.ErrParaCurHashNotMatch
} }
//genesis block scenario, new main node's blockHash as preMainHash, genesis sequence+1 as currSeq //genesis block scenario, new main node's blockHash as preMainHash, genesis sequence+1 as currSeq
...@@ -620,7 +583,7 @@ func (client *client) CreateBlock() { ...@@ -620,7 +583,7 @@ func (client *client) CreateBlock() {
currSeq++ currSeq++
} }
txs, blockOnMain, seqTy, err := client.RequestTx(currSeq, lastSeqMainHash) txs, blockOnMain, err := client.RequestTx(currSeq, lastSeqMainHash)
if err != nil { if err != nil {
incSeqFlag = false incSeqFlag = false
if err == paracross.ErrParaCurHashNotMatch { if err == paracross.ErrParaCurHashNotMatch {
...@@ -635,10 +598,10 @@ func (client *client) CreateBlock() { ...@@ -635,10 +598,10 @@ func (client *client) CreateBlock() {
continue continue
} }
lastSeqMainHeight := blockOnMain.Height lastSeqMainHeight := blockOnMain.Detail.Block.Height
lastSeqMainHash = blockOnMain.Hash() lastSeqMainHash = blockOnMain.Seq.Hash
if seqTy == delAct { if blockOnMain.Seq.Type == delAct {
lastSeqMainHash = blockOnMain.ParentHash lastSeqMainHash = blockOnMain.Detail.Block.ParentHash
} }
_, lastBlock, lastBlockMainHash, lastBlockMainHeight, err := client.getLastBlockInfo() _, lastBlock, lastBlockMainHash, lastBlockMainHeight, err := client.getLastBlockInfo()
...@@ -650,9 +613,9 @@ func (client *client) CreateBlock() { ...@@ -650,9 +613,9 @@ func (client *client) CreateBlock() {
plog.Info("Parachain process block", "lastBlockSeq", lastSeq, "curSeq", currSeq, plog.Info("Parachain process block", "lastBlockSeq", lastSeq, "curSeq", currSeq,
"currSeqMainHeight", lastSeqMainHeight, "currSeqMainHash", common.ToHex(lastSeqMainHash), "currSeqMainHeight", lastSeqMainHeight, "currSeqMainHash", common.ToHex(lastSeqMainHash),
"lastBlockMainHeight", lastBlockMainHeight, "lastBlockMainHash", common.ToHex(lastBlockMainHash), "seqTy", seqTy) "lastBlockMainHeight", lastBlockMainHeight, "lastBlockMainHash", common.ToHex(lastBlockMainHash), "seqTy", blockOnMain.Seq.Type)
if seqTy == delAct { if blockOnMain.Seq.Type == delAct {
if len(txs) == 0 { if len(txs) == 0 {
if lastSeqMainHeight > lastBlockMainHeight { if lastSeqMainHeight > lastBlockMainHeight {
incSeqFlag = true incSeqFlag = true
...@@ -665,7 +628,7 @@ func (client *client) CreateBlock() { ...@@ -665,7 +628,7 @@ func (client *client) CreateBlock() {
if err != nil { if err != nil {
plog.Error(fmt.Sprintf("********************err:%v", err.Error())) plog.Error(fmt.Sprintf("********************err:%v", err.Error()))
} }
} else if seqTy == addAct { } else if blockOnMain.Seq.Type == addAct {
if len(txs) == 0 { if len(txs) == 0 {
if lastSeqMainHeight-lastBlockMainHeight < emptyBlockInterval { if lastSeqMainHeight-lastBlockMainHeight < emptyBlockInterval {
incSeqFlag = true incSeqFlag = true
...@@ -689,14 +652,14 @@ func (client *client) CreateBlock() { ...@@ -689,14 +652,14 @@ func (client *client) CreateBlock() {
} }
// miner tx need all para node create, but not all node has auth account, here just not sign to keep align // miner tx need all para node create, but not all node has auth account, here just not sign to keep align
func (client *client) addMinerTx(preStateHash []byte, block *types.Block, main *types.Block) error { func (client *client) addMinerTx(preStateHash []byte, block *types.Block, main *types.BlockSeq) error {
status := &pt.ParacrossNodeStatus{ status := &pt.ParacrossNodeStatus{
Title: types.GetTitle(), Title: types.GetTitle(),
Height: block.Height, Height: block.Height,
PreBlockHash: block.ParentHash, PreBlockHash: block.ParentHash,
PreStateHash: preStateHash, PreStateHash: preStateHash,
MainBlockHash: main.Hash(), MainBlockHash: main.Seq.Hash,
MainBlockHeight: main.Height, MainBlockHeight: main.Detail.Block.Height,
} }
tx, err := paracross.CreateRawMinerTx(status) tx, err := paracross.CreateRawMinerTx(status)
...@@ -710,7 +673,7 @@ func (client *client) addMinerTx(preStateHash []byte, block *types.Block, main * ...@@ -710,7 +673,7 @@ func (client *client) addMinerTx(preStateHash []byte, block *types.Block, main *
} }
func (client *client) createBlock(lastBlock *types.Block, txs []*types.Transaction, seq int64, mainBlock *types.Block) error { func (client *client) createBlock(lastBlock *types.Block, txs []*types.Transaction, seq int64, mainBlock *types.BlockSeq) error {
var newblock types.Block var newblock types.Block
plog.Debug(fmt.Sprintf("the len txs is: %v", len(txs))) plog.Debug(fmt.Sprintf("the len txs is: %v", len(txs)))
newblock.ParentHash = lastBlock.Hash() newblock.ParentHash = lastBlock.Hash()
...@@ -719,7 +682,7 @@ func (client *client) createBlock(lastBlock *types.Block, txs []*types.Transacti ...@@ -719,7 +682,7 @@ func (client *client) createBlock(lastBlock *types.Block, txs []*types.Transacti
//挖矿固定难度 //挖矿固定难度
newblock.Difficulty = types.GetP(0).PowLimitBits newblock.Difficulty = types.GetP(0).PowLimitBits
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs) newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
newblock.BlockTime = mainBlock.BlockTime newblock.BlockTime = mainBlock.Detail.Block.BlockTime
err := client.addMinerTx(lastBlock.StateHash, &newblock, mainBlock) err := client.addMinerTx(lastBlock.StateHash, &newblock, mainBlock)
if err != nil { if err != nil {
......
...@@ -7,7 +7,6 @@ package para ...@@ -7,7 +7,6 @@ package para
import ( import (
"math/rand" "math/rand"
"testing" "testing"
"time"
"github.com/33cn/chain33/blockchain" "github.com/33cn/chain33/blockchain"
"github.com/33cn/chain33/common/log" "github.com/33cn/chain33/common/log"
...@@ -36,7 +35,7 @@ func init() { ...@@ -36,7 +35,7 @@ func init() {
pp.Init("paracross", nil) pp.Init("paracross", nil)
random = rand.New(rand.NewSource(types.Now().UnixNano())) random = rand.New(rand.NewSource(types.Now().UnixNano()))
consensusInterval = 2 consensusInterval = 2
log.SetLogLevel("debug") log.SetLogLevel("error")
} }
type suiteParaCommitMsg struct { type suiteParaCommitMsg struct {
...@@ -72,19 +71,11 @@ func (s *suiteParaCommitMsg) initEnv(cfg *types.Config, sub *types.ConfigSubModu ...@@ -72,19 +71,11 @@ func (s *suiteParaCommitMsg) initEnv(cfg *types.Config, sub *types.ConfigSubModu
s.store.SetQueueClient(q.Client()) s.store.SetQueueClient(q.Client())
s.para = New(cfg.Consensus, sub.Consensus["para"]).(*client) s.para = New(cfg.Consensus, sub.Consensus["para"]).(*client)
s.grpcCli = &typesmocks.Chain33Client{} s.grpcCli = &typesmocks.Chain33Client{}
blockHash := &types.BlockSequence{
Hash: []byte("1"),
Type: 1,
}
blockSeqs := &types.BlockSequences{Items: []*types.BlockSequence{blockHash}}
s.grpcCli.On("GetBlockSequences", mock.Anything, mock.Anything).Return(blockSeqs, errors.New("nil")) // GetBlockBySeq return error to stop create's for cycle to request tx
block := &types.Block{} s.grpcCli.On("GetBlockBySeq", mock.Anything, mock.Anything).Return(nil, errors.New("quit create"))
blockDetail := &types.BlockDetail{Block: block}
blockDetails := &types.BlockDetails{Items: []*types.BlockDetail{blockDetail}}
s.grpcCli.On("GetBlockByHashes", mock.Anything, mock.Anything).Return(blockDetails, errors.New("nil"))
//data := &types.Int64{1} //data := &types.Int64{1}
s.grpcCli.On("GetLastBlockSequence", mock.Anything, mock.Anything).Return(nil, errors.New("nil")) s.grpcCli.On("GetLastBlockSequence", mock.Anything, mock.Anything).Return(nil, errors.New("nil")).Maybe()
reply := &types.Reply{IsOk: true} reply := &types.Reply{IsOk: true}
s.grpcCli.On("IsSync", mock.Anything, mock.Anything).Return(reply, nil) s.grpcCli.On("IsSync", mock.Anything, mock.Anything).Return(reply, nil)
result := &pt.ParacrossStatus{Height: -1} result := &pt.ParacrossStatus{Height: -1}
...@@ -132,24 +123,55 @@ func (s *suiteParaCommitMsg) SetupSuite() { ...@@ -132,24 +123,55 @@ func (s *suiteParaCommitMsg) SetupSuite() {
s.initEnv(initConfigFile()) s.initEnv(initConfigFile())
} }
func (s *suiteParaCommitMsg) TestRun_1() { func (s *suiteParaCommitMsg) createBlock() {
//s.testGetBlock() var i int64
for i = 0; i < 3; i++ {
lastBlock, err := s.para.RequestLastBlock() lastBlock, err := s.para.RequestLastBlock()
if err != nil { if err != nil {
plog.Error("para test", "err", err.Error()) plog.Error("para test", "err", err.Error())
} }
plog.Info("para test---------", "last height", lastBlock.Height) s.Equal(int64(i), lastBlock.Height)
s.para.createBlock(lastBlock, nil, 0, getMainBlock(1, lastBlock.BlockTime+1)) s.para.createBlock(lastBlock, nil, i, getMainBlock(i+1, lastBlock.BlockTime+1))
lastBlock, err = s.para.RequestLastBlock() }
if err != nil { }
plog.Error("para test--2", "err", err.Error())
func (s *suiteParaCommitMsg) TestRun_1() {
s.createBlock()
s.testRunGetMinerTxInfo()
s.testRunRmvBlock()
lastBlock, _ := s.para.RequestLastBlock()
if lastBlock.Height > 0 {
s.para.DelBlock(lastBlock, 1)
} }
plog.Info("para test---------", "last height", lastBlock.Height)
s.para.createBlock(lastBlock, nil, 1, getMainBlock(2, lastBlock.BlockTime+1)) }
time.Sleep(time.Second * 3)
func (s *suiteParaCommitMsg) testRunGetMinerTxInfo() {
lastBlock, err := s.para.RequestLastBlock()
s.Nil(err)
plog.Info("para test testRunGetMinerTxInfo--------------", "last height", lastBlock.Height)
s.True(lastBlock.Height > 1)
status, err := getMinerTxInfo(lastBlock)
s.Nil(err)
s.Equal(int64(3), status.MainBlockHeight)
}
func (s *suiteParaCommitMsg) testRunRmvBlock() {
lastBlock, err := s.para.RequestLastBlock()
s.Nil(err)
plog.Info("para test testRunRmvBlock------------pre", "last height", lastBlock.Height)
s.True(lastBlock.Height > 1)
s.para.removeBlocks(1)
lastBlock, err = s.para.RequestLastBlock() lastBlock, err = s.para.RequestLastBlock()
s.para.DelBlock(lastBlock, 2) s.Nil(err)
time.Sleep(time.Second * 3) plog.Info("para test testRunRmvBlock----------after", "last height", lastBlock.Height)
s.Equal(int64(1), lastBlock.Height)
} }
func TestRunSuiteParaCommitMsg(t *testing.T) { func TestRunSuiteParaCommitMsg(t *testing.T) {
...@@ -158,7 +180,7 @@ func TestRunSuiteParaCommitMsg(t *testing.T) { ...@@ -158,7 +180,7 @@ func TestRunSuiteParaCommitMsg(t *testing.T) {
} }
func (s *suiteParaCommitMsg) TearDownSuite() { func (s *suiteParaCommitMsg) TearDownSuite() {
time.Sleep(time.Second * 5) //time.Sleep(time.Second * 1)
s.block.Close() s.block.Close()
s.para.Close() s.para.Close()
s.exec.Close() s.exec.Close()
...@@ -169,9 +191,17 @@ func (s *suiteParaCommitMsg) TearDownSuite() { ...@@ -169,9 +191,17 @@ func (s *suiteParaCommitMsg) TearDownSuite() {
} }
func getMainBlock(height int64, BlockTime int64) *types.Block { func getMainBlock(height int64, BlockTime int64) *types.BlockSeq {
return &types.Block{
return &types.BlockSeq{
Num: height,
Seq: &types.BlockSequence{Hash: []byte(string(height)), Type: addAct},
Detail: &types.BlockDetail{
Block: &types.Block{
ParentHash: []byte(string(height - 1)),
Height: height, Height: height,
BlockTime: BlockTime, BlockTime: BlockTime,
},
},
} }
} }
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
package para package para
import ( import (
"errors"
"testing" "testing"
"github.com/33cn/chain33/blockchain" "github.com/33cn/chain33/blockchain"
...@@ -15,9 +16,6 @@ import ( ...@@ -15,9 +16,6 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
//"github.com/33cn/plugin/plugin/dapp/paracross/rpc"
"time"
"github.com/33cn/chain33/queue" "github.com/33cn/chain33/queue"
"github.com/33cn/chain33/store" "github.com/33cn/chain33/store"
_ "github.com/33cn/chain33/system" _ "github.com/33cn/chain33/system"
...@@ -28,7 +26,7 @@ import ( ...@@ -28,7 +26,7 @@ import (
func init() { func init() {
//types.Init("user.p.para.", nil) //types.Init("user.p.para.", nil)
log.SetLogLevel("debug") log.SetLogLevel("error")
} }
type suiteParaClient struct { type suiteParaClient struct {
...@@ -59,42 +57,13 @@ func (s *suiteParaClient) initEnv(cfg *types.Config, sub *types.ConfigSubModule) ...@@ -59,42 +57,13 @@ func (s *suiteParaClient) initEnv(cfg *types.Config, sub *types.ConfigSubModule)
s.store.SetQueueClient(q.Client()) s.store.SetQueueClient(q.Client())
//cfg.Consensus.StartHeight = 0 //cfg.Consensus.StartHeight = 0
cfg.Consensus.EmptyBlockInterval = 1 //add block by UT below
cfg.Consensus.EmptyBlockInterval = 100
s.para = New(cfg.Consensus, sub.Consensus["para"]).(*client) s.para = New(cfg.Consensus, sub.Consensus["para"]).(*client)
s.grpcCli = &typesmocks.Chain33Client{} s.grpcCli = &typesmocks.Chain33Client{}
blockHash := &types.BlockSequence{
Hash: []byte("1"),
Type: 1,
}
blockSeqs := &types.BlockSequences{Items: []*types.BlockSequence{blockHash}}
s.grpcCli.On("GetBlockSequences", mock.Anything, mock.Anything).Return(blockSeqs, nil)
block := &types.Block{Height: 0}
blockDetail := &types.BlockDetail{Block: block}
blockDetails := &types.BlockDetails{Items: []*types.BlockDetail{blockDetail}}
s.grpcCli.On("GetBlockByHashes", mock.Anything, mock.Anything).Return(blockDetails, nil).Once()
block = &types.Block{Height: 6, BlockTime: 8888888888}
blockDetail = &types.BlockDetail{Block: block}
blockDetails = &types.BlockDetails{Items: []*types.BlockDetail{blockDetail}}
s.grpcCli.On("GetBlockByHashes", mock.Anything, mock.Anything).Return(blockDetails, nil).Once()
block = &types.Block{Height: 0}
blockDetail = &types.BlockDetail{Block: block}
blockDetails = &types.BlockDetails{Items: []*types.BlockDetail{blockDetail}}
s.grpcCli.On("GetBlockByHashes", mock.Anything, mock.Anything).Return(blockDetails, nil).Once()
block = &types.Block{Height: 0}
blockDetail = &types.BlockDetail{Block: block}
blockDetails = &types.BlockDetails{Items: []*types.BlockDetail{blockDetail}}
s.grpcCli.On("GetBlockByHashes", mock.Anything, mock.Anything).Return(blockDetails, nil)
seq := &types.Int64{Data: 1}
s.grpcCli.On("GetLastBlockSequence", mock.Anything, mock.Anything).Return(seq, nil).Once()
seq = &types.Int64{Data: 2}
s.grpcCli.On("GetLastBlockSequence", mock.Anything, mock.Anything).Return(seq, nil).Once()
seq = &types.Int64{Data: 3}
s.grpcCli.On("GetLastBlockSequence", mock.Anything, mock.Anything).Return(seq, nil)
seq = &types.Int64{Data: 1} s.createBlockMock()
s.grpcCli.On("GetSequenceByHash", mock.Anything, mock.Anything).Return(seq, nil)
reply := &types.Reply{IsOk: true} reply := &types.Reply{IsOk: true}
s.grpcCli.On("IsSync", mock.Anything, mock.Anything).Return(reply, nil) s.grpcCli.On("IsSync", mock.Anything, mock.Anything).Return(reply, nil)
...@@ -115,52 +84,74 @@ func (s *suiteParaClient) initEnv(cfg *types.Config, sub *types.ConfigSubModule) ...@@ -115,52 +84,74 @@ func (s *suiteParaClient) initEnv(cfg *types.Config, sub *types.ConfigSubModule)
s.network = p2p.New(cfg.P2P) s.network = p2p.New(cfg.P2P)
s.network.SetQueueClient(q.Client()) s.network.SetQueueClient(q.Client())
//create block self
s.createBlock()
} }
func (s *suiteParaClient) TestRun_Test() { func (s *suiteParaClient) createBlockMock() {
//s.testGetBlock() var i, hashdata int64
lastBlock, err := s.para.RequestLastBlock() for i = 0; i < 3; i++ {
if err != nil { hashdata = i
plog.Error("para test", "err", err.Error()) if i > 0 {
hashdata = i - 1
} }
plog.Info("para test---------1", "last height", lastBlock.Height)
s.para.createBlock(lastBlock, nil, 0, getMainBlock(2, lastBlock.BlockTime+1)) block := &types.Block{
lastBlock, err = s.para.RequestLastBlock() Height: i,
if err != nil { ParentHash: []byte(string(hashdata)),
plog.Error("para test--2", "err", err.Error()) }
blockSeq := &types.BlockSeq{
Seq: &types.BlockSequence{
Hash: []byte(string(i)),
Type: 1,
},
Detail: &types.BlockDetail{Block: block},
} }
plog.Info("para test---------", "last height", lastBlock.Height)
s.para.createBlock(lastBlock, nil, 1, getMainBlock(3, lastBlock.BlockTime+1))
time.Sleep(time.Second * 1)
s.testRunGetMinerTxInfo() s.grpcCli.On("GetBlockBySeq", mock.Anything, &types.Int64{Data: i}).Return(blockSeq, nil)
s.testRunRmvBlock() }
} // set block 3's parentHasn not equal, enter switch
block3 := &types.Block{
Height: 3,
ParentHash: []byte(string(1)),
}
blockSeq3 := &types.BlockSeq{
Seq: &types.BlockSequence{
Hash: []byte(string(3)),
Type: 1,
},
Detail: &types.BlockDetail{Block: block3},
}
s.grpcCli.On("GetBlockBySeq", mock.Anything, &types.Int64{Data: 3}).Return(blockSeq3, nil)
func (s *suiteParaClient) testRunGetMinerTxInfo() { // RequestTx GetLastSeqOnMainChain
lastBlock, err := s.para.RequestLastBlock() seq := &types.Int64{Data: 1}
s.Nil(err) s.grpcCli.On("GetLastBlockSequence", mock.Anything, mock.Anything).Return(seq, nil).Once()
plog.Info("para test testRunGetMinerTxInfo", "last height", lastBlock.Height) seq = &types.Int64{Data: 2}
s.True(lastBlock.Height > 1) s.grpcCli.On("GetLastBlockSequence", mock.Anything, mock.Anything).Return(seq, nil).Once()
status, err := getMinerTxInfo(lastBlock) seq = &types.Int64{Data: 3}
s.Nil(err) s.grpcCli.On("GetLastBlockSequence", mock.Anything, mock.Anything).Return(seq, nil)
s.Equal(int64(3), status.MainBlockHeight)
// mock for switchHashMatchedBlock
s.grpcCli.On("GetSequenceByHash", mock.Anything, &types.ReqHash{Hash: []byte(string(3))}).Return(nil, errors.New("hash err")).Once()
s.grpcCli.On("GetSequenceByHash", mock.Anything, &types.ReqHash{Hash: []byte(string(2))}).Return(nil, errors.New("hash err")).Once()
// mock for removeBlocks
seq = &types.Int64{Data: 1}
s.grpcCli.On("GetSequenceByHash", mock.Anything, mock.Anything).Return(seq, nil)
} }
func (s *suiteParaClient) testRunRmvBlock() { func (s *suiteParaClient) createBlock() {
var i int64
for i = 0; i < 3; i++ {
lastBlock, err := s.para.RequestLastBlock() lastBlock, err := s.para.RequestLastBlock()
s.Nil(err) if err != nil {
plog.Info("para test testRunGetMinerTxInfo", "last height", lastBlock.Height) plog.Error("para test", "err", err.Error())
s.True(lastBlock.Height > 1) }
s.para.removeBlocks(1) plog.Info("para test---------1", "last height", lastBlock.Height)
s.para.createBlock(lastBlock, nil, i, getMainBlock(i+1, lastBlock.BlockTime+1))
lastBlock, err = s.para.RequestLastBlock() }
s.Nil(err)
plog.Info("para test testRunGetMinerTxInfo", "last height", lastBlock.Height)
s.Equal(int64(1), lastBlock.Height)
} }
func (s *suiteParaClient) SetupSuite() { func (s *suiteParaClient) SetupSuite() {
...@@ -173,7 +164,7 @@ func TestRunSuiteParaClient(t *testing.T) { ...@@ -173,7 +164,7 @@ func TestRunSuiteParaClient(t *testing.T) {
} }
func (s *suiteParaClient) TearDownSuite() { func (s *suiteParaClient) TearDownSuite() {
time.Sleep(time.Second * 5) //time.Sleep(time.Second * 2)
s.block.Close() s.block.Close()
s.para.Close() s.para.Close()
s.network.Close() s.network.Close()
......
...@@ -22,8 +22,19 @@ chain33背后故事: [chain33诞生记](https://mp.weixin.qq.com/s/9g5ZFDKJi9uzR ...@@ -22,8 +22,19 @@ chain33背后故事: [chain33诞生记](https://mp.weixin.qq.com/s/9g5ZFDKJi9uzR
# 感谢 # 感谢
[腾讯玄武实验室](https://github.com/33cn/chain33/issues?utf8=%E2%9C%93&q=label%3A%E8%85%BE%E8%AE%AF%E7%8E%84%E6%AD%A6%E5%AE%9E%E9%AA%8C%E5%AE%A4) [腾讯玄武安全实验室](https://github.com/33cn/chain33/issues?utf8=%E2%9C%93&q=label%3A%E8%85%BE%E8%AE%AF%E7%8E%84%E6%AD%A6%E5%AE%9E%E9%AA%8C%E5%AE%A4)
# bug 奖励
我们会对bug 评价4个等级(不会奖励人民币,等值虚拟资产)。
只有影响现有在线运行系统的,并且会产生严重分叉等行为的,才会评价为 L3
```
L0 1000
L1 3000
L2 10000
L3 20000
```
## Building from source ## Building from source
......
...@@ -116,6 +116,7 @@ func TestBlockChain(t *testing.T) { ...@@ -116,6 +116,7 @@ func TestBlockChain(t *testing.T) {
testProcDelParaChainBlockMsg(t, mock33, blockchain) testProcDelParaChainBlockMsg(t, mock33, blockchain)
testProcAddParaChainBlockMsg(t, mock33, blockchain) testProcAddParaChainBlockMsg(t, mock33, blockchain)
testProcGetBlockBySeqMsg(t, mock33, blockchain)
testProcBlockChainFork(t, blockchain) testProcBlockChainFork(t, blockchain)
testDelBlock(t, blockchain, curBlock) testDelBlock(t, blockchain, curBlock)
...@@ -895,6 +896,28 @@ func testProcAddParaChainBlockMsg(t *testing.T, mock33 *testnode.Chain33Mock, bl ...@@ -895,6 +896,28 @@ func testProcAddParaChainBlockMsg(t *testing.T, mock33 *testnode.Chain33Mock, bl
chainlog.Info("testProcAddParaChainBlockMsg end --------------------") chainlog.Info("testProcAddParaChainBlockMsg end --------------------")
} }
func testProcGetBlockBySeqMsg(t *testing.T, mock33 *testnode.Chain33Mock, blockchain *blockchain.BlockChain) {
chainlog.Info("testProcGetBlockBySeqMsg begin --------------------")
seq, err := blockchain.GetStore().LoadBlockLastSequence()
assert.Nil(t, err)
//block, err := blockchain.GetBlock(curheight)
//require.NoError(t, err)
msgGen := mock33.GetClient().NewMessage("blockchain", types.EventGetBlockBySeq, &types.Int64{Data: seq})
mock33.GetClient().Send(msgGen, true)
msg, err := mock33.GetClient().Wait(msgGen)
if err != nil {
t.Log(err)
//t.Error("testProcAddParaChainBlockMsg only in parachain ")
}
blockseq := msg.Data.(*types.BlockSeq)
assert.Equal(t, seq, blockseq.Num)
chainlog.Info("testProcGetBlockBySeqMsg end --------------------")
}
func testProcBlockChainFork(t *testing.T, blockchain *blockchain.BlockChain) { func testProcBlockChainFork(t *testing.T, blockchain *blockchain.BlockChain) {
chainlog.Info("testProcBlockChainFork begin --------------------") chainlog.Info("testProcBlockChainFork begin --------------------")
......
...@@ -73,6 +73,9 @@ func (chain *BlockChain) ProcRecvMsg() { ...@@ -73,6 +73,9 @@ func (chain *BlockChain) ProcRecvMsg() {
case types.EventGetBlockByHashes: case types.EventGetBlockByHashes:
go chain.processMsg(msg, reqnum, chain.getBlockByHashes) go chain.processMsg(msg, reqnum, chain.getBlockByHashes)
case types.EventGetBlockBySeq:
go chain.processMsg(msg, reqnum, chain.getBlockBySeq)
case types.EventDelParaChainBlockDetail: case types.EventDelParaChainBlockDetail:
go chain.processMsg(msg, reqnum, chain.delParaChainBlockDetail) go chain.processMsg(msg, reqnum, chain.delParaChainBlockDetail)
...@@ -418,6 +421,32 @@ func (chain *BlockChain) getBlockByHashes(msg queue.Message) { ...@@ -418,6 +421,32 @@ func (chain *BlockChain) getBlockByHashes(msg queue.Message) {
} }
} }
func (chain *BlockChain) getBlockBySeq(msg queue.Message) {
seq := (msg.Data).(*types.Int64)
req := &types.ReqBlocks{Start: seq.Data, End: seq.Data, IsDetail: false, Pid: []string{}}
sequences, err := chain.GetBlockSequences(req)
if err != nil {
chainlog.Error("getBlockBySeq", "seq err", err.Error())
msg.Reply(chain.client.NewMessage("rpc", types.EventGetBlockBySeq, err))
return
}
reqHashes := &types.ReqHashes{Hashes: [][]byte{sequences.Items[0].Hash}}
blocks, err := chain.GetBlockByHashes(reqHashes.Hashes)
if err != nil {
chainlog.Error("getBlockBySeq", "hash err", err.Error())
msg.Reply(chain.client.NewMessage("rpc", types.EventGetBlockBySeq, err))
return
}
blockSeq := &types.BlockSeq{
Num: seq.Data,
Seq: sequences.Items[0],
Detail: blocks.Items[0],
}
msg.Reply(chain.client.NewMessage("rpc", types.EventGetBlockBySeq, blockSeq))
}
//平行链del block的处理 //平行链del block的处理
func (chain *BlockChain) delParaChainBlockDetail(msg queue.Message) { func (chain *BlockChain) delParaChainBlockDetail(msg queue.Message) {
var parablockDetail *types.ParaChainBlockDetail var parablockDetail *types.ParaChainBlockDetail
......
...@@ -104,6 +104,14 @@ func (m *mockBlockChain) SetQueueClient(q queue.Queue) { ...@@ -104,6 +104,14 @@ func (m *mockBlockChain) SetQueueClient(q queue.Queue) {
case types.EventGetSeqByHash: case types.EventGetSeqByHash:
msg.Reply(client.NewMessage(blockchainKey, types.EventReplyQuery, &types.Int64{Data: 1})) msg.Reply(client.NewMessage(blockchainKey, types.EventReplyQuery, &types.Int64{Data: 1}))
case types.EventGetBlockBySeq:
if req, ok := msg.GetData().(*types.Int64); ok {
// just for cover
if req.Data == 10 {
msg.Reply(client.NewMessage(blockchainKey, types.EventReplyQuery, &types.Reply{IsOk: false, Msg: []byte("not support")}))
}
msg.Reply(client.NewMessage(blockchainKey, types.EventReplyQuery, &types.BlockSeq{Num: 1}))
}
case types.EventIsSync: case types.EventIsSync:
msg.Reply(client.NewMessage(blockchainKey, types.EventReplyIsSync, &types.IsCaughtUp{})) msg.Reply(client.NewMessage(blockchainKey, types.EventReplyIsSync, &types.IsCaughtUp{}))
case types.EventIsNtpClockSync: case types.EventIsNtpClockSync:
......
...@@ -200,21 +200,21 @@ func (_m *QueueProtocolAPI) GetBlockByHashes(param *types.ReqHashes) (*types.Blo ...@@ -200,21 +200,21 @@ func (_m *QueueProtocolAPI) GetBlockByHashes(param *types.ReqHashes) (*types.Blo
return r0, r1 return r0, r1
} }
// GetBlockHash provides a mock function with given fields: param // GetBlockBySeq provides a mock function with given fields: param
func (_m *QueueProtocolAPI) GetBlockHash(param *types.ReqInt) (*types.ReplyHash, error) { func (_m *QueueProtocolAPI) GetBlockBySeq(param *types.Int64) (*types.BlockSeq, error) {
ret := _m.Called(param) ret := _m.Called(param)
var r0 *types.ReplyHash var r0 *types.BlockSeq
if rf, ok := ret.Get(0).(func(*types.ReqInt) *types.ReplyHash); ok { if rf, ok := ret.Get(0).(func(*types.Int64) *types.BlockSeq); ok {
r0 = rf(param) r0 = rf(param)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ReplyHash) r0 = ret.Get(0).(*types.BlockSeq)
} }
} }
var r1 error var r1 error
if rf, ok := ret.Get(1).(func(*types.ReqInt) error); ok { if rf, ok := ret.Get(1).(func(*types.Int64) error); ok {
r1 = rf(param) r1 = rf(param)
} else { } else {
r1 = ret.Error(1) r1 = ret.Error(1)
...@@ -223,21 +223,21 @@ func (_m *QueueProtocolAPI) GetBlockHash(param *types.ReqInt) (*types.ReplyHash, ...@@ -223,21 +223,21 @@ func (_m *QueueProtocolAPI) GetBlockHash(param *types.ReqInt) (*types.ReplyHash,
return r0, r1 return r0, r1
} }
// GetBlockOverview provides a mock function with given fields: param // GetBlockHash provides a mock function with given fields: param
func (_m *QueueProtocolAPI) GetBlockOverview(param *types.ReqHash) (*types.BlockOverview, error) { func (_m *QueueProtocolAPI) GetBlockHash(param *types.ReqInt) (*types.ReplyHash, error) {
ret := _m.Called(param) ret := _m.Called(param)
var r0 *types.BlockOverview var r0 *types.ReplyHash
if rf, ok := ret.Get(0).(func(*types.ReqHash) *types.BlockOverview); ok { if rf, ok := ret.Get(0).(func(*types.ReqInt) *types.ReplyHash); ok {
r0 = rf(param) r0 = rf(param)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.BlockOverview) r0 = ret.Get(0).(*types.ReplyHash)
} }
} }
var r1 error var r1 error
if rf, ok := ret.Get(1).(func(*types.ReqHash) error); ok { if rf, ok := ret.Get(1).(func(*types.ReqInt) error); ok {
r1 = rf(param) r1 = rf(param)
} else { } else {
r1 = ret.Error(1) r1 = ret.Error(1)
...@@ -246,21 +246,21 @@ func (_m *QueueProtocolAPI) GetBlockOverview(param *types.ReqHash) (*types.Block ...@@ -246,21 +246,21 @@ func (_m *QueueProtocolAPI) GetBlockOverview(param *types.ReqHash) (*types.Block
return r0, r1 return r0, r1
} }
// GetBlockSequences provides a mock function with given fields: param // GetBlockOverview provides a mock function with given fields: param
func (_m *QueueProtocolAPI) GetBlockSequences(param *types.ReqBlocks) (*types.BlockSequences, error) { func (_m *QueueProtocolAPI) GetBlockOverview(param *types.ReqHash) (*types.BlockOverview, error) {
ret := _m.Called(param) ret := _m.Called(param)
var r0 *types.BlockSequences var r0 *types.BlockOverview
if rf, ok := ret.Get(0).(func(*types.ReqBlocks) *types.BlockSequences); ok { if rf, ok := ret.Get(0).(func(*types.ReqHash) *types.BlockOverview); ok {
r0 = rf(param) r0 = rf(param)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.BlockSequences) r0 = ret.Get(0).(*types.BlockOverview)
} }
} }
var r1 error var r1 error
if rf, ok := ret.Get(1).(func(*types.ReqBlocks) error); ok { if rf, ok := ret.Get(1).(func(*types.ReqHash) error); ok {
r1 = rf(param) r1 = rf(param)
} else { } else {
r1 = ret.Error(1) r1 = ret.Error(1)
......
...@@ -950,24 +950,22 @@ func (q *QueueProtocol) GetBlockByHashes(param *types.ReqHashes) (*types.BlockDe ...@@ -950,24 +950,22 @@ func (q *QueueProtocol) GetBlockByHashes(param *types.ReqHashes) (*types.BlockDe
return nil, err return nil, err
} }
// GetBlockSequences block执行序列号 // GetBlockBySeq get block detail and hash by seq
func (q *QueueProtocol) GetBlockSequences(param *types.ReqBlocks) (*types.BlockSequences, error) { func (q *QueueProtocol) GetBlockBySeq(param *types.Int64) (*types.BlockSeq, error) {
if param == nil { if param == nil {
err := types.ErrInvalidParam err := types.ErrInvalidParam
log.Error("GetBlockSequences", "Error", err) log.Error("GetBlockBySeq", "Error", err)
return nil, err return nil, err
} }
msg, err := q.query(blockchainKey, types.EventGetBlockSequences, param) msg, err := q.query(blockchainKey, types.EventGetBlockBySeq, param)
if err != nil { if err != nil {
log.Error("GetBlockSequences", "Error", err.Error()) log.Error("GetBlockBySeq", "Error", err.Error())
return nil, err return nil, err
} }
if reply, ok := msg.GetData().(*types.BlockSequences); ok { if reply, ok := msg.GetData().(*types.BlockSeq); ok {
return reply, nil return reply, nil
} }
err = types.ErrTypeAsset return nil, types.ErrTypeAsset
log.Error("GetBlockSequences", "Error", err)
return nil, err
} }
// QueryChain query chain // QueryChain query chain
......
...@@ -807,6 +807,7 @@ func TestGRPC(t *testing.T) { ...@@ -807,6 +807,7 @@ func TestGRPC(t *testing.T) {
testGetAddrOverviewGRPC(t, &grpcMock) testGetAddrOverviewGRPC(t, &grpcMock)
testGetBlockHashGRPC(t, &grpcMock) testGetBlockHashGRPC(t, &grpcMock)
testGetSequenceByHashGRPC(t, &grpcMock) testGetSequenceByHashGRPC(t, &grpcMock)
testGetBlockBySeqGRPC(t, &grpcMock)
testGenSeedGRPC(t, &grpcMock) testGenSeedGRPC(t, &grpcMock)
testGetSeedGRPC(t, &grpcMock) testGetSeedGRPC(t, &grpcMock)
testSaveSeedGRPC(t, &grpcMock) testSaveSeedGRPC(t, &grpcMock)
...@@ -818,6 +819,7 @@ func TestGRPC(t *testing.T) { ...@@ -818,6 +819,7 @@ func TestGRPC(t *testing.T) {
testIsSyncGRPC(t, &grpcMock) testIsSyncGRPC(t, &grpcMock)
testIsNtpClockSyncGRPC(t, &grpcMock) testIsNtpClockSyncGRPC(t, &grpcMock)
testNetInfoGRPC(t, &grpcMock) testNetInfoGRPC(t, &grpcMock)
} }
func testNetInfoGRPC(t *testing.T, rpc *mockGRPCSystem) { func testNetInfoGRPC(t *testing.T, rpc *mockGRPCSystem) {
...@@ -1140,3 +1142,21 @@ func testGetSequenceByHashGRPC(t *testing.T, rpc *mockGRPCSystem) { ...@@ -1140,3 +1142,21 @@ func testGetSequenceByHashGRPC(t *testing.T, rpc *mockGRPCSystem) {
t.Error("Call GetSequenceByHash Failed.", err) t.Error("Call GetSequenceByHash Failed.", err)
} }
} }
func testGetBlockBySeqGRPC(t *testing.T, rpc *mockGRPCSystem) {
var res types.BlockSeq
//just for coverage
err := rpc.newRpcCtx("GetBlockBySeq", &types.Int64{Data: 1}, &res)
assert.Nil(t, err)
err = rpc.newRpcCtx("GetBlockBySeq", &types.Int64{Data: 10}, &res)
assert.NotNil(t, err)
}
func TestGetBlockBySeq(t *testing.T) {
q := client.QueueProtocol{}
_, err := q.GetBlockBySeq(nil)
assert.NotNil(t, err)
}
...@@ -107,10 +107,11 @@ type QueueProtocolAPI interface { ...@@ -107,10 +107,11 @@ type QueueProtocolAPI interface {
//types.EventGetLastBlockSequence: //types.EventGetLastBlockSequence:
GetLastBlockSequence() (*types.Int64, error) GetLastBlockSequence() (*types.Int64, error)
//types.EventGetBlockSequences:
GetBlockSequences(param *types.ReqBlocks) (*types.BlockSequences, error)
//types.EventGetBlockByHashes: //types.EventGetBlockByHashes:
GetBlockByHashes(param *types.ReqHashes) (*types.BlockDetails, error) GetBlockByHashes(param *types.ReqHashes) (*types.BlockDetails, error)
//types.EventGetBlockBySeq:
GetBlockBySeq(param *types.Int64) (*types.BlockSeq, error)
//types.EventGetSequenceByHash: //types.EventGetSequenceByHash:
GetSequenceByHash(param *types.ReqHash) (*types.Int64, error) GetSequenceByHash(param *types.ReqHash) (*types.Int64, error)
......
...@@ -321,6 +321,12 @@ func (c *GrpcCtx) Run() (err error) { ...@@ -321,6 +321,12 @@ func (c *GrpcCtx) Run() (err error) {
*c.Res.(*types.Int64) = *reply *c.Res.(*types.Int64) = *reply
} }
errRet = err errRet = err
case "GetBlockBySeq":
reply, err := rpc.GetBlockBySeq(context.Background(), c.Params.(*types.Int64))
if err == nil {
*c.Res.(*types.BlockSeq) = *reply
}
errRet = err
default: default:
errRet = errors.New(fmt.Sprintf("Unsupport method %v", c.Method)) errRet = errors.New(fmt.Sprintf("Unsupport method %v", c.Method))
} }
......
...@@ -53,7 +53,7 @@ type client struct { ...@@ -53,7 +53,7 @@ type client struct {
recv chan Message recv chan Message
done chan struct{} done chan struct{}
wg *sync.WaitGroup wg *sync.WaitGroup
topic string topic unsafe.Pointer
isClosed int32 isClosed int32
isCloseing int32 isCloseing int32
} }
...@@ -140,13 +140,11 @@ func (client *client) Recv() chan Message { ...@@ -140,13 +140,11 @@ func (client *client) Recv() chan Message {
} }
func (client *client) getTopic() string { func (client *client) getTopic() string {
address := unsafe.Pointer(&(client.topic)) return *(*string)(atomic.LoadPointer(&client.topic))
return *(*string)(atomic.LoadPointer(&address))
} }
func (client *client) setTopic(topic string) { func (client *client) setTopic(topic string) {
address := unsafe.Pointer(&(client.topic)) atomic.StorePointer(&client.topic, unsafe.Pointer(&topic))
atomic.StorePointer(&address, unsafe.Pointer(&topic))
} }
func (client *client) isClose() bool { func (client *client) isClose() bool {
...@@ -159,7 +157,7 @@ func (client *client) isInClose() bool { ...@@ -159,7 +157,7 @@ func (client *client) isInClose() bool {
// Close 关闭client // Close 关闭client
func (client *client) Close() { func (client *client) Close() {
if atomic.LoadInt32(&client.isClosed) == 1 { if atomic.LoadInt32(&client.isClosed) == 1 || client.topic == nil {
return return
} }
topic := client.getTopic() topic := client.getTopic()
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package queue
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSetTopic(t *testing.T) {
client := &client{}
hi := "hello"
client.setTopic(hi)
ret := client.getTopic()
assert.Equal(t, hi, ret)
}
...@@ -301,49 +301,3 @@ func (c *channelClient) GetExecBalance(in *types.ReqGetExecBalance) (*types.Repl ...@@ -301,49 +301,3 @@ func (c *channelClient) GetExecBalance(in *types.ReqGetExecBalance) (*types.Repl
} }
return resp, nil return resp, nil
} }
// GetAssetBalance 通用的获得资产的接口
func (c *channelClient) GetAssetBalance(in *types.ReqBalance) ([]*types.Account, error) {
if in.AssetSymbol == "" || in.AssetExec == "" {
return nil, types.ErrInvalidParam
}
acc, err := account.NewAccountDB(in.AssetExec, in.AssetSymbol, nil)
if err != nil {
return nil, err
}
// load balance
if in.AssetExec == in.Execer || in.Execer == "" {
addrs := in.GetAddresses()
var queryAddrs []string
for _, addr := range addrs {
if err := address.CheckAddress(addr); err != nil {
addr = string(acc.AccountKey(addr))
}
queryAddrs = append(queryAddrs, addr)
}
accounts, err := acc.LoadAccounts(c.QueueProtocolAPI, queryAddrs)
if err != nil {
log.Error("GetAssetBalance", "err", err.Error(), "exec", in.AssetExec, "symbol", in.AssetSymbol,
"address", queryAddrs)
return nil, err
}
return accounts, nil
}
// load exec balance
execaddress := address.ExecAddress(in.GetExecer())
addrs := in.GetAddresses()
var accounts []*types.Account
for _, addr := range addrs {
acc, err := acc.LoadExecAccountQueue(c.QueueProtocolAPI, addr, execaddress)
if err != nil {
log.Error("GetAssetBalance for exector", "err", err.Error(), "exec", in.AssetExec,
"symbol", in.AssetSymbol, "address", addr, "where", in.Execer)
continue
}
accounts = append(accounts, acc)
}
return accounts, nil
}
...@@ -361,66 +361,3 @@ func TestChannelClient_GetBalance(t *testing.T) { ...@@ -361,66 +361,3 @@ func TestChannelClient_GetBalance(t *testing.T) {
// assert.NotNil(t, data) // assert.NotNil(t, data)
// assert.Nil(t, err) // assert.Nil(t, err)
// } // }
func testChannelClient_GetAssetBalanceCoin(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
db := new(account.DB)
client := &channelClient{
QueueProtocolAPI: api,
accountdb: db,
}
head := &types.Header{StateHash: []byte("sdfadasds")}
api.On("GetLastHeader").Return(head, nil)
var acc = &types.Account{Addr: "1Jn2qu84Z1SUUosWjySggBS9pKWdAP3tZt", Balance: 100}
accv := types.Encode(acc)
storevalue := &types.StoreReplyValue{}
storevalue.Values = append(storevalue.Values, accv)
api.On("StoreGet", mock.Anything).Return(storevalue, nil)
var addrs = []string{"1Jn2qu84Z1SUUosWjySggBS9pKWdAP3tZt"}
var in = &types.ReqBalance{
AssetSymbol: "bty",
AssetExec: "coins",
Execer: "coins",
Addresses: addrs,
}
data, err := client.GetAssetBalance(in)
assert.Nil(t, err)
assert.Equal(t, acc.Addr, data[0].Addr)
}
func testChannelClient_GetAssetBalanceOther(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
db := new(account.DB)
client := &channelClient{
QueueProtocolAPI: api,
accountdb: db,
}
head := &types.Header{StateHash: []byte("sdfadasds")}
api.On("GetLastHeader").Return(head, nil)
var acc = &types.Account{Addr: "1Jn2qu84Z1SUUosWjySggBS9pKWdAP3tZt", Balance: 100}
accv := types.Encode(acc)
storevalue := &types.StoreReplyValue{}
storevalue.Values = append(storevalue.Values, accv)
api.On("StoreGet", mock.Anything).Return(storevalue, nil)
var addrs = []string{"1Jn2qu84Z1SUUosWjySggBS9pKWdAP3tZt"}
var in = &types.ReqBalance{
AssetSymbol: "bty",
AssetExec: "coins",
Execer: types.ExecName("ticket"),
Addresses: addrs,
}
data, err := client.GetAssetBalance(in)
assert.Nil(t, err)
assert.Equal(t, acc.Addr, data[0].Addr)
}
func TestChannelClient_GetAssetBalance(t *testing.T) {
testChannelClient_GetAssetBalanceCoin(t)
testChannelClient_GetAssetBalanceOther(t)
}
...@@ -344,11 +344,6 @@ func (g *Grpc) GetLastBlockSequence(ctx context.Context, in *pb.ReqNil) (*pb.Int ...@@ -344,11 +344,6 @@ func (g *Grpc) GetLastBlockSequence(ctx context.Context, in *pb.ReqNil) (*pb.Int
return g.cli.GetLastBlockSequence() return g.cli.GetLastBlockSequence()
} }
// GetBlockSequences get block sequeces
func (g *Grpc) GetBlockSequences(ctx context.Context, in *pb.ReqBlocks) (*pb.BlockSequences, error) {
return g.cli.GetBlockSequences(in)
}
// GetBlockByHashes get block by hashes // GetBlockByHashes get block by hashes
func (g *Grpc) GetBlockByHashes(ctx context.Context, in *pb.ReqHashes) (*pb.BlockDetails, error) { func (g *Grpc) GetBlockByHashes(ctx context.Context, in *pb.ReqHashes) (*pb.BlockDetails, error) {
return g.cli.GetBlockByHashes(in) return g.cli.GetBlockByHashes(in)
...@@ -359,6 +354,11 @@ func (g *Grpc) GetSequenceByHash(ctx context.Context, in *pb.ReqHash) (*pb.Int64 ...@@ -359,6 +354,11 @@ func (g *Grpc) GetSequenceByHash(ctx context.Context, in *pb.ReqHash) (*pb.Int64
return g.cli.GetSequenceByHash(in) return g.cli.GetSequenceByHash(in)
} }
// GetBlockBySeq get block with hash by seq
func (g *Grpc) GetBlockBySeq(ctx context.Context, in *pb.Int64) (*pb.BlockSeq, error) {
return g.cli.GetBlockBySeq(in)
}
// SignRawTx signature rawtransaction // SignRawTx signature rawtransaction
func (g *Grpc) SignRawTx(ctx context.Context, in *pb.ReqSignRawTx) (*pb.ReplySignRawTx, error) { func (g *Grpc) SignRawTx(ctx context.Context, in *pb.ReqSignRawTx) (*pb.ReplySignRawTx, error) {
return g.cli.SignRawTx(in) return g.cli.SignRawTx(in)
...@@ -372,3 +372,8 @@ func (g *Grpc) QueryRandNum(ctx context.Context, in *pb.ReqRandHash) (*pb.ReplyH ...@@ -372,3 +372,8 @@ func (g *Grpc) QueryRandNum(ctx context.Context, in *pb.ReqRandHash) (*pb.ReplyH
} }
return reply.(*pb.ReplyHash), nil return reply.(*pb.ReplyHash), nil
} }
// GetFork get fork height by fork key
func (g *Grpc) GetFork(ctx context.Context, in *pb.ReqKey) (*pb.Int64, error) {
return &pb.Int64{Data: pb.GetFork(string(in.Key))}, nil
}
...@@ -1051,22 +1051,6 @@ func (c *Chain33) GetLastBlockSequence(in *types.ReqNil, result *interface{}) er ...@@ -1051,22 +1051,6 @@ func (c *Chain33) GetLastBlockSequence(in *types.ReqNil, result *interface{}) er
return nil return nil
} }
// GetBlockSequences get the block loading sequence number information for the specified interval
func (c *Chain33) GetBlockSequences(in rpctypes.BlockParam, result *interface{}) error {
resp, err := c.cli.GetBlockSequences(&types.ReqBlocks{Start: in.Start, End: in.End, IsDetail: in.Isdetail, Pid: []string{""}})
if err != nil {
return err
}
var BlkSeqs rpctypes.ReplyBlkSeqs
items := resp.GetItems()
for _, item := range items {
BlkSeqs.BlkSeqInfos = append(BlkSeqs.BlkSeqInfos, &rpctypes.ReplyBlkSeq{Hash: common.ToHex(item.GetHash()),
Type: item.GetType()})
}
*result = &BlkSeqs
return nil
}
// GetBlockByHashes get block information by hashes // GetBlockByHashes get block information by hashes
func (c *Chain33) GetBlockByHashes(in rpctypes.ReqHashes, result *interface{}) error { func (c *Chain33) GetBlockByHashes(in rpctypes.ReqHashes, result *interface{}) error {
log.Warn("GetBlockByHashes", "hashes", in) log.Warn("GetBlockByHashes", "hashes", in)
......
...@@ -1214,26 +1214,6 @@ func TestChain33_GetLastBlockSequence(t *testing.T) { ...@@ -1214,26 +1214,6 @@ func TestChain33_GetLastBlockSequence(t *testing.T) {
assert.Equal(t, int64(1), result2) assert.Equal(t, int64(1), result2)
} }
func TestChain33_GetBlockSequences(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
client := newTestChain33(api)
var result interface{}
api.On("GetBlockSequences", mock.Anything).Return(nil, types.ErrInvalidParam)
err := client.GetBlockSequences(rpctypes.BlockParam{}, &result)
assert.NotNil(t, err)
api = new(mocks.QueueProtocolAPI)
client = newTestChain33(api)
var result2 interface{}
blocks := types.BlockSequences{}
blocks.Items = make([]*types.BlockSequence, 0)
blocks.Items = append(blocks.Items, &types.BlockSequence{Hash: []byte("h1"), Type: 1})
api.On("GetBlockSequences", mock.Anything).Return(&blocks, nil)
err = client.GetBlockSequences(rpctypes.BlockParam{}, &result2)
assert.Nil(t, err)
assert.Equal(t, 1, len(result2.(*rpctypes.ReplyBlkSeqs).BlkSeqInfos))
}
func TestChain33_GetBlockByHashes(t *testing.T) { func TestChain33_GetBlockByHashes(t *testing.T) {
api := new(mocks.QueueProtocolAPI) api := new(mocks.QueueProtocolAPI)
client := newTestChain33(api) client := newTestChain33(api)
......
...@@ -175,6 +175,15 @@ func TestGrpc_Call(t *testing.T) { ...@@ -175,6 +175,15 @@ func TestGrpc_Call(t *testing.T) {
assert.Equal(t, ret.IsOk, result.IsOk) assert.Equal(t, ret.IsOk, result.IsOk)
assert.Equal(t, ret.Msg, result.Msg) assert.Equal(t, ret.Msg, result.Msg)
rst, err := client.GetFork(ctx, &types.ReqKey{Key: []byte("ForkBlockHash")})
assert.Nil(t, err)
assert.Equal(t, int64(1), rst.Data)
api.On("GetBlockBySeq", mock.Anything).Return(&types.BlockSeq{}, nil)
blockSeq, err := client.GetBlockBySeq(ctx, &types.Int64{Data: 1})
assert.Nil(t, err)
assert.Equal(t, &types.BlockSeq{}, blockSeq)
server.Close() server.Close()
mock.AssertExpectationsForObjects(t, api) mock.AssertExpectationsForObjects(t, api)
} }
...@@ -109,7 +109,6 @@ func (store *BaseStore) processMessage(msg queue.Message) { ...@@ -109,7 +109,6 @@ func (store *BaseStore) processMessage(msg queue.Message) {
go func() { go func() {
datas := msg.GetData().(*types.StoreSetWithSync) datas := msg.GetData().(*types.StoreSetWithSync)
hash, err := store.child.MemSet(datas.Storeset, datas.Sync) hash, err := store.child.MemSet(datas.Storeset, datas.Sync)
println("EventStoreMemSet", string(hash))
if err != nil { if err != nil {
msg.Reply(client.NewMessage("", types.EventStoreSetReply, err)) msg.Reply(client.NewMessage("", types.EventStoreSetReply, err))
return return
...@@ -119,7 +118,6 @@ func (store *BaseStore) processMessage(msg queue.Message) { ...@@ -119,7 +118,6 @@ func (store *BaseStore) processMessage(msg queue.Message) {
} else if msg.Ty == types.EventStoreCommit { //把内存中set 的交易 commit } else if msg.Ty == types.EventStoreCommit { //把内存中set 的交易 commit
go func() { go func() {
req := msg.GetData().(*types.ReqHash) req := msg.GetData().(*types.ReqHash)
println("EventStoreCommit", string(req.Hash))
hash, err := store.child.Commit(req) hash, err := store.child.Commit(req)
if hash == nil { if hash == nil {
msg.Reply(client.NewMessage("", types.EventStoreCommit, types.ErrHashNotFound)) msg.Reply(client.NewMessage("", types.EventStoreCommit, types.ErrHashNotFound))
......
...@@ -144,4 +144,13 @@ func TestBaseStore_Queue(t *testing.T) { ...@@ -144,4 +144,13 @@ func TestBaseStore_Queue(t *testing.T) {
assert.NotNil(t, resp) assert.NotNil(t, resp)
assert.Equal(t, int64(types.EventGetTotalCoinsReply), resp.Ty) assert.Equal(t, int64(types.EventGetTotalCoinsReply), resp.Ty)
del := &types.StoreDel{StateHash: EmptyRoot[:], Height: 0}
msg = queueClinet.NewMessage("store", types.EventStoreDel, del)
err = queueClinet.Send(msg, true)
assert.Nil(t, err)
resp, err = queueClinet.Wait(msg)
assert.Nil(t, err)
assert.NotNil(t, resp)
assert.Equal(t, int64(types.EventStoreDel), resp.Ty)
} }
...@@ -7,6 +7,9 @@ package mavl ...@@ -7,6 +7,9 @@ package mavl
import ( import (
"bytes" "bytes"
"fmt"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
) )
...@@ -115,6 +118,26 @@ func (node *Node) get(t *Tree, key []byte) (index int32, value []byte, exists bo ...@@ -115,6 +118,26 @@ func (node *Node) get(t *Tree, key []byte) (index int32, value []byte, exists bo
return index, value, exists return index, value, exists
} }
func (node *Node) getHash(t *Tree, key []byte) (index int32, hash []byte, exists bool) {
if node.height == 0 {
cmp := bytes.Compare(node.key, key)
if cmp == 0 {
return 0, node.hash, true
} else if cmp == -1 {
return 1, nil, false
} else {
return 0, nil, false
}
}
if bytes.Compare(key, node.key) < 0 {
return node.getLeftNode(t).getHash(t, key)
}
rightNode := node.getRightNode(t)
index, hash, exists = rightNode.getHash(t, key)
index += node.size - rightNode.size
return index, hash, exists
}
//通过index获取leaf节点信息 //通过index获取leaf节点信息
func (node *Node) getByIndex(t *Tree, index int32) (key []byte, value []byte) { func (node *Node) getByIndex(t *Tree, index int32) (key []byte, value []byte) {
if node.height == 0 { if node.height == 0 {
...@@ -223,6 +246,21 @@ func (node *Node) save(t *Tree) int64 { ...@@ -223,6 +246,21 @@ func (node *Node) save(t *Tree) int64 {
return leftsaveNodeNo + rightsaveNodeNo + 1 return leftsaveNodeNo + rightsaveNodeNo + 1
} }
// 保存root节点hash以及区块高度
func (node *Node) saveRootHash(t *Tree) (err error) {
if node.hash == nil || t.ndb == nil || t.ndb.db == nil {
return
}
h := &types.Int64{}
h.Data = t.blockHeight
value, err := proto.Marshal(h)
if err != nil {
return err
}
t.ndb.batch.Set(genRootHashHeight(t.blockHeight, node.hash), value)
return nil
}
//将内存中的node转换成存储到db中的格式 //将内存中的node转换成存储到db中的格式
func (node *Node) storeNode(t *Tree) []byte { func (node *Node) storeNode(t *Tree) []byte {
var storeNode types.StoreNode var storeNode types.StoreNode
...@@ -309,7 +347,7 @@ func (node *Node) getLeftNode(t *Tree) *Node { ...@@ -309,7 +347,7 @@ func (node *Node) getLeftNode(t *Tree) *Node {
} }
leftNode, err := t.ndb.GetNode(t, node.leftHash) leftNode, err := t.ndb.GetNode(t, node.leftHash)
if err != nil { if err != nil {
panic(err) //数据库已经损坏 panic(fmt.Sprintln("left hash", common.ToHex(node.leftHash), err)) //数据库已经损坏
} }
return leftNode return leftNode
} }
...@@ -320,7 +358,7 @@ func (node *Node) getRightNode(t *Tree) *Node { ...@@ -320,7 +358,7 @@ func (node *Node) getRightNode(t *Tree) *Node {
} }
rightNode, err := t.ndb.GetNode(t, node.rightHash) rightNode, err := t.ndb.GetNode(t, node.rightHash)
if err != nil { if err != nil {
panic(err) panic(fmt.Sprintln("right hash", common.ToHex(node.rightHash), err))
} }
return rightNode return rightNode
} }
......
...@@ -25,7 +25,6 @@ const ( ...@@ -25,7 +25,6 @@ const (
leafKeyCountPrefix = "..mk.." leafKeyCountPrefix = "..mk.."
oldLeafKeyCountPrefix = "..mok.." oldLeafKeyCountPrefix = "..mok.."
secLvlPruningHeightKey = "_..mslphk.._" secLvlPruningHeightKey = "_..mslphk.._"
delMapPoolPrefix = "_..md.._"
blockHeightStrLen = 10 blockHeightStrLen = 10
hashLenStr = 3 hashLenStr = 3
pruningStateStart = 1 pruningStateStart = 1
...@@ -175,9 +174,12 @@ func setSecLvlPruningHeight(db dbm.DB, height int64) error { ...@@ -175,9 +174,12 @@ func setSecLvlPruningHeight(db dbm.DB, height int64) error {
return db.Set([]byte(secLvlPruningHeightKey), value) return db.Set([]byte(secLvlPruningHeightKey), value)
} }
func pruning(db dbm.DB, curHeight int64) {
defer wg.Done()
pruningTree(db, curHeight)
}
func pruningTree(db dbm.DB, curHeight int64) { func pruningTree(db dbm.DB, curHeight int64) {
wg.Add(1)
defer wg.Add(-1)
setPruning(pruningStateStart) setPruning(pruningStateStart)
// 一级遍历 // 一级遍历
pruningFirstLevel(db, curHeight) pruningFirstLevel(db, curHeight)
...@@ -423,20 +425,140 @@ func PruningTreePrintDB(db dbm.DB, prefix []byte) { ...@@ -423,20 +425,140 @@ func PruningTreePrintDB(db dbm.DB, prefix []byte) {
treelog.Debug("pruningTree:", "key:", string(it.Key())) treelog.Debug("pruningTree:", "key:", string(it.Key()))
} else if bytes.Equal(prefix, []byte(leafNodePrefix)) { } else if bytes.Equal(prefix, []byte(leafNodePrefix)) {
treelog.Debug("pruningTree:", "key:", string(it.Key())) treelog.Debug("pruningTree:", "key:", string(it.Key()))
} else if bytes.Equal(prefix, []byte(delMapPoolPrefix)) { }
value := it.Value() count++
var pData types.StoreValuePool }
fmt.Printf("prefix %s All count:%d \n", string(prefix), count)
treelog.Info("pruningTree:", "prefix:", string(prefix), "All count", count)
}
// PrintSameLeafKey 查询相同的叶子节点
func PrintSameLeafKey(db dbm.DB, key string) {
printSameLeafKeyDB(db, key, false)
printSameLeafKeyDB(db, key, true)
}
func printSameLeafKeyDB(db dbm.DB, key string, isold bool) {
var prifex string
if isold {
prifex = oldLeafKeyCountPrefix
} else {
prifex = leafKeyCountPrefix
}
priKey := []byte(fmt.Sprintf("%s%s", prifex, key))
it := db.Iterator(priKey, nil, true)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
hashK := make([]byte, len(it.Key()))
copy(hashK, it.Key())
key, height, hash, err := getKeyHeightFromLeafCountKey(hashK)
if err == nil {
pri := ""
if len(hash) > 32 {
pri = string(hash[:16])
}
treelog.Info("leaf node", "height", height, "pri", pri, "hash", common.Bytes2Hex(hash), "key", string(key))
}
}
}
// PrintLeafNodeParent 查询叶子节点的父节点
func PrintLeafNodeParent(db dbm.DB, key, hash []byte, height int64) {
var isHave bool
leafCountKey := genLeafCountKey(key, hash, height, len(hash))
value, err := db.Get(leafCountKey)
if err == nil {
var pData types.PruneData
err := proto.Unmarshal(value, &pData) err := proto.Unmarshal(value, &pData)
if err == nil { if err == nil {
for _, k := range pData.Values { for _, hash := range pData.Hashs {
treelog.Debug("delMapPool value ", "hash:", common.Bytes2Hex(k[:2])) var pri string
if len(hash) > 32 {
pri = string(hash[:16])
} }
treelog.Info("hash node", "hash pri", pri, "hash", common.Bytes2Hex(hash))
} }
} }
count++ isHave = true
}
if !isHave {
oldLeafCountKey := genOldLeafCountKey(key, hash, height, len(hash))
value, err = db.Get(oldLeafCountKey)
if err == nil {
var pData types.PruneData
err := proto.Unmarshal(value, &pData)
if err == nil {
for _, hash := range pData.Hashs {
var pri string
if len(hash) > 32 {
pri = string(hash[:16])
}
treelog.Info("hash node", "hash pri", pri, "hash", common.Bytes2Hex(hash))
}
}
isHave = true
}
}
if !isHave {
treelog.Info("err", "get db", "not exist in db")
}
}
// PrintNodeDb 查询hash节点以及其子节点
func PrintNodeDb(db dbm.DB, hash []byte) {
nDb := newNodeDB(db, true)
node, err := nDb.GetNode(nil, hash)
if err != nil {
fmt.Println("err", err)
return
}
pri := ""
if len(node.hash) > 32 {
pri = string(node.hash[:16])
}
treelog.Info("hash node", "hash pri", pri, "hash", common.Bytes2Hex(node.hash), "height", node.height)
node.printNodeInfo(nDb)
}
func (node *Node) printNodeInfo(db *nodeDB) {
if node.height == 0 {
pri := ""
if len(node.hash) > 32 {
pri = string(node.hash[:16])
}
treelog.Info("leaf node", "hash pri", pri, "hash", common.Bytes2Hex(node.hash), "key", string(node.key))
return
}
if node.leftHash != nil {
left, err := db.GetNode(nil, node.leftHash)
if err != nil {
return
}
pri := ""
if len(left.hash) > 32 {
pri = string(left.hash[:16])
}
treelog.Debug("hash node", "hash pri", pri, "hash", common.Bytes2Hex(left.hash), "height", left.height)
left.printNodeInfo(db)
}
if node.rightHash != nil {
right, err := db.GetNode(nil, node.rightHash)
if err != nil {
return
}
pri := ""
if len(right.hash) > 32 {
pri = string(right.hash[:16])
}
treelog.Debug("hash node", "hash pri", pri, "hash", common.Bytes2Hex(right.hash), "height", right.height)
right.printNodeInfo(db)
} }
fmt.Printf("prefix %s All count:%d \n", string(prefix), count)
treelog.Info("pruningTree:", "prefix:", string(prefix), "All count", count)
} }
// PruningTree 裁剪树 // PruningTree 裁剪树
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/33cn/chain33/common"
dbm "github.com/33cn/chain33/common/db" dbm "github.com/33cn/chain33/common/db"
log "github.com/33cn/chain33/common/log/log15" log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
...@@ -21,6 +22,8 @@ import ( ...@@ -21,6 +22,8 @@ import (
const ( const (
hashNodePrefix = "_mh_" hashNodePrefix = "_mh_"
leafNodePrefix = "_mb_" leafNodePrefix = "_mb_"
curMaxBlockHeight = "_..mcmbh.._"
rootHashHeightPrefix = "_mrhp_"
) )
var ( var (
...@@ -31,6 +34,9 @@ var ( ...@@ -31,6 +34,9 @@ var (
enableMavlPrefix bool enableMavlPrefix bool
// 是否开启MVCC // 是否开启MVCC
enableMvcc bool enableMvcc bool
// 当前树的最大高度
maxBlockHeight int64
heightMtx sync.Mutex
) )
// EnableMavlPrefix 使能mavl加前缀 // EnableMavlPrefix 使能mavl加前缀
...@@ -132,8 +138,16 @@ func (t *Tree) Save() []byte { ...@@ -132,8 +138,16 @@ func (t *Tree) Save() []byte {
return nil return nil
} }
if t.ndb != nil { if t.ndb != nil {
if t.isRemoveLeafCountKey() {
//DelLeafCountKV 需要先提前将leafcoutkey删除,这里需先于t.ndb.Commit()
DelLeafCountKV(t.ndb.db, t.blockHeight)
}
saveNodeNo := t.root.save(t) saveNodeNo := t.root.save(t)
treelog.Debug("Tree.Save", "saveNodeNo", saveNodeNo, "tree height", t.blockHeight) treelog.Debug("Tree.Save", "saveNodeNo", saveNodeNo, "tree height", t.blockHeight)
// 保存每个高度的roothash
if enablePrune {
t.root.saveRootHash(t)
}
beg := types.Now() beg := types.Now()
err := t.ndb.Commit() err := t.ndb.Commit()
treelog.Info("tree.commit", "cost", types.Since(beg)) treelog.Info("tree.commit", "cost", types.Since(beg))
...@@ -142,9 +156,11 @@ func (t *Tree) Save() []byte { ...@@ -142,9 +156,11 @@ func (t *Tree) Save() []byte {
} }
// 该线程应只允许一个 // 该线程应只允许一个
if enablePrune && !isPruning() && if enablePrune && !isPruning() &&
pruneHeight != 0 &&
t.blockHeight%int64(pruneHeight) == 0 && t.blockHeight%int64(pruneHeight) == 0 &&
t.blockHeight/int64(pruneHeight) > 1 { t.blockHeight/int64(pruneHeight) > 1 {
go pruningTree(t.ndb.db, t.blockHeight) wg.Add(1)
go pruning(t.ndb.db, t.blockHeight)
} }
} }
return t.root.hash return t.root.hash
...@@ -175,6 +191,14 @@ func (t *Tree) Get(key []byte) (index int32, value []byte, exists bool) { ...@@ -175,6 +191,14 @@ func (t *Tree) Get(key []byte) (index int32, value []byte, exists bool) {
return t.root.get(t, key) return t.root.get(t, key)
} }
// GetHash 通过key获取leaf节点hash信息
func (t *Tree) GetHash(key []byte) (index int32, hash []byte, exists bool) {
if t.root == nil {
return 0, nil, false
}
return t.root.getHash(t, key)
}
// GetByIndex 通过index获取leaf节点信息 // GetByIndex 通过index获取leaf节点信息
func (t *Tree) GetByIndex(index int32) (key []byte, value []byte) { func (t *Tree) GetByIndex(index int32) (key []byte, value []byte) {
if t.root == nil { if t.root == nil {
...@@ -220,6 +244,85 @@ func (t *Tree) Remove(key []byte) (value []byte, removed bool) { ...@@ -220,6 +244,85 @@ func (t *Tree) Remove(key []byte) (value []byte, removed bool) {
return value, true return value, true
} }
func (t *Tree) getMaxBlockHeight() int64 {
if t.ndb == nil || t.ndb.db == nil {
return 0
}
value, err := t.ndb.db.Get([]byte(curMaxBlockHeight))
if len(value) == 0 || err != nil {
return 0
}
h := &types.Int64{}
err = proto.Unmarshal(value, h)
if err != nil {
return 0
}
return h.Data
}
func (t *Tree) setMaxBlockHeight(height int64) error {
if t.ndb == nil || t.ndb.batch == nil {
return fmt.Errorf("ndb is nil")
}
h := &types.Int64{}
h.Data = height
value, err := proto.Marshal(h)
if err != nil {
return err
}
t.ndb.batch.Set([]byte(curMaxBlockHeight), value)
return nil
}
func (t *Tree) isRemoveLeafCountKey() bool {
if t.ndb == nil || t.ndb.db == nil {
return false
}
heightMtx.Lock()
defer heightMtx.Unlock()
if maxBlockHeight == 0 {
maxBlockHeight = t.getMaxBlockHeight()
}
if t.blockHeight > maxBlockHeight {
t.setMaxBlockHeight(t.blockHeight)
maxBlockHeight = t.blockHeight
return false
}
return true
}
// RemoveLeafCountKey 删除叶子节点的索引节点(防止裁剪时候回退产生的误删除)
func (t *Tree) RemoveLeafCountKey(height int64) {
if t.root == nil || t.ndb == nil {
return
}
prefix := genPrefixHashKey(&Node{}, height)
it := t.ndb.db.Iterator(prefix, nil, true)
defer it.Close()
var keys [][]byte
for it.Rewind(); it.Valid(); it.Next() {
value := make([]byte, len(it.Value()))
copy(value, it.Value())
pData := &types.StoreNode{}
err := proto.Unmarshal(value, pData)
if err == nil {
keys = append(keys, pData.Key)
}
}
batch := t.ndb.db.NewBatch(true)
for _, k := range keys {
_, hash, exits := t.GetHash(k)
if exits {
batch.Delete(genLeafCountKey(k, hash, height, len(hash)))
treelog.Debug("RemoveLeafCountKey:", "height", height, "key:", string(k), "hash:", common.ToHex(hash))
}
}
batch.Write()
}
// Iterate 依次迭代遍历树的所有键 // Iterate 依次迭代遍历树的所有键
func (t *Tree) Iterate(fn func(key []byte, value []byte) bool) (stopped bool) { func (t *Tree) Iterate(fn func(key []byte, value []byte) bool) (stopped bool) {
if t.root == nil { if t.root == nil {
...@@ -465,6 +568,26 @@ func DelKVPair(db dbm.DB, storeDel *types.StoreGet) ([]byte, [][]byte, error) { ...@@ -465,6 +568,26 @@ func DelKVPair(db dbm.DB, storeDel *types.StoreGet) ([]byte, [][]byte, error) {
return tree.Save(), values, nil return tree.Save(), values, nil
} }
// DelLeafCountKV 回退时候用于删除叶子节点的索引节点
func DelLeafCountKV(db dbm.DB, blockHeight int64) error {
treelog.Debug("RemoveLeafCountKey:", "height", blockHeight)
prefix := genRootHashPrefix(blockHeight)
it := db.Iterator(prefix, nil, true)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
hash, err := getRootHash(it.Key())
if err == nil {
tree := NewTree(db, true)
err := tree.Load(hash)
if err == nil {
treelog.Debug("RemoveLeafCountKey:", "height", blockHeight, "root hash:", common.ToHex(hash))
tree.RemoveLeafCountKey(blockHeight)
}
}
}
return nil
}
// VerifyKVPairProof 验证KVPair 的证明 // VerifyKVPairProof 验证KVPair 的证明
func VerifyKVPairProof(db dbm.DB, roothash []byte, keyvalue types.KeyValue, proof []byte) bool { func VerifyKVPairProof(db dbm.DB, roothash []byte, keyvalue types.KeyValue, proof []byte) bool {
...@@ -516,3 +639,24 @@ func genPrefixHashKey(node *Node, blockHeight int64) (key []byte) { ...@@ -516,3 +639,24 @@ func genPrefixHashKey(node *Node, blockHeight int64) (key []byte) {
} }
return key return key
} }
func genRootHashPrefix(blockHeight int64) (key []byte) {
key = []byte(fmt.Sprintf("%s%010d", rootHashHeightPrefix, blockHeight))
return key
}
func genRootHashHeight(blockHeight int64, hash []byte) (key []byte) {
key = []byte(fmt.Sprintf("%s%010d%s", rootHashHeightPrefix, blockHeight, string(hash)))
return key
}
func getRootHash(hashKey []byte) (hash []byte, err error) {
if len(hashKey) < len(rootHashHeightPrefix)+blockHeightStrLen+len(common.Hash{}) {
return nil, types.ErrSize
}
if !bytes.Contains(hashKey, []byte(rootHashHeightPrefix)) {
return nil, types.ErrSize
}
hash = hashKey[len(hashKey)-len(common.Hash{}):]
return hash, nil
}
...@@ -176,7 +176,6 @@ func TestKvdbIterate(t *testing.T) { ...@@ -176,7 +176,6 @@ func TestKvdbIterate(t *testing.T) {
assert.Len(t, checkKVResult, 2) assert.Len(t, checkKVResult, 2)
assert.Equal(t, []byte("v1"), checkKVResult[0].Value) assert.Equal(t, []byte("v1"), checkKVResult[0].Value)
assert.Equal(t, []byte("v2"), checkKVResult[1].Value) assert.Equal(t, []byte("v2"), checkKVResult[1].Value)
} }
type StatTool struct { type StatTool struct {
......
...@@ -138,6 +138,7 @@ const ( ...@@ -138,6 +138,7 @@ const (
EventStoreListReply = 131 EventStoreListReply = 131
EventListBlockSeqCB = 132 EventListBlockSeqCB = 132
EventGetSeqCBLastNum = 133 EventGetSeqCBLastNum = 133
EventGetBlockBySeq = 134
//exec //exec
EventBlockChainQuery = 212 EventBlockChainQuery = 212
...@@ -276,4 +277,6 @@ var eventName = map[int]string{ ...@@ -276,4 +277,6 @@ var eventName = map[int]string{
// Token // Token
EventBlockChainQuery: "EventBlockChainQuery", EventBlockChainQuery: "EventBlockChainQuery",
EventConsensusQuery: "EventConsensusQuery", EventConsensusQuery: "EventConsensusQuery",
EventGetBlockBySeq: "EventGetBlockBySeq",
} }
...@@ -402,6 +402,36 @@ func (_m *Chain33Client) GetBlockByHashes(ctx context.Context, in *types.ReqHash ...@@ -402,6 +402,36 @@ func (_m *Chain33Client) GetBlockByHashes(ctx context.Context, in *types.ReqHash
return r0, r1 return r0, r1
} }
// GetBlockBySeq provides a mock function with given fields: ctx, in, opts
func (_m *Chain33Client) GetBlockBySeq(ctx context.Context, in *types.Int64, opts ...grpc.CallOption) (*types.BlockSeq, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *types.BlockSeq
if rf, ok := ret.Get(0).(func(context.Context, *types.Int64, ...grpc.CallOption) *types.BlockSeq); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.BlockSeq)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *types.Int64, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetBlockHash provides a mock function with given fields: ctx, in, opts // GetBlockHash provides a mock function with given fields: ctx, in, opts
func (_m *Chain33Client) GetBlockHash(ctx context.Context, in *types.ReqInt, opts ...grpc.CallOption) (*types.ReplyHash, error) { func (_m *Chain33Client) GetBlockHash(ctx context.Context, in *types.ReqInt, opts ...grpc.CallOption) (*types.ReplyHash, error) {
_va := make([]interface{}, len(opts)) _va := make([]interface{}, len(opts))
...@@ -462,8 +492,8 @@ func (_m *Chain33Client) GetBlockOverview(ctx context.Context, in *types.ReqHash ...@@ -462,8 +492,8 @@ func (_m *Chain33Client) GetBlockOverview(ctx context.Context, in *types.ReqHash
return r0, r1 return r0, r1
} }
// GetBlockSequences provides a mock function with given fields: ctx, in, opts // GetBlocks provides a mock function with given fields: ctx, in, opts
func (_m *Chain33Client) GetBlockSequences(ctx context.Context, in *types.ReqBlocks, opts ...grpc.CallOption) (*types.BlockSequences, error) { func (_m *Chain33Client) GetBlocks(ctx context.Context, in *types.ReqBlocks, opts ...grpc.CallOption) (*types.Reply, error) {
_va := make([]interface{}, len(opts)) _va := make([]interface{}, len(opts))
for _i := range opts { for _i := range opts {
_va[_i] = opts[_i] _va[_i] = opts[_i]
...@@ -473,12 +503,12 @@ func (_m *Chain33Client) GetBlockSequences(ctx context.Context, in *types.ReqBlo ...@@ -473,12 +503,12 @@ func (_m *Chain33Client) GetBlockSequences(ctx context.Context, in *types.ReqBlo
_ca = append(_ca, _va...) _ca = append(_ca, _va...)
ret := _m.Called(_ca...) ret := _m.Called(_ca...)
var r0 *types.BlockSequences var r0 *types.Reply
if rf, ok := ret.Get(0).(func(context.Context, *types.ReqBlocks, ...grpc.CallOption) *types.BlockSequences); ok { if rf, ok := ret.Get(0).(func(context.Context, *types.ReqBlocks, ...grpc.CallOption) *types.Reply); ok {
r0 = rf(ctx, in, opts...) r0 = rf(ctx, in, opts...)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.BlockSequences) r0 = ret.Get(0).(*types.Reply)
} }
} }
...@@ -492,8 +522,8 @@ func (_m *Chain33Client) GetBlockSequences(ctx context.Context, in *types.ReqBlo ...@@ -492,8 +522,8 @@ func (_m *Chain33Client) GetBlockSequences(ctx context.Context, in *types.ReqBlo
return r0, r1 return r0, r1
} }
// GetBlocks provides a mock function with given fields: ctx, in, opts // GetFatalFailure provides a mock function with given fields: ctx, in, opts
func (_m *Chain33Client) GetBlocks(ctx context.Context, in *types.ReqBlocks, opts ...grpc.CallOption) (*types.Reply, error) { func (_m *Chain33Client) GetFatalFailure(ctx context.Context, in *types.ReqNil, opts ...grpc.CallOption) (*types.Int32, error) {
_va := make([]interface{}, len(opts)) _va := make([]interface{}, len(opts))
for _i := range opts { for _i := range opts {
_va[_i] = opts[_i] _va[_i] = opts[_i]
...@@ -503,17 +533,17 @@ func (_m *Chain33Client) GetBlocks(ctx context.Context, in *types.ReqBlocks, opt ...@@ -503,17 +533,17 @@ func (_m *Chain33Client) GetBlocks(ctx context.Context, in *types.ReqBlocks, opt
_ca = append(_ca, _va...) _ca = append(_ca, _va...)
ret := _m.Called(_ca...) ret := _m.Called(_ca...)
var r0 *types.Reply var r0 *types.Int32
if rf, ok := ret.Get(0).(func(context.Context, *types.ReqBlocks, ...grpc.CallOption) *types.Reply); ok { if rf, ok := ret.Get(0).(func(context.Context, *types.ReqNil, ...grpc.CallOption) *types.Int32); ok {
r0 = rf(ctx, in, opts...) r0 = rf(ctx, in, opts...)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Reply) r0 = ret.Get(0).(*types.Int32)
} }
} }
var r1 error var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *types.ReqBlocks, ...grpc.CallOption) error); ok { if rf, ok := ret.Get(1).(func(context.Context, *types.ReqNil, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...) r1 = rf(ctx, in, opts...)
} else { } else {
r1 = ret.Error(1) r1 = ret.Error(1)
...@@ -522,8 +552,8 @@ func (_m *Chain33Client) GetBlocks(ctx context.Context, in *types.ReqBlocks, opt ...@@ -522,8 +552,8 @@ func (_m *Chain33Client) GetBlocks(ctx context.Context, in *types.ReqBlocks, opt
return r0, r1 return r0, r1
} }
// GetFatalFailure provides a mock function with given fields: ctx, in, opts // GetFork provides a mock function with given fields: ctx, in, opts
func (_m *Chain33Client) GetFatalFailure(ctx context.Context, in *types.ReqNil, opts ...grpc.CallOption) (*types.Int32, error) { func (_m *Chain33Client) GetFork(ctx context.Context, in *types.ReqKey, opts ...grpc.CallOption) (*types.Int64, error) {
_va := make([]interface{}, len(opts)) _va := make([]interface{}, len(opts))
for _i := range opts { for _i := range opts {
_va[_i] = opts[_i] _va[_i] = opts[_i]
...@@ -533,17 +563,17 @@ func (_m *Chain33Client) GetFatalFailure(ctx context.Context, in *types.ReqNil, ...@@ -533,17 +563,17 @@ func (_m *Chain33Client) GetFatalFailure(ctx context.Context, in *types.ReqNil,
_ca = append(_ca, _va...) _ca = append(_ca, _va...)
ret := _m.Called(_ca...) ret := _m.Called(_ca...)
var r0 *types.Int32 var r0 *types.Int64
if rf, ok := ret.Get(0).(func(context.Context, *types.ReqNil, ...grpc.CallOption) *types.Int32); ok { if rf, ok := ret.Get(0).(func(context.Context, *types.ReqKey, ...grpc.CallOption) *types.Int64); ok {
r0 = rf(ctx, in, opts...) r0 = rf(ctx, in, opts...)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Int32) r0 = ret.Get(0).(*types.Int64)
} }
} }
var r1 error var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *types.ReqNil, ...grpc.CallOption) error); ok { if rf, ok := ret.Get(1).(func(context.Context, *types.ReqKey, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...) r1 = rf(ctx, in, opts...)
} else { } else {
r1 = ret.Error(1) r1 = ret.Error(1)
......
...@@ -122,14 +122,15 @@ service chain33 { ...@@ -122,14 +122,15 @@ service chain33 {
rpc GetFatalFailure(types.ReqNil) returns (Int32) {} rpc GetFatalFailure(types.ReqNil) returns (Int32) {}
rpc GetLastBlockSequence(ReqNil) returns (Int64) {} rpc GetLastBlockSequence(ReqNil) returns (Int64) {}
//获取指定区间的block加载序列号信息
rpc GetBlockSequences(ReqBlocks) returns (BlockSequences) {}
// get add block's sequence by hash // get add block's sequence by hash
rpc GetSequenceByHash(ReqHash) returns (Int64) {} rpc GetSequenceByHash(ReqHash) returns (Int64) {}
//通过block hash 获取对应的blocks信息 //通过block hash 获取对应的blocks信息
rpc GetBlockByHashes(ReqHashes) returns (BlockDetails) {} rpc GetBlockByHashes(ReqHashes) returns (BlockDetails) {}
//通过block seq 获取对应的blocks hash 信息
rpc GetBlockBySeq(Int64) returns (BlockSeq) {}
//关闭chain33 //关闭chain33
rpc CloseQueue(ReqNil) returns (Reply) {} rpc CloseQueue(ReqNil) returns (Reply) {}
...@@ -142,4 +143,7 @@ service chain33 { ...@@ -142,4 +143,7 @@ service chain33 {
// 获取随机HASH // 获取随机HASH
rpc QueryRandNum(ReqRandHash) returns (ReplyHash) {} rpc QueryRandNum(ReqRandHash) returns (ReplyHash) {}
// 获取是否达到fork高度
rpc GetFork(ReqKey) returns (Int64) {}
} }
...@@ -176,6 +176,8 @@ func RunChain33(name string) { ...@@ -176,6 +176,8 @@ func RunChain33(name string) {
health.Start(cfg.Health) health.Start(cfg.Health)
defer func() { defer func() {
//close all module,clean some resource //close all module,clean some resource
log.Info("begin close health module")
health.Close()
log.Info("begin close blockchain module") log.Info("begin close blockchain module")
chain.Close() chain.Close()
log.Info("begin close mempool module") log.Info("begin close mempool module")
...@@ -192,8 +194,6 @@ func RunChain33(name string) { ...@@ -192,8 +194,6 @@ func RunChain33(name string) {
rpcapi.Close() rpcapi.Close()
log.Info("begin close wallet module") log.Info("begin close wallet module")
walletm.Close() walletm.Close()
log.Info("begin close health module")
health.Close()
log.Info("begin close queue module") log.Info("begin close queue module")
q.Close() q.Close()
......
...@@ -98,7 +98,8 @@ func (s *HealthCheckServer) getHealth(sync bool) (bool, error) { ...@@ -98,7 +98,8 @@ func (s *HealthCheckServer) getHealth(sync bool) (bool, error) {
return false, err return false, err
} }
log.Info("healthCheck tick", "peers", len(peerList.Peers), "isSync", reply.IsOk, "sync", sync) log.Info("healthCheck tick", "peers", len(peerList.Peers), "isCaughtUp", reply.IsOk,
"health", len(peerList.Peers) > 1 && reply.IsOk, "listen", sync)
return len(peerList.Peers) > 1 && reply.IsOk, nil return len(peerList.Peers) > 1 && reply.IsOk, nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment