Commit 6bf4628e authored by harrylee's avatar harrylee Committed by vipwzw

ajust code for raft and pbft

parent e89dbbfa
......@@ -20,8 +20,6 @@ require (
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
github.com/golang/protobuf v1.3.2
github.com/hashicorp/golang-lru v0.5.0
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pkg/errors v0.8.0
github.com/prometheus/client_golang v1.1.0 // indirect
github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d
......
This diff is collapsed.
......@@ -150,3 +150,39 @@ func clearTestData() {
}
fmt.Println("test data clear successfully!")
}
func TestReplica(t *testing.T){
reply1, request1, isPrimary1:=NewReplica(1,"127.0.0.1:8891,127.0.0.1:8892,127.0.0.1:8893","127.0.0.1:8891")
reply2, request2, isPrimary2:=NewReplica(2,"127.0.0.1:8891,127.0.0.1:8892,127.0.0.1:8893","127.0.0.1:8892")
reply3, request3, isPrimary3:=NewReplica(3,"127.0.0.1:8891,127.0.0.1:8892,127.0.0.1:8893","127.0.0.1:8893")
go func() {
for {
select {
case <-reply1:
t.Log("I'm have reply1 message")
case <-reply2:
t.Log("I'm have reply2 message")
case <-reply3:
t.Log("I'm have reply3 message")
}
}
}()
t.Log(isPrimary1)
t.Log(isPrimary2)
t.Log(isPrimary3)
go func() {
for i:=0;i<100;i++ {
op := &types.Operation{Value: &types.Block{}}
select {
case request1<-ToRequestClient(op, types.Now().String(), "127.0.0.1:8891"):
t.Log("I'm have send request1 message")
case request2<-ToRequestClient(op, types.Now().String(), "127.0.0.1:8892"):
t.Log("I'm have send request2 message")
case request3<-ToRequestClient(op, types.Now().String(), "127.0.0.1:8893"):
t.Log("I'm have send request3 message")
}
}
}()
time.Sleep(5*time.Second)
}
\ No newline at end of file
......@@ -5,6 +5,7 @@
package raft
import (
"context"
"fmt"
"sync"
"time"
......@@ -35,14 +36,15 @@ type Client struct {
errorC <-chan error
snapshotter *snap.Snapshotter
validatorC <-chan bool
stopC chan<- struct{}
ctx context.Context
cancel context.CancelFunc
once sync.Once
}
// NewBlockstore create Raft Client
func NewBlockstore(cfg *types.Consensus, snapshotter *snap.Snapshotter, proposeC chan<- *types.Block, commitC <-chan *types.Block, errorC <-chan error, validatorC <-chan bool, stopC chan<- struct{}) *Client {
func NewBlockstore(ctx context.Context, cfg *types.Consensus, snapshotter *snap.Snapshotter, proposeC chan<- *types.Block, commitC <-chan *types.Block, errorC <-chan error, validatorC <-chan bool, cancel context.CancelFunc) *Client {
c := drivers.NewBaseClient(cfg)
client := &Client{BaseClient: c, proposeC: proposeC, snapshotter: snapshotter, validatorC: validatorC, commitC: commitC, errorC: errorC, stopC: stopC}
client := &Client{BaseClient: c, proposeC: proposeC, snapshotter: snapshotter, validatorC: validatorC, commitC: commitC, errorC: errorC, ctx: ctx, cancel: cancel}
c.SetChild(client)
return client
}
......@@ -97,12 +99,12 @@ func (client *Client) SetQueueClient(c queue.Client) {
})
go client.EventLoop()
go client.readCommits(client.commitC, client.errorC)
go client.pollingTask(c)
go client.pollingTask()
}
// Close method
func (client *Client) Close() {
client.stopC <- struct{}{}
client.cancel()
rlog.Info("consensus raft closed")
}
......@@ -125,10 +127,14 @@ func (client *Client) CreateBlock() {
panic("This node encounter problem, exit.")
}
}
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-client.ctx.Done():
case <-ticker.C:
//如果leader节点突然挂了,不是打包节点,需要退出
if !isLeader {
if !mux.Load().(bool) {
rlog.Warn("I'm not the validator node anymore, exit.=============================")
break
}
......@@ -193,6 +199,8 @@ func (client *Client) CreateBlock() {
}
time.Sleep(time.Second * time.Duration(writeBlockSeconds))
}
}
}
// 向raft底层发送block
......@@ -219,17 +227,21 @@ func (client *Client) readCommits(commitC <-chan *types.Block, errorC <-chan err
if ok {
panic(err)
}
case <-client.ctx.Done():
return
}
}
}
//轮询任务,去检测本机器是否为validator节点,如果是,则执行打包任务
func (client *Client) pollingTask(c queue.Client) {
func (client *Client) pollingTask() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-client.ctx.Done():
return
case value, ok := <-client.validatorC:
//各个节点Block只初始化一次
client.once.Do(func() {
......@@ -237,9 +249,14 @@ func (client *Client) pollingTask(c queue.Client) {
})
if ok && !value {
rlog.Debug("================I'm not the validator node!=============")
leader := mux.Load().(bool)
if leader {
isLeader = false
} else if ok && !isLeader && value {
mux.Store(isLeader)
}
} else if ok && !mux.Load().(bool) && value {
isLeader = true
mux.Store(isLeader)
go client.CreateBlock()
} else if !ok {
break
......
......@@ -36,8 +36,8 @@ enableTxQuickIndex=true
[p2p]
seeds=["127.0.0.1:13802"]
enable=true
isSeed=true
enable=false
isSeed=false
serverStart=true
innerSeedEnable=false
useGithub=false
......@@ -107,8 +107,8 @@ peersURL="http://127.0.0.1:9021"
# raft共识用到,指示raft集群中只读节点的IP(只同步日志,不参与raft共识)
readOnlyPeersURL=""
addPeersURL=""
#raft共识用到,默认raft中多少条记录打包一个snapshot
defaultSnapCount=1000
#raft共识用到,默认raft中多少条记录打包一个snapshot(这里为了测试调整小一点)
defaultSnapCount=2
#raft共识用到,默认raft中写区块时间间隔
writeBlockSeconds=1
#raft共识用到,默认raft中leader发送心跳包时间间隔
......
......@@ -5,7 +5,9 @@
package raft
import (
"context"
"strings"
"sync/atomic"
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/queue"
......@@ -22,6 +24,7 @@ var (
writeBlockSeconds int64 = 1
heartbeatTick = 1
isLeader = false
mux atomic.Value
confChangeC chan raftpb.ConfChange
)
......@@ -39,6 +42,10 @@ type subConfig struct {
HeartbeatTick int32 `json:"heartbeatTick"`
}
func init() {
mux.Store(isLeader)
}
// NewRaftCluster create raft cluster
func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
rlog.Info("Start to create raft cluster")
......@@ -70,10 +77,6 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
if subcfg.HeartbeatTick > 0 {
heartbeatTick = int(subcfg.HeartbeatTick)
}
// propose channel
proposeC := make(chan *types.Block)
confChangeC = make(chan raftpb.ConfChange)
var b *Client
getSnapshot := func() ([]byte, error) { return b.getSnapshot() }
// raft集群的建立,1. 初始化两条channel: propose channel用于客户端和raft底层交互, commit channel用于获取commit消息
......@@ -90,10 +93,15 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
if len(addPeers) == 1 && addPeers[0] == "" {
addPeers = []string{}
}
commitC, errorC, snapshotterReady, validatorC, stopC := NewRaftNode(int(subcfg.NodeID), subcfg.IsNewJoinNode, peers, readOnlyPeers, addPeers, getSnapshot, proposeC, confChangeC)
//采用context来统一管理所有服务
ctx, stop := context.WithCancel(context.Background())
// propose channel
proposeC := make(chan *types.Block)
confChangeC = make(chan raftpb.ConfChange)
commitC, errorC, snapshotterReady, validatorC := NewRaftNode(ctx, int(subcfg.NodeID), subcfg.IsNewJoinNode, peers, readOnlyPeers, addPeers, getSnapshot, proposeC, confChangeC)
//启动raft删除节点操作监听
go serveHTTPRaftAPI(int(subcfg.RaftAPIPort), confChangeC, errorC)
go serveHTTPRaftAPI(ctx, int(subcfg.RaftAPIPort), confChangeC, errorC)
// 监听commit channel,取block
b = NewBlockstore(cfg, <-snapshotterReady, proposeC, commitC, errorC, validatorC, stopC)
b = NewBlockstore(ctx, cfg, <-snapshotterReady, proposeC, commitC, errorC, validatorC, stop)
return b
}
......@@ -5,6 +5,7 @@
package raft
import (
"context"
"io/ioutil"
"net/http"
"strconv"
......@@ -66,8 +67,8 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
func serveHTTPRaftAPI(port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
srv := http.Server{
func serveHTTPRaftAPI(ctx context.Context, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
srv := &http.Server{
Addr: "localhost:" + strconv.Itoa(port),
Handler: &httpRaftAPI{
confChangeC: confChangeC,
......@@ -78,9 +79,11 @@ func serveHTTPRaftAPI(port int, confChangeC chan<- raftpb.ConfChange, errorC <-c
rlog.Error(fmt.Sprintf("ListenAndServe have a err: (%v)", err.Error()))
}
}()
// exit when raft goes down
if err, ok := <-errorC; ok {
select {
case <-ctx.Done():
srv.Close()
case err := <-errorC:
srv.Close()
rlog.Error(fmt.Sprintf("the errorC chan receive a err (%v)\n", err.Error()))
}
}
......@@ -5,6 +5,7 @@
package raft
import (
"context"
"errors"
"net"
"time"
......@@ -13,16 +14,16 @@ import (
// 设置TCP keep-alive超时,接收stopc
type stoppableListener struct {
*net.TCPListener
stopc <-chan struct{}
ctx context.Context
}
// 监听tcp连接
func newStoppableListener(addr string, stopc <-chan struct{}) (*stoppableListener, error) {
func newStoppableListener(addr string, ctx context.Context) (*stoppableListener, error) {
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
return &stoppableListener{ln.(*net.TCPListener), stopc}, nil
return &stoppableListener{ln.(*net.TCPListener), ctx}, nil
}
func (ln stoppableListener) Accept() (c net.Conn, err error) {
......@@ -37,7 +38,7 @@ func (ln stoppableListener) Accept() (c net.Conn, err error) {
connc <- tc
}()
select {
case <-ln.stopc:
case <-ln.ctx.Done():
return nil, errors.New("server stopped")
case err := <-errc:
return nil, err
......
......@@ -56,17 +56,18 @@ type raftNode struct {
snapCount uint64
transport *rafthttp.Transport
stopMu sync.RWMutex
stopc chan struct{}
httpstopc chan struct{}
httpdonec chan struct{}
ctx context.Context
//stopc chan struct{}
//httpstopc chan struct{}
//httpdonec chan struct{}
validatorC chan bool
//用于判断该节点是否重启过
restartC chan struct{}
}
// NewRaftNode create raft node
func NewRaftNode(id int, join bool, peers []string, readOnlyPeers []string, addPeers []string, getSnapshot func() ([]byte, error), proposeC <-chan *types.Block,
confChangeC <-chan raftpb.ConfChange) (<-chan *types.Block, <-chan error, <-chan *snap.Snapshotter, <-chan bool, chan<- struct{}) {
func NewRaftNode(ctx context.Context, id int, join bool, peers []string, readOnlyPeers []string, addPeers []string, getSnapshot func() ([]byte, error), proposeC <-chan *types.Block,
confChangeC <-chan raftpb.ConfChange) (<-chan *types.Block, <-chan error, <-chan *snap.Snapshotter, <-chan bool) {
rlog.Info("Enter consensus raft")
// commit channel
......@@ -86,16 +87,14 @@ func NewRaftNode(id int, join bool, peers []string, readOnlyPeers []string, addP
snapdir: fmt.Sprintf("chain33_raft-%d%ssnap", id, string(os.PathSeparator)),
getSnapshot: getSnapshot,
snapCount: defaultSnapCount,
stopc: make(chan struct{}),
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
validatorC: make(chan bool),
snapshotterReady: make(chan *snap.Snapshotter, 1),
restartC: make(chan struct{}, 1),
ctx: ctx,
}
go rc.startRaft()
return commitC, errorC, rc.snapshotterReady, rc.validatorC, rc.stopc
return commitC, errorC, rc.snapshotterReady, rc.validatorC
}
// 启动raft节点
......@@ -184,22 +183,22 @@ func (rc *raftNode) serveRaft() {
panic(err)
}
ln, err := newStoppableListener(nodeURL.Host, rc.httpstopc)
ln, err := newStoppableListener(nodeURL.Host, rc.ctx)
if err != nil {
rlog.Error(fmt.Sprintf("raft: Failed to listen rafthttp (%v)", err.Error()))
panic(err)
}
err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
raftSrv := &http.Server{Handler: rc.transport.Handler()}
err = raftSrv.Serve(ln)
if err != nil {
rlog.Error(fmt.Sprintf("raft: Failed to serve rafthttp (%v)", err.Error()))
}
select {
case <-rc.httpstopc:
case <-rc.ctx.Done():
raftSrv.Close()
default:
rlog.Error(fmt.Sprintf("raft: Failed to serve rafthttp (%v)", err.Error()))
}
close(rc.httpdonec)
}
func (rc *raftNode) serveChannels() {
......@@ -246,9 +245,11 @@ func (rc *raftNode) serveChannels() {
rlog.Error(fmt.Sprintf("rc.node.ProposeConfChange:%v", err.Error()))
}
}
case <-rc.ctx.Done():
rlog.Info("I have a exit message!")
return
}
}
close(rc.stopc)
}()
// 从Ready()中接收数据
for {
......@@ -275,7 +276,7 @@ func (rc *raftNode) serveChannels() {
rc.writeError(err)
return
case <-rc.stopc:
case <-rc.ctx.Done():
rc.stop()
return
}
......@@ -283,9 +284,9 @@ func (rc *raftNode) serveChannels() {
}
func (rc *raftNode) updateValidator() {
//TODO 这块监听后期需要根据场景进行优化?
time.Sleep(5 * time.Second)
//用于标记readOnlyPeers是否已经被添加到集群中了
flag := false
isRestart := false
......@@ -297,8 +298,12 @@ func (rc *raftNode) updateValidator() {
case <-ticker.C:
ticker.Stop()
}
ticker = time.NewTicker(time.Second)
for {
time.Sleep(time.Second)
select {
case <-rc.ctx.Done():
return
case <-ticker.C:
status := rc.Status()
if status.Lead == raft.None {
rlog.Debug(fmt.Sprintf("==============This is %s node!==============", status.RaftState.String()))
......@@ -317,6 +322,8 @@ func (rc *raftNode) updateValidator() {
flag = true
}
}
}
}
func (rc *raftNode) Status() raft.Status {
rc.stopMu.RLock()
......@@ -454,20 +461,18 @@ func (rc *raftNode) stop() {
rc.stopHTTP()
close(rc.commitC)
close(rc.errorC)
close(rc.stopc)
rc.node.Stop()
}
func (rc *raftNode) stopHTTP() {
rc.transport.Stop()
close(rc.httpstopc)
<-rc.httpdonec
//close(rc.httpstopc)
//<-rc.httpdonec
}
func (rc *raftNode) writeError(err error) {
rc.stopHTTP()
close(rc.commitC)
close(rc.stopc)
rc.errorC <- err
close(rc.errorC)
rc.node.Stop()
......@@ -488,7 +493,7 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
}
select {
case rc.commitC <- block:
case <-rc.stopc:
case <-rc.ctx.Done():
return false
}
......@@ -521,7 +526,7 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
if ents[i].Index == rc.lastIndex {
select {
case rc.commitC <- nil:
case <-rc.stopc:
case <-rc.ctx.Done():
return false
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment