Unverified Commit 742b96b5 authored by 33cn's avatar 33cn Committed by GitHub

Merge pull request #204 from vipwzw/update_chain33_0105

update chain33
parents 6d72a861 8f474e38
......@@ -2,4 +2,6 @@
while :; do
./chain33-cli net time
#nc -vz localhost 8805
sleep 1
done
......@@ -189,4 +189,9 @@ superManager=[
"1Bsg9j6gW83sShoee1fZAt9TkUjcrCgA9S",
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv",
"1Q8hGLfoGe63efeWa8fJ4Pnukhkngt6poK"
]
\ No newline at end of file
]
[health]
listenAddr="localhost:8805"
checkInterval=1
unSyncMaxTimes=2
\ No newline at end of file
......@@ -90,64 +90,82 @@ func (store *BaseStore) Wait() {}
func (store *BaseStore) processMessage(msg queue.Message) {
client := store.qclient
if msg.Ty == types.EventStoreSet {
datas := msg.GetData().(*types.StoreSetWithSync)
hash, err := store.child.Set(datas.Storeset, datas.Sync)
if err != nil {
msg.Reply(client.NewMessage("", types.EventStoreSetReply, err))
return
}
msg.Reply(client.NewMessage("", types.EventStoreSetReply, &types.ReplyHash{Hash: hash}))
go func() {
datas := msg.GetData().(*types.StoreSetWithSync)
hash, err := store.child.Set(datas.Storeset, datas.Sync)
if err != nil {
msg.Reply(client.NewMessage("", types.EventStoreSetReply, err))
return
}
msg.Reply(client.NewMessage("", types.EventStoreSetReply, &types.ReplyHash{Hash: hash}))
}()
} else if msg.Ty == types.EventStoreGet {
datas := msg.GetData().(*types.StoreGet)
values := store.child.Get(datas)
msg.Reply(client.NewMessage("", types.EventStoreGetReply, &types.StoreReplyValue{Values: values}))
go func() {
datas := msg.GetData().(*types.StoreGet)
values := store.child.Get(datas)
msg.Reply(client.NewMessage("", types.EventStoreGetReply, &types.StoreReplyValue{Values: values}))
}()
} else if msg.Ty == types.EventStoreMemSet { //只是在内存中set 一下,并不改变状态
datas := msg.GetData().(*types.StoreSetWithSync)
hash, err := store.child.MemSet(datas.Storeset, datas.Sync)
if err != nil {
msg.Reply(client.NewMessage("", types.EventStoreSetReply, err))
return
}
msg.Reply(client.NewMessage("", types.EventStoreSetReply, &types.ReplyHash{Hash: hash}))
go func() {
datas := msg.GetData().(*types.StoreSetWithSync)
hash, err := store.child.MemSet(datas.Storeset, datas.Sync)
println("EventStoreMemSet", string(hash))
if err != nil {
msg.Reply(client.NewMessage("", types.EventStoreSetReply, err))
return
}
msg.Reply(client.NewMessage("", types.EventStoreSetReply, &types.ReplyHash{Hash: hash}))
}()
} else if msg.Ty == types.EventStoreCommit { //把内存中set 的交易 commit
req := msg.GetData().(*types.ReqHash)
hash, err := store.child.Commit(req)
if hash == nil {
msg.Reply(client.NewMessage("", types.EventStoreCommit, types.ErrHashNotFound))
if err == types.ErrDataBaseDamage { //如果是数据库写失败,需要上报给用户
go util.ReportErrEventToFront(slog, client, "store", "wallet", err)
go func() {
req := msg.GetData().(*types.ReqHash)
println("EventStoreCommit", string(req.Hash))
hash, err := store.child.Commit(req)
if hash == nil {
msg.Reply(client.NewMessage("", types.EventStoreCommit, types.ErrHashNotFound))
if err == types.ErrDataBaseDamage { //如果是数据库写失败,需要上报给用户
go util.ReportErrEventToFront(slog, client, "store", "wallet", err)
}
} else {
msg.Reply(client.NewMessage("", types.EventStoreCommit, &types.ReplyHash{Hash: hash}))
}
} else {
msg.Reply(client.NewMessage("", types.EventStoreCommit, &types.ReplyHash{Hash: hash}))
}
}()
} else if msg.Ty == types.EventStoreRollback {
req := msg.GetData().(*types.ReqHash)
hash, err := store.child.Rollback(req)
if err != nil {
msg.Reply(client.NewMessage("", types.EventStoreRollback, types.ErrHashNotFound))
} else {
msg.Reply(client.NewMessage("", types.EventStoreRollback, &types.ReplyHash{Hash: hash}))
}
go func() {
req := msg.GetData().(*types.ReqHash)
hash, err := store.child.Rollback(req)
if err != nil {
msg.Reply(client.NewMessage("", types.EventStoreRollback, types.ErrHashNotFound))
} else {
msg.Reply(client.NewMessage("", types.EventStoreRollback, &types.ReplyHash{Hash: hash}))
}
}()
} else if msg.Ty == types.EventStoreGetTotalCoins {
req := msg.GetData().(*types.IterateRangeByStateHash)
resp := &types.ReplyGetTotalCoins{}
resp.Count = req.Count
store.child.IterateRangeByStateHash(req.StateHash, req.Start, req.End, true, resp.IterateRangeByStateHash)
msg.Reply(client.NewMessage("", types.EventGetTotalCoinsReply, resp))
go func() {
req := msg.GetData().(*types.IterateRangeByStateHash)
resp := &types.ReplyGetTotalCoins{}
resp.Count = req.Count
store.child.IterateRangeByStateHash(req.StateHash, req.Start, req.End, true, resp.IterateRangeByStateHash)
msg.Reply(client.NewMessage("", types.EventGetTotalCoinsReply, resp))
}()
} else if msg.Ty == types.EventStoreDel {
req := msg.GetData().(*types.StoreDel)
hash, err := store.child.Del(req)
if err != nil {
msg.Reply(client.NewMessage("", types.EventStoreDel, types.ErrHashNotFound))
} else {
msg.Reply(client.NewMessage("", types.EventStoreDel, &types.ReplyHash{Hash: hash}))
}
go func() {
req := msg.GetData().(*types.StoreDel)
hash, err := store.child.Del(req)
if err != nil {
msg.Reply(client.NewMessage("", types.EventStoreDel, types.ErrHashNotFound))
} else {
msg.Reply(client.NewMessage("", types.EventStoreDel, &types.ReplyHash{Hash: hash}))
}
}()
} else if msg.Ty == types.EventStoreList {
req := msg.GetData().(*types.StoreList)
query := NewStoreListQuery(store.child, req)
msg.Reply(client.NewMessage("", types.EventStoreListReply, query.Run()))
go func() {
req := msg.GetData().(*types.StoreList)
query := NewStoreListQuery(store.child, req)
msg.Reply(client.NewMessage("", types.EventStoreListReply, query.Run()))
}()
} else {
store.child.ProcEvent(msg)
go store.child.ProcEvent(msg)
}
}
......
......@@ -45,9 +45,8 @@ func EnableMVCC(enable bool) {
// Tree merkle avl tree
type Tree struct {
root *Node
ndb *nodeDB
//batch *nodeBatch
root *Node
ndb *nodeDB
blockHeight int64
}
......@@ -135,7 +134,9 @@ func (t *Tree) Save() []byte {
if t.ndb != nil {
saveNodeNo := t.root.save(t)
treelog.Debug("Tree.Save", "saveNodeNo", saveNodeNo, "tree height", t.blockHeight)
beg := types.Now()
err := t.ndb.Commit()
treelog.Info("tree.commit", "cost", types.Since(beg))
if err != nil {
return nil
}
......@@ -268,10 +269,6 @@ type nodeDB struct {
orphans map[string]struct{}
}
type nodeBatch struct {
batch dbm.Batch
}
func newNodeDB(db dbm.DB, sync bool) *nodeDB {
ndb := &nodeDB{
cache: db.GetCache(),
......@@ -312,11 +309,6 @@ func (ndb *nodeDB) GetNode(t *Tree, hash []byte) (*Node, error) {
return node, nil
}
// GetBatch get db batch handle
func (ndb *nodeDB) GetBatch(sync bool) *nodeBatch {
return &nodeBatch{ndb.db.NewBatch(sync)}
}
// 获取叶子节点的所有父节点
func getHashNode(node *Node) (hashs [][]byte) {
for {
......
......@@ -6,6 +6,8 @@
package mavl
import (
"sync"
"github.com/33cn/chain33/common"
clog "github.com/33cn/chain33/common/log"
log "github.com/33cn/chain33/common/log/log15"
......@@ -13,7 +15,6 @@ import (
drivers "github.com/33cn/chain33/system/store"
mavl "github.com/33cn/chain33/system/store/mavl/db"
"github.com/33cn/chain33/types"
lru "github.com/hashicorp/golang-lru"
)
var mlog = log.New("module", "mavl")
......@@ -31,8 +32,7 @@ func DisableLog() {
// Store mavl store struct
type Store struct {
*drivers.BaseStore
trees map[string]*mavl.Tree
cache *lru.Cache
trees *sync.Map
enableMavlPrefix bool
enableMVCC bool
enableMavlPrune bool
......@@ -57,10 +57,7 @@ func New(cfg *types.Store, sub []byte) queue.Module {
if sub != nil {
types.MustDecode(sub, &subcfg)
}
mavls := &Store{bs, make(map[string]*mavl.Tree), nil, subcfg.EnableMavlPrefix, subcfg.EnableMVCC, subcfg.EnableMavlPrune, subcfg.PruneHeight}
mavls.cache, _ = lru.New(10)
//使能前缀mavl以及MVCC
mavls := &Store{bs, &sync.Map{}, subcfg.EnableMavlPrefix, subcfg.EnableMVCC, subcfg.EnableMavlPrune, subcfg.PruneHeight}
mavls.enableMavlPrefix = subcfg.EnableMavlPrefix
mavls.enableMVCC = subcfg.EnableMVCC
mavls.enableMavlPrune = subcfg.EnableMavlPrune
......@@ -91,18 +88,13 @@ func (mavls *Store) Get(datas *types.StoreGet) [][]byte {
var err error
values := make([][]byte, len(datas.Keys))
search := string(datas.StateHash)
if data, ok := mavls.cache.Get(search); ok {
if data, ok := mavls.trees.Load(search); ok {
tree = data.(*mavl.Tree)
} else if data, ok := mavls.trees[search]; ok {
tree = data
} else {
tree = mavl.NewTree(mavls.GetDB(), true)
//get接口也应该传入高度
//tree.SetBlockHeight(datas.Height)
err = tree.Load(datas.StateHash)
if err == nil {
mavls.cache.Add(search, tree)
}
mlog.Debug("store mavl get tree", "err", err, "StateHash", common.ToHex(datas.StateHash))
}
if err == nil {
......@@ -118,9 +110,13 @@ func (mavls *Store) Get(datas *types.StoreGet) [][]byte {
// MemSet set keys values to memcory mavl, return root hash and error
func (mavls *Store) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
beg := types.Now()
defer func() {
mlog.Info("MemSet", "cost", types.Since(beg))
}()
if len(datas.KV) == 0 {
mlog.Info("store mavl memset,use preStateHash as stateHash for kvset is null")
mavls.trees[string(datas.StateHash)] = nil
mavls.trees.Store(string(datas.StateHash), nil)
return datas.StateHash, nil
}
tree := mavl.NewTree(mavls.GetDB(), sync)
......@@ -133,44 +129,47 @@ func (mavls *Store) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
tree.Set(datas.KV[i].Key, datas.KV[i].Value)
}
hash := tree.Hash()
mavls.trees[string(hash)] = tree
if len(mavls.trees) > 1000 {
mlog.Error("too many trees in cache")
}
mavls.trees.Store(string(hash), tree)
return hash, nil
}
// Commit convert memcory mavl to storage db
func (mavls *Store) Commit(req *types.ReqHash) ([]byte, error) {
tree, ok := mavls.trees[string(req.Hash)]
beg := types.Now()
defer func() {
mlog.Info("Commit", "cost", types.Since(beg))
}()
tree, ok := mavls.trees.Load(string(req.Hash))
if !ok {
mlog.Error("store mavl commit", "err", types.ErrHashNotFound)
return nil, types.ErrHashNotFound
}
if tree == nil {
mlog.Info("store mavl commit,do nothing for kvset is null")
delete(mavls.trees, string(req.Hash))
mavls.trees.Delete(string(req.Hash))
return req.Hash, nil
}
hash := tree.Save()
hash := tree.(*mavl.Tree).Save()
if hash == nil {
mlog.Error("store mavl commit", "err", types.ErrHashNotFound)
return nil, types.ErrDataBaseDamage
}
delete(mavls.trees, string(req.Hash))
mavls.trees.Delete(string(req.Hash))
return req.Hash, nil
}
// Rollback 回退将缓存的mavl树删除掉
func (mavls *Store) Rollback(req *types.ReqHash) ([]byte, error) {
_, ok := mavls.trees[string(req.Hash)]
beg := types.Now()
defer func() {
mlog.Info("Rollback", "cost", types.Since(beg))
}()
_, ok := mavls.trees.Load(string(req.Hash))
if !ok {
mlog.Error("store mavl rollback", "err", types.ErrHashNotFound)
return nil, types.ErrHashNotFound
}
delete(mavls.trees, string(req.Hash))
mavls.trees.Delete(string(req.Hash))
return req.Hash, nil
}
......
......@@ -6,21 +6,22 @@ package types
// Config 配置信息
type Config struct {
Title string `protobuf:"bytes,1,opt,name=title" json:"title,omitempty"`
Version string `protobuf:"bytes,1,opt,name=version" json:"version,omitempty"`
Log *Log `protobuf:"bytes,2,opt,name=log" json:"log,omitempty"`
Store *Store `protobuf:"bytes,3,opt,name=store" json:"store,omitempty"`
Consensus *Consensus `protobuf:"bytes,5,opt,name=consensus" json:"consensus,omitempty"`
Mempool *Mempool `protobuf:"bytes,6,opt,name=mempool" json:"memPool,omitempty"`
BlockChain *BlockChain `protobuf:"bytes,7,opt,name=blockChain" json:"blockChain,omitempty"`
Wallet *Wallet `protobuf:"bytes,8,opt,name=wallet" json:"wallet,omitempty"`
P2P *P2P `protobuf:"bytes,9,opt,name=p2p" json:"p2p,omitempty"`
RPC *RPC `protobuf:"bytes,10,opt,name=rpc" json:"rpc,omitempty"`
Exec *Exec `protobuf:"bytes,11,opt,name=exec" json:"exec,omitempty"`
TestNet bool `protobuf:"varint,12,opt,name=testNet" json:"testNet,omitempty"`
FixTime bool `protobuf:"varint,13,opt,name=fixTime" json:"fixTime,omitempty"`
Pprof *Pprof `protobuf:"bytes,14,opt,name=pprof" json:"pprof,omitempty"`
Fork *ForkList `protobuf:"bytes,15,opt,name=fork" json:"fork,omitempty"`
Title string `protobuf:"bytes,1,opt,name=title" json:"title,omitempty"`
Version string `protobuf:"bytes,1,opt,name=version" json:"version,omitempty"`
Log *Log `protobuf:"bytes,2,opt,name=log" json:"log,omitempty"`
Store *Store `protobuf:"bytes,3,opt,name=store" json:"store,omitempty"`
Consensus *Consensus `protobuf:"bytes,5,opt,name=consensus" json:"consensus,omitempty"`
Mempool *Mempool `protobuf:"bytes,6,opt,name=mempool" json:"memPool,omitempty"`
BlockChain *BlockChain `protobuf:"bytes,7,opt,name=blockChain" json:"blockChain,omitempty"`
Wallet *Wallet `protobuf:"bytes,8,opt,name=wallet" json:"wallet,omitempty"`
P2P *P2P `protobuf:"bytes,9,opt,name=p2p" json:"p2p,omitempty"`
RPC *RPC `protobuf:"bytes,10,opt,name=rpc" json:"rpc,omitempty"`
Exec *Exec `protobuf:"bytes,11,opt,name=exec" json:"exec,omitempty"`
TestNet bool `protobuf:"varint,12,opt,name=testNet" json:"testNet,omitempty"`
FixTime bool `protobuf:"varint,13,opt,name=fixTime" json:"fixTime,omitempty"`
Pprof *Pprof `protobuf:"bytes,14,opt,name=pprof" json:"pprof,omitempty"`
Fork *ForkList `protobuf:"bytes,15,opt,name=fork" json:"fork,omitempty"`
Health *HealthCheck `protobuf:"bytes,16,opt,name=health" json:"health,omitempty"`
}
// ForkList fork列表配置
......@@ -165,3 +166,10 @@ type Exec struct {
type Pprof struct {
ListenAddr string `protobuf:"bytes,1,opt,name=listenAddr" json:"listenAddr,omitempty"`
}
// HealthCheck 配置
type HealthCheck struct {
ListenAddr string `protobuf:"bytes,1,opt,name=listenAddr" json:"listenAddr,omitempty"`
CheckInterval uint32 `protobuf:"varint,2,opt,name=checkInterval" json:"checkInterval,omitempty"`
UnSyncMaxTimes uint32 `protobuf:"varint,3,opt,name=unSyncMaxTimes" json:"unSyncMaxTimes,omitempty"`
}
......@@ -171,6 +171,9 @@ func RunChain33(name string) {
log.Info("loading wallet module")
walletm := wallet.New(cfg.Wallet, sub.Wallet)
walletm.SetQueueClient(q.Client())
health := util.NewHealthCheckServer(q.Client())
health.Start(cfg.Health)
defer func() {
//close all module,clean some resource
log.Info("begin close blockchain module")
......@@ -189,6 +192,8 @@ func RunChain33(name string) {
rpcapi.Close()
log.Info("begin close wallet module")
walletm.Close()
log.Info("begin close health module")
health.Close()
log.Info("begin close queue module")
q.Close()
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package util
import (
"net"
"time"
"sync"
"github.com/33cn/chain33/client"
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
)
var (
listenAddr = "localhost:8805"
unSyncMaxTimes uint32 = 6 //max 6 times
checkInterval uint32 = 5 // 5s
)
// HealthCheckServer a node's health check server
type HealthCheckServer struct {
api client.QueueProtocolAPI
l net.Listener
quit chan struct{}
wg sync.WaitGroup
}
// Close NewHealthCheckServer close
func (s *HealthCheckServer) Close() {
close(s.quit)
s.wg.Wait()
log.Info("healthCheck quit")
}
// NewHealthCheckServer new json rpcserver object
func NewHealthCheckServer(c queue.Client) *HealthCheckServer {
h := &HealthCheckServer{}
h.api, _ = client.New(c, nil)
h.quit = make(chan struct{})
return h
}
// Start HealthCheckServer start
func (s *HealthCheckServer) Start(cfg *types.HealthCheck) {
if cfg != nil {
if cfg.ListenAddr != "" {
listenAddr = cfg.ListenAddr
}
if cfg.CheckInterval != 0 {
checkInterval = cfg.CheckInterval
}
if cfg.UnSyncMaxTimes != 0 {
unSyncMaxTimes = cfg.UnSyncMaxTimes
}
}
log.Info("healthCheck start ", "addr", listenAddr, "inter", checkInterval, "times", unSyncMaxTimes)
s.wg.Add(1)
go s.healthCheck()
}
func (s *HealthCheckServer) listen(on bool) error {
if on {
listener, err := net.Listen("tcp", listenAddr)
if err != nil {
return err
}
s.l = listener
log.Info("healthCheck listen open")
return nil
}
if s.l != nil {
err := s.l.Close()
if err != nil {
return err
}
log.Info("healthCheck listen close")
s.l = nil
}
return nil
}
func (s *HealthCheckServer) getHealth(sync bool) (bool, error) {
reply, err := s.api.IsSync()
if err != nil {
return false, err
}
peerList, err := s.api.PeerInfo()
if err != nil {
return false, err
}
log.Info("healthCheck tick", "peers", len(peerList.Peers), "isSync", reply.IsOk, "sync", sync)
return len(peerList.Peers) > 1 && reply.IsOk, nil
}
func (s *HealthCheckServer) healthCheck() {
ticker := time.NewTicker(time.Second * time.Duration(checkInterval))
defer ticker.Stop()
defer s.wg.Done()
var sync bool
var unSyncTimes uint32
for {
select {
case <-s.quit:
if s.l != nil {
s.l.Close()
}
if s.api != nil {
s.api.Close()
}
return
case <-ticker.C:
health, err := s.getHealth(sync)
if err != nil {
continue
}
//sync
if health {
if !sync {
err = s.listen(true)
if err != nil {
log.Error("healthCheck ", "listen open err", err.Error())
continue
}
sync = true
}
unSyncTimes = 0
} else {
if sync {
if unSyncTimes >= unSyncMaxTimes {
err = s.listen(false)
if err != nil {
log.Error("healthCheck ", "listen close err", err.Error())
continue
}
sync = false
}
unSyncTimes++
}
}
}
}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package util
import (
"testing"
"time"
"github.com/33cn/chain33/client/mocks"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
func TestStart(t *testing.T) {
q := queue.New("channel")
health := NewHealthCheckServer(q.Client())
api := new(mocks.QueueProtocolAPI)
reply := &types.Reply{IsOk: true}
api.On("IsSync").Return(reply, nil)
peer1 := &types.Peer{Addr: "addr1"}
peer2 := &types.Peer{Addr: "addr2"}
peers := &types.PeerList{Peers: []*types.Peer{peer1, peer2}}
api.On("PeerInfo").Return(peers, nil)
api.On("Close").Return()
health.api = api
cfg, _ := types.InitCfg("../cmd/chain33/chain33.test.toml")
health.Start(cfg.Health)
time.Sleep(time.Second * 3)
health.Close()
time.Sleep(time.Second * 1)
}
func TestGetHealth(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
reply := &types.Reply{IsOk: true}
api.On("IsSync").Return(reply, nil).Once()
peer2 := &types.Peer{Addr: "addr2"}
peerlist := &types.PeerList{Peers: []*types.Peer{peer2}}
api.On("PeerInfo").Return(peerlist, nil).Once()
q := queue.New("channel")
health := NewHealthCheckServer(q.Client())
health.api = api
ret, err := health.getHealth(true)
assert.Nil(t, err)
assert.Equal(t, false, ret)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment