Commit 1a47e08d authored by caopingcp's avatar caopingcp Committed by vipwzw

raft prune wal

parent 90d2f148
......@@ -6,21 +6,18 @@ package raft
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/merkle"
"github.com/33cn/chain33/queue"
drivers "github.com/33cn/chain33/system/consensus"
cty "github.com/33cn/chain33/system/dapp/coins/types"
"github.com/33cn/chain33/types"
"github.com/coreos/etcd/snap"
"github.com/golang/protobuf/proto"
)
var (
zeroHash [32]byte
)
func init() {
......@@ -31,18 +28,20 @@ func init() {
// Client Raft implementation
type Client struct {
*drivers.BaseClient
proposeC chan<- *types.Block
commitC <-chan *types.Block
proposeC chan<- BlockInfo
commitC <-chan *BlockInfo
errorC <-chan error
snapshotter *snap.Snapshotter
validatorC <-chan bool
ctx context.Context
cancel context.CancelFunc
once sync.Once
blockInfo *BlockInfo
mtx sync.Mutex
}
// NewBlockstore create Raft Client
func NewBlockstore(ctx context.Context, cfg *types.Consensus, snapshotter *snap.Snapshotter, proposeC chan<- *types.Block, commitC <-chan *types.Block, errorC <-chan error, validatorC <-chan bool, cancel context.CancelFunc) *Client {
func NewBlockstore(ctx context.Context, cfg *types.Consensus, snapshotter *snap.Snapshotter, proposeC chan<- BlockInfo, commitC <-chan *BlockInfo, errorC <-chan error, validatorC <-chan bool, cancel context.CancelFunc) *Client {
c := drivers.NewBaseClient(cfg)
client := &Client{BaseClient: c, proposeC: proposeC, snapshotter: snapshotter, validatorC: validatorC, commitC: commitC, errorC: errorC, ctx: ctx, cancel: cancel}
c.SetChild(client)
......@@ -75,20 +74,23 @@ func (client *Client) ProcEvent(msg *queue.Message) bool {
// CheckBlock method
func (client *Client) CheckBlock(parent *types.Block, current *types.BlockDetail) error {
cfg := client.GetAPI().GetConfig()
if current.Block.Difficulty != cfg.GetP(0).PowLimitBits {
return types.ErrBlockHeaderDifficulty
}
return nil
}
func (client *Client) getSnapshot() ([]byte, error) {
//这里可能导致死锁
return proto.Marshal(client.GetCurrentBlock())
return json.Marshal(client.GetCurrentInfo())
}
func (client *Client) recoverFromSnapshot(snapshot []byte) error {
var block types.Block
if err := proto.Unmarshal(snapshot, &block); err != nil {
var info *BlockInfo
if err := json.Unmarshal(snapshot, info); err != nil {
return err
}
client.SetCurrentBlock(&block)
client.SetCurrentInfo(info)
return nil
}
......@@ -110,10 +112,8 @@ func (client *Client) Close() {
// CreateBlock method
func (client *Client) CreateBlock() {
issleep := true
retry := 0
infoflag := 0
count := 0
count := int64(0)
cfg := client.GetAPI().GetConfig()
//打包区块前先同步到最大高度
for {
......@@ -124,62 +124,59 @@ func (client *Client) CreateBlock() {
time.Sleep(time.Second)
retry++
if retry >= 600 {
panic("This node encounter problem, exit.")
panic("Leader encounter problem, exit.")
}
}
curBlock, err := client.RequestLastBlock()
if err != nil {
rlog.Error("Leader RequestLastBlock fail", "err", err)
panic(err)
}
curInfo := &BlockInfo{
Height: curBlock.Height,
Hash: common.ToHex(curBlock.Hash(cfg)),
}
ticker := time.NewTicker(50 * time.Millisecond)
client.SetCurrentInfo(curInfo)
ticker := time.NewTicker(time.Duration(writeBlockSeconds) * time.Second)
hint := time.NewTicker(30 * time.Second)
defer ticker.Stop()
defer hint.Stop()
for {
select {
case <-client.ctx.Done():
return
case <-hint.C:
rlog.Info("==================This is Leader node=====================")
case <-ticker.C:
//如果leader节点突然挂了,不是打包节点,需要退出
if !mux.Load().(bool) {
rlog.Warn("I'm not the validator node anymore, exit.=============================")
break
}
infoflag++
if infoflag >= 3 {
rlog.Info("==================This is Leader node=====================")
infoflag = 0
}
if issleep {
time.Sleep(10 * time.Second)
count++
rlog.Warn("Not the Leader node anymore")
return
}
if count >= 12 {
rlog.Info("Create an empty block")
block := client.GetCurrentBlock()
emptyBlock := &types.Block{}
emptyBlock.StateHash = block.StateHash
emptyBlock.ParentHash = block.Hash(cfg)
emptyBlock.Height = block.Height + 1
emptyBlock.Txs = nil
emptyBlock.TxHash = zeroHash[:]
emptyBlock.BlockTime = types.Now().Unix()
entry := emptyBlock
client.propose(entry)
er := client.WriteBlock(block.StateHash, emptyBlock)
if er != nil {
rlog.Error(fmt.Sprintf("********************err:%v", er.Error()))
continue
lastBlock, err := client.RequestLastBlock()
if err != nil {
rlog.Error("Leader RequestLastBlock fail", "err", err)
break
}
client.SetCurrentBlock(emptyBlock)
count = 0
if client.GetCurrentInfoHeight() != lastBlock.Height {
rlog.Info("Leader wait commit blockInfo", "infoHeight", client.GetCurrentInfoHeight(),
"blockHeight", lastBlock.Height)
break
}
lastBlock := client.GetCurrentBlock()
txs := client.RequestTx(int(cfg.GetP(lastBlock.Height+1).MaxTxNumber), nil)
if len(txs) == 0 {
issleep = true
continue
count++
//not create empty block when emptyBlockInterval is 0
if emptyBlockInterval == 0 || count < emptyBlockInterval/writeBlockSeconds {
break
}
issleep = false
count = 0
rlog.Debug("==================start create new block!=====================")
//create empty block every no tx in emptyBlockInterval seconds
rlog.Info("Leader create empty block")
}
var newblock types.Block
newblock.ParentHash = lastBlock.Hash(cfg)
newblock.Height = lastBlock.Height + 1
......@@ -189,32 +186,37 @@ func (client *Client) CreateBlock() {
newblock.Txs = types.TransactionSort(newblock.Txs)
}
newblock.TxHash = merkle.CalcMerkleRoot(cfg, newblock.Height, newblock.Txs)
//固定难度
newblock.Difficulty = cfg.GetP(0).PowLimitBits
newblock.BlockTime = types.Now().Unix()
if lastBlock.BlockTime >= newblock.BlockTime {
newblock.BlockTime = lastBlock.BlockTime + 1
}
blockEntry := newblock
client.propose(&blockEntry)
err := client.WriteBlock(lastBlock.StateHash, &newblock)
err = client.WriteBlock(lastBlock.StateHash, &newblock)
if err != nil {
issleep = true
rlog.Error(fmt.Sprintf("********************err:%v", err.Error()))
continue
rlog.Error("Leader WriteBlock fail", "err", err)
break
}
info := BlockInfo{
Height: newblock.Height,
Hash: common.ToHex(newblock.Hash(cfg)),
}
time.Sleep(time.Second * time.Duration(writeBlockSeconds))
client.propose(info)
count = 0
}
}
}
// 向raft底层发送block
func (client *Client) propose(block *types.Block) {
client.proposeC <- block.Clone()
// 向raft底层发送BlockInfo
func (client *Client) propose(info BlockInfo) {
client.proposeC <- info
}
// 从receive channel中读leader发来的block
func (client *Client) readCommits(commitC <-chan *types.Block, errorC <-chan error) {
var data *types.Block
func (client *Client) readCommits(commitC <-chan *BlockInfo, errorC <-chan error) {
var data *BlockInfo
var ok bool
for {
select {
......@@ -222,10 +224,8 @@ func (client *Client) readCommits(commitC <-chan *types.Block, errorC <-chan err
if !ok || data == nil {
continue
}
rlog.Debug("===============Get block from commit channel===========")
// 在程序刚开始启动的时候有可能存在丢失数据的问题
//区块高度统一由base中的相关代码进行变更,防止错误区块出现
//client.SetCurrentBlock(data)
rlog.Info("Commit blockInfo", "height", data.Height, "blockhash", data.Hash)
client.SetCurrentInfo(data)
case err, ok := <-errorC:
if ok {
......@@ -239,9 +239,6 @@ func (client *Client) readCommits(commitC <-chan *types.Block, errorC <-chan err
//轮询任务,去检测本机器是否为validator节点,如果是,则执行打包任务
func (client *Client) pollingTask() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-client.ctx.Done():
......@@ -252,7 +249,7 @@ func (client *Client) pollingTask() {
client.InitBlock()
})
if ok && !value {
rlog.Debug("================I'm not the validator node!=============")
rlog.Debug("================I'm not the validator node=============")
leader := mux.Load().(bool)
if leader {
isLeader = false
......@@ -265,8 +262,6 @@ func (client *Client) pollingTask() {
} else if !ok {
break
}
case <-ticker.C:
rlog.Debug("Gets the leader node information timeout and triggers the ticker.")
}
}
}
......@@ -275,3 +270,60 @@ func (client *Client) pollingTask() {
func (client *Client) CmpBestBlock(newBlock *types.Block, cmpBlock *types.Block) bool {
return false
}
// BlockInfo struct
type BlockInfo struct {
Height int64 `json:"height"`
Hash string `json:"hash"`
}
// SetCurrentInfo ...
func (client *Client) SetCurrentInfo(info *BlockInfo) {
client.mtx.Lock()
defer client.mtx.Unlock()
client.blockInfo = info
}
// GetCurrentInfo ...
func (client *Client) GetCurrentInfo() *BlockInfo {
client.mtx.Lock()
defer client.mtx.Unlock()
return client.blockInfo
}
// GetCurrentInfoHeight ...
func (client *Client) GetCurrentInfoHeight() int64 {
client.mtx.Lock()
defer client.mtx.Unlock()
return client.blockInfo.Height
}
// CheckBlockInfo check corresponding block
func (client *Client) CheckBlockInfo(info *BlockInfo) bool {
retry := 0
factor := 1
for {
lastBlock, err := client.RequestLastBlock()
if err == nil && lastBlock.Height >= info.Height {
break
}
retry++
time.Sleep(500 * time.Millisecond)
if retry >= 30*factor {
rlog.Info(fmt.Sprintf("CheckBlockInfo wait %d seconds", retry/2), "height", info.Height)
factor = factor * 2
}
}
block, err := client.RequestBlock(info.Height)
if err != nil {
rlog.Error("CheckBlockInfo RequestBlock fail", "err", err)
return false
}
cfg := client.GetAPI().GetConfig()
if common.ToHex(block.Hash(cfg)) != info.Hash {
rlog.Error("CheckBlockInfo hash not equal", "blockHash", common.ToHex(block.Hash(cfg)),
"infoHash", info.Hash)
return false
}
return true
}
......@@ -38,7 +38,7 @@ enableTxQuickIndex=true
seeds=["127.0.0.1:13802"]
enable=false
isSeed=false
serverStart=true
serverStart=false
innerSeedEnable=false
useGithub=false
innerBounds=300
......@@ -108,12 +108,14 @@ peersURL="http://127.0.0.1:9021"
# raft共识用到,指示raft集群中只读节点的IP(只同步日志,不参与raft共识)
readOnlyPeersURL=""
addPeersURL=""
#raft共识用到,默认raft中多少条记录打包一个snapshot(这里为了测试调整小一点)
#raft中多少条记录打包一个snapshot,默认为10000(这里为了测试调整小一点)
defaultSnapCount=2
#raft共识用到,默认raft中写区块时间间隔
#raft中写区块时间间隔,默认为1秒
writeBlockSeconds=1
#raft共识用到,默认raft中leader发送心跳包时间间隔
#raft中leader发送心跳包时间间隔,默认为1秒
heartbeatTick=1
#raft中leader打包空区块的时间间隔,默认为0,表示不打包空区块
emptyBlockInterval=120
# =============== raft共识配置参数 ===========================
[store]
......@@ -134,7 +136,7 @@ dbCache=16
signType="secp256k1"
[wallet.sub.ticket]
minerdisable=false
minerdisable=true
minerwhitelist=["*"]
minerWaitTime="1s"
......
......@@ -19,10 +19,11 @@ var (
rlog = log.New("module", "raft")
genesis string
genesisBlockTime int64
defaultSnapCount uint64 = 1000
snapshotCatchUpEntriesN uint64 = 1000
defaultSnapCount uint64 = 10000
snapshotCatchUpEntriesN uint64 = 10000
writeBlockSeconds int64 = 1
heartbeatTick = 1
emptyBlockInterval int64
isLeader = false
mux atomic.Value
confChangeC chan raftpb.ConfChange
......@@ -40,6 +41,7 @@ type subConfig struct {
DefaultSnapCount int64 `json:"defaultSnapCount"`
WriteBlockSeconds int64 `json:"writeBlockSeconds"`
HeartbeatTick int32 `json:"heartbeatTick"`
EmptyBlockInterval int64 `json:"emptyBlockInterval"`
}
func init() {
......@@ -64,7 +66,7 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
//TODO 当传入的参数异常时,返回给主函数的是个nil,这时候需要做异常处理
return nil
}
// 默认1000个Entry打一个snapshot
// 默认10000个Entry打一个snapshot
if subcfg.DefaultSnapCount > 0 {
defaultSnapCount = uint64(subcfg.DefaultSnapCount)
snapshotCatchUpEntriesN = uint64(subcfg.DefaultSnapCount)
......@@ -77,6 +79,11 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
if subcfg.HeartbeatTick > 0 {
heartbeatTick = int(subcfg.HeartbeatTick)
}
// write empty block interval in second
if subcfg.EmptyBlockInterval > 0 {
emptyBlockInterval = subcfg.EmptyBlockInterval
}
var b *Client
getSnapshot := func() ([]byte, error) { return b.getSnapshot() }
// raft集群的建立,1. 初始化两条channel: propose channel用于客户端和raft底层交互, commit channel用于获取commit消息
......@@ -96,12 +103,13 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
//采用context来统一管理所有服务
ctx, stop := context.WithCancel(context.Background())
// propose channel
proposeC := make(chan *types.Block)
proposeC := make(chan BlockInfo)
confChangeC = make(chan raftpb.ConfChange)
commitC, errorC, snapshotterReady, validatorC := NewRaftNode(ctx, int(subcfg.NodeID), subcfg.IsNewJoinNode, peers, readOnlyPeers, addPeers, getSnapshot, proposeC, confChangeC)
node, commitC, errorC, snapshotterReady, validatorC := NewRaftNode(ctx, int(subcfg.NodeID), subcfg.IsNewJoinNode, peers, readOnlyPeers, addPeers, getSnapshot, proposeC, confChangeC)
//启动raft删除节点操作监听
go serveHTTPRaftAPI(ctx, int(subcfg.RaftAPIPort), confChangeC, errorC)
// 监听commit channel,取block
b = NewBlockstore(ctx, cfg, <-snapshotterReady, proposeC, commitC, errorC, validatorC, stop)
node.SetClient(b)
return b
}
......@@ -6,6 +6,7 @@ package raft
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
......@@ -14,7 +15,6 @@ import (
"sync"
"time"
"github.com/33cn/chain33/types"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/fileutil"
typec "github.com/coreos/etcd/pkg/types"
......@@ -24,7 +24,6 @@ import (
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/golang/protobuf/proto"
)
var (
......@@ -32,9 +31,10 @@ var (
)
type raftNode struct {
proposeC <-chan *types.Block
client *Client
proposeC <-chan BlockInfo
confChangeC <-chan raftpb.ConfChange
commitC chan<- *types.Block
commitC chan<- *BlockInfo
errorC chan<- error
id int
bootstrapPeers []string
......@@ -65,13 +65,21 @@ type raftNode struct {
restartC chan struct{}
}
type Node struct {
*raftNode
}
func (node *Node) SetClient(client *Client) {
node.client = client
}
// NewRaftNode create raft node
func NewRaftNode(ctx context.Context, id int, join bool, peers []string, readOnlyPeers []string, addPeers []string, getSnapshot func() ([]byte, error), proposeC <-chan *types.Block,
confChangeC <-chan raftpb.ConfChange) (<-chan *types.Block, <-chan error, <-chan *snap.Snapshotter, <-chan bool) {
func NewRaftNode(ctx context.Context, id int, join bool, peers []string, readOnlyPeers []string, addPeers []string, getSnapshot func() ([]byte, error), proposeC <-chan BlockInfo,
confChangeC <-chan raftpb.ConfChange) (*Node, <-chan *BlockInfo, <-chan error, <-chan *snap.Snapshotter, <-chan bool) {
rlog.Info("Enter consensus raft")
// commit channel
commitC := make(chan *types.Block)
commitC := make(chan *BlockInfo)
errorC := make(chan error)
rc := &raftNode{
proposeC: proposeC,
......@@ -94,7 +102,7 @@ func NewRaftNode(ctx context.Context, id int, join bool, peers []string, readOnl
}
go rc.startRaft()
return commitC, errorC, rc.snapshotterReady, rc.validatorC
return &Node{rc}, commitC, errorC, rc.snapshotterReady, rc.validatorC
}
// 启动raft节点
......@@ -224,7 +232,7 @@ func (rc *raftNode) serveChannels() {
if !ok {
rc.proposeC = nil
} else {
out, err := proto.Marshal(prop)
out, err := json.Marshal(prop)
if err != nil {
rlog.Error(fmt.Sprintf("failed to marshal block:%v ", err.Error()))
}
......@@ -257,6 +265,10 @@ func (rc *raftNode) serveChannels() {
case <-ticker.C:
rc.node.Tick()
case rd := <-rc.node.Ready():
if !rc.checkEntries(rd.Entries) || !rc.checkEntries(rd.CommittedEntries) {
rc.stop()
return
}
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
......@@ -283,6 +295,25 @@ func (rc *raftNode) serveChannels() {
}
}
func (rc *raftNode) checkEntries(ents []raftpb.Entry) bool {
for i := range ents {
if ents[i].Type == raftpb.EntryNormal && len(ents[i].Data) != 0 {
info := &BlockInfo{}
if err := json.Unmarshal(ents[i].Data, info); err != nil {
rlog.Error("checkEntries Unmarshal BlockInfo fail", "err", err)
return false
}
if rc.client != nil {
if !rc.client.CheckBlockInfo(info) {
rlog.Error("checkEntries CheckBlockInfo fail")
return false
}
}
}
}
return true
}
func (rc *raftNode) updateValidator() {
//TODO 这块监听后期需要根据场景进行优化?
......@@ -325,6 +356,7 @@ func (rc *raftNode) updateValidator() {
}
}
func (rc *raftNode) Status() raft.Status {
rc.stopMu.RLock()
defer rc.stopMu.RUnlock()
......@@ -383,15 +415,13 @@ func (rc *raftNode) maybeTriggerSnapshot() {
return
}
appliedIndex := rc.appliedIndex
snapshotIndex := rc.snapshotIndex
confState := rc.confState
rlog.Info(fmt.Sprintf("start snapshot [applied index: %d | last snapshot index: %d]", appliedIndex, snapshotIndex))
ents, err := rc.raftStorage.Entries(appliedIndex, appliedIndex+1, 2)
rlog.Info(fmt.Sprintf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.snapshotIndex))
data, err := rc.getSnapshot()
if err != nil {
rlog.Error(fmt.Sprintf("Err happened when get snapshot:%v", err.Error()))
rlog.Error("getSnapshot fail", "err", err)
panic(err)
}
snapShot, err := rc.raftStorage.CreateSnapshot(appliedIndex, &confState, ents[0].Data)
snapShot, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data)
if err != nil {
panic(err)
}
......@@ -400,15 +430,15 @@ func (rc *raftNode) maybeTriggerSnapshot() {
}
compactIndex := uint64(1)
if appliedIndex > snapshotCatchUpEntriesN {
compactIndex = appliedIndex - snapshotCatchUpEntriesN
if rc.appliedIndex > snapshotCatchUpEntriesN {
compactIndex = rc.appliedIndex - snapshotCatchUpEntriesN
}
if err := rc.raftStorage.Compact(compactIndex); err != nil {
panic(err)
}
rlog.Info(fmt.Sprintf("compacted log at index %d", compactIndex))
rc.snapshotIndex = appliedIndex
rc.snapshotIndex = rc.appliedIndex
}
func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) {
......@@ -487,12 +517,13 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
break
}
// 解码
block := &types.Block{}
if err := proto.Unmarshal(ents[i].Data, block); err != nil {
rlog.Error(fmt.Sprintf("failed to unmarshal: %v", err.Error()))
info := &BlockInfo{}
if err := json.Unmarshal(ents[i].Data, info); err != nil {
rlog.Error("Unmarshal BlockInfo fail", "err", err)
break
}
select {
case rc.commitC <- block:
case rc.commitC <- info:
case <-rc.ctx.Done():
return false
}
......
......@@ -5,6 +5,7 @@
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
......@@ -18,7 +19,6 @@ import (
raftsnap "github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/golang/protobuf/proto"
)
func main() {
......@@ -94,12 +94,12 @@ func main() {
break
}
// 解码
block := &Block{}
if err := proto.Unmarshal(e.Data, block); err != nil {
info := &BlockInfo{}
if err := json.Unmarshal(e.Data, info); err != nil {
log.Printf("failed to unmarshal: %v", err)
break
}
msg = fmt.Sprintf("%s\t BlockHeight:%d", msg, block.Height)
msg = fmt.Sprintf("%s\tHeight=%d\tHash=%s", msg, info.Height, info.Hash)
case raftpb.EntryConfChange:
msg = fmt.Sprintf("%s\tconf", msg)
var r raftpb.ConfChange
......@@ -133,21 +133,7 @@ func genIDSlice(a []uint64) []types.ID {
return ids
}
// Block struct
type Block struct {
Version int64 `protobuf:"varint,1,opt,name=version" json:"version,omitempty"`
ParentHash []byte `protobuf:"bytes,2,opt,name=parentHash,proto3" json:"parentHash,omitempty"`
TxHash []byte `protobuf:"bytes,3,opt,name=txHash,proto3" json:"txHash,omitempty"`
StateHash []byte `protobuf:"bytes,4,opt,name=stateHash,proto3" json:"stateHash,omitempty"`
Height int64 `protobuf:"varint,5,opt,name=height" json:"height,omitempty"`
BlockTime int64 `protobuf:"varint,6,opt,name=blockTime" json:"blockTime,omitempty"`
//Signature *Signature `protobuf:"bytes,8,opt,name=signature" json:"signature,omitempty"`
//Txs []*Transaction `protobuf:"bytes,7,rep,name=txs" json:"txs,omitempty"`
type BlockInfo struct {
Height int64 `json:"height"`
Hash string `json:"hash"`
}
// Reset method
func (m *Block) Reset() { *m = Block{} }
func (m *Block) String() string { return proto.CompactTextString(m) }
// ProtoMessage method
func (*Block) ProtoMessage() {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment