Commit b9e80d54 authored by vipwzw's avatar vipwzw

update chain33

parent 6d72a861
...@@ -2,4 +2,6 @@ ...@@ -2,4 +2,6 @@
while :; do while :; do
./chain33-cli net time ./chain33-cli net time
#nc -vz localhost 8805
sleep 1
done done
...@@ -190,3 +190,8 @@ superManager=[ ...@@ -190,3 +190,8 @@ superManager=[
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv", "12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv",
"1Q8hGLfoGe63efeWa8fJ4Pnukhkngt6poK" "1Q8hGLfoGe63efeWa8fJ4Pnukhkngt6poK"
] ]
[health]
listenAddr="localhost:8805"
checkInterval=1
unSyncMaxTimes=2
\ No newline at end of file
...@@ -90,6 +90,7 @@ func (store *BaseStore) Wait() {} ...@@ -90,6 +90,7 @@ func (store *BaseStore) Wait() {}
func (store *BaseStore) processMessage(msg queue.Message) { func (store *BaseStore) processMessage(msg queue.Message) {
client := store.qclient client := store.qclient
if msg.Ty == types.EventStoreSet { if msg.Ty == types.EventStoreSet {
go func() {
datas := msg.GetData().(*types.StoreSetWithSync) datas := msg.GetData().(*types.StoreSetWithSync)
hash, err := store.child.Set(datas.Storeset, datas.Sync) hash, err := store.child.Set(datas.Storeset, datas.Sync)
if err != nil { if err != nil {
...@@ -97,20 +98,28 @@ func (store *BaseStore) processMessage(msg queue.Message) { ...@@ -97,20 +98,28 @@ func (store *BaseStore) processMessage(msg queue.Message) {
return return
} }
msg.Reply(client.NewMessage("", types.EventStoreSetReply, &types.ReplyHash{Hash: hash})) msg.Reply(client.NewMessage("", types.EventStoreSetReply, &types.ReplyHash{Hash: hash}))
}()
} else if msg.Ty == types.EventStoreGet { } else if msg.Ty == types.EventStoreGet {
go func() {
datas := msg.GetData().(*types.StoreGet) datas := msg.GetData().(*types.StoreGet)
values := store.child.Get(datas) values := store.child.Get(datas)
msg.Reply(client.NewMessage("", types.EventStoreGetReply, &types.StoreReplyValue{Values: values})) msg.Reply(client.NewMessage("", types.EventStoreGetReply, &types.StoreReplyValue{Values: values}))
}()
} else if msg.Ty == types.EventStoreMemSet { //只是在内存中set 一下,并不改变状态 } else if msg.Ty == types.EventStoreMemSet { //只是在内存中set 一下,并不改变状态
go func() {
datas := msg.GetData().(*types.StoreSetWithSync) datas := msg.GetData().(*types.StoreSetWithSync)
hash, err := store.child.MemSet(datas.Storeset, datas.Sync) hash, err := store.child.MemSet(datas.Storeset, datas.Sync)
println("EventStoreMemSet", string(hash))
if err != nil { if err != nil {
msg.Reply(client.NewMessage("", types.EventStoreSetReply, err)) msg.Reply(client.NewMessage("", types.EventStoreSetReply, err))
return return
} }
msg.Reply(client.NewMessage("", types.EventStoreSetReply, &types.ReplyHash{Hash: hash})) msg.Reply(client.NewMessage("", types.EventStoreSetReply, &types.ReplyHash{Hash: hash}))
}()
} else if msg.Ty == types.EventStoreCommit { //把内存中set 的交易 commit } else if msg.Ty == types.EventStoreCommit { //把内存中set 的交易 commit
go func() {
req := msg.GetData().(*types.ReqHash) req := msg.GetData().(*types.ReqHash)
println("EventStoreCommit", string(req.Hash))
hash, err := store.child.Commit(req) hash, err := store.child.Commit(req)
if hash == nil { if hash == nil {
msg.Reply(client.NewMessage("", types.EventStoreCommit, types.ErrHashNotFound)) msg.Reply(client.NewMessage("", types.EventStoreCommit, types.ErrHashNotFound))
...@@ -120,7 +129,9 @@ func (store *BaseStore) processMessage(msg queue.Message) { ...@@ -120,7 +129,9 @@ func (store *BaseStore) processMessage(msg queue.Message) {
} else { } else {
msg.Reply(client.NewMessage("", types.EventStoreCommit, &types.ReplyHash{Hash: hash})) msg.Reply(client.NewMessage("", types.EventStoreCommit, &types.ReplyHash{Hash: hash}))
} }
}()
} else if msg.Ty == types.EventStoreRollback { } else if msg.Ty == types.EventStoreRollback {
go func() {
req := msg.GetData().(*types.ReqHash) req := msg.GetData().(*types.ReqHash)
hash, err := store.child.Rollback(req) hash, err := store.child.Rollback(req)
if err != nil { if err != nil {
...@@ -128,13 +139,17 @@ func (store *BaseStore) processMessage(msg queue.Message) { ...@@ -128,13 +139,17 @@ func (store *BaseStore) processMessage(msg queue.Message) {
} else { } else {
msg.Reply(client.NewMessage("", types.EventStoreRollback, &types.ReplyHash{Hash: hash})) msg.Reply(client.NewMessage("", types.EventStoreRollback, &types.ReplyHash{Hash: hash}))
} }
}()
} else if msg.Ty == types.EventStoreGetTotalCoins { } else if msg.Ty == types.EventStoreGetTotalCoins {
go func() {
req := msg.GetData().(*types.IterateRangeByStateHash) req := msg.GetData().(*types.IterateRangeByStateHash)
resp := &types.ReplyGetTotalCoins{} resp := &types.ReplyGetTotalCoins{}
resp.Count = req.Count resp.Count = req.Count
store.child.IterateRangeByStateHash(req.StateHash, req.Start, req.End, true, resp.IterateRangeByStateHash) store.child.IterateRangeByStateHash(req.StateHash, req.Start, req.End, true, resp.IterateRangeByStateHash)
msg.Reply(client.NewMessage("", types.EventGetTotalCoinsReply, resp)) msg.Reply(client.NewMessage("", types.EventGetTotalCoinsReply, resp))
}()
} else if msg.Ty == types.EventStoreDel { } else if msg.Ty == types.EventStoreDel {
go func() {
req := msg.GetData().(*types.StoreDel) req := msg.GetData().(*types.StoreDel)
hash, err := store.child.Del(req) hash, err := store.child.Del(req)
if err != nil { if err != nil {
...@@ -142,12 +157,15 @@ func (store *BaseStore) processMessage(msg queue.Message) { ...@@ -142,12 +157,15 @@ func (store *BaseStore) processMessage(msg queue.Message) {
} else { } else {
msg.Reply(client.NewMessage("", types.EventStoreDel, &types.ReplyHash{Hash: hash})) msg.Reply(client.NewMessage("", types.EventStoreDel, &types.ReplyHash{Hash: hash}))
} }
}()
} else if msg.Ty == types.EventStoreList { } else if msg.Ty == types.EventStoreList {
go func() {
req := msg.GetData().(*types.StoreList) req := msg.GetData().(*types.StoreList)
query := NewStoreListQuery(store.child, req) query := NewStoreListQuery(store.child, req)
msg.Reply(client.NewMessage("", types.EventStoreListReply, query.Run())) msg.Reply(client.NewMessage("", types.EventStoreListReply, query.Run()))
}()
} else { } else {
store.child.ProcEvent(msg) go store.child.ProcEvent(msg)
} }
} }
......
...@@ -47,7 +47,6 @@ func EnableMVCC(enable bool) { ...@@ -47,7 +47,6 @@ func EnableMVCC(enable bool) {
type Tree struct { type Tree struct {
root *Node root *Node
ndb *nodeDB ndb *nodeDB
//batch *nodeBatch
blockHeight int64 blockHeight int64
} }
...@@ -135,7 +134,9 @@ func (t *Tree) Save() []byte { ...@@ -135,7 +134,9 @@ func (t *Tree) Save() []byte {
if t.ndb != nil { if t.ndb != nil {
saveNodeNo := t.root.save(t) saveNodeNo := t.root.save(t)
treelog.Debug("Tree.Save", "saveNodeNo", saveNodeNo, "tree height", t.blockHeight) treelog.Debug("Tree.Save", "saveNodeNo", saveNodeNo, "tree height", t.blockHeight)
beg := types.Now()
err := t.ndb.Commit() err := t.ndb.Commit()
treelog.Info("tree.commit", "cost", types.Since(beg))
if err != nil { if err != nil {
return nil return nil
} }
...@@ -268,10 +269,6 @@ type nodeDB struct { ...@@ -268,10 +269,6 @@ type nodeDB struct {
orphans map[string]struct{} orphans map[string]struct{}
} }
type nodeBatch struct {
batch dbm.Batch
}
func newNodeDB(db dbm.DB, sync bool) *nodeDB { func newNodeDB(db dbm.DB, sync bool) *nodeDB {
ndb := &nodeDB{ ndb := &nodeDB{
cache: db.GetCache(), cache: db.GetCache(),
...@@ -312,11 +309,6 @@ func (ndb *nodeDB) GetNode(t *Tree, hash []byte) (*Node, error) { ...@@ -312,11 +309,6 @@ func (ndb *nodeDB) GetNode(t *Tree, hash []byte) (*Node, error) {
return node, nil return node, nil
} }
// GetBatch get db batch handle
func (ndb *nodeDB) GetBatch(sync bool) *nodeBatch {
return &nodeBatch{ndb.db.NewBatch(sync)}
}
// 获取叶子节点的所有父节点 // 获取叶子节点的所有父节点
func getHashNode(node *Node) (hashs [][]byte) { func getHashNode(node *Node) (hashs [][]byte) {
for { for {
......
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
package mavl package mavl
import ( import (
"sync"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
clog "github.com/33cn/chain33/common/log" clog "github.com/33cn/chain33/common/log"
log "github.com/33cn/chain33/common/log/log15" log "github.com/33cn/chain33/common/log/log15"
...@@ -13,7 +15,6 @@ import ( ...@@ -13,7 +15,6 @@ import (
drivers "github.com/33cn/chain33/system/store" drivers "github.com/33cn/chain33/system/store"
mavl "github.com/33cn/chain33/system/store/mavl/db" mavl "github.com/33cn/chain33/system/store/mavl/db"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
lru "github.com/hashicorp/golang-lru"
) )
var mlog = log.New("module", "mavl") var mlog = log.New("module", "mavl")
...@@ -31,8 +32,7 @@ func DisableLog() { ...@@ -31,8 +32,7 @@ func DisableLog() {
// Store mavl store struct // Store mavl store struct
type Store struct { type Store struct {
*drivers.BaseStore *drivers.BaseStore
trees map[string]*mavl.Tree trees *sync.Map
cache *lru.Cache
enableMavlPrefix bool enableMavlPrefix bool
enableMVCC bool enableMVCC bool
enableMavlPrune bool enableMavlPrune bool
...@@ -57,10 +57,7 @@ func New(cfg *types.Store, sub []byte) queue.Module { ...@@ -57,10 +57,7 @@ func New(cfg *types.Store, sub []byte) queue.Module {
if sub != nil { if sub != nil {
types.MustDecode(sub, &subcfg) types.MustDecode(sub, &subcfg)
} }
mavls := &Store{bs, make(map[string]*mavl.Tree), nil, subcfg.EnableMavlPrefix, subcfg.EnableMVCC, subcfg.EnableMavlPrune, subcfg.PruneHeight} mavls := &Store{bs, &sync.Map{}, subcfg.EnableMavlPrefix, subcfg.EnableMVCC, subcfg.EnableMavlPrune, subcfg.PruneHeight}
mavls.cache, _ = lru.New(10)
//使能前缀mavl以及MVCC
mavls.enableMavlPrefix = subcfg.EnableMavlPrefix mavls.enableMavlPrefix = subcfg.EnableMavlPrefix
mavls.enableMVCC = subcfg.EnableMVCC mavls.enableMVCC = subcfg.EnableMVCC
mavls.enableMavlPrune = subcfg.EnableMavlPrune mavls.enableMavlPrune = subcfg.EnableMavlPrune
...@@ -91,18 +88,13 @@ func (mavls *Store) Get(datas *types.StoreGet) [][]byte { ...@@ -91,18 +88,13 @@ func (mavls *Store) Get(datas *types.StoreGet) [][]byte {
var err error var err error
values := make([][]byte, len(datas.Keys)) values := make([][]byte, len(datas.Keys))
search := string(datas.StateHash) search := string(datas.StateHash)
if data, ok := mavls.cache.Get(search); ok { if data, ok := mavls.trees.Load(search); ok {
tree = data.(*mavl.Tree) tree = data.(*mavl.Tree)
} else if data, ok := mavls.trees[search]; ok {
tree = data
} else { } else {
tree = mavl.NewTree(mavls.GetDB(), true) tree = mavl.NewTree(mavls.GetDB(), true)
//get接口也应该传入高度 //get接口也应该传入高度
//tree.SetBlockHeight(datas.Height) //tree.SetBlockHeight(datas.Height)
err = tree.Load(datas.StateHash) err = tree.Load(datas.StateHash)
if err == nil {
mavls.cache.Add(search, tree)
}
mlog.Debug("store mavl get tree", "err", err, "StateHash", common.ToHex(datas.StateHash)) mlog.Debug("store mavl get tree", "err", err, "StateHash", common.ToHex(datas.StateHash))
} }
if err == nil { if err == nil {
...@@ -118,9 +110,13 @@ func (mavls *Store) Get(datas *types.StoreGet) [][]byte { ...@@ -118,9 +110,13 @@ func (mavls *Store) Get(datas *types.StoreGet) [][]byte {
// MemSet set keys values to memcory mavl, return root hash and error // MemSet set keys values to memcory mavl, return root hash and error
func (mavls *Store) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) { func (mavls *Store) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
beg := types.Now()
defer func() {
mlog.Info("MemSet", "cost", types.Since(beg))
}()
if len(datas.KV) == 0 { if len(datas.KV) == 0 {
mlog.Info("store mavl memset,use preStateHash as stateHash for kvset is null") mlog.Info("store mavl memset,use preStateHash as stateHash for kvset is null")
mavls.trees[string(datas.StateHash)] = nil mavls.trees.Store(string(datas.StateHash), nil)
return datas.StateHash, nil return datas.StateHash, nil
} }
tree := mavl.NewTree(mavls.GetDB(), sync) tree := mavl.NewTree(mavls.GetDB(), sync)
...@@ -133,44 +129,47 @@ func (mavls *Store) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) { ...@@ -133,44 +129,47 @@ func (mavls *Store) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
tree.Set(datas.KV[i].Key, datas.KV[i].Value) tree.Set(datas.KV[i].Key, datas.KV[i].Value)
} }
hash := tree.Hash() hash := tree.Hash()
mavls.trees[string(hash)] = tree mavls.trees.Store(string(hash), tree)
if len(mavls.trees) > 1000 {
mlog.Error("too many trees in cache")
}
return hash, nil return hash, nil
} }
// Commit convert memcory mavl to storage db // Commit convert memcory mavl to storage db
func (mavls *Store) Commit(req *types.ReqHash) ([]byte, error) { func (mavls *Store) Commit(req *types.ReqHash) ([]byte, error) {
tree, ok := mavls.trees[string(req.Hash)] beg := types.Now()
defer func() {
mlog.Info("Commit", "cost", types.Since(beg))
}()
tree, ok := mavls.trees.Load(string(req.Hash))
if !ok { if !ok {
mlog.Error("store mavl commit", "err", types.ErrHashNotFound) mlog.Error("store mavl commit", "err", types.ErrHashNotFound)
return nil, types.ErrHashNotFound return nil, types.ErrHashNotFound
} }
if tree == nil { if tree == nil {
mlog.Info("store mavl commit,do nothing for kvset is null") mlog.Info("store mavl commit,do nothing for kvset is null")
delete(mavls.trees, string(req.Hash)) mavls.trees.Delete(string(req.Hash))
return req.Hash, nil return req.Hash, nil
} }
hash := tree.(*mavl.Tree).Save()
hash := tree.Save()
if hash == nil { if hash == nil {
mlog.Error("store mavl commit", "err", types.ErrHashNotFound) mlog.Error("store mavl commit", "err", types.ErrHashNotFound)
return nil, types.ErrDataBaseDamage return nil, types.ErrDataBaseDamage
} }
delete(mavls.trees, string(req.Hash)) mavls.trees.Delete(string(req.Hash))
return req.Hash, nil return req.Hash, nil
} }
// Rollback 回退将缓存的mavl树删除掉 // Rollback 回退将缓存的mavl树删除掉
func (mavls *Store) Rollback(req *types.ReqHash) ([]byte, error) { func (mavls *Store) Rollback(req *types.ReqHash) ([]byte, error) {
_, ok := mavls.trees[string(req.Hash)] beg := types.Now()
defer func() {
mlog.Info("Rollback", "cost", types.Since(beg))
}()
_, ok := mavls.trees.Load(string(req.Hash))
if !ok { if !ok {
mlog.Error("store mavl rollback", "err", types.ErrHashNotFound) mlog.Error("store mavl rollback", "err", types.ErrHashNotFound)
return nil, types.ErrHashNotFound return nil, types.ErrHashNotFound
} }
delete(mavls.trees, string(req.Hash)) mavls.trees.Delete(string(req.Hash))
return req.Hash, nil return req.Hash, nil
} }
......
...@@ -21,6 +21,7 @@ type Config struct { ...@@ -21,6 +21,7 @@ type Config struct {
FixTime bool `protobuf:"varint,13,opt,name=fixTime" json:"fixTime,omitempty"` FixTime bool `protobuf:"varint,13,opt,name=fixTime" json:"fixTime,omitempty"`
Pprof *Pprof `protobuf:"bytes,14,opt,name=pprof" json:"pprof,omitempty"` Pprof *Pprof `protobuf:"bytes,14,opt,name=pprof" json:"pprof,omitempty"`
Fork *ForkList `protobuf:"bytes,15,opt,name=fork" json:"fork,omitempty"` Fork *ForkList `protobuf:"bytes,15,opt,name=fork" json:"fork,omitempty"`
Health *HealthCheck `protobuf:"bytes,16,opt,name=health" json:"health,omitempty"`
} }
// ForkList fork列表配置 // ForkList fork列表配置
...@@ -165,3 +166,10 @@ type Exec struct { ...@@ -165,3 +166,10 @@ type Exec struct {
type Pprof struct { type Pprof struct {
ListenAddr string `protobuf:"bytes,1,opt,name=listenAddr" json:"listenAddr,omitempty"` ListenAddr string `protobuf:"bytes,1,opt,name=listenAddr" json:"listenAddr,omitempty"`
} }
// HealthCheck 配置
type HealthCheck struct {
ListenAddr string `protobuf:"bytes,1,opt,name=listenAddr" json:"listenAddr,omitempty"`
CheckInterval uint32 `protobuf:"varint,2,opt,name=checkInterval" json:"checkInterval,omitempty"`
UnSyncMaxTimes uint32 `protobuf:"varint,3,opt,name=unSyncMaxTimes" json:"unSyncMaxTimes,omitempty"`
}
...@@ -171,6 +171,9 @@ func RunChain33(name string) { ...@@ -171,6 +171,9 @@ func RunChain33(name string) {
log.Info("loading wallet module") log.Info("loading wallet module")
walletm := wallet.New(cfg.Wallet, sub.Wallet) walletm := wallet.New(cfg.Wallet, sub.Wallet)
walletm.SetQueueClient(q.Client()) walletm.SetQueueClient(q.Client())
health := util.NewHealthCheckServer(q.Client())
health.Start(cfg.Health)
defer func() { defer func() {
//close all module,clean some resource //close all module,clean some resource
log.Info("begin close blockchain module") log.Info("begin close blockchain module")
...@@ -189,6 +192,8 @@ func RunChain33(name string) { ...@@ -189,6 +192,8 @@ func RunChain33(name string) {
rpcapi.Close() rpcapi.Close()
log.Info("begin close wallet module") log.Info("begin close wallet module")
walletm.Close() walletm.Close()
log.Info("begin close health module")
health.Close()
log.Info("begin close queue module") log.Info("begin close queue module")
q.Close() q.Close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment