Commit 08854e25 authored by caoping's avatar caoping Committed by vipwzw

fix linter warning in raft/pbft/norm/kvdb

parent 59f852a0
...@@ -16,37 +16,44 @@ import ( ...@@ -16,37 +16,44 @@ import (
func init() { func init() {
drivers.Reg("pbft", NewPbft) drivers.Reg("pbft", NewPbft)
drivers.QueryData.Register("pbft", &PbftClient{}) drivers.QueryData.Register("pbft", &Client{})
} }
type PbftClient struct { // Client Pbft implementation
type Client struct {
*drivers.BaseClient *drivers.BaseClient
replyChan chan *types.ClientReply replyChan chan *types.ClientReply
requestChan chan *types.Request requestChan chan *types.Request
isPrimary bool isPrimary bool
} }
func NewBlockstore(cfg *types.Consensus, replyChan chan *types.ClientReply, requestChan chan *types.Request, isPrimary bool) *PbftClient { // NewBlockstore create Pbft Client
func NewBlockstore(cfg *types.Consensus, replyChan chan *types.ClientReply, requestChan chan *types.Request, isPrimary bool) *Client {
c := drivers.NewBaseClient(cfg) c := drivers.NewBaseClient(cfg)
client := &PbftClient{BaseClient: c, replyChan: replyChan, requestChan: requestChan, isPrimary: isPrimary} client := &Client{BaseClient: c, replyChan: replyChan, requestChan: requestChan, isPrimary: isPrimary}
c.SetChild(client) c.SetChild(client)
return client return client
} }
func (client *PbftClient) ProcEvent(msg queue.Message) bool {
// ProcEvent method
func (client *Client) ProcEvent(msg queue.Message) bool {
return false return false
} }
func (client *PbftClient) Propose(block *types.Block) { // Propose method
op := &types.Operation{block} func (client *Client) Propose(block *types.Block) {
op := &types.Operation{Value: block}
req := ToRequestClient(op, types.Now().String(), clientAddr) req := ToRequestClient(op, types.Now().String(), clientAddr)
client.requestChan <- req client.requestChan <- req
} }
func (client *PbftClient) CheckBlock(parent *types.Block, current *types.BlockDetail) error { // CheckBlock method
func (client *Client) CheckBlock(parent *types.Block, current *types.BlockDetail) error {
return nil return nil
} }
func (client *PbftClient) SetQueueClient(c queue.Client) { // SetQueueClient method
func (client *Client) SetQueueClient(c queue.Client) {
plog.Info("Enter SetQueue method of pbft consensus") plog.Info("Enter SetQueue method of pbft consensus")
client.InitClient(c, func() { client.InitClient(c, func() {
...@@ -57,7 +64,8 @@ func (client *PbftClient) SetQueueClient(c queue.Client) { ...@@ -57,7 +64,8 @@ func (client *PbftClient) SetQueueClient(c queue.Client) {
go client.CreateBlock() go client.CreateBlock()
} }
func (client *PbftClient) CreateBlock() { // CreateBlock method
func (client *Client) CreateBlock() {
issleep := true issleep := true
if !client.isPrimary { if !client.isPrimary {
return return
...@@ -95,11 +103,13 @@ func (client *PbftClient) CreateBlock() { ...@@ -95,11 +103,13 @@ func (client *PbftClient) CreateBlock() {
} }
} }
func (client *PbftClient) GetGenesisBlockTime() int64 { // GetGenesisBlockTime get genesis blocktime
func (client *Client) GetGenesisBlockTime() int64 {
return genesisBlockTime return genesisBlockTime
} }
func (client *PbftClient) CreateGenesisTx() (ret []*types.Transaction) { // CreateGenesisTx get genesis tx
func (client *Client) CreateGenesisTx() (ret []*types.Transaction) {
var tx types.Transaction var tx types.Transaction
tx.Execer = []byte("coins") tx.Execer = []byte("coins")
tx.To = genesis tx.To = genesis
...@@ -112,7 +122,7 @@ func (client *PbftClient) CreateGenesisTx() (ret []*types.Transaction) { ...@@ -112,7 +122,7 @@ func (client *PbftClient) CreateGenesisTx() (ret []*types.Transaction) {
return return
} }
func (client *PbftClient) readReply() { func (client *Client) readReply() {
data := <-client.replyChan data := <-client.replyChan
if data == nil { if data == nil {
......
...@@ -89,7 +89,7 @@ powLimitBits = "0x1f2fffff" ...@@ -89,7 +89,7 @@ powLimitBits = "0x1f2fffff"
[consensus.sub.pbft] [consensus.sub.pbft]
genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt" genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
genesisBlockTime=1514533394 genesisBlockTime=1514533394
nodeId=1 nodeID=1
peersURL="127.0.0.1:8890" peersURL="127.0.0.1:8890"
clientAddr="127.0.0.1:8890" clientAddr="127.0.0.1:8890"
......
...@@ -22,11 +22,12 @@ var ( ...@@ -22,11 +22,12 @@ var (
type subConfig struct { type subConfig struct {
Genesis string `json:"genesis"` Genesis string `json:"genesis"`
GenesisBlockTime int64 `json:"genesisBlockTime"` GenesisBlockTime int64 `json:"genesisBlockTime"`
NodeId int64 `json:"nodeId"` NodeID int64 `json:"nodeID"`
PeersURL string `json:"peersURL"` PeersURL string `json:"peersURL"`
ClientAddr string `json:"clientAddr"` ClientAddr string `json:"clientAddr"`
} }
// NewPbft create pbft cluster
func NewPbft(cfg *pb.Consensus, sub []byte) queue.Module { func NewPbft(cfg *pb.Consensus, sub []byte) queue.Module {
plog.Info("start to creat pbft node") plog.Info("start to creat pbft node")
var subcfg subConfig var subcfg subConfig
...@@ -40,14 +41,14 @@ func NewPbft(cfg *pb.Consensus, sub []byte) queue.Module { ...@@ -40,14 +41,14 @@ func NewPbft(cfg *pb.Consensus, sub []byte) queue.Module {
if subcfg.GenesisBlockTime > 0 { if subcfg.GenesisBlockTime > 0 {
genesisBlockTime = subcfg.GenesisBlockTime genesisBlockTime = subcfg.GenesisBlockTime
} }
if int(subcfg.NodeId) == 0 || strings.Compare(subcfg.PeersURL, "") == 0 || strings.Compare(subcfg.ClientAddr, "") == 0 { if int(subcfg.NodeID) == 0 || strings.Compare(subcfg.PeersURL, "") == 0 || strings.Compare(subcfg.ClientAddr, "") == 0 {
plog.Error("The nodeId, peersURL or clientAddr is empty!") plog.Error("The nodeId, peersURL or clientAddr is empty!")
return nil return nil
} }
clientAddr = subcfg.ClientAddr clientAddr = subcfg.ClientAddr
var c *PbftClient var c *Client
replyChan, requestChan, isPrimary := NewReplica(uint32(subcfg.NodeId), subcfg.PeersURL, subcfg.ClientAddr) replyChan, requestChan, isPrimary := NewReplica(uint32(subcfg.NodeID), subcfg.PeersURL, subcfg.ClientAddr)
c = NewBlockstore(cfg, replyChan, requestChan, isPrimary) c = NewBlockstore(cfg, replyChan, requestChan, isPrimary)
return c return c
} }
...@@ -15,8 +15,7 @@ import ( ...@@ -15,8 +15,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
) )
// Digest // EQ Digest
func EQ(d1 []byte, d2 []byte) bool { func EQ(d1 []byte, d2 []byte) bool {
if len(d1) != len(d2) { if len(d1) != len(d2) {
return false return false
...@@ -29,90 +28,91 @@ func EQ(d1 []byte, d2 []byte) bool { ...@@ -29,90 +28,91 @@ func EQ(d1 []byte, d2 []byte) bool {
return true return true
} }
// Checkpoint // ToCheckpoint method
func ToCheckpoint(sequence uint32, digest []byte) *types.Checkpoint { func ToCheckpoint(sequence uint32, digest []byte) *types.Checkpoint {
return &types.Checkpoint{sequence, digest} return &types.Checkpoint{Sequence: sequence, Digest: digest}
} }
// Entry // ToEntry method
func ToEntry(sequence uint32, digest []byte, view uint32) *types.Entry { func ToEntry(sequence uint32, digest []byte, view uint32) *types.Entry {
return &types.Entry{sequence, digest, view} return &types.Entry{Sequence: sequence, Digest: digest, View: view}
} }
// ViewChange // ToViewChange method
func ToViewChange(viewchanger uint32, digest []byte) *types.ViewChange { func ToViewChange(viewchanger uint32, digest []byte) *types.ViewChange {
return &types.ViewChange{viewchanger, digest} return &types.ViewChange{Viewchanger: viewchanger, Digest: digest}
} }
// Summary // ToSummary method
func ToSummary(sequence uint32, digest []byte) *types.Summary { func ToSummary(sequence uint32, digest []byte) *types.Summary {
return &types.Summary{sequence, digest} return &types.Summary{Sequence: sequence, Digest: digest}
} }
// Request // ToRequestClient method
func ToRequestClient(op *types.Operation, timestamp, client string) *types.Request { func ToRequestClient(op *types.Operation, timestamp, client string) *types.Request {
return &types.Request{ return &types.Request{
Value: &types.Request_Client{ Value: &types.Request_Client{
&types.RequestClient{op, timestamp, client}}, Client: &types.RequestClient{Op: op, Timestamp: timestamp, Client: client}},
} }
} }
// ToRequestPreprepare method
func ToRequestPreprepare(view, sequence uint32, digest []byte, replica uint32) *types.Request { func ToRequestPreprepare(view, sequence uint32, digest []byte, replica uint32) *types.Request {
return &types.Request{ return &types.Request{
Value: &types.Request_Preprepare{ Value: &types.Request_Preprepare{
&types.RequestPrePrepare{view, sequence, digest, replica}}, Preprepare: &types.RequestPrePrepare{View: view, Sequence: sequence, Digest: digest, Replica: replica}},
} }
} }
// ToRequestPrepare method
func ToRequestPrepare(view, sequence uint32, digest []byte, replica uint32) *types.Request { func ToRequestPrepare(view, sequence uint32, digest []byte, replica uint32) *types.Request {
return &types.Request{ return &types.Request{
Value: &types.Request_Prepare{ Value: &types.Request_Prepare{
&types.RequestPrepare{view, sequence, digest, replica}}, Prepare: &types.RequestPrepare{View: view, Sequence: sequence, Digest: digest, Replica: replica}},
} }
} }
// ToRequestCommit method
func ToRequestCommit(view, sequence, replica uint32) *types.Request { func ToRequestCommit(view, sequence, replica uint32) *types.Request {
return &types.Request{ return &types.Request{
Value: &types.Request_Commit{ Value: &types.Request_Commit{
&types.RequestCommit{view, sequence, replica}}, Commit: &types.RequestCommit{View: view, Sequence: sequence, Replica: replica}},
} }
} }
// ToRequestCheckpoint method
func ToRequestCheckpoint(sequence uint32, digest []byte, replica uint32) *types.Request { func ToRequestCheckpoint(sequence uint32, digest []byte, replica uint32) *types.Request {
return &types.Request{ return &types.Request{
Value: &types.Request_Checkpoint{ Value: &types.Request_Checkpoint{
&types.RequestCheckpoint{sequence, digest, replica}}, Checkpoint: &types.RequestCheckpoint{Sequence: sequence, Digest: digest, Replica: replica}},
} }
} }
// ToRequestViewChange method
func ToRequestViewChange(view, sequence uint32, checkpoints []*types.Checkpoint, preps, prePreps []*types.Entry, replica uint32) *types.Request { func ToRequestViewChange(view, sequence uint32, checkpoints []*types.Checkpoint, preps, prePreps []*types.Entry, replica uint32) *types.Request {
return &types.Request{ return &types.Request{
Value: &types.Request_Viewchange{ Value: &types.Request_Viewchange{
&types.RequestViewChange{view, sequence, checkpoints, preps, prePreps, replica}}, Viewchange: &types.RequestViewChange{View: view, Sequence: sequence, Checkpoints: checkpoints, Preps: preps, Prepreps: prePreps, Replica: replica}},
} }
} }
// ToRequestAck method
func ToRequestAck(view, replica, viewchanger uint32, digest []byte) *types.Request { func ToRequestAck(view, replica, viewchanger uint32, digest []byte) *types.Request {
return &types.Request{ return &types.Request{
Value: &types.Request_Ack{ Value: &types.Request_Ack{
&types.RequestAck{view, replica, viewchanger, digest}}, Ack: &types.RequestAck{View: view, Replica: replica, Viewchanger: viewchanger, Digest: digest}},
} }
} }
// ToRequestNewView method
func ToRequestNewView(view uint32, viewChanges []*types.ViewChange, summaries []*types.Summary, replica uint32) *types.Request { func ToRequestNewView(view uint32, viewChanges []*types.ViewChange, summaries []*types.Summary, replica uint32) *types.Request {
return &types.Request{ return &types.Request{
Value: &types.Request_Newview{ Value: &types.Request_Newview{
&types.RequestNewView{view, viewChanges, summaries, replica}}, Newview: &types.RequestNewView{View: view, Viewchanges: viewChanges, Summaries: summaries, Replica: replica}},
} }
} }
// Request Methods // ReqDigest method
func ReqDigest(req *types.Request) []byte { func ReqDigest(req *types.Request) []byte {
if req == nil { if req == nil {
return nil return nil
...@@ -130,14 +130,12 @@ func ReqDigest(req *types.Request) []byte { ...@@ -130,14 +130,12 @@ func ReqDigest(req *types.Request) []byte {
return lwm return lwm
}*/ }*/
// Reply // ToReply method
func ToReply(view uint32, timestamp, client string, replica uint32, result *types.Result) *types.ClientReply { func ToReply(view uint32, timestamp, client string, replica uint32, result *types.Result) *types.ClientReply {
return &types.ClientReply{view, timestamp, client, replica, result} return &types.ClientReply{View: view, Timestamp: timestamp, Client: client, Replica: replica, Result: result}
} }
// Reply Methods // RepDigest method
func RepDigest(reply fmt.Stringer) []byte { func RepDigest(reply fmt.Stringer) []byte {
if reply == nil { if reply == nil {
return nil return nil
...@@ -146,8 +144,7 @@ func RepDigest(reply fmt.Stringer) []byte { ...@@ -146,8 +144,7 @@ func RepDigest(reply fmt.Stringer) []byte {
return bytes[:] return bytes[:]
} }
// Write proto message // WriteMessage write proto message
func WriteMessage(addr string, msg proto.Message) error { func WriteMessage(addr string, msg proto.Message) error {
conn, err := net.Dial("tcp", addr) conn, err := net.Dial("tcp", addr)
defer conn.Close() defer conn.Close()
...@@ -163,8 +160,7 @@ func WriteMessage(addr string, msg proto.Message) error { ...@@ -163,8 +160,7 @@ func WriteMessage(addr string, msg proto.Message) error {
return err return err
} }
// Read proto message // ReadMessage read proto message
func ReadMessage(conn io.Reader, msg proto.Message) error { func ReadMessage(conn io.Reader, msg proto.Message) error {
var buf bytes.Buffer var buf bytes.Buffer
n, err := io.Copy(&buf, conn) n, err := io.Copy(&buf, conn)
......
...@@ -13,11 +13,13 @@ import ( ...@@ -13,11 +13,13 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
) )
// constant
const ( const (
CHECKPOINT_PERIOD uint32 = 128 CheckPointPeriod uint32 = 128
CONSTANT_FACTOR uint32 = 2 ConstantFactor uint32 = 2
) )
// Replica struct
type Replica struct { type Replica struct {
ID uint32 ID uint32
replicas map[uint32]string replicas map[uint32]string
...@@ -36,6 +38,7 @@ type Replica struct { ...@@ -36,6 +38,7 @@ type Replica struct {
checkpoints []*pb.Checkpoint checkpoints []*pb.Checkpoint
} }
// NewReplica create Replica instance
func NewReplica(id uint32, PeersURL string, addr string) (chan *pb.ClientReply, chan *pb.Request, bool) { func NewReplica(id uint32, PeersURL string, addr string) (chan *pb.ClientReply, chan *pb.Request, bool) {
replyChan := make(chan *pb.ClientReply) replyChan := make(chan *pb.ClientReply)
requestChan := make(chan *pb.Request) requestChan := make(chan *pb.Request)
...@@ -65,6 +68,7 @@ func NewReplica(id uint32, PeersURL string, addr string) (chan *pb.ClientReply, ...@@ -65,6 +68,7 @@ func NewReplica(id uint32, PeersURL string, addr string) (chan *pb.ClientReply,
} }
// Startnode method
func (rep *Replica) Startnode(addr string) { func (rep *Replica) Startnode(addr string) {
rep.acceptConnections(addr) rep.acceptConnections(addr)
} }
...@@ -104,7 +108,7 @@ func (rep *Replica) lowWaterMark() uint32 { ...@@ -104,7 +108,7 @@ func (rep *Replica) lowWaterMark() uint32 {
} }
func (rep *Replica) highWaterMark() uint32 { func (rep *Replica) highWaterMark() uint32 {
return rep.lowWaterMark() + CHECKPOINT_PERIOD*CONSTANT_FACTOR return rep.lowWaterMark() + CheckPointPeriod*ConstantFactor
} }
func (rep *Replica) sequenceInRange(sequence uint32) bool { func (rep *Replica) sequenceInRange(sequence uint32) bool {
...@@ -133,9 +137,8 @@ func (rep *Replica) theLastReply() *pb.ClientReply { ...@@ -133,9 +137,8 @@ func (rep *Replica) theLastReply() *pb.ClientReply {
func (rep *Replica) lastReplyToClient(client string) *pb.ClientReply { func (rep *Replica) lastReplyToClient(client string) *pb.ClientReply {
if v, ok := rep.replies[client]; ok { if v, ok := rep.replies[client]; ok {
return v[len(rep.replies[client])-1] return v[len(rep.replies[client])-1]
} else {
return nil
} }
return nil
} }
func (rep *Replica) stateDigest() []byte { func (rep *Replica) stateDigest() []byte {
...@@ -143,7 +146,7 @@ func (rep *Replica) stateDigest() []byte { ...@@ -143,7 +146,7 @@ func (rep *Replica) stateDigest() []byte {
} }
func (rep *Replica) isCheckpoint(sequence uint32) bool { func (rep *Replica) isCheckpoint(sequence uint32) bool {
return sequence%CHECKPOINT_PERIOD == 0 return sequence%CheckPointPeriod == 0
} }
func (rep *Replica) addCheckpoint(checkpoint *pb.Checkpoint) { func (rep *Replica) addCheckpoint(checkpoint *pb.Checkpoint) {
...@@ -735,7 +738,7 @@ func (rep *Replica) handleRequestCommit(REQ *pb.Request) { ...@@ -735,7 +738,7 @@ func (rep *Replica) handleRequestCommit(REQ *pb.Request) {
op := req.GetClient().Op op := req.GetClient().Op
timestamp := req.GetClient().Timestamp timestamp := req.GetClient().Timestamp
client := req.GetClient().Client client := req.GetClient().Client
result := &pb.Result{op.Value} result := &pb.Result{Value: op.Value}
rep.executed = append(rep.executed, sequence) rep.executed = append(rep.executed, sequence)
reply := ToReply(view, timestamp, client, rep.ID, result) reply := ToReply(view, timestamp, client, rep.ID, result)
...@@ -1036,7 +1039,7 @@ func (rep *Replica) correctSummaries(requests []*pb.Request, summaries []*pb.Sum ...@@ -1036,7 +1039,7 @@ func (rep *Replica) correctSummaries(requests []*pb.Request, summaries []*pb.Sum
return return
} }
end := start + CHECKPOINT_PERIOD*CONSTANT_FACTOR end := start + CheckPointPeriod*ConstantFactor
for seq := start; seq <= end; seq++ { for seq := start; seq <= end; seq++ {
...@@ -1516,7 +1519,7 @@ FOR_LOOP_1: ...@@ -1516,7 +1519,7 @@ FOR_LOOP_1:
summaries = append(summaries, summary) summaries = append(summaries, summary)
start = summary.Sequence start = summary.Sequence
end = start + CHECKPOINT_PERIOD*CONSTANT_FACTOR end = start + CheckPointPeriod*ConstantFactor
// select summaries // select summaries
// TODO: optimize // TODO: optimize
......
...@@ -96,7 +96,7 @@ func sendReplyList(q queue.Queue) { ...@@ -96,7 +96,7 @@ func sendReplyList(q queue.Queue) {
count++ count++
createReplyList("test" + strconv.Itoa(count)) createReplyList("test" + strconv.Itoa(count))
msg.Reply(client.NewMessage("consensus", types.EventReplyTxList, msg.Reply(client.NewMessage("consensus", types.EventReplyTxList,
&types.ReplyTxList{transactions})) &types.ReplyTxList{Txs: transactions}))
if count == 5 { if count == 5 {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
break break
...@@ -125,7 +125,7 @@ func createReplyList(account string) { ...@@ -125,7 +125,7 @@ func createReplyList(account string) {
var result []*types.Transaction var result []*types.Transaction
for j := 0; j < txSize; j++ { for j := 0; j < txSize; j++ {
//tx := &types.Transaction{} //tx := &types.Transaction{}
val := &cty.CoinsAction_Transfer{&types.AssetsTransfer{Amount: 10}} val := &cty.CoinsAction_Transfer{Transfer: &types.AssetsTransfer{Amount: 10}}
action := &cty.CoinsAction{Value: val, Ty: cty.CoinsActionTransfer} action := &cty.CoinsAction{Value: val, Ty: cty.CoinsActionTransfer}
tx := &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(action), Fee: 0} tx := &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(action), Fee: 0}
tx.To = "14qViLJfdGaP4EeHnDyJbEGQysnCpwn1gZ" tx.To = "14qViLJfdGaP4EeHnDyJbEGQysnCpwn1gZ"
......
...@@ -24,10 +24,11 @@ var ( ...@@ -24,10 +24,11 @@ var (
func init() { func init() {
drivers.Reg("raft", NewRaftCluster) drivers.Reg("raft", NewRaftCluster)
drivers.QueryData.Register("raft", &RaftClient{}) drivers.QueryData.Register("raft", &Client{})
} }
type RaftClient struct { // Client Raft implementation
type Client struct {
*drivers.BaseClient *drivers.BaseClient
proposeC chan<- *types.Block proposeC chan<- *types.Block
commitC <-chan *types.Block commitC <-chan *types.Block
...@@ -38,18 +39,21 @@ type RaftClient struct { ...@@ -38,18 +39,21 @@ type RaftClient struct {
once sync.Once once sync.Once
} }
func NewBlockstore(cfg *types.Consensus, snapshotter *snap.Snapshotter, proposeC chan<- *types.Block, commitC <-chan *types.Block, errorC <-chan error, validatorC <-chan bool, stopC chan<- struct{}) *RaftClient { // NewBlockstore create Raft Client
func NewBlockstore(cfg *types.Consensus, snapshotter *snap.Snapshotter, proposeC chan<- *types.Block, commitC <-chan *types.Block, errorC <-chan error, validatorC <-chan bool, stopC chan<- struct{}) *Client {
c := drivers.NewBaseClient(cfg) c := drivers.NewBaseClient(cfg)
client := &RaftClient{BaseClient: c, proposeC: proposeC, snapshotter: snapshotter, validatorC: validatorC, commitC: commitC, errorC: errorC, stopC: stopC} client := &Client{BaseClient: c, proposeC: proposeC, snapshotter: snapshotter, validatorC: validatorC, commitC: commitC, errorC: errorC, stopC: stopC}
c.SetChild(client) c.SetChild(client)
return client return client
} }
func (client *RaftClient) GetGenesisBlockTime() int64 { // GetGenesisBlockTime get genesis blocktime
func (client *Client) GetGenesisBlockTime() int64 {
return genesisBlockTime return genesisBlockTime
} }
func (client *RaftClient) CreateGenesisTx() (ret []*types.Transaction) { // CreateGenesisTx get genesis tx
func (client *Client) CreateGenesisTx() (ret []*types.Transaction) {
var tx types.Transaction var tx types.Transaction
tx.Execer = []byte(cty.CoinsX) tx.Execer = []byte(cty.CoinsX)
tx.To = genesis tx.To = genesis
...@@ -62,20 +66,22 @@ func (client *RaftClient) CreateGenesisTx() (ret []*types.Transaction) { ...@@ -62,20 +66,22 @@ func (client *RaftClient) CreateGenesisTx() (ret []*types.Transaction) {
return return
} }
func (client *RaftClient) ProcEvent(msg queue.Message) bool { // ProcEvent method
func (client *Client) ProcEvent(msg queue.Message) bool {
return false return false
} }
func (client *RaftClient) CheckBlock(parent *types.Block, current *types.BlockDetail) error { // CheckBlock method
func (client *Client) CheckBlock(parent *types.Block, current *types.BlockDetail) error {
return nil return nil
} }
func (client *RaftClient) getSnapshot() ([]byte, error) { func (client *Client) getSnapshot() ([]byte, error) {
//这里可能导致死锁 //这里可能导致死锁
return proto.Marshal(client.GetCurrentBlock()) return proto.Marshal(client.GetCurrentBlock())
} }
func (client *RaftClient) recoverFromSnapshot(snapshot []byte) error { func (client *Client) recoverFromSnapshot(snapshot []byte) error {
var block types.Block var block types.Block
if err := proto.Unmarshal(snapshot, &block); err != nil { if err := proto.Unmarshal(snapshot, &block); err != nil {
return err return err
...@@ -84,7 +90,8 @@ func (client *RaftClient) recoverFromSnapshot(snapshot []byte) error { ...@@ -84,7 +90,8 @@ func (client *RaftClient) recoverFromSnapshot(snapshot []byte) error {
return nil return nil
} }
func (client *RaftClient) SetQueueClient(c queue.Client) { // SetQueueClient method
func (client *Client) SetQueueClient(c queue.Client) {
rlog.Info("Enter SetQueue method of raft consensus") rlog.Info("Enter SetQueue method of raft consensus")
client.InitClient(c, func() { client.InitClient(c, func() {
}) })
...@@ -93,12 +100,14 @@ func (client *RaftClient) SetQueueClient(c queue.Client) { ...@@ -93,12 +100,14 @@ func (client *RaftClient) SetQueueClient(c queue.Client) {
go client.pollingTask(c) go client.pollingTask(c)
} }
func (client *RaftClient) Close() { // Close method
func (client *Client) Close() {
client.stopC <- struct{}{} client.stopC <- struct{}{}
rlog.Info("consensus raft closed") rlog.Info("consensus raft closed")
} }
func (client *RaftClient) CreateBlock() { // CreateBlock method
func (client *Client) CreateBlock() {
issleep := true issleep := true
retry := 0 retry := 0
infoflag := 0 infoflag := 0
...@@ -187,12 +196,12 @@ func (client *RaftClient) CreateBlock() { ...@@ -187,12 +196,12 @@ func (client *RaftClient) CreateBlock() {
} }
// 向raft底层发送block // 向raft底层发送block
func (client *RaftClient) propose(block *types.Block) { func (client *Client) propose(block *types.Block) {
client.proposeC <- block client.proposeC <- block
} }
// 从receive channel中读leader发来的block // 从receive channel中读leader发来的block
func (client *RaftClient) readCommits(commitC <-chan *types.Block, errorC <-chan error) { func (client *Client) readCommits(commitC <-chan *types.Block, errorC <-chan error) {
var data *types.Block var data *types.Block
var ok bool var ok bool
for { for {
...@@ -216,7 +225,7 @@ func (client *RaftClient) readCommits(commitC <-chan *types.Block, errorC <-chan ...@@ -216,7 +225,7 @@ func (client *RaftClient) readCommits(commitC <-chan *types.Block, errorC <-chan
} }
//轮询任务,去检测本机器是否为validator节点,如果是,则执行打包任务 //轮询任务,去检测本机器是否为validator节点,如果是,则执行打包任务
func (client *RaftClient) pollingTask(c queue.Client) { func (client *Client) pollingTask(c queue.Client) {
ticker := time.NewTicker(100 * time.Millisecond) ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
for { for {
......
...@@ -95,9 +95,9 @@ genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt" ...@@ -95,9 +95,9 @@ genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
genesisBlockTime=1514533394 genesisBlockTime=1514533394
# =============== raft共识配置参数 =========================== # =============== raft共识配置参数 ===========================
# 共识节点ID,raft共识用到,不同的节点设置不同的nodeId(目前只支持1,2,3这种设置) # 共识节点ID,raft共识用到,不同的节点设置不同的nodeId(目前只支持1,2,3这种设置)
nodeId=1 nodeID=1
# raft共识用到,通过这个端口进行节点的增加和删除 # raft共识用到,通过这个端口进行节点的增加和删除
raftApiPort=9121 raftAPIPort=9121
# raft共识用到,指示这个节点是否新增加节点 # raft共识用到,指示这个节点是否新增加节点
isNewJoinNode=false isNewJoinNode=false
# raft共识用到,指示raft集群中的服务器IP和端口 # raft共识用到,指示raft集群中的服务器IP和端口
......
...@@ -20,17 +20,17 @@ var ( ...@@ -20,17 +20,17 @@ var (
defaultSnapCount uint64 = 1000 defaultSnapCount uint64 = 1000
snapshotCatchUpEntriesN uint64 = 1000 snapshotCatchUpEntriesN uint64 = 1000
writeBlockSeconds int64 = 1 writeBlockSeconds int64 = 1
heartbeatTick int = 1 heartbeatTick = 1
isLeader bool = false isLeader = false
confChangeC chan raftpb.ConfChange confChangeC chan raftpb.ConfChange
) )
type subConfig struct { type subConfig struct {
Genesis string `json:"genesis"` Genesis string `json:"genesis"`
GenesisBlockTime int64 `json:"genesisBlockTime"` GenesisBlockTime int64 `json:"genesisBlockTime"`
NodeId int64 `json:"nodeId"` NodeID int64 `json:"nodeID"`
PeersURL string `json:"peersURL"` PeersURL string `json:"peersURL"`
RaftApiPort int64 `json:"raftApiPort"` RaftAPIPort int64 `json:"raftAPIPort"`
IsNewJoinNode bool `json:"isNewJoinNode"` IsNewJoinNode bool `json:"isNewJoinNode"`
ReadOnlyPeersURL string `json:"readOnlyPeersURL"` ReadOnlyPeersURL string `json:"readOnlyPeersURL"`
AddPeersURL string `json:"addPeersURL"` AddPeersURL string `json:"addPeersURL"`
...@@ -39,6 +39,7 @@ type subConfig struct { ...@@ -39,6 +39,7 @@ type subConfig struct {
HeartbeatTick int32 `json:"heartbeatTick"` HeartbeatTick int32 `json:"heartbeatTick"`
} }
// NewRaftCluster create raft cluster
func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module { func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
rlog.Info("Start to create raft cluster") rlog.Info("Start to create raft cluster")
var subcfg subConfig var subcfg subConfig
...@@ -52,7 +53,7 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -52,7 +53,7 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
if subcfg.GenesisBlockTime > 0 { if subcfg.GenesisBlockTime > 0 {
genesisBlockTime = subcfg.GenesisBlockTime genesisBlockTime = subcfg.GenesisBlockTime
} }
if int(subcfg.NodeId) == 0 || strings.Compare(subcfg.PeersURL, "") == 0 { if int(subcfg.NodeID) == 0 || strings.Compare(subcfg.PeersURL, "") == 0 {
rlog.Error("Please check whether the configuration of nodeId and peersURL is empty!") rlog.Error("Please check whether the configuration of nodeId and peersURL is empty!")
//TODO 当传入的参数异常时,返回给主函数的是个nil,这时候需要做异常处理 //TODO 当传入的参数异常时,返回给主函数的是个nil,这时候需要做异常处理
return nil return nil
...@@ -74,7 +75,7 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -74,7 +75,7 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
proposeC := make(chan *types.Block) proposeC := make(chan *types.Block)
confChangeC = make(chan raftpb.ConfChange) confChangeC = make(chan raftpb.ConfChange)
var b *RaftClient var b *Client
getSnapshot := func() ([]byte, error) { return b.getSnapshot() } getSnapshot := func() ([]byte, error) { return b.getSnapshot() }
// raft集群的建立,1. 初始化两条channel: propose channel用于客户端和raft底层交互, commit channel用于获取commit消息 // raft集群的建立,1. 初始化两条channel: propose channel用于客户端和raft底层交互, commit channel用于获取commit消息
// 2. raft集群中的节点之间建立http连接 // 2. raft集群中的节点之间建立http连接
...@@ -90,9 +91,9 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -90,9 +91,9 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
if len(addPeers) == 1 && addPeers[0] == "" { if len(addPeers) == 1 && addPeers[0] == "" {
addPeers = []string{} addPeers = []string{}
} }
commitC, errorC, snapshotterReady, validatorC, stopC := NewRaftNode(int(subcfg.NodeId), subcfg.IsNewJoinNode, peers, readOnlyPeers, addPeers, getSnapshot, proposeC, confChangeC) commitC, errorC, snapshotterReady, validatorC, stopC := NewRaftNode(int(subcfg.NodeID), subcfg.IsNewJoinNode, peers, readOnlyPeers, addPeers, getSnapshot, proposeC, confChangeC)
//启动raft删除节点操作监听 //启动raft删除节点操作监听
go serveHttpRaftAPI(int(subcfg.RaftApiPort), confChangeC, errorC) go serveHTTPRaftAPI(int(subcfg.RaftAPIPort), confChangeC, errorC)
// 监听commit channel,取block // 监听commit channel,取block
b = NewBlockstore(cfg, <-snapshotterReady, proposeC, commitC, errorC, validatorC, stopC) b = NewBlockstore(cfg, <-snapshotterReady, proposeC, commitC, errorC, validatorC, stopC)
return b return b
......
...@@ -30,7 +30,7 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { ...@@ -30,7 +30,7 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
nodeId, err := strconv.ParseUint(key[1:], 0, 64) nodeID, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil { if err != nil {
rlog.Error(fmt.Sprintf("Failed to convert ID for conf change (%v)", err.Error())) rlog.Error(fmt.Sprintf("Failed to convert ID for conf change (%v)", err.Error()))
http.Error(w, "Failed on POST", http.StatusBadRequest) http.Error(w, "Failed on POST", http.StatusBadRequest)
...@@ -39,14 +39,14 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { ...@@ -39,14 +39,14 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cc := raftpb.ConfChange{ cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode, Type: raftpb.ConfChangeAddNode,
NodeID: nodeId, NodeID: nodeID,
Context: url, Context: url,
} }
h.confChangeC <- cc h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change // As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusCreated) w.WriteHeader(http.StatusCreated)
case r.Method == "DELETE": case r.Method == "DELETE":
nodeId, err := strconv.ParseUint(key[1:], 0, 64) nodeID, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil { if err != nil {
rlog.Error(fmt.Sprintf("Failed to convert ID for conf change (%v)", err.Error())) rlog.Error(fmt.Sprintf("Failed to convert ID for conf change (%v)", err.Error()))
http.Error(w, "Failed on DELETE", http.StatusBadRequest) http.Error(w, "Failed on DELETE", http.StatusBadRequest)
...@@ -54,7 +54,7 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { ...@@ -54,7 +54,7 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
cc := raftpb.ConfChange{ cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode, Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeId, NodeID: nodeID,
} }
h.confChangeC <- cc h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change // As above, optimistic that raft will apply the conf change
...@@ -66,7 +66,7 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { ...@@ -66,7 +66,7 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
func serveHttpRaftAPI(port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) { func serveHTTPRaftAPI(port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
srv := http.Server{ srv := http.Server{
Addr: "localhost:" + strconv.Itoa(port), Addr: "localhost:" + strconv.Itoa(port),
Handler: &httpRaftAPI{ Handler: &httpRaftAPI{
......
...@@ -64,6 +64,7 @@ type raftNode struct { ...@@ -64,6 +64,7 @@ type raftNode struct {
restartC chan struct{} restartC chan struct{}
} }
// NewRaftNode create raft node
func NewRaftNode(id int, join bool, peers []string, readOnlyPeers []string, addPeers []string, getSnapshot func() ([]byte, error), proposeC <-chan *types.Block, func NewRaftNode(id int, join bool, peers []string, readOnlyPeers []string, addPeers []string, getSnapshot func() ([]byte, error), proposeC <-chan *types.Block,
confChangeC <-chan raftpb.ConfChange) (<-chan *types.Block, <-chan error, <-chan *snap.Snapshotter, <-chan bool, chan<- struct{}) { confChangeC <-chan raftpb.ConfChange) (<-chan *types.Block, <-chan error, <-chan *snap.Snapshotter, <-chan bool, chan<- struct{}) {
...@@ -212,7 +213,7 @@ func (rc *raftNode) serveChannels() { ...@@ -212,7 +213,7 @@ func (rc *raftNode) serveChannels() {
defer ticker.Stop() defer ticker.Stop()
go func() { go func() {
var confChangeCount uint64 = 0 var confChangeCount uint64
// 通过propose和proposeConfchange方法往RaftNode发通知 // 通过propose和proposeConfchange方法往RaftNode发通知
for rc.proposeC != nil && rc.confChangeC != nil { for rc.proposeC != nil && rc.confChangeC != nil {
select { select {
...@@ -231,7 +232,7 @@ func (rc *raftNode) serveChannels() { ...@@ -231,7 +232,7 @@ func (rc *raftNode) serveChannels() {
if !ok { if !ok {
rc.confChangeC = nil rc.confChangeC = nil
} else { } else {
confChangeCount += 1 confChangeCount++
cc.ID = confChangeCount cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc) rc.node.ProposeConfChange(context.TODO(), cc)
} }
......
...@@ -34,8 +34,8 @@ import ( ...@@ -34,8 +34,8 @@ import (
var ( var (
random *rand.Rand random *rand.Rand
txNumber int = 10 txNumber = 10
loopCount int = 10 loopCount = 10
) )
func init() { func init() {
...@@ -132,7 +132,7 @@ func sendReplyList(q queue.Queue) { ...@@ -132,7 +132,7 @@ func sendReplyList(q queue.Queue) {
if msg.Ty == types.EventTxList { if msg.Ty == types.EventTxList {
count++ count++
msg.Reply(client.NewMessage("consensus", types.EventReplyTxList, msg.Reply(client.NewMessage("consensus", types.EventReplyTxList,
&types.ReplyTxList{getReplyList(txNumber)})) &types.ReplyTxList{Txs: getReplyList(txNumber)}))
if count >= loopCount { if count >= loopCount {
time.Sleep(4 * time.Second) time.Sleep(4 * time.Second)
break break
...@@ -149,7 +149,7 @@ func prepareTxList() *types.Transaction { ...@@ -149,7 +149,7 @@ func prepareTxList() *types.Transaction {
key = generateKey(i, 32) key = generateKey(i, 32)
value = generateValue(i, 180) value = generateValue(i, 180)
nput := &pty.NormAction_Nput{&pty.NormPut{Key: key, Value: []byte(value)}} nput := &pty.NormAction_Nput{Nput: &pty.NormPut{Key: key, Value: []byte(value)}}
action := &pty.NormAction{Value: nput, Ty: pty.NormActionPut} action := &pty.NormAction{Value: nput, Ty: pty.NormActionPut}
tx := &types.Transaction{Execer: []byte("norm"), Payload: types.Encode(action), Fee: 0} tx := &types.Transaction{Execer: []byte("norm"), Payload: types.Encode(action), Fee: 0}
tx.To = address.ExecAddress("norm") tx.To = address.ExecAddress("norm")
......
...@@ -45,7 +45,7 @@ func main() { ...@@ -45,7 +45,7 @@ func main() {
if isIndex { if isIndex {
fmt.Printf("Start dumping log entries from index %s.\n", *index) fmt.Printf("Start dumping log entries from index %s.\n", *index)
walsnap.Index, err = strconv.ParseUint(*index, 10, 64) walsnap.Index, _ = strconv.ParseUint(*index, 10, 64)
} else { } else {
if *snapfile == "" { if *snapfile == "" {
ss := raftsnap.New(snapDir(dataDir)) ss := raftsnap.New(snapDir(dataDir))
...@@ -133,6 +133,7 @@ func genIDSlice(a []uint64) []types.ID { ...@@ -133,6 +133,7 @@ func genIDSlice(a []uint64) []types.ID {
return ids return ids
} }
// Block struct
type Block struct { type Block struct {
Version int64 `protobuf:"varint,1,opt,name=version" json:"version,omitempty"` Version int64 `protobuf:"varint,1,opt,name=version" json:"version,omitempty"`
ParentHash []byte `protobuf:"bytes,2,opt,name=parentHash,proto3" json:"parentHash,omitempty"` ParentHash []byte `protobuf:"bytes,2,opt,name=parentHash,proto3" json:"parentHash,omitempty"`
...@@ -144,6 +145,9 @@ type Block struct { ...@@ -144,6 +145,9 @@ type Block struct {
//Txs []*Transaction `protobuf:"bytes,7,rep,name=txs" json:"txs,omitempty"` //Txs []*Transaction `protobuf:"bytes,7,rep,name=txs" json:"txs,omitempty"`
} }
// Reset method
func (m *Block) Reset() { *m = Block{} } func (m *Block) Reset() { *m = Block{} }
func (m *Block) String() string { return proto.CompactTextString(m) } func (m *Block) String() string { return proto.CompactTextString(m) }
// ProtoMessage method
func (*Block) ProtoMessage() {} func (*Block) ProtoMessage() {}
...@@ -98,6 +98,7 @@ func main() { ...@@ -98,6 +98,7 @@ func main() {
} }
} }
// LoadHelp show available commands
func LoadHelp() { func LoadHelp() {
fmt.Println("Available Commands:") fmt.Println("Available Commands:")
fmt.Println("[ip] transferperf [from, to, amount, txNum, duration] : 转账性能测试") fmt.Println("[ip] transferperf [from, to, amount, txNum, duration] : 转账性能测试")
...@@ -108,6 +109,7 @@ func LoadHelp() { ...@@ -108,6 +109,7 @@ func LoadHelp() {
fmt.Println("[ip] normreadperf [num, interval, duration] : 常规读数据性能测试") fmt.Println("[ip] normreadperf [num, interval, duration] : 常规读数据性能测试")
} }
// TransferPerf run transfer performance
func TransferPerf(from string, to string, amount string, txNum string, duration string) { func TransferPerf(from string, to string, amount string, txNum string, duration string) {
txNumInt, err := strconv.Atoi(txNum) txNumInt, err := strconv.Atoi(txNum)
if err != nil { if err != nil {
...@@ -139,6 +141,7 @@ func TransferPerf(from string, to string, amount string, txNum string, duration ...@@ -139,6 +141,7 @@ func TransferPerf(from string, to string, amount string, txNum string, duration
} }
} }
// SendToAddress run transfer
func SendToAddress(from string, to string, amount string, note string) { func SendToAddress(from string, to string, amount string, note string) {
amountFloat64, err := strconv.ParseFloat(amount, 64) amountFloat64, err := strconv.ParseFloat(amount, 64)
if err != nil { if err != nil {
...@@ -162,6 +165,7 @@ func SendToAddress(from string, to string, amount string, note string) { ...@@ -162,6 +165,7 @@ func SendToAddress(from string, to string, amount string, note string) {
fmt.Println(string(data)) fmt.Println(string(data))
} }
// NormPerf run norm performance
func NormPerf(size string, num string, interval string, duration string) { func NormPerf(size string, num string, interval string, duration string) {
var key string var key string
var value string var value string
...@@ -197,7 +201,7 @@ func NormPerf(size string, num string, interval string, duration string) { ...@@ -197,7 +201,7 @@ func NormPerf(size string, num string, interval string, duration string) {
ch := make(chan struct{}, numThread) ch := make(chan struct{}, numThread)
for i := 0; i < numThread; i++ { for i := 0; i < numThread; i++ {
go func() { go func() {
var result int64 = 0 var result int64
totalCount := 0 totalCount := 0
txCount := 0 txCount := 0
_, priv := genaddress() _, priv := genaddress()
...@@ -228,7 +232,7 @@ func NormPerf(size string, num string, interval string, duration string) { ...@@ -228,7 +232,7 @@ func NormPerf(size string, num string, interval string, duration string) {
} }
} }
//zzh // NormReadPerf run read performance
func NormReadPerf(num string, interval string, duration string) { func NormReadPerf(num string, interval string, duration string) {
var numThread int var numThread int
numInt, err := strconv.Atoi(num) numInt, err := strconv.Atoi(num)
...@@ -260,7 +264,6 @@ func NormReadPerf(num string, interval string, duration string) { ...@@ -260,7 +264,6 @@ func NormReadPerf(num string, interval string, duration string) {
f, err := os.Open("normperf.log") f, err := os.Open("normperf.log")
if err != nil { if err != nil {
panic("open file failed.") panic("open file failed.")
return
} }
buf := bufio.NewReader(f) buf := bufio.NewReader(f)
cnt := 0 cnt := 0
...@@ -277,7 +280,6 @@ func NormReadPerf(num string, interval string, duration string) { ...@@ -277,7 +280,6 @@ func NormReadPerf(num string, interval string, duration string) {
f, err := os.Open("normperf.log") f, err := os.Open("normperf.log")
if err != nil { if err != nil {
panic("open file failed.") panic("open file failed.")
return
} }
buf = bufio.NewReader(f) buf = bufio.NewReader(f)
} }
...@@ -315,6 +317,7 @@ func NormReadPerf(num string, interval string, duration string) { ...@@ -315,6 +317,7 @@ func NormReadPerf(num string, interval string, duration string) {
} }
} }
// RandStringBytes create random string
func RandStringBytes(n int) string { func RandStringBytes(n int) string {
b := make([]byte, n) b := make([]byte, n)
rand.Seed(types.Now().UnixNano()) rand.Seed(types.Now().UnixNano())
...@@ -324,9 +327,10 @@ func RandStringBytes(n int) string { ...@@ -324,9 +327,10 @@ func RandStringBytes(n int) string {
return string(b) return string(b)
} }
// NormPut run put action
func NormPut(privkey string, key string, value string) { func NormPut(privkey string, key string, value string) {
fmt.Println(key, "=", value) fmt.Println(key, "=", value)
nput := &pty.NormAction_Nput{&pty.NormPut{Key: key, Value: []byte(value)}} nput := &pty.NormAction_Nput{Nput: &pty.NormPut{Key: key, Value: []byte(value)}}
action := &pty.NormAction{Value: nput, Ty: pty.NormActionPut} action := &pty.NormAction{Value: nput, Ty: pty.NormActionPut}
tx := &types.Transaction{Execer: []byte("norm"), Payload: types.Encode(action), Fee: fee} tx := &types.Transaction{Execer: []byte("norm"), Payload: types.Encode(action), Fee: fee}
tx.To = address.ExecAddress("norm") tx.To = address.ExecAddress("norm")
...@@ -344,6 +348,7 @@ func NormPut(privkey string, key string, value string) { ...@@ -344,6 +348,7 @@ func NormPut(privkey string, key string, value string) {
} }
} }
// NormGet run query action
func NormGet(key string) { func NormGet(key string) {
in := &pty.NormGetKey{Key: key} in := &pty.NormGetKey{Key: key}
data, err := proto.Marshal(in) data, err := proto.Marshal(in)
......
...@@ -25,25 +25,28 @@ import ( ...@@ -25,25 +25,28 @@ import (
var configPath = flag.String("f", "servers.toml", "configfile") var configPath = flag.String("f", "servers.toml", "configfile")
// ScpInfo struct
type ScpInfo struct { type ScpInfo struct {
UserName string UserName string
PassWord string PassWord string
HostIp string HostIP string
Port int Port int
LocalFilePath string LocalFilePath string
RemoteDir string RemoteDir string
} }
// CmdInfo struct
type CmdInfo struct { type CmdInfo struct {
userName string userName string
passWord string passWord string
hostIp string hostIP string
port int port int
cmd string cmd string
remoteDir string remoteDir string
} }
type tomlConfig struct { // TomlConfig struct
type TomlConfig struct {
Title string Title string
Servers map[string]ScpInfo Servers map[string]ScpInfo
} }
...@@ -117,8 +120,9 @@ func sftpconnect(user, password, host string, port int) (*sftp.Client, error) { ...@@ -117,8 +120,9 @@ func sftpconnect(user, password, host string, port int) (*sftp.Client, error) {
return sftpClient, nil return sftpClient, nil
} }
// ScpFileFromLocalToRemote copy local file to remote
func ScpFileFromLocalToRemote(si *ScpInfo) { func ScpFileFromLocalToRemote(si *ScpInfo) {
sftpClient, err := sftpconnect(si.UserName, si.PassWord, si.HostIp, si.Port) sftpClient, err := sftpconnect(si.UserName, si.PassWord, si.HostIP, si.Port)
if err != nil { if err != nil {
fmt.Println("sftconnect have a err!") fmt.Println("sftconnect have a err!")
log.Fatal(err) log.Fatal(err)
...@@ -157,9 +161,10 @@ func ScpFileFromLocalToRemote(si *ScpInfo) { ...@@ -157,9 +161,10 @@ func ScpFileFromLocalToRemote(si *ScpInfo) {
fmt.Println("copy file to remote server finished!") fmt.Println("copy file to remote server finished!")
} }
// RemoteExec run cmd in remote
func RemoteExec(cmdInfo *CmdInfo) error { func RemoteExec(cmdInfo *CmdInfo) error {
//A Session only accepts one call to Run, Start or Shell. //A Session only accepts one call to Run, Start or Shell.
session, err := sshconnect(cmdInfo.userName, cmdInfo.passWord, cmdInfo.hostIp, cmdInfo.port) session, err := sshconnect(cmdInfo.userName, cmdInfo.passWord, cmdInfo.hostIP, cmdInfo.port)
if err != nil { if err != nil {
return err return err
} }
...@@ -180,8 +185,9 @@ func remoteScp(si *ScpInfo, reqnum chan struct{}) { ...@@ -180,8 +185,9 @@ func remoteScp(si *ScpInfo, reqnum chan struct{}) {
} }
func InitCfg(path string) *tomlConfig { // InitCfg init config
var cfg tomlConfig func InitCfg(path string) *TomlConfig {
var cfg TomlConfig
if _, err := tml.DecodeFile(path, &cfg); err != nil { if _, err := tml.DecodeFile(path, &cfg); err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(0) os.Exit(0)
...@@ -243,6 +249,7 @@ func main() { ...@@ -243,6 +249,7 @@ func main() {
log.Printf("read common cost time %v\n", timeCommon.Sub(start)) log.Printf("read common cost time %v\n", timeCommon.Sub(start))
} }
// LoadHelp show available commands
func LoadHelp() { func LoadHelp() {
fmt.Println("Available Commands:") fmt.Println("Available Commands:")
fmt.Println(" start : 启动服务 ") fmt.Println(" start : 启动服务 ")
...@@ -250,14 +257,14 @@ func LoadHelp() { ...@@ -250,14 +257,14 @@ func LoadHelp() {
fmt.Println(" clear : 清空数据") fmt.Println(" clear : 清空数据")
} }
func startAll(conf *tomlConfig) { func startAll(conf *TomlConfig) {
//fmt.Println(getCurrentDirectory()) //fmt.Println(getCurrentDirectory())
arrMap := make(map[string]*CmdInfo) arrMap := make(map[string]*CmdInfo)
//多协程启动部署 //多协程启动部署
reqC := make(chan struct{}, len(conf.Servers)) reqC := make(chan struct{}, len(conf.Servers))
for index, sc := range conf.Servers { for index, sc := range conf.Servers {
cmdInfo := &CmdInfo{} cmdInfo := &CmdInfo{}
cmdInfo.hostIp = sc.HostIp cmdInfo.hostIP = sc.HostIP
cmdInfo.userName = sc.UserName cmdInfo.userName = sc.UserName
cmdInfo.port = sc.Port cmdInfo.port = sc.Port
cmdInfo.passWord = sc.PassWord cmdInfo.passWord = sc.PassWord
...@@ -276,11 +283,11 @@ func startAll(conf *tomlConfig) { ...@@ -276,11 +283,11 @@ func startAll(conf *tomlConfig) {
} }
} }
func stopAll(conf *tomlConfig) { func stopAll(conf *TomlConfig) {
//执行速度快,不需要多起多协程工作 //执行速度快,不需要多起多协程工作
for _, sc := range conf.Servers { for _, sc := range conf.Servers {
cmdInfo := &CmdInfo{} cmdInfo := &CmdInfo{}
cmdInfo.hostIp = sc.HostIp cmdInfo.hostIP = sc.HostIP
cmdInfo.userName = sc.UserName cmdInfo.userName = sc.UserName
cmdInfo.port = sc.Port cmdInfo.port = sc.Port
cmdInfo.passWord = sc.PassWord cmdInfo.passWord = sc.PassWord
...@@ -290,10 +297,10 @@ func stopAll(conf *tomlConfig) { ...@@ -290,10 +297,10 @@ func stopAll(conf *tomlConfig) {
} }
} }
func clearAll(conf *tomlConfig) { func clearAll(conf *TomlConfig) {
for _, sc := range conf.Servers { for _, sc := range conf.Servers {
cmdInfo := &CmdInfo{} cmdInfo := &CmdInfo{}
cmdInfo.hostIp = sc.HostIp cmdInfo.hostIP = sc.HostIP
cmdInfo.userName = sc.UserName cmdInfo.userName = sc.UserName
cmdInfo.port = sc.Port cmdInfo.port = sc.Port
cmdInfo.passWord = sc.PassWord cmdInfo.passWord = sc.PassWord
......
...@@ -9,9 +9,10 @@ import ( ...@@ -9,9 +9,10 @@ import (
pty "github.com/33cn/plugin/plugin/dapp/norm/types" pty "github.com/33cn/plugin/plugin/dapp/norm/types"
) )
// Exec_Nput Action
func (n *Norm) Exec_Nput(nput *pty.NormPut, tx *types.Transaction, index int) (*types.Receipt, error) { func (n *Norm) Exec_Nput(nput *pty.NormPut, tx *types.Transaction, index int) (*types.Receipt, error) {
receipt := &types.Receipt{types.ExecOk, nil, nil} receipt := &types.Receipt{Ty: types.ExecOk, KV: nil, Logs: nil}
normKV := &types.KeyValue{Key(nput.Key), nput.Value} normKV := &types.KeyValue{Key: Key(nput.Key), Value: nput.Value}
receipt.KV = append(receipt.KV, normKV) receipt.KV = append(receipt.KV, normKV)
return receipt, nil return receipt, nil
} }
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
pty "github.com/33cn/plugin/plugin/dapp/norm/types" pty "github.com/33cn/plugin/plugin/dapp/norm/types"
) )
// ExecDelLocal_Nput Action
func (n *Norm) ExecDelLocal_Nput(nput *pty.NormPut, tx *types.Transaction, receipt *types.ReceiptData, index int) (*types.LocalDBSet, error) { func (n *Norm) ExecDelLocal_Nput(nput *pty.NormPut, tx *types.Transaction, receipt *types.ReceiptData, index int) (*types.LocalDBSet, error) {
return nil, nil return nil, nil
} }
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
pty "github.com/33cn/plugin/plugin/dapp/norm/types" pty "github.com/33cn/plugin/plugin/dapp/norm/types"
) )
// ExecLocal_Nput Action
func (n *Norm) ExecLocal_Nput(nput *pty.NormPut, tx *types.Transaction, receipt *types.ReceiptData, index int) (*types.LocalDBSet, error) { func (n *Norm) ExecLocal_Nput(nput *pty.NormPut, tx *types.Transaction, receipt *types.ReceiptData, index int) (*types.LocalDBSet, error) {
return nil, nil return nil, nil
} }
...@@ -18,15 +18,18 @@ func init() { ...@@ -18,15 +18,18 @@ func init() {
ety.InitFuncList(types.ListMethod(&Norm{})) ety.InitFuncList(types.ListMethod(&Norm{}))
} }
// Init norm
func Init(name string, sub []byte) { func Init(name string, sub []byte) {
clog.Debug("register norm execer") clog.Debug("register norm execer")
drivers.Register(GetName(), newNorm, types.GetDappFork(driverName, "Enable")) drivers.Register(GetName(), newNorm, types.GetDappFork(driverName, "Enable"))
} }
// GetName for norm
func GetName() string { func GetName() string {
return newNorm().GetName() return newNorm().GetName()
} }
// Norm driver
type Norm struct { type Norm struct {
drivers.DriverBase drivers.DriverBase
} }
...@@ -39,14 +42,17 @@ func newNorm() drivers.Driver { ...@@ -39,14 +42,17 @@ func newNorm() drivers.Driver {
return n return n
} }
// GetDriverName for norm
func (n *Norm) GetDriverName() string { func (n *Norm) GetDriverName() string {
return driverName return driverName
} }
// CheckTx for norm
func (n *Norm) CheckTx(tx *types.Transaction, index int) error { func (n *Norm) CheckTx(tx *types.Transaction, index int) error {
return nil return nil
} }
// Key for norm
func Key(str string) (key []byte) { func Key(str string) (key []byte) {
key = append(key, []byte("mavl-norm-")...) key = append(key, []byte("mavl-norm-")...)
key = append(key, str...) key = append(key, str...)
......
...@@ -9,10 +9,11 @@ import ( ...@@ -9,10 +9,11 @@ import (
pty "github.com/33cn/plugin/plugin/dapp/norm/types" pty "github.com/33cn/plugin/plugin/dapp/norm/types"
) )
// Query_NormGet get value
func (n *Norm) Query_NormGet(in *pty.NormGetKey) (types.Message, error) { func (n *Norm) Query_NormGet(in *pty.NormGetKey) (types.Message, error) {
value, err := n.GetStateDB().Get(Key(in.Key)) value, err := n.GetStateDB().Get(Key(in.Key))
if err != nil { if err != nil {
return nil, types.ErrNotFound return nil, types.ErrNotFound
} }
return &types.ReplyString{string(value)}, nil return &types.ReplyString{Data: string(value)}, nil
} }
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
) )
// NormX name
var NormX = "norm" var NormX = "norm"
func init() { func init() {
...@@ -16,26 +17,31 @@ func init() { ...@@ -16,26 +17,31 @@ func init() {
types.RegisterDappFork(NormX, "Enable", 0) types.RegisterDappFork(NormX, "Enable", 0)
} }
// NormType def
type NormType struct { type NormType struct {
types.ExecTypeBase types.ExecTypeBase
} }
// NewType method
func NewType() *NormType { func NewType() *NormType {
c := &NormType{} c := &NormType{}
c.SetChild(c) c.SetChild(c)
return c return c
} }
// GetPayload method
func (norm *NormType) GetPayload() types.Message { func (norm *NormType) GetPayload() types.Message {
return &NormAction{} return &NormAction{}
} }
// GetTypeMap method
func (norm *NormType) GetTypeMap() map[string]int32 { func (norm *NormType) GetTypeMap() map[string]int32 {
return map[string]int32{ return map[string]int32{
"Nput": NormActionPut, "Nput": NormActionPut,
} }
} }
func (at *NormType) GetLogMap() map[int64]*types.LogInfo { // GetLogMap method
func (norm *NormType) GetLogMap() map[int64]*types.LogInfo {
return map[int64]*types.LogInfo{} return map[int64]*types.LogInfo{}
} }
...@@ -16,10 +16,12 @@ import ( ...@@ -16,10 +16,12 @@ import (
var klog = log.New("module", "kvdb") var klog = log.New("module", "kvdb")
// SetLogLevel set log level
func SetLogLevel(level string) { func SetLogLevel(level string) {
clog.SetLogLevel(level) clog.SetLogLevel(level)
} }
// DisableLog disable log output
func DisableLog() { func DisableLog() {
klog.SetHandler(log.DiscardHandler()) klog.SetHandler(log.DiscardHandler())
} }
...@@ -28,11 +30,13 @@ func init() { ...@@ -28,11 +30,13 @@ func init() {
drivers.Reg("kvdb", New) drivers.Reg("kvdb", New)
} }
// KVStore implementation
type KVStore struct { type KVStore struct {
*drivers.BaseStore *drivers.BaseStore
cache map[string]map[string]*types.KeyValue cache map[string]map[string]*types.KeyValue
} }
// New KVStore module
func New(cfg *types.Store, sub []byte) queue.Module { func New(cfg *types.Store, sub []byte) queue.Module {
bs := drivers.NewBaseStore(cfg) bs := drivers.NewBaseStore(cfg)
kvs := &KVStore{bs, make(map[string]map[string]*types.KeyValue)} kvs := &KVStore{bs, make(map[string]map[string]*types.KeyValue)}
...@@ -40,11 +44,13 @@ func New(cfg *types.Store, sub []byte) queue.Module { ...@@ -40,11 +44,13 @@ func New(cfg *types.Store, sub []byte) queue.Module {
return kvs return kvs
} }
// Close KVStore module
func (kvs *KVStore) Close() { func (kvs *KVStore) Close() {
kvs.BaseStore.Close() kvs.BaseStore.Close()
klog.Info("store kvdb closed") klog.Info("store kvdb closed")
} }
// Set kvs with statehash to KVStore
func (kvs *KVStore) Set(datas *types.StoreSet, sync bool) ([]byte, error) { func (kvs *KVStore) Set(datas *types.StoreSet, sync bool) ([]byte, error) {
hash := calcHash(datas) hash := calcHash(datas)
kvmap := make(map[string]*types.KeyValue) kvmap := make(map[string]*types.KeyValue)
...@@ -55,6 +61,7 @@ func (kvs *KVStore) Set(datas *types.StoreSet, sync bool) ([]byte, error) { ...@@ -55,6 +61,7 @@ func (kvs *KVStore) Set(datas *types.StoreSet, sync bool) ([]byte, error) {
return hash, nil return hash, nil
} }
// Get kvs with statehash from KVStore
func (kvs *KVStore) Get(datas *types.StoreGet) [][]byte { func (kvs *KVStore) Get(datas *types.StoreGet) [][]byte {
values := make([][]byte, len(datas.Keys)) values := make([][]byte, len(datas.Keys))
if kvmap, ok := kvs.cache[string(datas.StateHash)]; ok { if kvmap, ok := kvs.cache[string(datas.StateHash)]; ok {
...@@ -76,6 +83,7 @@ func (kvs *KVStore) Get(datas *types.StoreGet) [][]byte { ...@@ -76,6 +83,7 @@ func (kvs *KVStore) Get(datas *types.StoreGet) [][]byte {
return values return values
} }
// MemSet set kvs to the mem of KVStore
func (kvs *KVStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) { func (kvs *KVStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
if len(datas.KV) == 0 { if len(datas.KV) == 0 {
klog.Info("store kv memset,use preStateHash as stateHash for kvset is null") klog.Info("store kv memset,use preStateHash as stateHash for kvset is null")
...@@ -96,6 +104,7 @@ func (kvs *KVStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) { ...@@ -96,6 +104,7 @@ func (kvs *KVStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
return hash, nil return hash, nil
} }
// Commit kvs in the mem of KVStore
func (kvs *KVStore) Commit(req *types.ReqHash) ([]byte, error) { func (kvs *KVStore) Commit(req *types.ReqHash) ([]byte, error) {
kvmap, ok := kvs.cache[string(req.Hash)] kvmap, ok := kvs.cache[string(req.Hash)]
if !ok { if !ok {
...@@ -112,6 +121,7 @@ func (kvs *KVStore) Commit(req *types.ReqHash) ([]byte, error) { ...@@ -112,6 +121,7 @@ func (kvs *KVStore) Commit(req *types.ReqHash) ([]byte, error) {
return req.Hash, nil return req.Hash, nil
} }
// Rollback kvs in the mem of KVStore
func (kvs *KVStore) Rollback(req *types.ReqHash) ([]byte, error) { func (kvs *KVStore) Rollback(req *types.ReqHash) ([]byte, error) {
_, ok := kvs.cache[string(req.Hash)] _, ok := kvs.cache[string(req.Hash)]
if !ok { if !ok {
...@@ -122,16 +132,19 @@ func (kvs *KVStore) Rollback(req *types.ReqHash) ([]byte, error) { ...@@ -122,16 +132,19 @@ func (kvs *KVStore) Rollback(req *types.ReqHash) ([]byte, error) {
return req.Hash, nil return req.Hash, nil
} }
// IterateRangeByStateHash method
func (kvs *KVStore) IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool) { func (kvs *KVStore) IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool) {
panic("empty") panic("empty")
//TODO: //TODO:
//kvs.IterateRangeByStateHash(mavls.GetDB(), statehash, start, end, ascending, fn) //kvs.IterateRangeByStateHash(mavls.GetDB(), statehash, start, end, ascending, fn)
} }
// ProcEvent handles supported events
func (kvs *KVStore) ProcEvent(msg queue.Message) { func (kvs *KVStore) ProcEvent(msg queue.Message) {
msg.ReplyErr("KVStore", types.ErrActionNotSupport) msg.ReplyErr("KVStore", types.ErrActionNotSupport)
} }
// Del set kvs to nil with StateHash
func (kvs *KVStore) Del(req *types.StoreDel) ([]byte, error) { func (kvs *KVStore) Del(req *types.StoreDel) ([]byte, error) {
//not support //not support
return nil, nil return nil, nil
......
...@@ -23,8 +23,8 @@ func TestKvdbNewClose(t *testing.T) { ...@@ -23,8 +23,8 @@ func TestKvdbNewClose(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录 os.RemoveAll(dir) //删除已存在目录
var store_cfg = newStoreCfg(dir) var storeCfg = newStoreCfg(dir)
store := New(store_cfg, nil) store := New(storeCfg, nil)
assert.NotNil(t, store) assert.NotNil(t, store)
store.Close() store.Close()
...@@ -35,12 +35,12 @@ func TestKvddbSetGet(t *testing.T) { ...@@ -35,12 +35,12 @@ func TestKvddbSetGet(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录 os.RemoveAll(dir) //删除已存在目录
var store_cfg = newStoreCfg(dir) var storeCfg = newStoreCfg(dir)
store := New(store_cfg, nil).(*KVStore) store := New(storeCfg, nil).(*KVStore)
assert.NotNil(t, store) assert.NotNil(t, store)
keys0 := [][]byte{[]byte("mk1"), []byte("mk2")} keys0 := [][]byte{[]byte("mk1"), []byte("mk2")}
get0 := &types.StoreGet{drivers.EmptyRoot[:], keys0} get0 := &types.StoreGet{StateHash: drivers.EmptyRoot[:], Keys: keys0}
values0 := store.Get(get0) values0 := store.Get(get0)
klog.Info("info", "info", values0) klog.Info("info", "info", values0)
// Get exist key, result nil // Get exist key, result nil
...@@ -49,16 +49,16 @@ func TestKvddbSetGet(t *testing.T) { ...@@ -49,16 +49,16 @@ func TestKvddbSetGet(t *testing.T) {
assert.Equal(t, []byte(nil), values0[1]) assert.Equal(t, []byte(nil), values0[1])
var kv []*types.KeyValue var kv []*types.KeyValue
kv = append(kv, &types.KeyValue{[]byte("k1"), []byte("v1")}) kv = append(kv, &types.KeyValue{Key: []byte("k1"), Value: []byte("v1")})
kv = append(kv, &types.KeyValue{[]byte("k2"), []byte("v2")}) kv = append(kv, &types.KeyValue{Key: []byte("k2"), Value: []byte("v2")})
datas := &types.StoreSet{ datas := &types.StoreSet{
drivers.EmptyRoot[:], StateHash: drivers.EmptyRoot[:],
kv, KV: kv,
0} Height: 0}
hash, err := store.Set(datas, true) hash, err := store.Set(datas, true)
assert.Nil(t, err) assert.Nil(t, err)
keys := [][]byte{[]byte("k1"), []byte("k2")} keys := [][]byte{[]byte("k1"), []byte("k2")}
get1 := &types.StoreGet{hash, keys} get1 := &types.StoreGet{StateHash: hash, Keys: keys}
values := store.Get(get1) values := store.Get(get1)
assert.Len(t, values, 2) assert.Len(t, values, 2)
...@@ -66,12 +66,12 @@ func TestKvddbSetGet(t *testing.T) { ...@@ -66,12 +66,12 @@ func TestKvddbSetGet(t *testing.T) {
assert.Equal(t, []byte("v2"), values[1]) assert.Equal(t, []byte("v2"), values[1])
keys = [][]byte{[]byte("k1")} keys = [][]byte{[]byte("k1")}
get2 := &types.StoreGet{hash, keys} get2 := &types.StoreGet{StateHash: hash, Keys: keys}
values2 := store.Get(get2) values2 := store.Get(get2)
assert.Len(t, values2, 1) assert.Len(t, values2, 1)
assert.Equal(t, []byte("v1"), values2[0]) assert.Equal(t, []byte("v1"), values2[0])
get3 := &types.StoreGet{drivers.EmptyRoot[:], keys} get3 := &types.StoreGet{StateHash: drivers.EmptyRoot[:], Keys: keys}
values3 := store.Get(get3) values3 := store.Get(get3)
assert.Len(t, values3, 1) assert.Len(t, values3, 1)
} }
...@@ -81,29 +81,29 @@ func TestKvdbMemSet(t *testing.T) { ...@@ -81,29 +81,29 @@ func TestKvdbMemSet(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录 os.RemoveAll(dir) //删除已存在目录
var store_cfg = newStoreCfg(dir) var storeCfg = newStoreCfg(dir)
store := New(store_cfg, nil).(*KVStore) store := New(storeCfg, nil).(*KVStore)
assert.NotNil(t, store) assert.NotNil(t, store)
var kv []*types.KeyValue var kv []*types.KeyValue
kv = append(kv, &types.KeyValue{[]byte("mk1"), []byte("v1")}) kv = append(kv, &types.KeyValue{Key: []byte("mk1"), Value: []byte("v1")})
kv = append(kv, &types.KeyValue{[]byte("mk2"), []byte("v2")}) kv = append(kv, &types.KeyValue{Key: []byte("mk2"), Value: []byte("v2")})
datas := &types.StoreSet{ datas := &types.StoreSet{
drivers.EmptyRoot[:], StateHash: drivers.EmptyRoot[:],
kv, KV: kv,
0} Height: 0}
hash, err := store.MemSet(datas, true) hash, err := store.MemSet(datas, true)
assert.Nil(t, err) assert.Nil(t, err)
keys := [][]byte{[]byte("mk1"), []byte("mk2")} keys := [][]byte{[]byte("mk1"), []byte("mk2")}
get1 := &types.StoreGet{hash, keys} get1 := &types.StoreGet{StateHash: hash, Keys: keys}
values := store.Get(get1) values := store.Get(get1)
assert.Len(t, values, 2) assert.Len(t, values, 2)
actHash, _ := store.Commit(&types.ReqHash{hash}) actHash, _ := store.Commit(&types.ReqHash{Hash: hash})
assert.Equal(t, hash, actHash) assert.Equal(t, hash, actHash)
notExistHash, _ := store.Commit(&types.ReqHash{drivers.EmptyRoot[:]}) notExistHash, _ := store.Commit(&types.ReqHash{Hash: drivers.EmptyRoot[:]})
assert.Nil(t, notExistHash) assert.Nil(t, notExistHash)
} }
...@@ -112,27 +112,27 @@ func TestKvdbRollback(t *testing.T) { ...@@ -112,27 +112,27 @@ func TestKvdbRollback(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录 os.RemoveAll(dir) //删除已存在目录
var store_cfg = newStoreCfg(dir) var storeCfg = newStoreCfg(dir)
store := New(store_cfg, nil).(*KVStore) store := New(storeCfg, nil).(*KVStore)
assert.NotNil(t, store) assert.NotNil(t, store)
var kv []*types.KeyValue var kv []*types.KeyValue
kv = append(kv, &types.KeyValue{[]byte("mk1"), []byte("v1")}) kv = append(kv, &types.KeyValue{Key: []byte("mk1"), Value: []byte("v1")})
kv = append(kv, &types.KeyValue{[]byte("mk2"), []byte("v2")}) kv = append(kv, &types.KeyValue{Key: []byte("mk2"), Value: []byte("v2")})
datas := &types.StoreSet{ datas := &types.StoreSet{
drivers.EmptyRoot[:], StateHash: drivers.EmptyRoot[:],
kv, KV: kv,
0} Height: 0}
hash, err := store.MemSet(datas, true) hash, err := store.MemSet(datas, true)
assert.Nil(t, err) assert.Nil(t, err)
keys := [][]byte{[]byte("mk1"), []byte("mk2")} keys := [][]byte{[]byte("mk1"), []byte("mk2")}
get1 := &types.StoreGet{hash, keys} get1 := &types.StoreGet{StateHash: hash, Keys: keys}
values := store.Get(get1) values := store.Get(get1)
assert.Len(t, values, 2) assert.Len(t, values, 2)
actHash, _ := store.Rollback(&types.ReqHash{hash}) actHash, _ := store.Rollback(&types.ReqHash{Hash: hash})
assert.Equal(t, hash, actHash) assert.Equal(t, hash, actHash)
notExistHash, _ := store.Rollback(&types.ReqHash{drivers.EmptyRoot[:]}) notExistHash, _ := store.Rollback(&types.ReqHash{Hash: drivers.EmptyRoot[:]})
assert.Nil(t, notExistHash) assert.Nil(t, notExistHash)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment