Commit 7498c131 authored by caopingcp's avatar caopingcp Committed by vipwzw

raft avoid rollback

parent cf216df2
......@@ -6,9 +6,6 @@ package raft
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/33cn/chain33/common"
......@@ -18,6 +15,7 @@ import (
cty "github.com/33cn/chain33/system/dapp/coins/types"
"github.com/33cn/chain33/types"
"github.com/coreos/etcd/snap"
"github.com/golang/protobuf/proto"
)
func init() {
......@@ -28,19 +26,17 @@ func init() {
// Client Raft implementation
type Client struct {
*drivers.BaseClient
proposeC chan<- BlockInfo
commitC <-chan *BlockInfo
proposeC chan<- *types.Block
commitC <-chan *types.Block
errorC <-chan error
snapshotter *snap.Snapshotter
validatorC <-chan bool
ctx context.Context
cancel context.CancelFunc
blockInfo *BlockInfo
mtx sync.Mutex
}
// NewBlockstore create Raft Client
func NewBlockstore(ctx context.Context, cfg *types.Consensus, snapshotter *snap.Snapshotter, proposeC chan<- BlockInfo, commitC <-chan *BlockInfo, errorC <-chan error, validatorC <-chan bool, cancel context.CancelFunc) *Client {
func NewBlockstore(ctx context.Context, cfg *types.Consensus, snapshotter *snap.Snapshotter, proposeC chan<- *types.Block, commitC <-chan *types.Block, errorC <-chan error, validatorC <-chan bool, cancel context.CancelFunc) *Client {
c := drivers.NewBaseClient(cfg)
client := &Client{BaseClient: c, proposeC: proposeC, snapshotter: snapshotter, validatorC: validatorC, commitC: commitC, errorC: errorC, ctx: ctx, cancel: cancel}
c.SetChild(client)
......@@ -81,15 +77,14 @@ func (client *Client) CheckBlock(parent *types.Block, current *types.BlockDetail
}
func (client *Client) getSnapshot() ([]byte, error) {
return json.Marshal(client.GetCurrentInfo())
return proto.Marshal(client.GetCurrentBlock())
}
func (client *Client) recoverFromSnapshot(snapshot []byte) error {
var info *BlockInfo
if err := json.Unmarshal(snapshot, info); err != nil {
block := &types.Block{}
if err := proto.Unmarshal(snapshot, block); err != nil {
return err
}
client.SetCurrentInfo(info)
return nil
}
......@@ -112,34 +107,28 @@ func (client *Client) Close() {
// CreateBlock method
func (client *Client) CreateBlock() {
retry := 0
count := int64(0)
cfg := client.GetAPI().GetConfig()
//打包区块前先同步到最大高度
tocker := time.NewTicker(30 * time.Second)
beg := time.Now()
OuterLoop:
for {
select {
case <-tocker.C:
rlog.Info("Still catching up max height......", "Height", client.GetCurrentHeight(), "cost", time.Since(beg))
default:
if client.IsCaughtUp() {
rlog.Info("Leader has caught up the max height")
break
rlog.Info("Leader has caught up max height")
break OuterLoop
}
time.Sleep(time.Second)
retry++
if retry >= 600 {
panic("Leader encounter problem, exit.")
}
}
curBlock, err := client.RequestLastBlock()
if err != nil {
rlog.Error("Leader RequestLastBlock fail", "err", err)
panic(err)
}
curInfo := &BlockInfo{
Height: curBlock.Height,
Hash: common.ToHex(curBlock.Hash(cfg)),
}
client.SetCurrentInfo(curInfo)
tocker.Stop()
ticker := time.NewTicker(time.Duration(writeBlockSeconds) * time.Second)
count := int64(0)
cfg := client.GetAPI().GetConfig()
hint := time.NewTicker(30 * time.Second)
ticker := time.NewTicker(time.Duration(writeBlockSeconds) * time.Second)
defer ticker.Stop()
defer hint.Stop()
for {
......@@ -151,7 +140,7 @@ func (client *Client) CreateBlock() {
case <-ticker.C:
//如果leader节点突然挂了,不是打包节点,需要退出
if !mux.Load().(bool) {
rlog.Warn("Not the Leader node anymore")
rlog.Warn("Not Leader node anymore")
return
}
......@@ -160,11 +149,6 @@ func (client *Client) CreateBlock() {
rlog.Error("Leader RequestLastBlock fail", "err", err)
break
}
if client.GetCurrentInfoHeight() != lastBlock.Height {
rlog.Info("Leader wait commit blockInfo", "infoHeight", client.GetCurrentInfoHeight(),
"blockHeight", lastBlock.Height)
break
}
txs := client.RequestTx(int(cfg.GetP(lastBlock.Height+1).MaxTxNumber), nil)
if len(txs) == 0 {
......@@ -192,17 +176,10 @@ func (client *Client) CreateBlock() {
if lastBlock.BlockTime >= newblock.BlockTime {
newblock.BlockTime = lastBlock.BlockTime + 1
}
err = client.WriteBlock(lastBlock.StateHash, &newblock)
if err != nil {
rlog.Error("Leader WriteBlock fail", "err", err)
break
}
info := BlockInfo{
Height: newblock.Height,
Hash: common.ToHex(newblock.Hash(cfg)),
}
client.propose(info)
pblock := newblock.Clone()
client.propose(pblock)
rlog.Info("Leader propose block", "height", pblock.Height, "blockhash", common.ToHex(pblock.Hash(cfg)),
"txhash", common.ToHex(pblock.TxHash))
count = 0
}
......@@ -210,23 +187,37 @@ func (client *Client) CreateBlock() {
}
// 向raft底层发送BlockInfo
func (client *Client) propose(info BlockInfo) {
client.proposeC <- info
func (client *Client) propose(block *types.Block) {
client.proposeC <- block
}
// 从receive channel中读leader发来的block
func (client *Client) readCommits(commitC <-chan *BlockInfo, errorC <-chan error) {
var data *BlockInfo
func (client *Client) readCommits(commitC <-chan *types.Block, errorC <-chan error) {
var data *types.Block
var ok bool
cfg := client.GetAPI().GetConfig()
for {
select {
case data, ok = <-commitC:
if !ok || data == nil {
continue
break
}
lastBlock, err := client.RequestLastBlock()
if err != nil {
rlog.Error("RequestLastBlock fail", "err", err)
break
}
if lastBlock.Height >= data.Height {
rlog.Info("already has block", "height", data.Height)
break
}
rlog.Info("Write block", "height", data.Height, "blockhash", common.ToHex(data.Hash(cfg)),
"txhash", common.ToHex(data.TxHash))
err = client.WriteBlock(nil, data)
if err != nil {
rlog.Error("WriteBlock fail", "err", err)
break
}
rlog.Info("Commit blockInfo", "height", data.Height, "blockhash", data.Hash)
client.SetCurrentInfo(data)
case err, ok := <-errorC:
if ok {
panic(err)
......@@ -245,9 +236,9 @@ func (client *Client) pollingTask() {
return
case value, ok := <-client.validatorC:
if ok && !value {
rlog.Debug("================I'm not the validator node=============")
leader := mux.Load().(bool)
if leader {
rlog.Info("================Change to follower node=============")
isLeader = false
mux.Store(isLeader)
}
......@@ -266,60 +257,3 @@ func (client *Client) pollingTask() {
func (client *Client) CmpBestBlock(newBlock *types.Block, cmpBlock *types.Block) bool {
return false
}
// BlockInfo struct
type BlockInfo struct {
Height int64 `json:"height"`
Hash string `json:"hash"`
}
// SetCurrentInfo ...
func (client *Client) SetCurrentInfo(info *BlockInfo) {
client.mtx.Lock()
defer client.mtx.Unlock()
client.blockInfo = info
}
// GetCurrentInfo ...
func (client *Client) GetCurrentInfo() *BlockInfo {
client.mtx.Lock()
defer client.mtx.Unlock()
return client.blockInfo
}
// GetCurrentInfoHeight ...
func (client *Client) GetCurrentInfoHeight() int64 {
client.mtx.Lock()
defer client.mtx.Unlock()
return client.blockInfo.Height
}
// CheckBlockInfo check corresponding block
func (client *Client) CheckBlockInfo(info *BlockInfo) bool {
retry := 0
factor := 1
for {
lastBlock, err := client.RequestLastBlock()
if err == nil && lastBlock.Height >= info.Height {
break
}
retry++
time.Sleep(500 * time.Millisecond)
if retry >= 30*factor {
rlog.Info(fmt.Sprintf("CheckBlockInfo wait %d seconds", retry/2), "height", info.Height)
factor = factor * 2
}
}
block, err := client.RequestBlock(info.Height)
if err != nil {
rlog.Error("CheckBlockInfo RequestBlock fail", "err", err)
return false
}
cfg := client.GetAPI().GetConfig()
if common.ToHex(block.Hash(cfg)) != info.Hash {
rlog.Error("CheckBlockInfo hash not equal", "blockHash", common.ToHex(block.Hash(cfg)),
"infoHash", info.Hash)
return false
}
return true
}
......@@ -35,7 +35,7 @@ batchsync=false
enableTxQuickIndex=true
[p2p]
types=[]
types=["dht"]
enable=false
msgCacheSize=10240
driver="leveldb"
......@@ -160,3 +160,9 @@ superManager=[
[exec.sub.autonomy]
total="16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp"
useBalance=false
[metrics]
#是否使能发送metrics数据的发送
enableMetrics=false
#数据保存模式
dataEmitMode="influxdb"
\ No newline at end of file
......@@ -103,13 +103,12 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
//采用context来统一管理所有服务
ctx, stop := context.WithCancel(context.Background())
// propose channel
proposeC := make(chan BlockInfo)
proposeC := make(chan *types.Block)
confChangeC = make(chan raftpb.ConfChange)
node, commitC, errorC, snapshotterReady, validatorC := NewRaftNode(ctx, int(subcfg.NodeID), subcfg.IsNewJoinNode, peers, readOnlyPeers, addPeers, getSnapshot, proposeC, confChangeC)
commitC, errorC, snapshotterReady, validatorC := NewRaftNode(ctx, int(subcfg.NodeID), subcfg.IsNewJoinNode, peers, readOnlyPeers, addPeers, getSnapshot, proposeC, confChangeC)
//启动raft删除节点操作监听
go serveHTTPRaftAPI(ctx, int(subcfg.RaftAPIPort), confChangeC, errorC)
// 监听commit channel,取block
b = NewBlockstore(ctx, cfg, <-snapshotterReady, proposeC, commitC, errorC, validatorC, stop)
node.SetClient(b)
return b
}
......@@ -6,15 +6,18 @@ package raft
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/33cn/chain33/types"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/fileutil"
typec "github.com/coreos/etcd/pkg/types"
......@@ -24,6 +27,7 @@ import (
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/golang/protobuf/proto"
)
var (
......@@ -31,10 +35,9 @@ var (
)
type raftNode struct {
client *Client
proposeC <-chan BlockInfo
proposeC <-chan *types.Block
confChangeC <-chan raftpb.ConfChange
commitC chan<- *BlockInfo
commitC chan<- *types.Block
errorC chan<- error
id int
bootstrapPeers []string
......@@ -69,17 +72,13 @@ type Node struct {
*raftNode
}
func (node *Node) SetClient(client *Client) {
node.client = client
}
// NewRaftNode create raft node
func NewRaftNode(ctx context.Context, id int, join bool, peers []string, readOnlyPeers []string, addPeers []string, getSnapshot func() ([]byte, error), proposeC <-chan BlockInfo,
confChangeC <-chan raftpb.ConfChange) (*Node, <-chan *BlockInfo, <-chan error, <-chan *snap.Snapshotter, <-chan bool) {
func NewRaftNode(ctx context.Context, id int, join bool, peers []string, readOnlyPeers []string, addPeers []string, getSnapshot func() ([]byte, error), proposeC <-chan *types.Block,
confChangeC <-chan raftpb.ConfChange) (<-chan *types.Block, <-chan error, <-chan *snap.Snapshotter, <-chan bool) {
rlog.Info("Enter consensus raft")
// commit channel
commitC := make(chan *BlockInfo)
commitC := make(chan *types.Block)
errorC := make(chan error)
rc := &raftNode{
proposeC: proposeC,
......@@ -102,7 +101,7 @@ func NewRaftNode(ctx context.Context, id int, join bool, peers []string, readOnl
}
go rc.startRaft()
return &Node{rc}, commitC, errorC, rc.snapshotterReady, rc.validatorC
return commitC, errorC, rc.snapshotterReady, rc.validatorC
}
// 启动raft节点
......@@ -176,6 +175,9 @@ func (rc *raftNode) startRaft() {
//定时轮询watch leader 状态是否改变,更新validator
go rc.updateValidator()
//定时清理wal日志
go rc.cleanupWal()
}
// 网络监听
......@@ -232,7 +234,7 @@ func (rc *raftNode) serveChannels() {
if !ok {
rc.proposeC = nil
} else {
out, err := json.Marshal(prop)
out, err := proto.Marshal(prop)
if err != nil {
rlog.Error(fmt.Sprintf("failed to marshal block:%v ", err.Error()))
}
......@@ -265,10 +267,6 @@ func (rc *raftNode) serveChannels() {
case <-ticker.C:
rc.node.Tick()
case rd := <-rc.node.Ready():
if !rc.checkEntries(rd.Entries) || !rc.checkEntries(rd.CommittedEntries) {
rc.stop()
return
}
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
......@@ -295,25 +293,6 @@ func (rc *raftNode) serveChannels() {
}
}
func (rc *raftNode) checkEntries(ents []raftpb.Entry) bool {
for i := range ents {
if ents[i].Type == raftpb.EntryNormal && len(ents[i].Data) != 0 {
info := &BlockInfo{}
if err := json.Unmarshal(ents[i].Data, info); err != nil {
rlog.Error("checkEntries Unmarshal BlockInfo fail", "err", err)
return false
}
if rc.client != nil {
if !rc.client.CheckBlockInfo(info) {
rlog.Error("checkEntries CheckBlockInfo fail")
return false
}
}
}
}
return true
}
func (rc *raftNode) updateValidator() {
//TODO 这块监听后期需要根据场景进行优化?
......@@ -447,7 +426,7 @@ func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) {
}
rlog.Info(fmt.Sprintf("publishing snapshot at index %d", rc.snapshotIndex))
defer rlog.Info("finished publishing snapshot at index %d", rc.snapshotIndex)
defer rlog.Info(fmt.Sprintf("finished publishing snapshot at index %d", rc.snapshotIndex))
if snapshotToSave.Metadata.Index <= rc.appliedIndex {
rlog.Error(fmt.Sprintf("snapshot index [%d] should > progress.appliedIndex [%d] + 1", snapshotToSave.Metadata.Index, rc.appliedIndex))
......@@ -517,13 +496,13 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
break
}
// 解码
info := &BlockInfo{}
if err := json.Unmarshal(ents[i].Data, info); err != nil {
rlog.Error("Unmarshal BlockInfo fail", "err", err)
block := &types.Block{}
if err := proto.Unmarshal(ents[i].Data, block); err != nil {
rlog.Error("Unmarshal block fail", "err", err)
break
}
select {
case rc.commitC <- info:
case rc.commitC <- block:
case <-rc.ctx.Done():
return false
}
......@@ -610,3 +589,63 @@ func (rc *raftNode) addReadOnlyPeers() {
}
}
}
func (rc *raftNode) cleanupWal() {
walcount := 10
ticker := time.NewTicker(600 * time.Second)
for {
select {
case <-rc.ctx.Done():
return
case <-ticker.C:
names, _ := readWalNames(rc.waldir)
if len(names) <= walcount*2 {
continue
}
wnames := sort.StringSlice(names)
_, lastWalIndex, _ := parseWalName(wnames[walcount-1])
lastEntryIndex := lastWalIndex - 1
compactIndex := rc.snapshotIndex - snapshotCatchUpEntriesN
if compactIndex > lastEntryIndex {
beg := time.Now()
rlog.Info(fmt.Sprintf("clean up wal [compacted index: %d | last wal index: %d]", compactIndex, lastEntryIndex))
removeWal(wnames[:walcount], rc.waldir)
rlog.Info(fmt.Sprintf("clean up %d wal cost %s", walcount, time.Since(beg)))
}
}
}
}
func readWalNames(dirpath string) ([]string, error) {
wnames := make([]string, 0)
names, err := fileutil.ReadDir(dirpath)
if err != nil {
rlog.Error(fmt.Sprintf("chain33_raft: error read wal names (%v)", err.Error()))
return nil, err
}
for _, name := range names {
if _, _, err := parseWalName(name); err == nil {
wnames = append(wnames, name)
}
}
return wnames, nil
}
func parseWalName(str string) (seq, index uint64, err error) {
if !strings.HasSuffix(str, ".wal") {
return 0, 0, errors.New("bad wal name")
}
_, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index)
return seq, index, err
}
func removeWal(names []string, waldir string) error {
for _, name := range names {
srcPath := fmt.Sprintf("%s%s%s", waldir, string(os.PathSeparator), name)
if err := os.Remove(srcPath); err != nil {
rlog.Error(fmt.Sprintf("chain33_raft: error remove wal (%v)", err.Error()))
return err
}
}
return nil
}
raftPerf
scripts/chain33
scripts/chain33_raft-1/
......@@ -5,13 +5,13 @@
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"path/filepath"
"strconv"
ttypes "github.com/33cn/chain33/types"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
......@@ -19,6 +19,7 @@ import (
raftsnap "github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/golang/protobuf/proto"
)
func main() {
......@@ -94,12 +95,12 @@ func main() {
break
}
// 解码
info := &BlockInfo{}
if err := json.Unmarshal(e.Data, info); err != nil {
block := &ttypes.Block{}
if err := proto.Unmarshal(e.Data, block); err != nil {
log.Printf("failed to unmarshal: %v", err)
break
}
msg = fmt.Sprintf("%s\tHeight=%d\tHash=%s", msg, info.Height, info.Hash)
msg = fmt.Sprintf("%s\tHeight=%d\tTxHash=%X", msg, block.Height, block.TxHash)
case raftpb.EntryConfChange:
msg = fmt.Sprintf("%s\tconf", msg)
var r raftpb.ConfChange
......@@ -132,8 +133,3 @@ func genIDSlice(a []uint64) []types.ID {
}
return ids
}
type BlockInfo struct {
Height int64 `json:"height"`
Hash string `json:"hash"`
}
......@@ -277,7 +277,6 @@ func CheckState(t *testing.T, client *Client) {
assert.Equal(t, client.csState.IsProposer(), true)
assert.Nil(t, client.csState.GetPrevotesState(state.LastBlockHeight, 0, nil))
assert.Nil(t, client.csState.GetPrecommitsState(state.LastBlockHeight, 0, nil))
assert.NotEmpty(t, client.csState.GetPrivValidator())
assert.Len(t, client.GenesisDoc().Validators, 1)
msg1, err := client.Query_IsHealthy(&types.ReqNil{})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment