Commit cfbee546 authored by caopingcp's avatar caopingcp Committed by vipwzw

improve qbft performance

parent 341a606f
...@@ -1862,7 +1862,7 @@ func (cs *ConsensusState) EmptyBlockInterval() time.Duration { ...@@ -1862,7 +1862,7 @@ func (cs *ConsensusState) EmptyBlockInterval() time.Duration {
// PeerGossipSleep returns the amount of time to sleep if there is nothing to send from the ConsensusReactor // PeerGossipSleep returns the amount of time to sleep if there is nothing to send from the ConsensusReactor
func (cs *ConsensusState) PeerGossipSleep() time.Duration { func (cs *ConsensusState) PeerGossipSleep() time.Duration {
return time.Duration(peerGossipSleepDuration) * time.Millisecond return time.Duration(peerGossipSleepDuration.Load().(int32)) * time.Millisecond
} }
// PeerQueryMaj23Sleep returns the amount of time to sleep after each VoteSetMaj23Message is sent in the ConsensusReactor // PeerQueryMaj23Sleep returns the amount of time to sleep after each VoteSetMaj23Message is sent in the ConsensusReactor
......
...@@ -27,7 +27,7 @@ const ( ...@@ -27,7 +27,7 @@ const (
maxSendQueueSize = 1024 maxSendQueueSize = 1024
defaultSendTimeout = 60 * time.Second defaultSendTimeout = 60 * time.Second
//MaxMsgPacketPayloadSize define //MaxMsgPacketPayloadSize define
MaxMsgPacketPayloadSize = 10 * 1024 * 1024 MaxMsgPacketPayloadSize = 2 * 1024 * 1024
defaultDialTimeout = 3 * time.Second defaultDialTimeout = 3 * time.Second
dialRandomizerIntervalMilliseconds = 3000 dialRandomizerIntervalMilliseconds = 3000
// repeatedly try to reconnect for a few minutes // repeatedly try to reconnect for a few minutes
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/33cn/chain33/types"
"io" "io"
"net" "net"
"reflect" "reflect"
...@@ -20,6 +21,7 @@ import ( ...@@ -20,6 +21,7 @@ import (
ttypes "github.com/33cn/plugin/plugin/consensus/qbft/types" ttypes "github.com/33cn/plugin/plugin/consensus/qbft/types"
tmtypes "github.com/33cn/plugin/plugin/dapp/qbftNode/types" tmtypes "github.com/33cn/plugin/plugin/dapp/qbftNode/types"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
...@@ -273,7 +275,7 @@ func (pc *peerConn) SetTransferChannel(transferChannel chan MsgInfo) { ...@@ -273,7 +275,7 @@ func (pc *peerConn) SetTransferChannel(transferChannel chan MsgInfo) {
func (pc *peerConn) String() string { func (pc *peerConn) String() string {
return fmt.Sprintf("PeerConn{outbound:%v persistent:%v ip:%s id:%s started:%v stopped:%v}", return fmt.Sprintf("PeerConn{outbound:%v persistent:%v ip:%s id:%s started:%v stopped:%v}",
pc.outbound, pc.persistent, pc.ip.String(), pc.id, pc.started, pc.stopped) pc.outbound, pc.persistent, pc.ip.String(), pc.id, atomic.LoadUint32(&pc.started), atomic.LoadUint32(&pc.stopped))
} }
func (pc *peerConn) CloseConn() { func (pc *peerConn) CloseConn() {
...@@ -480,30 +482,38 @@ func (pc *peerConn) stopForError(r interface{}) { ...@@ -480,30 +482,38 @@ func (pc *peerConn) stopForError(r interface{}) {
} }
} }
// 数据压缩后发送, 内部对相关数组进行重复利用
func encodeMsg(msg types.Message, pbuf *[]byte, typeID byte) []byte {
buf := *pbuf
buf = buf[:cap(buf)]
raw := types.Encode(msg)
buf = snappy.Encode(buf, raw)
if len(raw) > MaxMsgPacketPayloadSize {
qbftlog.Info("packet exceed max size", "old", len(raw), "new", len(buf))
}
*pbuf = buf
// 复用raw数组作为压缩数据返回, 需要比较容量是否够大
if cap(raw) >= len(buf)+5 {
raw = raw[:len(buf)+5]
} else {
raw = make([]byte, len(buf)+5)
}
raw[0] = typeID
bytelen := make([]byte, 4)
binary.BigEndian.PutUint32(bytelen, uint32(len(buf)))
copy(raw[1:5], bytelen)
copy(raw[5:], buf)
return raw
}
func (pc *peerConn) sendRoutine() { func (pc *peerConn) sendRoutine() {
buf := make([]byte, 0)
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
case msg := <-pc.sendQueue: case msg := <-pc.sendQueue:
bytes, err := proto.Marshal(msg.Msg) raw := encodeMsg(msg.Msg, &buf, msg.TypeID)
if err != nil { _, err := pc.bufWriter.Write(raw)
qbftlog.Error("peerConn sendroutine marshal data failed", "error", err)
pc.stopForError(err)
break FOR_LOOP
}
len := len(bytes)
bytelen := make([]byte, 4)
binary.BigEndian.PutUint32(bytelen, uint32(len))
pc.sendBuffer = pc.sendBuffer[:0]
pc.sendBuffer = append(pc.sendBuffer, msg.TypeID)
pc.sendBuffer = append(pc.sendBuffer, bytelen...)
pc.sendBuffer = append(pc.sendBuffer, bytes...)
if len+5 > MaxMsgPacketPayloadSize {
qbftlog.Info("packet exceed max size", "len", len+5)
}
_, err = pc.bufWriter.Write(pc.sendBuffer[:len+5])
if err != nil { if err != nil {
qbftlog.Error("peerConn sendroutine write data failed", "error", err) qbftlog.Error("peerConn sendroutine write data failed", "error", err)
pc.stopForError(err) pc.stopForError(err)
...@@ -531,17 +541,25 @@ FOR_LOOP: ...@@ -531,17 +541,25 @@ FOR_LOOP:
pc.stopForError(err) pc.stopForError(err)
break FOR_LOOP break FOR_LOOP
} }
pkt := msgPacket{} pkt := msgPacket{}
pkt.TypeID = buf[0] pkt.TypeID = buf[0]
len := binary.BigEndian.Uint32(buf[1:]) len := binary.BigEndian.Uint32(buf[1:])
if len > 0 { if len > 0 {
buf2 := make([]byte, len) buf2 := make([]byte, len)
_, err = io.ReadFull(pc.bufReader, buf2) _, err = io.ReadFull(pc.bufReader, buf2)
if err != nil { if err != nil {
qbftlog.Error("Connection failed @ recvRoutine", "conn", pc, "err", err) qbftlog.Error("recvRoutine read data fail", "conn", pc, "err", err)
pc.stopForError(err)
}
buf3 := make([]byte, len)
buf3, err = snappy.Decode(buf3, buf2)
if err != nil {
qbftlog.Error("recvRoutine snappy decode fail", "conn", pc, "err", err)
pc.stopForError(err) pc.stopForError(err)
} }
pkt.Bytes = buf2 pkt.Bytes = buf3
} }
if v, ok := ttypes.MsgMap[pkt.TypeID]; ok { if v, ok := ttypes.MsgMap[pkt.TypeID]; ok {
......
...@@ -62,7 +62,7 @@ var ( ...@@ -62,7 +62,7 @@ var (
zeroHash [32]byte zeroHash [32]byte
random *rand.Rand random *rand.Rand
peerGossipSleepDuration int32 = 100 peerGossipSleepDuration atomic.Value
peerQueryMaj23SleepDuration int32 = 2000 peerQueryMaj23SleepDuration int32 = 2000
) )
...@@ -111,6 +111,7 @@ type subConfig struct { ...@@ -111,6 +111,7 @@ type subConfig struct {
SignName string `json:"signName"` SignName string `json:"signName"`
UseAggregateSignature bool `json:"useAggregateSignature"` UseAggregateSignature bool `json:"useAggregateSignature"`
MultiBlocks int64 `json:"multiBlocks"` MultiBlocks int64 `json:"multiBlocks"`
MessageInterval int32 `json:"messageInterval"`
} }
func applyConfig(sub []byte) { func applyConfig(sub []byte) {
...@@ -158,6 +159,10 @@ func applyConfig(sub []byte) { ...@@ -158,6 +159,10 @@ func applyConfig(sub []byte) {
if subcfg.MultiBlocks > 0 { if subcfg.MultiBlocks > 0 {
multiBlocks.Store(subcfg.MultiBlocks) multiBlocks.Store(subcfg.MultiBlocks)
} }
peerGossipSleepDuration.Store(int32(100))
if subcfg.MessageInterval > 0 {
peerGossipSleepDuration.Store(subcfg.MessageInterval)
}
gossipVotes.Store(true) gossipVotes.Store(true)
} }
...@@ -221,7 +226,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -221,7 +226,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
return nil return nil
} }
qbftlog.Info("show qbft info", "sign", ttypes.CryptoName, "useAggSig", useAggSig, "genesisFile", genesisFile, qbftlog.Info("show qbft info", "sign", ttypes.CryptoName, "useAggSig", UseAggSig(), "genesisFile", genesisFile,
"privFile", privFile) "privFile", privFile)
ttypes.InitMessageMap() ttypes.InitMessageMap()
...@@ -570,7 +575,7 @@ func (client *Client) WaitBlock(height int64) bool { ...@@ -570,7 +575,7 @@ func (client *Client) WaitBlock(height int64) bool {
if err == nil && newHeight >= height { if err == nil && newHeight >= height {
return true return true
} }
time.Sleep(100 * time.Millisecond) time.Sleep(50 * time.Millisecond)
} }
} }
} }
...@@ -624,11 +629,13 @@ func (client *Client) LoadBlockCommit(height int64) *tmtypes.QbftCommit { ...@@ -624,11 +629,13 @@ func (client *Client) LoadBlockCommit(height int64) *tmtypes.QbftCommit {
qbftlog.Error("LoadBlockCommit GetBlockInfo fail", "err", err) qbftlog.Error("LoadBlockCommit GetBlockInfo fail", "err", err)
return nil return nil
} }
seq, voteType := blockInfo.State.LastSequence, blockInfo.Block.LastCommit.VoteType if height > 1 {
if (seq == 0 && voteType != uint32(ttypes.VoteTypePrecommit)) || seq, voteType := blockInfo.State.LastSequence, blockInfo.Block.LastCommit.VoteType
(seq > 0 && voteType != uint32(ttypes.VoteTypePrevote)) { if (seq == 0 && voteType != uint32(ttypes.VoteTypePrecommit)) ||
qbftlog.Error("LoadBlockCommit wrong VoteType", "seq", seq, "voteType", voteType) (seq > 0 && voteType != uint32(ttypes.VoteTypePrevote)) {
return nil qbftlog.Error("LoadBlockCommit wrong VoteType", "seq", seq, "voteType", voteType)
return nil
}
} }
return blockInfo.GetBlock().GetLastCommit() return blockInfo.GetBlock().GetLastCommit()
} }
...@@ -666,11 +673,17 @@ func (client *Client) Query_IsHealthy(req *types.ReqNil) (types.Message, error) ...@@ -666,11 +673,17 @@ func (client *Client) Query_IsHealthy(req *types.ReqNil) (types.Message, error)
// Query_CurrentState query current consensus state // Query_CurrentState query current consensus state
func (client *Client) Query_CurrentState(req *types.ReqNil) (types.Message, error) { func (client *Client) Query_CurrentState(req *types.ReqNil) (types.Message, error) {
if client.csState == nil {
return nil, ttypes.ErrConsensusState
}
return SaveState(client.csState.GetState()), nil return SaveState(client.csState.GetState()), nil
} }
// Query_NodeInfo query validator node info // Query_NodeInfo query validator node info
func (client *Client) Query_NodeInfo(req *types.ReqNil) (types.Message, error) { func (client *Client) Query_NodeInfo(req *types.ReqNil) (types.Message, error) {
if client.csState == nil {
return nil, ttypes.ErrConsensusState
}
vals := client.csState.GetRoundState().Validators.Validators vals := client.csState.GetRoundState().Validators.Validators
nodes := make([]*tmtypes.QbftNodeInfo, 0) nodes := make([]*tmtypes.QbftNodeInfo, 0)
for _, val := range vals { for _, val := range vals {
......
...@@ -65,6 +65,9 @@ type State struct { ...@@ -65,6 +65,9 @@ type State struct {
// Copy makes a copy of the QbftState for mutating. // Copy makes a copy of the QbftState for mutating.
func (s State) Copy() State { func (s State) Copy() State {
if &s == nil {
return State{}
}
return State{ return State{
ChainID: s.ChainID, ChainID: s.ChainID,
......
...@@ -28,6 +28,8 @@ var ( ...@@ -28,6 +28,8 @@ var (
ErrBaseExecErr = errors.New("ErrBaseExecErr") ErrBaseExecErr = errors.New("ErrBaseExecErr")
// ErrLastBlockID error type // ErrLastBlockID error type
ErrLastBlockID = errors.New("ErrLastBlockID") ErrLastBlockID = errors.New("ErrLastBlockID")
// ErrConsensusState error type
ErrConsensusState = errors.New("ErrConsensusState")
) )
var ( var (
......
...@@ -144,6 +144,9 @@ func (valSet *ValidatorSet) IncrementAccum(times int) { ...@@ -144,6 +144,9 @@ func (valSet *ValidatorSet) IncrementAccum(times int) {
// Copy ... // Copy ...
func (valSet *ValidatorSet) Copy() *ValidatorSet { func (valSet *ValidatorSet) Copy() *ValidatorSet {
if valSet == nil {
return nil
}
validators := make([]*Validator, len(valSet.Validators)) validators := make([]*Validator, len(valSet.Validators))
for i, val := range valSet.Validators { for i, val := range valSet.Validators {
// NOTE: must copy, since IncrementAccum updates in place. // NOTE: must copy, since IncrementAccum updates in place.
......
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
"github.com/33cn/chain33/common/crypto" "github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/log/log15" "github.com/33cn/chain33/common/log/log15"
rpctypes "github.com/33cn/chain33/rpc/types" rpctypes "github.com/33cn/chain33/rpc/types"
"github.com/33cn/chain33/system/crypto/none"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
ty "github.com/33cn/plugin/plugin/dapp/valnode/types" ty "github.com/33cn/plugin/plugin/dapp/valnode/types"
"google.golang.org/grpc" "google.golang.org/grpc"
...@@ -58,6 +59,12 @@ func main() { ...@@ -58,6 +59,12 @@ func main() {
return return
} }
Perf(argsWithoutProg[1], argsWithoutProg[2], argsWithoutProg[3], argsWithoutProg[4], argsWithoutProg[5]) Perf(argsWithoutProg[1], argsWithoutProg[2], argsWithoutProg[3], argsWithoutProg[4], argsWithoutProg[5])
case "perfV2":
if len(argsWithoutProg) != 4 {
fmt.Print(errors.New("参数错误").Error())
return
}
PerfV2(argsWithoutProg[1], argsWithoutProg[2], argsWithoutProg[3])
case "put": case "put":
if len(argsWithoutProg) != 3 { if len(argsWithoutProg) != 3 {
fmt.Print(errors.New("参数错误").Error()) fmt.Print(errors.New("参数错误").Error())
...@@ -88,9 +95,10 @@ func main() { ...@@ -88,9 +95,10 @@ func main() {
// LoadHelp ... // LoadHelp ...
func LoadHelp() { func LoadHelp() {
fmt.Println("Available Commands:") fmt.Println("Available Commands:")
fmt.Println("perf [host, size, num, interval, duration] : 写数据性能测试,interval单位为100毫秒,host形式为ip:port") fmt.Println("perf [host, size, num, interval, duration] : 写数据性能测试,interval单位为100毫秒,host形式为ip:port")
fmt.Println("put [ip, size] : 写数据") fmt.Println("perfV2 [host, size, duration] : 写数据性能测试,host形式为ip:port")
fmt.Println("get [ip, hash] : 读数据") fmt.Println("put [ip, size] : 写数据")
fmt.Println("get [ip, hash] : 读数据")
fmt.Println("valnode [ip, pubkey, power] : 增加/删除/修改tendermint节点") fmt.Println("valnode [ip, pubkey, power] : 增加/删除/修改tendermint节点")
fmt.Println("perfOld [ip, size, num, interval, duration] : 不推荐使用,写数据性能测试,interval单位为100毫秒") fmt.Println("perfOld [ip, size, num, interval, duration] : 不推荐使用,写数据性能测试,interval单位为100毫秒")
} }
...@@ -223,6 +231,119 @@ func Perf(host, txsize, num, sleepinterval, totalduration string) { ...@@ -223,6 +231,119 @@ func Perf(host, txsize, num, sleepinterval, totalduration string) {
log.Info("sendtx success tx", "success", success) log.Info("sendtx success tx", "success", success)
} }
// PerfV2
func PerfV2(host, txsize, duration string) {
durInt, _ := strconv.Atoi(duration)
sizeInt, _ := strconv.Atoi(txsize)
numCPU := runtime.NumCPU()
numThread := numCPU * 2
numSend := numCPU * 3
ch := make(chan struct{}, numThread)
chSend := make(chan struct{}, numSend)
numInt := 10000
batchNum := 200
txChan := make(chan *types.Transaction, numInt)
var blockHeight int64
total := int64(0)
success := int64(0)
start := time.Now()
go func() {
ch <- struct{}{}
conn := newGrpcConn(host)
defer conn.Close()
gcli := types.NewChain33Client(conn)
for {
height, err := getHeight(gcli)
if err != nil {
log.Error("getHeight", "err", err)
time.Sleep(time.Second)
} else {
atomic.StoreInt64(&blockHeight, height)
}
time.Sleep(time.Millisecond * 500)
}
}()
<-ch
for i := 0; i < numThread; i++ {
go func() {
ticker := time.NewTicker(time.Duration(durInt) * time.Second)
defer ticker.Stop()
_, priv := genaddress()
beg := time.Now()
OuterLoop:
for {
select {
case <-ticker.C:
log.Info("thread duration", "cost", time.Since(beg))
break OuterLoop
default:
//txHeight := atomic.LoadInt64(&blockHeight) + types.LowAllowPackHeight
for txs := 0; txs < batchNum; txs++ {
//构造存证交易
tx := &types.Transaction{Execer: []byte("user.write")}
tx.To = execAddr
tx.Fee = 1e6
tx.Nonce = time.Now().UnixNano()
//tx.Expire = types.TxHeightFlag + txHeight
tx.Expire = 0
tx.Payload = RandStringBytes(sizeInt)
//交易签名
//tx.Sign(types.SECP256K1, priv)
tx.Signature = &types.Signature{Ty: none.ID, Pubkey: priv.PubKey().Bytes()}
txChan <- tx
}
}
}
ch <- struct{}{}
}()
}
for i := 0; i < numSend; i++ {
go func() {
conn := newGrpcConn(host)
defer conn.Close()
gcli := types.NewChain33Client(conn)
txs := &types.Transactions{Txs: make([]*types.Transaction, 0, batchNum)}
for tx := range txChan {
txs.Txs = append(txs.Txs, tx)
if len(txs.Txs) == batchNum {
_, err := gcli.SendTransactions(context.Background(), txs)
atomic.AddInt64(&total, int64(batchNum))
txs.Txs = txs.Txs[:0]
if err != nil {
if strings.Contains(err.Error(), "ErrChannelClosed") {
return
}
log.Error("sendtx", "err", err.Error())
time.Sleep(time.Second)
continue
}
atomic.AddInt64(&success, int64(batchNum))
}
}
chSend <- struct{}{}
}()
}
for j := 0; j < numThread; j++ {
<-ch
}
close(txChan)
for k := 0; k < numSend; k++ {
<-chSend
}
log.Info("sendtx duration", "cost", time.Since(start))
//打印发送的交易总数
log.Info("sendtx total tx", "total", total)
//打印成功发送的交易总数
log.Info("sendtx success tx", "success", success)
}
var ( var (
log = log15.New() log = log15.New()
execAddr = address.ExecAddress("user.write") execAddr = address.ExecAddress("user.write")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment