Commit acb6a5c9 authored by vipwzw's avatar vipwzw Committed by 33cn

update chain33 01 11

parent e26a5b79
......@@ -22,8 +22,19 @@ chain33背后故事: [chain33诞生记](https://mp.weixin.qq.com/s/9g5ZFDKJi9uzR
# 感谢
[腾讯玄武实验室](https://github.com/33cn/chain33/issues?utf8=%E2%9C%93&q=label%3A%E8%85%BE%E8%AE%AF%E7%8E%84%E6%AD%A6%E5%AE%9E%E9%AA%8C%E5%AE%A4)
[腾讯安全玄武实验室](https://github.com/33cn/chain33/issues?utf8=%E2%9C%93&q=label%3A%E8%85%BE%E8%AE%AF%E7%8E%84%E6%AD%A6%E5%AE%9E%E9%AA%8C%E5%AE%A4)
# bug 奖励
我们会对bug 评价4个等级(不会奖励人民币,等值虚拟资产)。
只有影响现有在线运行系统的,并且会产生严重分叉等行为的,才会评价为 L3
```
L0 1000
L1 3000
L2 10000
L3 20000
```
## Building from source
......
......@@ -53,7 +53,7 @@ type client struct {
recv chan Message
done chan struct{}
wg *sync.WaitGroup
topic string
topic unsafe.Pointer
isClosed int32
isCloseing int32
}
......@@ -140,13 +140,11 @@ func (client *client) Recv() chan Message {
}
func (client *client) getTopic() string {
address := unsafe.Pointer(&(client.topic))
return *(*string)(atomic.LoadPointer(&address))
return *(*string)(atomic.LoadPointer(&client.topic))
}
func (client *client) setTopic(topic string) {
address := unsafe.Pointer(&(client.topic))
atomic.StorePointer(&address, unsafe.Pointer(&topic))
atomic.StorePointer(&client.topic, unsafe.Pointer(&topic))
}
func (client *client) isClose() bool {
......@@ -159,7 +157,7 @@ func (client *client) isInClose() bool {
// Close 关闭client
func (client *client) Close() {
if atomic.LoadInt32(&client.isClosed) == 1 {
if atomic.LoadInt32(&client.isClosed) == 1 || client.topic == nil {
return
}
topic := client.getTopic()
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package queue
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSetTopic(t *testing.T) {
client := &client{}
hi := "hello"
client.setTopic(hi)
ret := client.getTopic()
assert.Equal(t, hi, ret)
}
......@@ -301,49 +301,3 @@ func (c *channelClient) GetExecBalance(in *types.ReqGetExecBalance) (*types.Repl
}
return resp, nil
}
// GetAssetBalance 通用的获得资产的接口
func (c *channelClient) GetAssetBalance(in *types.ReqBalance) ([]*types.Account, error) {
if in.AssetSymbol == "" || in.AssetExec == "" {
return nil, types.ErrInvalidParam
}
acc, err := account.NewAccountDB(in.AssetExec, in.AssetSymbol, nil)
if err != nil {
return nil, err
}
// load balance
if in.AssetExec == in.Execer || in.Execer == "" {
addrs := in.GetAddresses()
var queryAddrs []string
for _, addr := range addrs {
if err := address.CheckAddress(addr); err != nil {
addr = string(acc.AccountKey(addr))
}
queryAddrs = append(queryAddrs, addr)
}
accounts, err := acc.LoadAccounts(c.QueueProtocolAPI, queryAddrs)
if err != nil {
log.Error("GetAssetBalance", "err", err.Error(), "exec", in.AssetExec, "symbol", in.AssetSymbol,
"address", queryAddrs)
return nil, err
}
return accounts, nil
}
// load exec balance
execaddress := address.ExecAddress(in.GetExecer())
addrs := in.GetAddresses()
var accounts []*types.Account
for _, addr := range addrs {
acc, err := acc.LoadExecAccountQueue(c.QueueProtocolAPI, addr, execaddress)
if err != nil {
log.Error("GetAssetBalance for exector", "err", err.Error(), "exec", in.AssetExec,
"symbol", in.AssetSymbol, "address", addr, "where", in.Execer)
continue
}
accounts = append(accounts, acc)
}
return accounts, nil
}
......@@ -361,66 +361,3 @@ func TestChannelClient_GetBalance(t *testing.T) {
// assert.NotNil(t, data)
// assert.Nil(t, err)
// }
func testChannelClient_GetAssetBalanceCoin(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
db := new(account.DB)
client := &channelClient{
QueueProtocolAPI: api,
accountdb: db,
}
head := &types.Header{StateHash: []byte("sdfadasds")}
api.On("GetLastHeader").Return(head, nil)
var acc = &types.Account{Addr: "1Jn2qu84Z1SUUosWjySggBS9pKWdAP3tZt", Balance: 100}
accv := types.Encode(acc)
storevalue := &types.StoreReplyValue{}
storevalue.Values = append(storevalue.Values, accv)
api.On("StoreGet", mock.Anything).Return(storevalue, nil)
var addrs = []string{"1Jn2qu84Z1SUUosWjySggBS9pKWdAP3tZt"}
var in = &types.ReqBalance{
AssetSymbol: "bty",
AssetExec: "coins",
Execer: "coins",
Addresses: addrs,
}
data, err := client.GetAssetBalance(in)
assert.Nil(t, err)
assert.Equal(t, acc.Addr, data[0].Addr)
}
func testChannelClient_GetAssetBalanceOther(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
db := new(account.DB)
client := &channelClient{
QueueProtocolAPI: api,
accountdb: db,
}
head := &types.Header{StateHash: []byte("sdfadasds")}
api.On("GetLastHeader").Return(head, nil)
var acc = &types.Account{Addr: "1Jn2qu84Z1SUUosWjySggBS9pKWdAP3tZt", Balance: 100}
accv := types.Encode(acc)
storevalue := &types.StoreReplyValue{}
storevalue.Values = append(storevalue.Values, accv)
api.On("StoreGet", mock.Anything).Return(storevalue, nil)
var addrs = []string{"1Jn2qu84Z1SUUosWjySggBS9pKWdAP3tZt"}
var in = &types.ReqBalance{
AssetSymbol: "bty",
AssetExec: "coins",
Execer: types.ExecName("ticket"),
Addresses: addrs,
}
data, err := client.GetAssetBalance(in)
assert.Nil(t, err)
assert.Equal(t, acc.Addr, data[0].Addr)
}
func TestChannelClient_GetAssetBalance(t *testing.T) {
testChannelClient_GetAssetBalanceCoin(t)
testChannelClient_GetAssetBalanceOther(t)
}
......@@ -144,4 +144,13 @@ func TestBaseStore_Queue(t *testing.T) {
assert.NotNil(t, resp)
assert.Equal(t, int64(types.EventGetTotalCoinsReply), resp.Ty)
del := &types.StoreDel{StateHash: EmptyRoot[:], Height: 0}
msg = queueClinet.NewMessage("store", types.EventStoreDel, del)
err = queueClinet.Send(msg, true)
assert.Nil(t, err)
resp, err = queueClinet.Wait(msg)
assert.Nil(t, err)
assert.NotNil(t, resp)
assert.Equal(t, int64(types.EventStoreDel), resp.Ty)
}
......@@ -7,6 +7,9 @@ package mavl
import (
"bytes"
"fmt"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/types"
"github.com/golang/protobuf/proto"
)
......@@ -115,6 +118,26 @@ func (node *Node) get(t *Tree, key []byte) (index int32, value []byte, exists bo
return index, value, exists
}
func (node *Node) getHash(t *Tree, key []byte) (index int32, hash []byte, exists bool) {
if node.height == 0 {
cmp := bytes.Compare(node.key, key)
if cmp == 0 {
return 0, node.hash, true
} else if cmp == -1 {
return 1, nil, false
} else {
return 0, nil, false
}
}
if bytes.Compare(key, node.key) < 0 {
return node.getLeftNode(t).getHash(t, key)
}
rightNode := node.getRightNode(t)
index, hash, exists = rightNode.getHash(t, key)
index += node.size - rightNode.size
return index, hash, exists
}
//通过index获取leaf节点信息
func (node *Node) getByIndex(t *Tree, index int32) (key []byte, value []byte) {
if node.height == 0 {
......@@ -223,6 +246,21 @@ func (node *Node) save(t *Tree) int64 {
return leftsaveNodeNo + rightsaveNodeNo + 1
}
// 保存root节点hash以及区块高度
func (node *Node) saveRootHash(t *Tree) (err error) {
if node.hash == nil || t.ndb == nil || t.ndb.db == nil {
return
}
h := &types.Int64{}
h.Data = t.blockHeight
value, err := proto.Marshal(h)
if err != nil {
return err
}
t.ndb.batch.Set(genRootHashHeight(t.blockHeight, node.hash), value)
return nil
}
//将内存中的node转换成存储到db中的格式
func (node *Node) storeNode(t *Tree) []byte {
var storeNode types.StoreNode
......@@ -309,7 +347,7 @@ func (node *Node) getLeftNode(t *Tree) *Node {
}
leftNode, err := t.ndb.GetNode(t, node.leftHash)
if err != nil {
panic(err) //数据库已经损坏
panic(fmt.Sprintln("left hash", common.ToHex(node.leftHash), err)) //数据库已经损坏
}
return leftNode
}
......@@ -320,7 +358,7 @@ func (node *Node) getRightNode(t *Tree) *Node {
}
rightNode, err := t.ndb.GetNode(t, node.rightHash)
if err != nil {
panic(err)
panic(fmt.Sprintln("right hash", common.ToHex(node.rightHash), err))
}
return rightNode
}
......
......@@ -25,7 +25,6 @@ const (
leafKeyCountPrefix = "..mk.."
oldLeafKeyCountPrefix = "..mok.."
secLvlPruningHeightKey = "_..mslphk.._"
delMapPoolPrefix = "_..md.._"
blockHeightStrLen = 10
hashLenStr = 3
pruningStateStart = 1
......@@ -175,9 +174,12 @@ func setSecLvlPruningHeight(db dbm.DB, height int64) error {
return db.Set([]byte(secLvlPruningHeightKey), value)
}
func pruning(db dbm.DB, curHeight int64) {
defer wg.Done()
pruningTree(db, curHeight)
}
func pruningTree(db dbm.DB, curHeight int64) {
wg.Add(1)
defer wg.Add(-1)
setPruning(pruningStateStart)
// 一级遍历
pruningFirstLevel(db, curHeight)
......@@ -423,20 +425,140 @@ func PruningTreePrintDB(db dbm.DB, prefix []byte) {
treelog.Debug("pruningTree:", "key:", string(it.Key()))
} else if bytes.Equal(prefix, []byte(leafNodePrefix)) {
treelog.Debug("pruningTree:", "key:", string(it.Key()))
} else if bytes.Equal(prefix, []byte(delMapPoolPrefix)) {
value := it.Value()
var pData types.StoreValuePool
}
count++
}
fmt.Printf("prefix %s All count:%d \n", string(prefix), count)
treelog.Info("pruningTree:", "prefix:", string(prefix), "All count", count)
}
// PrintSameLeafKey 查询相同的叶子节点
func PrintSameLeafKey(db dbm.DB, key string) {
printSameLeafKeyDB(db, key, false)
printSameLeafKeyDB(db, key, true)
}
func printSameLeafKeyDB(db dbm.DB, key string, isold bool) {
var prifex string
if isold {
prifex = oldLeafKeyCountPrefix
} else {
prifex = leafKeyCountPrefix
}
priKey := []byte(fmt.Sprintf("%s%s", prifex, key))
it := db.Iterator(priKey, nil, true)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
hashK := make([]byte, len(it.Key()))
copy(hashK, it.Key())
key, height, hash, err := getKeyHeightFromLeafCountKey(hashK)
if err == nil {
pri := ""
if len(hash) > 32 {
pri = string(hash[:16])
}
treelog.Info("leaf node", "height", height, "pri", pri, "hash", common.Bytes2Hex(hash), "key", string(key))
}
}
}
// PrintLeafNodeParent 查询叶子节点的父节点
func PrintLeafNodeParent(db dbm.DB, key, hash []byte, height int64) {
var isHave bool
leafCountKey := genLeafCountKey(key, hash, height, len(hash))
value, err := db.Get(leafCountKey)
if err == nil {
var pData types.PruneData
err := proto.Unmarshal(value, &pData)
if err == nil {
for _, k := range pData.Values {
treelog.Debug("delMapPool value ", "hash:", common.Bytes2Hex(k[:2]))
for _, hash := range pData.Hashs {
var pri string
if len(hash) > 32 {
pri = string(hash[:16])
}
treelog.Info("hash node", "hash pri", pri, "hash", common.Bytes2Hex(hash))
}
}
count++
isHave = true
}
if !isHave {
oldLeafCountKey := genOldLeafCountKey(key, hash, height, len(hash))
value, err = db.Get(oldLeafCountKey)
if err == nil {
var pData types.PruneData
err := proto.Unmarshal(value, &pData)
if err == nil {
for _, hash := range pData.Hashs {
var pri string
if len(hash) > 32 {
pri = string(hash[:16])
}
treelog.Info("hash node", "hash pri", pri, "hash", common.Bytes2Hex(hash))
}
}
isHave = true
}
}
if !isHave {
treelog.Info("err", "get db", "not exist in db")
}
}
// PrintNodeDb 查询hash节点以及其子节点
func PrintNodeDb(db dbm.DB, hash []byte) {
nDb := newNodeDB(db, true)
node, err := nDb.GetNode(nil, hash)
if err != nil {
fmt.Println("err", err)
return
}
pri := ""
if len(node.hash) > 32 {
pri = string(node.hash[:16])
}
treelog.Info("hash node", "hash pri", pri, "hash", common.Bytes2Hex(node.hash), "height", node.height)
node.printNodeInfo(nDb)
}
func (node *Node) printNodeInfo(db *nodeDB) {
if node.height == 0 {
pri := ""
if len(node.hash) > 32 {
pri = string(node.hash[:16])
}
treelog.Info("leaf node", "hash pri", pri, "hash", common.Bytes2Hex(node.hash), "key", string(node.key))
return
}
if node.leftHash != nil {
left, err := db.GetNode(nil, node.leftHash)
if err != nil {
return
}
pri := ""
if len(left.hash) > 32 {
pri = string(left.hash[:16])
}
treelog.Debug("hash node", "hash pri", pri, "hash", common.Bytes2Hex(left.hash), "height", left.height)
left.printNodeInfo(db)
}
if node.rightHash != nil {
right, err := db.GetNode(nil, node.rightHash)
if err != nil {
return
}
pri := ""
if len(right.hash) > 32 {
pri = string(right.hash[:16])
}
treelog.Debug("hash node", "hash pri", pri, "hash", common.Bytes2Hex(right.hash), "height", right.height)
right.printNodeInfo(db)
}
fmt.Printf("prefix %s All count:%d \n", string(prefix), count)
treelog.Info("pruningTree:", "prefix:", string(prefix), "All count", count)
}
// PruningTree 裁剪树
......
......@@ -11,6 +11,7 @@ import (
"fmt"
"sync"
"github.com/33cn/chain33/common"
dbm "github.com/33cn/chain33/common/db"
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/types"
......@@ -21,6 +22,8 @@ import (
const (
hashNodePrefix = "_mh_"
leafNodePrefix = "_mb_"
curMaxBlockHeight = "_..mcmbh.._"
rootHashHeightPrefix = "_mrhp_"
)
var (
......@@ -31,6 +34,9 @@ var (
enableMavlPrefix bool
// 是否开启MVCC
enableMvcc bool
// 当前树的最大高度
maxBlockHeight int64
heightMtx sync.Mutex
)
// EnableMavlPrefix 使能mavl加前缀
......@@ -132,8 +138,16 @@ func (t *Tree) Save() []byte {
return nil
}
if t.ndb != nil {
if t.isRemoveLeafCountKey() {
//DelLeafCountKV 需要先提前将leafcoutkey删除,这里需先于t.ndb.Commit()
DelLeafCountKV(t.ndb.db, t.blockHeight)
}
saveNodeNo := t.root.save(t)
treelog.Debug("Tree.Save", "saveNodeNo", saveNodeNo, "tree height", t.blockHeight)
// 保存每个高度的roothash
if enablePrune {
t.root.saveRootHash(t)
}
beg := types.Now()
err := t.ndb.Commit()
treelog.Info("tree.commit", "cost", types.Since(beg))
......@@ -142,9 +156,11 @@ func (t *Tree) Save() []byte {
}
// 该线程应只允许一个
if enablePrune && !isPruning() &&
pruneHeight != 0 &&
t.blockHeight%int64(pruneHeight) == 0 &&
t.blockHeight/int64(pruneHeight) > 1 {
go pruningTree(t.ndb.db, t.blockHeight)
wg.Add(1)
go pruning(t.ndb.db, t.blockHeight)
}
}
return t.root.hash
......@@ -175,6 +191,14 @@ func (t *Tree) Get(key []byte) (index int32, value []byte, exists bool) {
return t.root.get(t, key)
}
// GetHash 通过key获取leaf节点hash信息
func (t *Tree) GetHash(key []byte) (index int32, hash []byte, exists bool) {
if t.root == nil {
return 0, nil, false
}
return t.root.getHash(t, key)
}
// GetByIndex 通过index获取leaf节点信息
func (t *Tree) GetByIndex(index int32) (key []byte, value []byte) {
if t.root == nil {
......@@ -220,6 +244,85 @@ func (t *Tree) Remove(key []byte) (value []byte, removed bool) {
return value, true
}
func (t *Tree) getMaxBlockHeight() int64 {
if t.ndb == nil || t.ndb.db == nil {
return 0
}
value, err := t.ndb.db.Get([]byte(curMaxBlockHeight))
if len(value) == 0 || err != nil {
return 0
}
h := &types.Int64{}
err = proto.Unmarshal(value, h)
if err != nil {
return 0
}
return h.Data
}
func (t *Tree) setMaxBlockHeight(height int64) error {
if t.ndb == nil || t.ndb.batch == nil {
return fmt.Errorf("ndb is nil")
}
h := &types.Int64{}
h.Data = height
value, err := proto.Marshal(h)
if err != nil {
return err
}
t.ndb.batch.Set([]byte(curMaxBlockHeight), value)
return nil
}
func (t *Tree) isRemoveLeafCountKey() bool {
if t.ndb == nil || t.ndb.db == nil {
return false
}
heightMtx.Lock()
defer heightMtx.Unlock()
if maxBlockHeight == 0 {
maxBlockHeight = t.getMaxBlockHeight()
}
if t.blockHeight > maxBlockHeight {
t.setMaxBlockHeight(t.blockHeight)
maxBlockHeight = t.blockHeight
return false
}
return true
}
// RemoveLeafCountKey 删除叶子节点的索引节点(防止裁剪时候回退产生的误删除)
func (t *Tree) RemoveLeafCountKey(height int64) {
if t.root == nil || t.ndb == nil {
return
}
prefix := genPrefixHashKey(&Node{}, height)
it := t.ndb.db.Iterator(prefix, nil, true)
defer it.Close()
var keys [][]byte
for it.Rewind(); it.Valid(); it.Next() {
value := make([]byte, len(it.Value()))
copy(value, it.Value())
pData := &types.StoreNode{}
err := proto.Unmarshal(value, pData)
if err == nil {
keys = append(keys, pData.Key)
}
}
batch := t.ndb.db.NewBatch(true)
for _, k := range keys {
_, hash, exits := t.GetHash(k)
if exits {
batch.Delete(genLeafCountKey(k, hash, height, len(hash)))
treelog.Debug("RemoveLeafCountKey:", "height", height, "key:", string(k), "hash:", common.ToHex(hash))
}
}
batch.Write()
}
// Iterate 依次迭代遍历树的所有键
func (t *Tree) Iterate(fn func(key []byte, value []byte) bool) (stopped bool) {
if t.root == nil {
......@@ -465,6 +568,26 @@ func DelKVPair(db dbm.DB, storeDel *types.StoreGet) ([]byte, [][]byte, error) {
return tree.Save(), values, nil
}
// DelLeafCountKV 回退时候用于删除叶子节点的索引节点
func DelLeafCountKV(db dbm.DB, blockHeight int64) error {
treelog.Debug("RemoveLeafCountKey:", "height", blockHeight)
prefix := genRootHashPrefix(blockHeight)
it := db.Iterator(prefix, nil, true)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
hash, err := getRootHash(it.Key())
if err == nil {
tree := NewTree(db, true)
err := tree.Load(hash)
if err == nil {
treelog.Debug("RemoveLeafCountKey:", "height", blockHeight, "root hash:", common.ToHex(hash))
tree.RemoveLeafCountKey(blockHeight)
}
}
}
return nil
}
// VerifyKVPairProof 验证KVPair 的证明
func VerifyKVPairProof(db dbm.DB, roothash []byte, keyvalue types.KeyValue, proof []byte) bool {
......@@ -516,3 +639,24 @@ func genPrefixHashKey(node *Node, blockHeight int64) (key []byte) {
}
return key
}
func genRootHashPrefix(blockHeight int64) (key []byte) {
key = []byte(fmt.Sprintf("%s%010d", rootHashHeightPrefix, blockHeight))
return key
}
func genRootHashHeight(blockHeight int64, hash []byte) (key []byte) {
key = []byte(fmt.Sprintf("%s%010d%s", rootHashHeightPrefix, blockHeight, string(hash)))
return key
}
func getRootHash(hashKey []byte) (hash []byte, err error) {
if len(hashKey) < len(rootHashHeightPrefix)+blockHeightStrLen+len(common.Hash{}) {
return nil, types.ErrSize
}
if !bytes.Contains(hashKey, []byte(rootHashHeightPrefix)) {
return nil, types.ErrSize
}
hash = hashKey[len(hashKey)-len(common.Hash{}):]
return hash, nil
}
......@@ -176,7 +176,6 @@ func TestKvdbIterate(t *testing.T) {
assert.Len(t, checkKVResult, 2)
assert.Equal(t, []byte("v1"), checkKVResult[0].Value)
assert.Equal(t, []byte("v2"), checkKVResult[1].Value)
}
type StatTool struct {
......
......@@ -176,6 +176,8 @@ func RunChain33(name string) {
health.Start(cfg.Health)
defer func() {
//close all module,clean some resource
log.Info("begin close health module")
health.Close()
log.Info("begin close blockchain module")
chain.Close()
log.Info("begin close mempool module")
......@@ -192,8 +194,6 @@ func RunChain33(name string) {
rpcapi.Close()
log.Info("begin close wallet module")
walletm.Close()
log.Info("begin close health module")
health.Close()
log.Info("begin close queue module")
q.Close()
......
......@@ -98,7 +98,8 @@ func (s *HealthCheckServer) getHealth(sync bool) (bool, error) {
return false, err
}
log.Info("healthCheck tick", "peers", len(peerList.Peers), "isSync", reply.IsOk, "sync", sync)
log.Info("healthCheck tick", "peers", len(peerList.Peers), "isCaughtUp", reply.IsOk,
"health", len(peerList.Peers) > 1 && reply.IsOk, "listen", sync)
return len(peerList.Peers) > 1 && reply.IsOk, nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment