Commit c34694de authored by caopingcp's avatar caopingcp Committed by vipwzw

qbft add DetachExecution option

parent e44ba57b
......@@ -834,7 +834,7 @@ func (cs *ConsensusState) createProposalBlock() (block *ttypes.QbftBlock) {
// Mempool validated transactions
beg := time.Now()
pblock := cs.client.BuildBlock()
pblock := cs.client.BuildBlock(cs.Height - 1)
if pblock == nil {
qbftlog.Error("createProposalBlock BuildBlock fail")
return nil
......@@ -1252,7 +1252,9 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
// commit block
commitBlock := cs.ProposalBlock.Data
cs.client.CommitBlock(commitBlock)
if !DetachExec() {
cs.client.CommitBlock(commitBlock.Clone())
}
if bytes.Equal(cs.privValidator.GetAddress(), block.QbftBlock.Header.ProposerAddr) {
qbftlog.Info(fmt.Sprintf("Proposer reach consensus. Current: %v/%v/%v", cs.Height, cs.Round, cs.Step),
"CommitRound", cs.CommitRound, "tx-len", len(commitBlock.Txs), "cost", types.Since(cs.begCons),
......@@ -1264,10 +1266,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
"proposer-addr", fmt.Sprintf("%X", ttypes.Fingerprint(block.QbftBlock.Header.ProposerAddr)),
"seq", block.Header.Sequence)
}
reqblock, err := cs.client.RequestBlock(height)
if err != nil {
panic(fmt.Sprintf("finalizeCommit RequestBlock fail: %v", err))
}
reqblock := cs.client.WaitBlock(height)
stateCopy.LastResultsHash = reqblock.Hash(cs.client.GetAPI().GetConfig())
//check whether need update validator nodes
......@@ -1428,6 +1427,11 @@ func (cs *ConsensusState) addProposalBlock(proposalBlock *tmtypes.QbftBlock) (er
qbftlog.Info(fmt.Sprintf("Consensus set proposal block. Current: %v/%v/%v", cs.Height, cs.Round, cs.Step),
"ProposalBlockHash", fmt.Sprintf("%X", cs.ProposalBlockHash), "cost", types.Since(cs.begCons))
if DetachExec() {
qbftlog.Info("write proposal block in advance")
go cs.client.CommitBlock(cs.ProposalBlock.Data.Clone())
}
// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
......
......@@ -215,7 +215,7 @@ func validateBlock(stateDB *CSStateDB, s State, b *ttypes.QbftBlock) error {
// validate prev block info
if !bytes.Equal(b.Header.LastBlockID.Hash, s.LastBlockID.Hash) {
return fmt.Errorf("Wrong Block.Header.LastBlockID. Expected %v, got %v", s.LastBlockID, b.Header.LastBlockID)
return fmt.Errorf("Wrong Block.Header.LastBlockID. Expected %X, got %X", s.LastBlockID.Hash, b.Header.LastBlockID.Hash)
}
newTxs := b.Header.NumTxs
......@@ -225,16 +225,16 @@ func validateBlock(stateDB *CSStateDB, s State, b *ttypes.QbftBlock) error {
// validate app info
if !bytes.Equal(b.Header.AppHash, s.AppHash) {
return fmt.Errorf("Wrong Block.Header.AppHash. Expected %X, got %v", s.AppHash, b.Header.AppHash)
return fmt.Errorf("Wrong Block.Header.AppHash. Expected %X, got %X", s.AppHash, b.Header.AppHash)
}
if !bytes.Equal(b.Header.ConsensusHash, s.ConsensusParams.Hash()) {
return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", s.ConsensusParams.Hash(), b.Header.ConsensusHash)
return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %X", s.ConsensusParams.Hash(), b.Header.ConsensusHash)
}
if !bytes.Equal(b.Header.LastResultsHash, s.LastResultsHash) {
return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", s.LastResultsHash, b.Header.LastResultsHash)
return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %X", s.LastResultsHash, b.Header.LastResultsHash)
}
if !bytes.Equal(b.Header.ValidatorsHash, s.Validators.Hash()) {
return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", s.Validators.Hash(), b.Header.ValidatorsHash)
return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %X", s.Validators.Hash(), b.Header.ValidatorsHash)
}
// Validate block LastCommit.
......
......@@ -482,37 +482,47 @@ func (pc *peerConn) stopForError(r interface{}) {
}
}
// 数据压缩后发送, 内部对相关数组进行重复利用
func encodeMsg(msg types.Message, pbuf *[]byte, typeID byte) []byte {
buf := *pbuf
buf = buf[:cap(buf)]
// 数据压缩
func encodeMsg(msg types.Message, typeID byte) []byte {
raw := types.Encode(msg)
buf = snappy.Encode(buf, raw)
cmp := byte(0)
if len(raw) > MaxMsgPacketPayloadSize {
qbftlog.Info("packet exceed max size", "old", len(raw), "new", len(buf))
buf := make([]byte, 0)
buf = snappy.Encode(buf, raw)
cmp = byte(1)
qbftlog.Info("compress large message", "old", len(raw), "new", len(buf))
raw = buf
}
ebuf := make([]byte, len(raw)+6)
ebuf[0] = typeID
ebuf[1] = cmp
bytelen := make([]byte, 4)
binary.BigEndian.PutUint32(bytelen, uint32(len(raw)))
copy(ebuf[2:6], bytelen)
copy(ebuf[6:], raw)
return ebuf
}
// 数据解压
func decodeMsg(msg []byte, cmp byte) ([]byte, error) {
if cmp == byte(0) {
return msg, nil
}
*pbuf = buf
// 复用raw数组作为压缩数据返回, 需要比较容量是否够大
if cap(raw) >= len(buf)+5 {
raw = raw[:len(buf)+5]
} else {
raw = make([]byte, len(buf)+5)
buf := make([]byte, 0)
buf, err := snappy.Decode(buf, msg)
if err != nil {
return nil, err
}
raw[0] = typeID
bytelen := make([]byte, 4)
binary.BigEndian.PutUint32(bytelen, uint32(len(buf)))
copy(raw[1:5], bytelen)
copy(raw[5:], buf)
return raw
qbftlog.Info("uncompress large message", "old", len(msg), "new", len(buf))
return buf, nil
}
func (pc *peerConn) sendRoutine() {
buf := make([]byte, 0)
FOR_LOOP:
for {
select {
case msg := <-pc.sendQueue:
raw := encodeMsg(msg.Msg, &buf, msg.TypeID)
raw := encodeMsg(msg.Msg, msg.TypeID)
_, err := pc.bufWriter.Write(raw)
if err != nil {
qbftlog.Error("peerConn sendroutine write data failed", "error", err)
......@@ -533,40 +543,41 @@ FOR_LOOP:
func (pc *peerConn) recvRoutine() {
FOR_LOOP:
for {
//typeID+msgLen+msg
var buf [5]byte
//typeID+cmp+msgLen+msg
var buf [6]byte
_, err := io.ReadFull(pc.bufReader, buf[:])
if err != nil {
qbftlog.Error("Connection failed @ recvRoutine (reading byte)", "conn", pc, "err", err)
qbftlog.Error("recvRoutine read byte fail", "conn", pc, "err", err)
pc.stopForError(err)
break FOR_LOOP
}
pkt := msgPacket{}
pkt.TypeID = buf[0]
len := binary.BigEndian.Uint32(buf[1:])
if len > 0 {
buf2 := make([]byte, len)
_, err = io.ReadFull(pc.bufReader, buf2)
if err != nil {
qbftlog.Error("recvRoutine read data fail", "conn", pc, "err", err)
pc.stopForError(err)
}
buf3 := make([]byte, len)
buf3, err = snappy.Decode(buf3, buf2)
if err != nil {
qbftlog.Error("recvRoutine snappy decode fail", "conn", pc, "err", err)
pc.stopForError(err)
}
pkt.Bytes = buf3
cmp := buf[1]
msgLen := binary.BigEndian.Uint32(buf[2:6])
if msgLen <= 0 {
qbftlog.Error("recvRoutine read invalid data", "msgLen", msgLen, "cmp", cmp, "peerIP", pc.ip.String())
continue
}
buf2 := make([]byte, msgLen)
_, err = io.ReadFull(pc.bufReader, buf2)
if err != nil {
qbftlog.Error("recvRoutine read data fail", "err", err, "peerIP", pc.ip.String())
continue
}
buf3, err := decodeMsg(buf2, cmp)
if err != nil {
qbftlog.Error("recvRoutine decode msg fail", "err", err, "peerIP", pc.ip.String())
continue
}
pkt.Bytes = buf3
if v, ok := ttypes.MsgMap[pkt.TypeID]; ok {
realMsg := reflect.New(v).Interface()
err := proto.Unmarshal(pkt.Bytes, realMsg.(proto.Message))
if err != nil {
qbftlog.Error("peerConn recvRoutine Unmarshal data failed", "err", err)
qbftlog.Error("recvRoutine Unmarshal data fail", "msgTy", pkt.TypeID, "msgLen", len(pkt.Bytes), "err", err, "peerIP", pc.ip.String())
continue
}
if pc.transferChannel != nil && (pkt.TypeID == ttypes.ProposalID || pkt.TypeID == ttypes.VoteID ||
......@@ -595,12 +606,8 @@ FOR_LOOP:
pc.updateStateQueue <- MsgInfo{pkt.TypeID, realMsg.(proto.Message), pc.ID(), pc.ip.String()}
}
} else {
err := fmt.Errorf("Unknown message type %v", pkt.TypeID)
qbftlog.Error("Connection failed @ recvRoutine", "conn", pc, "err", err)
pc.stopForError(err)
break FOR_LOOP
qbftlog.Error("receive unknown message type", "type", pkt.TypeID, "peerIP", pc.ip.String())
}
}
pc.quitUpdate <- struct{}{}
pc.quitBeat <- struct{}{}
......
......@@ -59,6 +59,7 @@ var (
useAggSig atomic.Value // false
multiBlocks atomic.Value // 1
gossipVotes atomic.Value
detachExec atomic.Value // false
zeroHash [32]byte
random *rand.Rand
......@@ -112,6 +113,7 @@ type subConfig struct {
UseAggregateSignature bool `json:"useAggregateSignature"`
MultiBlocks int64 `json:"multiBlocks"`
MessageInterval int32 `json:"messageInterval"`
DetachExecution bool `json:"detachExecution"`
}
func applyConfig(cfg *types.Consensus, sub []byte) {
......@@ -166,6 +168,7 @@ func applyConfig(cfg *types.Consensus, sub []byte) {
if subcfg.MessageInterval > 0 {
peerGossipSleepDuration.Store(subcfg.MessageInterval)
}
detachExec.Store(subcfg.DetachExecution)
gossipVotes.Store(true)
}
......@@ -175,6 +178,11 @@ func UseAggSig() bool {
return useAggSig.Load().(bool)
}
// DetachExec returns whether detach Execution from Consensus
func DetachExec() bool {
return detachExec.Load().(bool)
}
// DefaultDBProvider returns a database
func DefaultDBProvider(name string) dbm.DB {
return dbm.NewDB(name, "leveldb", dbPath, 0)
......@@ -230,7 +238,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
}
qbftlog.Info("show qbft info", "version", qbftVersion, "sign", ttypes.CryptoName, "useAggSig", UseAggSig(),
"genesisFile", genesisFile, "privFile", privFile)
"detachExec", DetachExec(), "genesisFile", genesisFile, "privFile", privFile)
ttypes.InitMessageMap()
......@@ -526,12 +534,8 @@ func (client *Client) CheckTxDup(txs []*types.Transaction, height int64) (transa
}
// BuildBlock build a new block
func (client *Client) BuildBlock() *types.Block {
lastBlock, err := client.RequestLastBlock()
if err != nil {
qbftlog.Error("BuildBlock fail", "err", err)
return nil
}
func (client *Client) BuildBlock(height int64) *types.Block {
lastBlock := client.WaitBlock(height)
cfg := client.GetAPI().GetConfig()
txs := client.RequestTx(int(cfg.GetP(lastBlock.Height+1).MaxTxNumber)-1, nil)
// placeholder
......@@ -561,7 +565,7 @@ func (client *Client) CommitBlock(block *types.Block) {
}
// WaitBlock by height
func (client *Client) WaitBlock(height int64) bool {
func (client *Client) WaitBlock(height int64) *types.Block {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
......@@ -570,13 +574,16 @@ func (client *Client) WaitBlock(height int64) bool {
select {
case <-client.ctx.Done():
qbftlog.Info("WaitBlock quit")
return false
return nil
case <-ticker.C:
qbftlog.Info("Still waiting block......", "height", height, "cost", time.Since(beg))
default:
newHeight, err := client.getLastHeight()
if err == nil && newHeight >= height {
return true
block, err := client.RequestBlock(height)
if err == nil {
return block
}
}
time.Sleep(50 * time.Millisecond)
}
......
......@@ -60,11 +60,11 @@ func main() {
}
Perf(argsWithoutProg[1], argsWithoutProg[2], argsWithoutProg[3], argsWithoutProg[4], argsWithoutProg[5])
case "perfV2":
if len(argsWithoutProg) != 4 {
if len(argsWithoutProg) != 5 {
fmt.Print(errors.New("参数错误").Error())
return
}
PerfV2(argsWithoutProg[1], argsWithoutProg[2], argsWithoutProg[3])
PerfV2(argsWithoutProg[1], argsWithoutProg[2], argsWithoutProg[3], argsWithoutProg[4])
case "put":
if len(argsWithoutProg) != 3 {
fmt.Print(errors.New("参数错误").Error())
......@@ -96,7 +96,7 @@ func main() {
func LoadHelp() {
fmt.Println("Available Commands:")
fmt.Println("perf [host, size, num, interval, duration] : 写数据性能测试,interval单位为100毫秒,host形式为ip:port")
fmt.Println("perfV2 [host, size, duration] : 写数据性能测试,host形式为ip:port")
fmt.Println("perfV2 [host, size, interval, duration] : 写数据性能测试,interval单位为秒,host形式为ip:port")
fmt.Println("put [ip, size] : 写数据")
fmt.Println("get [ip, hash] : 读数据")
fmt.Println("valnode [ip, pubkey, power] : 增加/删除/修改tendermint节点")
......@@ -232,16 +232,17 @@ func Perf(host, txsize, num, sleepinterval, totalduration string) {
}
// PerfV2
func PerfV2(host, txsize, duration string) {
func PerfV2(host, txsize, sleepinterval, duration string) {
durInt, _ := strconv.Atoi(duration)
sizeInt, _ := strconv.Atoi(txsize)
sleep, _ := strconv.Atoi(sleepinterval)
numCPU := runtime.NumCPU()
numThread := numCPU * 2
numSend := numCPU * 3
numThread := numCPU
numSend := numCPU * 2
ch := make(chan struct{}, numThread)
chSend := make(chan struct{}, numSend)
numInt := 10000
batchNum := 200
batchNum := 100
txChan := make(chan *types.Transaction, numInt)
var blockHeight int64
total := int64(0)
......@@ -307,24 +308,42 @@ func PerfV2(host, txsize, duration string) {
defer conn.Close()
gcli := types.NewChain33Client(conn)
txs := &types.Transactions{Txs: make([]*types.Transaction, 0, batchNum)}
retryTxs := make([]*types.Transaction, 0, batchNum*2)
for tx := range txChan {
txs.Txs = append(txs.Txs, tx)
if len(txs.Txs) == batchNum {
_, err := gcli.SendTransactions(context.Background(), txs)
atomic.AddInt64(&total, int64(batchNum))
txs.Txs = txs.Txs[:0]
if len(retryTxs) > 0 {
txs.Txs = append(txs.Txs, retryTxs...)
retryTxs = retryTxs[:0]
}
if len(txs.Txs) >= batchNum {
reps, err := gcli.SendTransactions(context.Background(), txs)
if err != nil {
if strings.Contains(err.Error(), "ErrChannelClosed") {
log.Error("sendtxs", "err", err)
return
}
atomic.AddInt64(&total, int64(len(txs.Txs)))
// retry failed txs
for index, reply := range reps.GetReplyList() {
if reply.IsOk {
continue
}
if string(reply.GetMsg()) == types.ErrChannelClosed.Error() {
return
}
log.Error("sendtx", "err", err.Error())
time.Sleep(time.Second)
continue
if string(reply.GetMsg()) == types.ErrMemFull.Error() ||
string(reply.GetMsg()) == types.ErrManyTx.Error() {
retryTxs = append(retryTxs, txs.Txs[index])
}
}
atomic.AddInt64(&success, int64(batchNum))
atomic.AddInt64(&success, int64(len(txs.Txs)-len(retryTxs)))
if len(retryTxs) > 0 {
time.Sleep(time.Second * time.Duration(sleep))
}
txs.Txs = txs.Txs[:0]
}
}
chSend <- struct{}{}
}()
......
......@@ -111,7 +111,7 @@ func (val *QbftNode) Query_GetPerfStat(in *pty.ReqQbftPerfStat) (types.Message,
startHeader := startInfo.Block.Header
endHeader := endInfo.Block.Header
totalTx := endHeader.TotalTxs - startHeader.TotalTxs
totalTx := endHeader.TotalTxs - startHeader.TotalTxs + startHeader.NumTxs
totalBlock := endHeader.Height - startHeader.Height + 1
totalSecond := endHeader.Time - startHeader.Time + 1
return &pty.QbftPerfStat{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment