Commit 59095119 authored by wjx@disanbo.com's avatar wjx@disanbo.com

Merge branch 'master' into fix_exec_dellocal

parents cf3de5a3 5e8dbeec
......@@ -26,6 +26,13 @@ jobs:
script:
- make test
- stage: auto-test
go: "1.9.x"
install: skip
before_script: make build_ci
script:
- make autotest dapp=all
- stage: coverage
if: branch = master
go: "1.9.x"
......@@ -39,6 +46,7 @@ jobs:
- stage: deploy
if: env(DAPP) IS present
sudo: required
services:
- docker
......@@ -55,6 +63,4 @@ jobs:
- sudo mv docker-compose /usr/local/bin
before_script: make build_ci
script:
- make autotest_ci proj=build
# - make docker-compose && make docker-compose-down && make docker-compose DAPP=paracross && make docker-compose-down DAPP=paracross && make docker-compose DAPP=relay && make docker-compose-down DAPP=relay
- make docker-compose DAPP=all && make docker-compose-down DAPP=all && make clean
- make docker-compose DAPP=${DAPP} && make docker-compose-down DAPP=${DAPP} && make clean
......@@ -28,6 +28,24 @@ pipeline {
}
}
stage('deploy') {
steps {
dir("${PROJ_DIR}"){
gitlabCommitStatus(name: 'deploy'){
sh 'make build_ci'
sh "cd build && mkdir ${env.BUILD_NUMBER} && cp ci/* ${env.BUILD_NUMBER} -r && cp chain33* Dockerfile* docker* *.sh ${env.BUILD_NUMBER}/ && cd ${env.BUILD_NUMBER}/ && ./docker-compose-pre.sh run ${env.BUILD_NUMBER} all "
}
}
}
post {
always {
dir("${PROJ_DIR}"){
sh "cd build/${env.BUILD_NUMBER} && ./docker-compose-pre.sh down ${env.BUILD_NUMBER} all && cd .. && rm -rf ${env.BUILD_NUMBER} && cd .. && make clean "
}
}
}
}
}
post {
......
......@@ -70,3 +70,10 @@ make push b=branch_dev_name m="hello world"
```
如果m不设置,那么不会执行 git commit 的命令
#### 测试代码
类似plugin/dapp/relay,在cmd目录下编写自己插件的Makefile和build.sh
在build目录下写testcase和相关的Dockerfile和docker-compose配置文件,
testcase的规则参考plugin/dapp/testcase_compose_rule.md
用户可以在travis自己工程里面设置自己plugin的DAPP变量,如DAPP设置为relay,则travis里面run relay的testcase
......@@ -16,37 +16,44 @@ import (
func init() {
drivers.Reg("pbft", NewPbft)
drivers.QueryData.Register("pbft", &PbftClient{})
drivers.QueryData.Register("pbft", &Client{})
}
type PbftClient struct {
// Client Pbft implementation
type Client struct {
*drivers.BaseClient
replyChan chan *types.ClientReply
requestChan chan *types.Request
isPrimary bool
}
func NewBlockstore(cfg *types.Consensus, replyChan chan *types.ClientReply, requestChan chan *types.Request, isPrimary bool) *PbftClient {
// NewBlockstore create Pbft Client
func NewBlockstore(cfg *types.Consensus, replyChan chan *types.ClientReply, requestChan chan *types.Request, isPrimary bool) *Client {
c := drivers.NewBaseClient(cfg)
client := &PbftClient{BaseClient: c, replyChan: replyChan, requestChan: requestChan, isPrimary: isPrimary}
client := &Client{BaseClient: c, replyChan: replyChan, requestChan: requestChan, isPrimary: isPrimary}
c.SetChild(client)
return client
}
func (client *PbftClient) ProcEvent(msg queue.Message) bool {
// ProcEvent method
func (client *Client) ProcEvent(msg queue.Message) bool {
return false
}
func (client *PbftClient) Propose(block *types.Block) {
op := &types.Operation{block}
// Propose method
func (client *Client) Propose(block *types.Block) {
op := &types.Operation{Value: block}
req := ToRequestClient(op, types.Now().String(), clientAddr)
client.requestChan <- req
}
func (client *PbftClient) CheckBlock(parent *types.Block, current *types.BlockDetail) error {
// CheckBlock method
func (client *Client) CheckBlock(parent *types.Block, current *types.BlockDetail) error {
return nil
}
func (client *PbftClient) SetQueueClient(c queue.Client) {
// SetQueueClient method
func (client *Client) SetQueueClient(c queue.Client) {
plog.Info("Enter SetQueue method of pbft consensus")
client.InitClient(c, func() {
......@@ -57,7 +64,8 @@ func (client *PbftClient) SetQueueClient(c queue.Client) {
go client.CreateBlock()
}
func (client *PbftClient) CreateBlock() {
// CreateBlock method
func (client *Client) CreateBlock() {
issleep := true
if !client.isPrimary {
return
......@@ -95,11 +103,13 @@ func (client *PbftClient) CreateBlock() {
}
}
func (client *PbftClient) GetGenesisBlockTime() int64 {
// GetGenesisBlockTime get genesis blocktime
func (client *Client) GetGenesisBlockTime() int64 {
return genesisBlockTime
}
func (client *PbftClient) CreateGenesisTx() (ret []*types.Transaction) {
// CreateGenesisTx get genesis tx
func (client *Client) CreateGenesisTx() (ret []*types.Transaction) {
var tx types.Transaction
tx.Execer = []byte("coins")
tx.To = genesis
......@@ -112,7 +122,7 @@ func (client *PbftClient) CreateGenesisTx() (ret []*types.Transaction) {
return
}
func (client *PbftClient) readReply() {
func (client *Client) readReply() {
data := <-client.replyChan
if data == nil {
......
......@@ -89,7 +89,7 @@ powLimitBits = "0x1f2fffff"
[consensus.sub.pbft]
genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
genesisBlockTime=1514533394
nodeId=1
nodeID=1
peersURL="127.0.0.1:8890"
clientAddr="127.0.0.1:8890"
......
......@@ -22,11 +22,12 @@ var (
type subConfig struct {
Genesis string `json:"genesis"`
GenesisBlockTime int64 `json:"genesisBlockTime"`
NodeId int64 `json:"nodeId"`
NodeID int64 `json:"nodeID"`
PeersURL string `json:"peersURL"`
ClientAddr string `json:"clientAddr"`
}
// NewPbft create pbft cluster
func NewPbft(cfg *pb.Consensus, sub []byte) queue.Module {
plog.Info("start to creat pbft node")
var subcfg subConfig
......@@ -40,14 +41,14 @@ func NewPbft(cfg *pb.Consensus, sub []byte) queue.Module {
if subcfg.GenesisBlockTime > 0 {
genesisBlockTime = subcfg.GenesisBlockTime
}
if int(subcfg.NodeId) == 0 || strings.Compare(subcfg.PeersURL, "") == 0 || strings.Compare(subcfg.ClientAddr, "") == 0 {
if int(subcfg.NodeID) == 0 || strings.Compare(subcfg.PeersURL, "") == 0 || strings.Compare(subcfg.ClientAddr, "") == 0 {
plog.Error("The nodeId, peersURL or clientAddr is empty!")
return nil
}
clientAddr = subcfg.ClientAddr
var c *PbftClient
replyChan, requestChan, isPrimary := NewReplica(uint32(subcfg.NodeId), subcfg.PeersURL, subcfg.ClientAddr)
var c *Client
replyChan, requestChan, isPrimary := NewReplica(uint32(subcfg.NodeID), subcfg.PeersURL, subcfg.ClientAddr)
c = NewBlockstore(cfg, replyChan, requestChan, isPrimary)
return c
}
......@@ -15,8 +15,7 @@ import (
"github.com/golang/protobuf/proto"
)
// Digest
// EQ Digest
func EQ(d1 []byte, d2 []byte) bool {
if len(d1) != len(d2) {
return false
......@@ -29,90 +28,91 @@ func EQ(d1 []byte, d2 []byte) bool {
return true
}
// Checkpoint
// ToCheckpoint method
func ToCheckpoint(sequence uint32, digest []byte) *types.Checkpoint {
return &types.Checkpoint{sequence, digest}
return &types.Checkpoint{Sequence: sequence, Digest: digest}
}
// Entry
// ToEntry method
func ToEntry(sequence uint32, digest []byte, view uint32) *types.Entry {
return &types.Entry{sequence, digest, view}
return &types.Entry{Sequence: sequence, Digest: digest, View: view}
}
// ViewChange
// ToViewChange method
func ToViewChange(viewchanger uint32, digest []byte) *types.ViewChange {
return &types.ViewChange{viewchanger, digest}
return &types.ViewChange{Viewchanger: viewchanger, Digest: digest}
}
// Summary
// ToSummary method
func ToSummary(sequence uint32, digest []byte) *types.Summary {
return &types.Summary{sequence, digest}
return &types.Summary{Sequence: sequence, Digest: digest}
}
// Request
// ToRequestClient method
func ToRequestClient(op *types.Operation, timestamp, client string) *types.Request {
return &types.Request{
Value: &types.Request_Client{
&types.RequestClient{op, timestamp, client}},
Client: &types.RequestClient{Op: op, Timestamp: timestamp, Client: client}},
}
}
// ToRequestPreprepare method
func ToRequestPreprepare(view, sequence uint32, digest []byte, replica uint32) *types.Request {
return &types.Request{
Value: &types.Request_Preprepare{
&types.RequestPrePrepare{view, sequence, digest, replica}},
Preprepare: &types.RequestPrePrepare{View: view, Sequence: sequence, Digest: digest, Replica: replica}},
}
}
// ToRequestPrepare method
func ToRequestPrepare(view, sequence uint32, digest []byte, replica uint32) *types.Request {
return &types.Request{
Value: &types.Request_Prepare{
&types.RequestPrepare{view, sequence, digest, replica}},
Prepare: &types.RequestPrepare{View: view, Sequence: sequence, Digest: digest, Replica: replica}},
}
}
// ToRequestCommit method
func ToRequestCommit(view, sequence, replica uint32) *types.Request {
return &types.Request{
Value: &types.Request_Commit{
&types.RequestCommit{view, sequence, replica}},
Commit: &types.RequestCommit{View: view, Sequence: sequence, Replica: replica}},
}
}
// ToRequestCheckpoint method
func ToRequestCheckpoint(sequence uint32, digest []byte, replica uint32) *types.Request {
return &types.Request{
Value: &types.Request_Checkpoint{
&types.RequestCheckpoint{sequence, digest, replica}},
Checkpoint: &types.RequestCheckpoint{Sequence: sequence, Digest: digest, Replica: replica}},
}
}
// ToRequestViewChange method
func ToRequestViewChange(view, sequence uint32, checkpoints []*types.Checkpoint, preps, prePreps []*types.Entry, replica uint32) *types.Request {
return &types.Request{
Value: &types.Request_Viewchange{
&types.RequestViewChange{view, sequence, checkpoints, preps, prePreps, replica}},
Viewchange: &types.RequestViewChange{View: view, Sequence: sequence, Checkpoints: checkpoints, Preps: preps, Prepreps: prePreps, Replica: replica}},
}
}
// ToRequestAck method
func ToRequestAck(view, replica, viewchanger uint32, digest []byte) *types.Request {
return &types.Request{
Value: &types.Request_Ack{
&types.RequestAck{view, replica, viewchanger, digest}},
Ack: &types.RequestAck{View: view, Replica: replica, Viewchanger: viewchanger, Digest: digest}},
}
}
// ToRequestNewView method
func ToRequestNewView(view uint32, viewChanges []*types.ViewChange, summaries []*types.Summary, replica uint32) *types.Request {
return &types.Request{
Value: &types.Request_Newview{
&types.RequestNewView{view, viewChanges, summaries, replica}},
Newview: &types.RequestNewView{View: view, Viewchanges: viewChanges, Summaries: summaries, Replica: replica}},
}
}
// Request Methods
// ReqDigest method
func ReqDigest(req *types.Request) []byte {
if req == nil {
return nil
......@@ -130,14 +130,12 @@ func ReqDigest(req *types.Request) []byte {
return lwm
}*/
// Reply
// ToReply method
func ToReply(view uint32, timestamp, client string, replica uint32, result *types.Result) *types.ClientReply {
return &types.ClientReply{view, timestamp, client, replica, result}
return &types.ClientReply{View: view, Timestamp: timestamp, Client: client, Replica: replica, Result: result}
}
// Reply Methods
// RepDigest method
func RepDigest(reply fmt.Stringer) []byte {
if reply == nil {
return nil
......@@ -146,8 +144,7 @@ func RepDigest(reply fmt.Stringer) []byte {
return bytes[:]
}
// Write proto message
// WriteMessage write proto message
func WriteMessage(addr string, msg proto.Message) error {
conn, err := net.Dial("tcp", addr)
defer conn.Close()
......@@ -163,8 +160,7 @@ func WriteMessage(addr string, msg proto.Message) error {
return err
}
// Read proto message
// ReadMessage read proto message
func ReadMessage(conn io.Reader, msg proto.Message) error {
var buf bytes.Buffer
n, err := io.Copy(&buf, conn)
......
......@@ -13,11 +13,13 @@ import (
"github.com/golang/protobuf/proto"
)
// constant
const (
CHECKPOINT_PERIOD uint32 = 128
CONSTANT_FACTOR uint32 = 2
CheckPointPeriod uint32 = 128
ConstantFactor uint32 = 2
)
// Replica struct
type Replica struct {
ID uint32
replicas map[uint32]string
......@@ -36,6 +38,7 @@ type Replica struct {
checkpoints []*pb.Checkpoint
}
// NewReplica create Replica instance
func NewReplica(id uint32, PeersURL string, addr string) (chan *pb.ClientReply, chan *pb.Request, bool) {
replyChan := make(chan *pb.ClientReply)
requestChan := make(chan *pb.Request)
......@@ -65,6 +68,7 @@ func NewReplica(id uint32, PeersURL string, addr string) (chan *pb.ClientReply,
}
// Startnode method
func (rep *Replica) Startnode(addr string) {
rep.acceptConnections(addr)
}
......@@ -104,7 +108,7 @@ func (rep *Replica) lowWaterMark() uint32 {
}
func (rep *Replica) highWaterMark() uint32 {
return rep.lowWaterMark() + CHECKPOINT_PERIOD*CONSTANT_FACTOR
return rep.lowWaterMark() + CheckPointPeriod*ConstantFactor
}
func (rep *Replica) sequenceInRange(sequence uint32) bool {
......@@ -133,9 +137,8 @@ func (rep *Replica) theLastReply() *pb.ClientReply {
func (rep *Replica) lastReplyToClient(client string) *pb.ClientReply {
if v, ok := rep.replies[client]; ok {
return v[len(rep.replies[client])-1]
} else {
return nil
}
return nil
}
func (rep *Replica) stateDigest() []byte {
......@@ -143,7 +146,7 @@ func (rep *Replica) stateDigest() []byte {
}
func (rep *Replica) isCheckpoint(sequence uint32) bool {
return sequence%CHECKPOINT_PERIOD == 0
return sequence%CheckPointPeriod == 0
}
func (rep *Replica) addCheckpoint(checkpoint *pb.Checkpoint) {
......@@ -735,7 +738,7 @@ func (rep *Replica) handleRequestCommit(REQ *pb.Request) {
op := req.GetClient().Op
timestamp := req.GetClient().Timestamp
client := req.GetClient().Client
result := &pb.Result{op.Value}
result := &pb.Result{Value: op.Value}
rep.executed = append(rep.executed, sequence)
reply := ToReply(view, timestamp, client, rep.ID, result)
......@@ -1036,7 +1039,7 @@ func (rep *Replica) correctSummaries(requests []*pb.Request, summaries []*pb.Sum
return
}
end := start + CHECKPOINT_PERIOD*CONSTANT_FACTOR
end := start + CheckPointPeriod*ConstantFactor
for seq := start; seq <= end; seq++ {
......@@ -1516,7 +1519,7 @@ FOR_LOOP_1:
summaries = append(summaries, summary)
start = summary.Sequence
end = start + CHECKPOINT_PERIOD*CONSTANT_FACTOR
end = start + CheckPointPeriod*ConstantFactor
// select summaries
// TODO: optimize
......
......@@ -96,7 +96,7 @@ func sendReplyList(q queue.Queue) {
count++
createReplyList("test" + strconv.Itoa(count))
msg.Reply(client.NewMessage("consensus", types.EventReplyTxList,
&types.ReplyTxList{transactions}))
&types.ReplyTxList{Txs: transactions}))
if count == 5 {
time.Sleep(5 * time.Second)
break
......@@ -125,7 +125,7 @@ func createReplyList(account string) {
var result []*types.Transaction
for j := 0; j < txSize; j++ {
//tx := &types.Transaction{}
val := &cty.CoinsAction_Transfer{&types.AssetsTransfer{Amount: 10}}
val := &cty.CoinsAction_Transfer{Transfer: &types.AssetsTransfer{Amount: 10}}
action := &cty.CoinsAction{Value: val, Ty: cty.CoinsActionTransfer}
tx := &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(action), Fee: 0}
tx.To = "14qViLJfdGaP4EeHnDyJbEGQysnCpwn1gZ"
......
......@@ -24,10 +24,11 @@ var (
func init() {
drivers.Reg("raft", NewRaftCluster)
drivers.QueryData.Register("raft", &RaftClient{})
drivers.QueryData.Register("raft", &Client{})
}
type RaftClient struct {
// Client Raft implementation
type Client struct {
*drivers.BaseClient
proposeC chan<- *types.Block
commitC <-chan *types.Block
......@@ -38,18 +39,21 @@ type RaftClient struct {
once sync.Once
}
func NewBlockstore(cfg *types.Consensus, snapshotter *snap.Snapshotter, proposeC chan<- *types.Block, commitC <-chan *types.Block, errorC <-chan error, validatorC <-chan bool, stopC chan<- struct{}) *RaftClient {
// NewBlockstore create Raft Client
func NewBlockstore(cfg *types.Consensus, snapshotter *snap.Snapshotter, proposeC chan<- *types.Block, commitC <-chan *types.Block, errorC <-chan error, validatorC <-chan bool, stopC chan<- struct{}) *Client {
c := drivers.NewBaseClient(cfg)
client := &RaftClient{BaseClient: c, proposeC: proposeC, snapshotter: snapshotter, validatorC: validatorC, commitC: commitC, errorC: errorC, stopC: stopC}
client := &Client{BaseClient: c, proposeC: proposeC, snapshotter: snapshotter, validatorC: validatorC, commitC: commitC, errorC: errorC, stopC: stopC}
c.SetChild(client)
return client
}
func (client *RaftClient) GetGenesisBlockTime() int64 {
// GetGenesisBlockTime get genesis blocktime
func (client *Client) GetGenesisBlockTime() int64 {
return genesisBlockTime
}
func (client *RaftClient) CreateGenesisTx() (ret []*types.Transaction) {
// CreateGenesisTx get genesis tx
func (client *Client) CreateGenesisTx() (ret []*types.Transaction) {
var tx types.Transaction
tx.Execer = []byte(cty.CoinsX)
tx.To = genesis
......@@ -62,20 +66,22 @@ func (client *RaftClient) CreateGenesisTx() (ret []*types.Transaction) {
return
}
func (client *RaftClient) ProcEvent(msg queue.Message) bool {
// ProcEvent method
func (client *Client) ProcEvent(msg queue.Message) bool {
return false
}
func (client *RaftClient) CheckBlock(parent *types.Block, current *types.BlockDetail) error {
// CheckBlock method
func (client *Client) CheckBlock(parent *types.Block, current *types.BlockDetail) error {
return nil
}
func (client *RaftClient) getSnapshot() ([]byte, error) {
func (client *Client) getSnapshot() ([]byte, error) {
//这里可能导致死锁
return proto.Marshal(client.GetCurrentBlock())
}
func (client *RaftClient) recoverFromSnapshot(snapshot []byte) error {
func (client *Client) recoverFromSnapshot(snapshot []byte) error {
var block types.Block
if err := proto.Unmarshal(snapshot, &block); err != nil {
return err
......@@ -84,7 +90,8 @@ func (client *RaftClient) recoverFromSnapshot(snapshot []byte) error {
return nil
}
func (client *RaftClient) SetQueueClient(c queue.Client) {
// SetQueueClient method
func (client *Client) SetQueueClient(c queue.Client) {
rlog.Info("Enter SetQueue method of raft consensus")
client.InitClient(c, func() {
})
......@@ -93,12 +100,14 @@ func (client *RaftClient) SetQueueClient(c queue.Client) {
go client.pollingTask(c)
}
func (client *RaftClient) Close() {
// Close method
func (client *Client) Close() {
client.stopC <- struct{}{}
rlog.Info("consensus raft closed")
}
func (client *RaftClient) CreateBlock() {
// CreateBlock method
func (client *Client) CreateBlock() {
issleep := true
retry := 0
infoflag := 0
......@@ -187,12 +196,12 @@ func (client *RaftClient) CreateBlock() {
}
// 向raft底层发送block
func (client *RaftClient) propose(block *types.Block) {
func (client *Client) propose(block *types.Block) {
client.proposeC <- block
}
// 从receive channel中读leader发来的block
func (client *RaftClient) readCommits(commitC <-chan *types.Block, errorC <-chan error) {
func (client *Client) readCommits(commitC <-chan *types.Block, errorC <-chan error) {
var data *types.Block
var ok bool
for {
......@@ -216,7 +225,7 @@ func (client *RaftClient) readCommits(commitC <-chan *types.Block, errorC <-chan
}
//轮询任务,去检测本机器是否为validator节点,如果是,则执行打包任务
func (client *RaftClient) pollingTask(c queue.Client) {
func (client *Client) pollingTask(c queue.Client) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
......
......@@ -95,9 +95,9 @@ genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
genesisBlockTime=1514533394
# =============== raft共识配置参数 ===========================
# 共识节点ID,raft共识用到,不同的节点设置不同的nodeId(目前只支持1,2,3这种设置)
nodeId=1
nodeID=1
# raft共识用到,通过这个端口进行节点的增加和删除
raftApiPort=9121
raftAPIPort=9121
# raft共识用到,指示这个节点是否新增加节点
isNewJoinNode=false
# raft共识用到,指示raft集群中的服务器IP和端口
......
......@@ -20,17 +20,17 @@ var (
defaultSnapCount uint64 = 1000
snapshotCatchUpEntriesN uint64 = 1000
writeBlockSeconds int64 = 1
heartbeatTick int = 1
isLeader bool = false
heartbeatTick = 1
isLeader = false
confChangeC chan raftpb.ConfChange
)
type subConfig struct {
Genesis string `json:"genesis"`
GenesisBlockTime int64 `json:"genesisBlockTime"`
NodeId int64 `json:"nodeId"`
NodeID int64 `json:"nodeID"`
PeersURL string `json:"peersURL"`
RaftApiPort int64 `json:"raftApiPort"`
RaftAPIPort int64 `json:"raftAPIPort"`
IsNewJoinNode bool `json:"isNewJoinNode"`
ReadOnlyPeersURL string `json:"readOnlyPeersURL"`
AddPeersURL string `json:"addPeersURL"`
......@@ -39,6 +39,7 @@ type subConfig struct {
HeartbeatTick int32 `json:"heartbeatTick"`
}
// NewRaftCluster create raft cluster
func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
rlog.Info("Start to create raft cluster")
var subcfg subConfig
......@@ -52,7 +53,7 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
if subcfg.GenesisBlockTime > 0 {
genesisBlockTime = subcfg.GenesisBlockTime
}
if int(subcfg.NodeId) == 0 || strings.Compare(subcfg.PeersURL, "") == 0 {
if int(subcfg.NodeID) == 0 || strings.Compare(subcfg.PeersURL, "") == 0 {
rlog.Error("Please check whether the configuration of nodeId and peersURL is empty!")
//TODO 当传入的参数异常时,返回给主函数的是个nil,这时候需要做异常处理
return nil
......@@ -74,7 +75,7 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
proposeC := make(chan *types.Block)
confChangeC = make(chan raftpb.ConfChange)
var b *RaftClient
var b *Client
getSnapshot := func() ([]byte, error) { return b.getSnapshot() }
// raft集群的建立,1. 初始化两条channel: propose channel用于客户端和raft底层交互, commit channel用于获取commit消息
// 2. raft集群中的节点之间建立http连接
......@@ -90,9 +91,9 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
if len(addPeers) == 1 && addPeers[0] == "" {
addPeers = []string{}
}
commitC, errorC, snapshotterReady, validatorC, stopC := NewRaftNode(int(subcfg.NodeId), subcfg.IsNewJoinNode, peers, readOnlyPeers, addPeers, getSnapshot, proposeC, confChangeC)
commitC, errorC, snapshotterReady, validatorC, stopC := NewRaftNode(int(subcfg.NodeID), subcfg.IsNewJoinNode, peers, readOnlyPeers, addPeers, getSnapshot, proposeC, confChangeC)
//启动raft删除节点操作监听
go serveHttpRaftAPI(int(subcfg.RaftApiPort), confChangeC, errorC)
go serveHTTPRaftAPI(int(subcfg.RaftAPIPort), confChangeC, errorC)
// 监听commit channel,取block
b = NewBlockstore(cfg, <-snapshotterReady, proposeC, commitC, errorC, validatorC, stopC)
return b
......
......@@ -30,7 +30,7 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
nodeID, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
rlog.Error(fmt.Sprintf("Failed to convert ID for conf change (%v)", err.Error()))
http.Error(w, "Failed on POST", http.StatusBadRequest)
......@@ -39,14 +39,14 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeId,
NodeID: nodeID,
Context: url,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusCreated)
case r.Method == "DELETE":
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
nodeID, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
rlog.Error(fmt.Sprintf("Failed to convert ID for conf change (%v)", err.Error()))
http.Error(w, "Failed on DELETE", http.StatusBadRequest)
......@@ -54,7 +54,7 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeId,
NodeID: nodeID,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
......@@ -66,7 +66,7 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
func serveHttpRaftAPI(port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
func serveHTTPRaftAPI(port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
srv := http.Server{
Addr: "localhost:" + strconv.Itoa(port),
Handler: &httpRaftAPI{
......
......@@ -64,6 +64,7 @@ type raftNode struct {
restartC chan struct{}
}
// NewRaftNode create raft node
func NewRaftNode(id int, join bool, peers []string, readOnlyPeers []string, addPeers []string, getSnapshot func() ([]byte, error), proposeC <-chan *types.Block,
confChangeC <-chan raftpb.ConfChange) (<-chan *types.Block, <-chan error, <-chan *snap.Snapshotter, <-chan bool, chan<- struct{}) {
......@@ -212,7 +213,7 @@ func (rc *raftNode) serveChannels() {
defer ticker.Stop()
go func() {
var confChangeCount uint64 = 0
var confChangeCount uint64
// 通过propose和proposeConfchange方法往RaftNode发通知
for rc.proposeC != nil && rc.confChangeC != nil {
select {
......@@ -231,7 +232,7 @@ func (rc *raftNode) serveChannels() {
if !ok {
rc.confChangeC = nil
} else {
confChangeCount += 1
confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
......
......@@ -34,8 +34,8 @@ import (
var (
random *rand.Rand
txNumber int = 10
loopCount int = 10
txNumber = 10
loopCount = 10
)
func init() {
......@@ -132,7 +132,7 @@ func sendReplyList(q queue.Queue) {
if msg.Ty == types.EventTxList {
count++
msg.Reply(client.NewMessage("consensus", types.EventReplyTxList,
&types.ReplyTxList{getReplyList(txNumber)}))
&types.ReplyTxList{Txs: getReplyList(txNumber)}))
if count >= loopCount {
time.Sleep(4 * time.Second)
break
......@@ -149,7 +149,7 @@ func prepareTxList() *types.Transaction {
key = generateKey(i, 32)
value = generateValue(i, 180)
nput := &pty.NormAction_Nput{&pty.NormPut{Key: key, Value: []byte(value)}}
nput := &pty.NormAction_Nput{Nput: &pty.NormPut{Key: key, Value: []byte(value)}}
action := &pty.NormAction{Value: nput, Ty: pty.NormActionPut}
tx := &types.Transaction{Execer: []byte("norm"), Payload: types.Encode(action), Fee: 0}
tx.To = address.ExecAddress("norm")
......
......@@ -45,7 +45,7 @@ func main() {
if isIndex {
fmt.Printf("Start dumping log entries from index %s.\n", *index)
walsnap.Index, err = strconv.ParseUint(*index, 10, 64)
walsnap.Index, _ = strconv.ParseUint(*index, 10, 64)
} else {
if *snapfile == "" {
ss := raftsnap.New(snapDir(dataDir))
......@@ -133,6 +133,7 @@ func genIDSlice(a []uint64) []types.ID {
return ids
}
// Block struct
type Block struct {
Version int64 `protobuf:"varint,1,opt,name=version" json:"version,omitempty"`
ParentHash []byte `protobuf:"bytes,2,opt,name=parentHash,proto3" json:"parentHash,omitempty"`
......@@ -144,6 +145,9 @@ type Block struct {
//Txs []*Transaction `protobuf:"bytes,7,rep,name=txs" json:"txs,omitempty"`
}
// Reset method
func (m *Block) Reset() { *m = Block{} }
func (m *Block) String() string { return proto.CompactTextString(m) }
func (*Block) ProtoMessage() {}
// ProtoMessage method
func (*Block) ProtoMessage() {}
......@@ -98,6 +98,7 @@ func main() {
}
}
// LoadHelp show available commands
func LoadHelp() {
fmt.Println("Available Commands:")
fmt.Println("[ip] transferperf [from, to, amount, txNum, duration] : 转账性能测试")
......@@ -108,6 +109,7 @@ func LoadHelp() {
fmt.Println("[ip] normreadperf [num, interval, duration] : 常规读数据性能测试")
}
// TransferPerf run transfer performance
func TransferPerf(from string, to string, amount string, txNum string, duration string) {
txNumInt, err := strconv.Atoi(txNum)
if err != nil {
......@@ -139,6 +141,7 @@ func TransferPerf(from string, to string, amount string, txNum string, duration
}
}
// SendToAddress run transfer
func SendToAddress(from string, to string, amount string, note string) {
amountFloat64, err := strconv.ParseFloat(amount, 64)
if err != nil {
......@@ -162,6 +165,7 @@ func SendToAddress(from string, to string, amount string, note string) {
fmt.Println(string(data))
}
// NormPerf run norm performance
func NormPerf(size string, num string, interval string, duration string) {
var key string
var value string
......@@ -197,7 +201,7 @@ func NormPerf(size string, num string, interval string, duration string) {
ch := make(chan struct{}, numThread)
for i := 0; i < numThread; i++ {
go func() {
var result int64 = 0
var result int64
totalCount := 0
txCount := 0
_, priv := genaddress()
......@@ -228,7 +232,7 @@ func NormPerf(size string, num string, interval string, duration string) {
}
}
//zzh
// NormReadPerf run read performance
func NormReadPerf(num string, interval string, duration string) {
var numThread int
numInt, err := strconv.Atoi(num)
......@@ -260,7 +264,6 @@ func NormReadPerf(num string, interval string, duration string) {
f, err := os.Open("normperf.log")
if err != nil {
panic("open file failed.")
return
}
buf := bufio.NewReader(f)
cnt := 0
......@@ -277,7 +280,6 @@ func NormReadPerf(num string, interval string, duration string) {
f, err := os.Open("normperf.log")
if err != nil {
panic("open file failed.")
return
}
buf = bufio.NewReader(f)
}
......@@ -315,6 +317,7 @@ func NormReadPerf(num string, interval string, duration string) {
}
}
// RandStringBytes create random string
func RandStringBytes(n int) string {
b := make([]byte, n)
rand.Seed(types.Now().UnixNano())
......@@ -324,9 +327,10 @@ func RandStringBytes(n int) string {
return string(b)
}
// NormPut run put action
func NormPut(privkey string, key string, value string) {
fmt.Println(key, "=", value)
nput := &pty.NormAction_Nput{&pty.NormPut{Key: key, Value: []byte(value)}}
nput := &pty.NormAction_Nput{Nput: &pty.NormPut{Key: key, Value: []byte(value)}}
action := &pty.NormAction{Value: nput, Ty: pty.NormActionPut}
tx := &types.Transaction{Execer: []byte("norm"), Payload: types.Encode(action), Fee: fee}
tx.To = address.ExecAddress("norm")
......@@ -344,6 +348,7 @@ func NormPut(privkey string, key string, value string) {
}
}
// NormGet run query action
func NormGet(key string) {
in := &pty.NormGetKey{Key: key}
data, err := proto.Marshal(in)
......
......@@ -25,25 +25,28 @@ import (
var configPath = flag.String("f", "servers.toml", "configfile")
// ScpInfo struct
type ScpInfo struct {
UserName string
PassWord string
HostIp string
HostIP string
Port int
LocalFilePath string
RemoteDir string
}
// CmdInfo struct
type CmdInfo struct {
userName string
passWord string
hostIp string
hostIP string
port int
cmd string
remoteDir string
}
type tomlConfig struct {
// TomlConfig struct
type TomlConfig struct {
Title string
Servers map[string]ScpInfo
}
......@@ -117,8 +120,9 @@ func sftpconnect(user, password, host string, port int) (*sftp.Client, error) {
return sftpClient, nil
}
// ScpFileFromLocalToRemote copy local file to remote
func ScpFileFromLocalToRemote(si *ScpInfo) {
sftpClient, err := sftpconnect(si.UserName, si.PassWord, si.HostIp, si.Port)
sftpClient, err := sftpconnect(si.UserName, si.PassWord, si.HostIP, si.Port)
if err != nil {
fmt.Println("sftconnect have a err!")
log.Fatal(err)
......@@ -157,9 +161,10 @@ func ScpFileFromLocalToRemote(si *ScpInfo) {
fmt.Println("copy file to remote server finished!")
}
// RemoteExec run cmd in remote
func RemoteExec(cmdInfo *CmdInfo) error {
//A Session only accepts one call to Run, Start or Shell.
session, err := sshconnect(cmdInfo.userName, cmdInfo.passWord, cmdInfo.hostIp, cmdInfo.port)
session, err := sshconnect(cmdInfo.userName, cmdInfo.passWord, cmdInfo.hostIP, cmdInfo.port)
if err != nil {
return err
}
......@@ -180,8 +185,9 @@ func remoteScp(si *ScpInfo, reqnum chan struct{}) {
}
func InitCfg(path string) *tomlConfig {
var cfg tomlConfig
// InitCfg init config
func InitCfg(path string) *TomlConfig {
var cfg TomlConfig
if _, err := tml.DecodeFile(path, &cfg); err != nil {
fmt.Println(err)
os.Exit(0)
......@@ -243,6 +249,7 @@ func main() {
log.Printf("read common cost time %v\n", timeCommon.Sub(start))
}
// LoadHelp show available commands
func LoadHelp() {
fmt.Println("Available Commands:")
fmt.Println(" start : 启动服务 ")
......@@ -250,14 +257,14 @@ func LoadHelp() {
fmt.Println(" clear : 清空数据")
}
func startAll(conf *tomlConfig) {
func startAll(conf *TomlConfig) {
//fmt.Println(getCurrentDirectory())
arrMap := make(map[string]*CmdInfo)
//多协程启动部署
reqC := make(chan struct{}, len(conf.Servers))
for index, sc := range conf.Servers {
cmdInfo := &CmdInfo{}
cmdInfo.hostIp = sc.HostIp
cmdInfo.hostIP = sc.HostIP
cmdInfo.userName = sc.UserName
cmdInfo.port = sc.Port
cmdInfo.passWord = sc.PassWord
......@@ -276,11 +283,11 @@ func startAll(conf *tomlConfig) {
}
}
func stopAll(conf *tomlConfig) {
func stopAll(conf *TomlConfig) {
//执行速度快,不需要多起多协程工作
for _, sc := range conf.Servers {
cmdInfo := &CmdInfo{}
cmdInfo.hostIp = sc.HostIp
cmdInfo.hostIP = sc.HostIP
cmdInfo.userName = sc.UserName
cmdInfo.port = sc.Port
cmdInfo.passWord = sc.PassWord
......@@ -290,10 +297,10 @@ func stopAll(conf *tomlConfig) {
}
}
func clearAll(conf *tomlConfig) {
func clearAll(conf *TomlConfig) {
for _, sc := range conf.Servers {
cmdInfo := &CmdInfo{}
cmdInfo.hostIp = sc.HostIp
cmdInfo.hostIP = sc.HostIP
cmdInfo.userName = sc.UserName
cmdInfo.port = sc.Port
cmdInfo.passWord = sc.PassWord
......
......@@ -39,12 +39,12 @@ func init() {
// Client export ticket client struct
type Client struct {
*drivers.BaseClient
//ticket list for miner
tlist *ty.ReplyTicketList
privmap map[string]crypto.PrivKey
ticketmu sync.Mutex
done chan struct{}
subcfg *subConfig
//ticket map for miner
ticketsMap map[string]*ty.Ticket
privmap map[string]crypto.PrivKey
ticketmu sync.Mutex
done chan struct{}
subcfg *subConfig
}
type genesisTicket struct {
......@@ -68,7 +68,13 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
if subcfg.GenesisBlockTime > 0 {
cfg.GenesisBlockTime = subcfg.GenesisBlockTime
}
t := &Client{c, &ty.ReplyTicketList{}, nil, sync.Mutex{}, make(chan struct{}), &subcfg}
t := &Client{
BaseClient: c,
ticketsMap: make(map[string]*ty.Ticket),
privmap: nil,
ticketmu: sync.Mutex{},
done: make(chan struct{}),
subcfg: &subcfg}
c.SetChild(t)
go t.flushTicketBackend()
return t
......@@ -188,16 +194,17 @@ func (client *Client) getTickets() ([]*ty.Ticket, []crypto.PrivKey, error) {
func (client *Client) getTicketCount() int64 {
client.ticketmu.Lock()
defer client.ticketmu.Unlock()
if client.tlist == nil {
return 0
}
return int64(len(client.tlist.Tickets))
return int64(len(client.ticketsMap))
}
func (client *Client) setTicket(tlist *ty.ReplyTicketList, privmap map[string]crypto.PrivKey) {
client.ticketmu.Lock()
defer client.ticketmu.Unlock()
client.tlist = tlist
client.ticketsMap = make(map[string]*ty.Ticket)
for _, ticket := range tlist.Tickets {
client.ticketsMap[ticket.GetTicketId()] = ticket
}
//client.tlist = tlist
client.privmap = privmap
tlog.Debug("setTicket", "n", len(tlist.GetTickets()))
}
......@@ -464,27 +471,32 @@ func printBInt(data *big.Int) string {
return strings.Repeat("0", 64-len(txt)) + txt
}
func (client *Client) searchTargetTicket(parent, block *types.Block) (*ty.Ticket, crypto.PrivKey, *big.Int, []byte, int, error) {
func (client *Client) searchTargetTicket(parent, block *types.Block) (*ty.Ticket, crypto.PrivKey, *big.Int, []byte, string, error) {
bits := parent.Difficulty
diff, modify, err := client.getNextTarget(parent, bits)
if err != nil {
return nil, nil, nil, nil, 0, err
return nil, nil, nil, nil, "", err
}
client.ticketmu.Lock()
defer client.ticketmu.Unlock()
for i := 0; i < len(client.tlist.Tickets); i++ {
ticket := client.tlist.Tickets[i]
for ticketID, ticket := range client.ticketsMap {
if ticket == nil {
tlog.Warn("Client searchTargetTicket ticket is nil", "ticketID", ticketID)
continue
}
//已经到成熟期
if !ticket.GetIsGenesis() && (block.BlockTime-ticket.GetCreateTime() <= types.GetP(block.Height).TicketFrozenTime) {
continue
}
//已经到成熟器
if !ticket.IsGenesis && block.BlockTime-ticket.CreateTime <= types.GetP(block.Height).TicketFrozenTime {
// 查找私钥
priv, ok := client.privmap[ticket.MinerAddress]
if !ok {
tlog.Error("Client searchTargetTicket can't find private key", "MinerAddress", ticket.MinerAddress)
continue
}
//find priv key
priv := client.privmap[ticket.MinerAddress]
privHash, err := genPrivHash(priv, ticket.TicketId)
privHash, err := genPrivHash(priv, ticketID)
if err != nil {
tlog.Error("Client searchTargetTicket genPrivHash ", "error", err)
continue
}
currentdiff := client.getCurrentTarget(block.BlockTime, ticket.TicketId, modify, privHash)
......@@ -493,36 +505,26 @@ func (client *Client) searchTargetTicket(parent, block *types.Block) (*ty.Ticket
}
tlog.Info("currentdiff", "hex", printBInt(currentdiff))
tlog.Info("FindBlock", "height------->", block.Height, "ntx", len(block.Txs))
return ticket, priv, diff, modify, i, nil
return ticket, priv, diff, modify, ticketID, nil
}
return nil, nil, nil, nil, 0, nil
return nil, nil, nil, nil, "", nil
}
func (client *Client) delTicket(ticket *ty.Ticket, index int) {
func (client *Client) delTicket(ticketID string) {
client.ticketmu.Lock()
defer client.ticketmu.Unlock()
//1. 结构体没有被重新调整过
oldticket := client.tlist.Tickets[index]
if oldticket.TicketId == ticket.TicketId {
client.tlist.Tickets[index] = nil
}
//2. 全表search
for i := 0; i < len(client.tlist.Tickets); i++ {
oldticket = client.tlist.Tickets[i]
if oldticket == nil {
continue
}
if oldticket.TicketId == ticket.TicketId {
client.tlist.Tickets[i] = nil
return
}
if client.ticketsMap == nil || len(ticketID) == 0 {
return
}
if _, ok := client.ticketsMap[ticketID]; ok {
delete(client.ticketsMap, ticketID)
}
}
// Miner ticket miner function
func (client *Client) Miner(parent, block *types.Block) bool {
//add miner address
ticket, priv, diff, modify, index, err := client.searchTargetTicket(parent, block)
ticket, priv, diff, modify, ticketID, err := client.searchTargetTicket(parent, block)
if err != nil {
tlog.Error("Miner", "err", err)
newblock, err := client.RequestLastBlock()
......@@ -543,7 +545,7 @@ func (client *Client) Miner(parent, block *types.Block) bool {
if err != nil {
return false
}
client.delTicket(ticket, index)
client.delTicket(ticketID)
return true
}
......
......@@ -7,6 +7,8 @@ package ticket
import (
"testing"
"github.com/33cn/plugin/plugin/dapp/ticket/types"
_ "github.com/33cn/chain33/system"
"github.com/33cn/chain33/util/testnode"
_ "github.com/33cn/plugin/plugin/dapp/init"
......@@ -23,3 +25,20 @@ func TestTicket(t *testing.T) {
err := mock33.WaitHeight(100)
assert.Nil(t, err)
}
func TestTicketMap(t *testing.T) {
c := Client{}
ticketList := &types.ReplyTicketList{}
ticketList.Tickets = []*types.Ticket{
{TicketId: "1111"},
{TicketId: "2222"},
{TicketId: "3333"},
{TicketId: "4444"},
}
assert.Equal(t, c.getTicketCount(), int64(0))
c.setTicket(ticketList, nil)
assert.Equal(t, c.getTicketCount(), int64(4))
c.delTicket("3333")
assert.Equal(t, c.getTicketCount(), int64(3))
}
......@@ -9,9 +9,10 @@ import (
pty "github.com/33cn/plugin/plugin/dapp/norm/types"
)
// Exec_Nput Action
func (n *Norm) Exec_Nput(nput *pty.NormPut, tx *types.Transaction, index int) (*types.Receipt, error) {
receipt := &types.Receipt{types.ExecOk, nil, nil}
normKV := &types.KeyValue{Key(nput.Key), nput.Value}
receipt := &types.Receipt{Ty: types.ExecOk, KV: nil, Logs: nil}
normKV := &types.KeyValue{Key: Key(nput.Key), Value: nput.Value}
receipt.KV = append(receipt.KV, normKV)
return receipt, nil
}
......@@ -9,6 +9,7 @@ import (
pty "github.com/33cn/plugin/plugin/dapp/norm/types"
)
// ExecDelLocal_Nput Action
func (n *Norm) ExecDelLocal_Nput(nput *pty.NormPut, tx *types.Transaction, receipt *types.ReceiptData, index int) (*types.LocalDBSet, error) {
return nil, nil
}
......@@ -9,6 +9,7 @@ import (
pty "github.com/33cn/plugin/plugin/dapp/norm/types"
)
// ExecLocal_Nput Action
func (n *Norm) ExecLocal_Nput(nput *pty.NormPut, tx *types.Transaction, receipt *types.ReceiptData, index int) (*types.LocalDBSet, error) {
return nil, nil
}
......@@ -18,15 +18,18 @@ func init() {
ety.InitFuncList(types.ListMethod(&Norm{}))
}
// Init norm
func Init(name string, sub []byte) {
clog.Debug("register norm execer")
drivers.Register(GetName(), newNorm, types.GetDappFork(driverName, "Enable"))
}
// GetName for norm
func GetName() string {
return newNorm().GetName()
}
// Norm driver
type Norm struct {
drivers.DriverBase
}
......@@ -39,14 +42,17 @@ func newNorm() drivers.Driver {
return n
}
// GetDriverName for norm
func (n *Norm) GetDriverName() string {
return driverName
}
// CheckTx for norm
func (n *Norm) CheckTx(tx *types.Transaction, index int) error {
return nil
}
// Key for norm
func Key(str string) (key []byte) {
key = append(key, []byte("mavl-norm-")...)
key = append(key, str...)
......
......@@ -9,10 +9,11 @@ import (
pty "github.com/33cn/plugin/plugin/dapp/norm/types"
)
// Query_NormGet get value
func (n *Norm) Query_NormGet(in *pty.NormGetKey) (types.Message, error) {
value, err := n.GetStateDB().Get(Key(in.Key))
if err != nil {
return nil, types.ErrNotFound
}
return &types.ReplyString{string(value)}, nil
return &types.ReplyString{Data: string(value)}, nil
}
......@@ -8,6 +8,7 @@ import (
"github.com/33cn/chain33/types"
)
// NormX name
var NormX = "norm"
func init() {
......@@ -16,26 +17,31 @@ func init() {
types.RegisterDappFork(NormX, "Enable", 0)
}
// NormType def
type NormType struct {
types.ExecTypeBase
}
// NewType method
func NewType() *NormType {
c := &NormType{}
c.SetChild(c)
return c
}
// GetPayload method
func (norm *NormType) GetPayload() types.Message {
return &NormAction{}
}
// GetTypeMap method
func (norm *NormType) GetTypeMap() map[string]int32 {
return map[string]int32{
"Nput": NormActionPut,
}
}
func (at *NormType) GetLogMap() map[int64]*types.LogInfo {
// GetLogMap method
func (norm *NormType) GetLogMap() map[int64]*types.LogInfo {
return map[int64]*types.LogInfo{}
}
......@@ -16,10 +16,12 @@ import (
var klog = log.New("module", "kvdb")
// SetLogLevel set log level
func SetLogLevel(level string) {
clog.SetLogLevel(level)
}
// DisableLog disable log output
func DisableLog() {
klog.SetHandler(log.DiscardHandler())
}
......@@ -28,11 +30,13 @@ func init() {
drivers.Reg("kvdb", New)
}
// KVStore implementation
type KVStore struct {
*drivers.BaseStore
cache map[string]map[string]*types.KeyValue
}
// New KVStore module
func New(cfg *types.Store, sub []byte) queue.Module {
bs := drivers.NewBaseStore(cfg)
kvs := &KVStore{bs, make(map[string]map[string]*types.KeyValue)}
......@@ -40,11 +44,13 @@ func New(cfg *types.Store, sub []byte) queue.Module {
return kvs
}
// Close KVStore module
func (kvs *KVStore) Close() {
kvs.BaseStore.Close()
klog.Info("store kvdb closed")
}
// Set kvs with statehash to KVStore
func (kvs *KVStore) Set(datas *types.StoreSet, sync bool) ([]byte, error) {
hash := calcHash(datas)
kvmap := make(map[string]*types.KeyValue)
......@@ -55,6 +61,7 @@ func (kvs *KVStore) Set(datas *types.StoreSet, sync bool) ([]byte, error) {
return hash, nil
}
// Get kvs with statehash from KVStore
func (kvs *KVStore) Get(datas *types.StoreGet) [][]byte {
values := make([][]byte, len(datas.Keys))
if kvmap, ok := kvs.cache[string(datas.StateHash)]; ok {
......@@ -76,6 +83,7 @@ func (kvs *KVStore) Get(datas *types.StoreGet) [][]byte {
return values
}
// MemSet set kvs to the mem of KVStore
func (kvs *KVStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
if len(datas.KV) == 0 {
klog.Info("store kv memset,use preStateHash as stateHash for kvset is null")
......@@ -96,6 +104,7 @@ func (kvs *KVStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
return hash, nil
}
// Commit kvs in the mem of KVStore
func (kvs *KVStore) Commit(req *types.ReqHash) ([]byte, error) {
kvmap, ok := kvs.cache[string(req.Hash)]
if !ok {
......@@ -112,6 +121,7 @@ func (kvs *KVStore) Commit(req *types.ReqHash) ([]byte, error) {
return req.Hash, nil
}
// Rollback kvs in the mem of KVStore
func (kvs *KVStore) Rollback(req *types.ReqHash) ([]byte, error) {
_, ok := kvs.cache[string(req.Hash)]
if !ok {
......@@ -122,16 +132,19 @@ func (kvs *KVStore) Rollback(req *types.ReqHash) ([]byte, error) {
return req.Hash, nil
}
// IterateRangeByStateHash method
func (kvs *KVStore) IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool) {
panic("empty")
//TODO:
//kvs.IterateRangeByStateHash(mavls.GetDB(), statehash, start, end, ascending, fn)
}
// ProcEvent handles supported events
func (kvs *KVStore) ProcEvent(msg queue.Message) {
msg.ReplyErr("KVStore", types.ErrActionNotSupport)
}
// Del set kvs to nil with StateHash
func (kvs *KVStore) Del(req *types.StoreDel) ([]byte, error) {
//not support
return nil, nil
......
......@@ -23,8 +23,8 @@ func TestKvdbNewClose(t *testing.T) {
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var store_cfg = newStoreCfg(dir)
store := New(store_cfg, nil)
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil)
assert.NotNil(t, store)
store.Close()
......@@ -35,12 +35,12 @@ func TestKvddbSetGet(t *testing.T) {
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var store_cfg = newStoreCfg(dir)
store := New(store_cfg, nil).(*KVStore)
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVStore)
assert.NotNil(t, store)
keys0 := [][]byte{[]byte("mk1"), []byte("mk2")}
get0 := &types.StoreGet{drivers.EmptyRoot[:], keys0}
get0 := &types.StoreGet{StateHash: drivers.EmptyRoot[:], Keys: keys0}
values0 := store.Get(get0)
klog.Info("info", "info", values0)
// Get exist key, result nil
......@@ -49,16 +49,16 @@ func TestKvddbSetGet(t *testing.T) {
assert.Equal(t, []byte(nil), values0[1])
var kv []*types.KeyValue
kv = append(kv, &types.KeyValue{[]byte("k1"), []byte("v1")})
kv = append(kv, &types.KeyValue{[]byte("k2"), []byte("v2")})
kv = append(kv, &types.KeyValue{Key: []byte("k1"), Value: []byte("v1")})
kv = append(kv, &types.KeyValue{Key: []byte("k2"), Value: []byte("v2")})
datas := &types.StoreSet{
drivers.EmptyRoot[:],
kv,
0}
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
hash, err := store.Set(datas, true)
assert.Nil(t, err)
keys := [][]byte{[]byte("k1"), []byte("k2")}
get1 := &types.StoreGet{hash, keys}
get1 := &types.StoreGet{StateHash: hash, Keys: keys}
values := store.Get(get1)
assert.Len(t, values, 2)
......@@ -66,12 +66,12 @@ func TestKvddbSetGet(t *testing.T) {
assert.Equal(t, []byte("v2"), values[1])
keys = [][]byte{[]byte("k1")}
get2 := &types.StoreGet{hash, keys}
get2 := &types.StoreGet{StateHash: hash, Keys: keys}
values2 := store.Get(get2)
assert.Len(t, values2, 1)
assert.Equal(t, []byte("v1"), values2[0])
get3 := &types.StoreGet{drivers.EmptyRoot[:], keys}
get3 := &types.StoreGet{StateHash: drivers.EmptyRoot[:], Keys: keys}
values3 := store.Get(get3)
assert.Len(t, values3, 1)
}
......@@ -81,29 +81,29 @@ func TestKvdbMemSet(t *testing.T) {
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var store_cfg = newStoreCfg(dir)
store := New(store_cfg, nil).(*KVStore)
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVStore)
assert.NotNil(t, store)
var kv []*types.KeyValue
kv = append(kv, &types.KeyValue{[]byte("mk1"), []byte("v1")})
kv = append(kv, &types.KeyValue{[]byte("mk2"), []byte("v2")})
kv = append(kv, &types.KeyValue{Key: []byte("mk1"), Value: []byte("v1")})
kv = append(kv, &types.KeyValue{Key: []byte("mk2"), Value: []byte("v2")})
datas := &types.StoreSet{
drivers.EmptyRoot[:],
kv,
0}
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
hash, err := store.MemSet(datas, true)
assert.Nil(t, err)
keys := [][]byte{[]byte("mk1"), []byte("mk2")}
get1 := &types.StoreGet{hash, keys}
get1 := &types.StoreGet{StateHash: hash, Keys: keys}
values := store.Get(get1)
assert.Len(t, values, 2)
actHash, _ := store.Commit(&types.ReqHash{hash})
actHash, _ := store.Commit(&types.ReqHash{Hash: hash})
assert.Equal(t, hash, actHash)
notExistHash, _ := store.Commit(&types.ReqHash{drivers.EmptyRoot[:]})
notExistHash, _ := store.Commit(&types.ReqHash{Hash: drivers.EmptyRoot[:]})
assert.Nil(t, notExistHash)
}
......@@ -112,27 +112,27 @@ func TestKvdbRollback(t *testing.T) {
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var store_cfg = newStoreCfg(dir)
store := New(store_cfg, nil).(*KVStore)
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVStore)
assert.NotNil(t, store)
var kv []*types.KeyValue
kv = append(kv, &types.KeyValue{[]byte("mk1"), []byte("v1")})
kv = append(kv, &types.KeyValue{[]byte("mk2"), []byte("v2")})
kv = append(kv, &types.KeyValue{Key: []byte("mk1"), Value: []byte("v1")})
kv = append(kv, &types.KeyValue{Key: []byte("mk2"), Value: []byte("v2")})
datas := &types.StoreSet{
drivers.EmptyRoot[:],
kv,
0}
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
hash, err := store.MemSet(datas, true)
assert.Nil(t, err)
keys := [][]byte{[]byte("mk1"), []byte("mk2")}
get1 := &types.StoreGet{hash, keys}
get1 := &types.StoreGet{StateHash: hash, Keys: keys}
values := store.Get(get1)
assert.Len(t, values, 2)
actHash, _ := store.Rollback(&types.ReqHash{hash})
actHash, _ := store.Rollback(&types.ReqHash{Hash: hash})
assert.Equal(t, hash, actHash)
notExistHash, _ := store.Rollback(&types.ReqHash{drivers.EmptyRoot[:]})
notExistHash, _ := store.Rollback(&types.ReqHash{Hash: drivers.EmptyRoot[:]})
assert.Nil(t, notExistHash)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment