Commit 0be50d74 authored by caopingcp's avatar caopingcp Committed by 33cn

update for tendermint performance

parent b375d231
...@@ -768,8 +768,12 @@ func (cs *ConsensusState) createProposalBlock() (block *ttypes.TendermintBlock) ...@@ -768,8 +768,12 @@ func (cs *ConsensusState) createProposalBlock() (block *ttypes.TendermintBlock)
// Mempool validated transactions // Mempool validated transactions
beg := time.Now() beg := time.Now()
pblock := cs.client.BuildBlock() pblock := cs.client.BuildBlock()
if pblock == nil {
tendermintlog.Error("createProposalBlock BuildBlock fail")
return nil
}
tendermintlog.Info(fmt.Sprintf("createProposalBlock BuildBlock. Current: %v/%v/%v", cs.Height, cs.Round, cs.Step), tendermintlog.Info(fmt.Sprintf("createProposalBlock BuildBlock. Current: %v/%v/%v", cs.Height, cs.Round, cs.Step),
"txs-len", len(pblock.Txs), "cost", types.Since(beg)) "height", pblock.Height, "txs-len", len(pblock.Txs), "cost", types.Since(beg))
if pblock.Height != cs.Height { if pblock.Height != cs.Height {
tendermintlog.Error("pblock.Height is not equal to cs.Height") tendermintlog.Error("pblock.Height is not equal to cs.Height")
......
...@@ -25,7 +25,6 @@ const ( ...@@ -25,7 +25,6 @@ const (
tryListenSeconds = 5 tryListenSeconds = 5
handshakeTimeout = 20 // * time.Second, handshakeTimeout = 20 // * time.Second,
maxSendQueueSize = 1024 maxSendQueueSize = 1024
minSendQueueSize = 10
defaultSendTimeout = 60 * time.Second defaultSendTimeout = 60 * time.Second
//MaxMsgPacketPayloadSize define //MaxMsgPacketPayloadSize define
MaxMsgPacketPayloadSize = 10 * 1024 * 1024 MaxMsgPacketPayloadSize = 10 * 1024 * 1024
...@@ -153,7 +152,7 @@ func NewNode(seeds []string, protocol string, lAddr string, privKey crypto.PrivK ...@@ -153,7 +152,7 @@ func NewNode(seeds []string, protocol string, lAddr string, privKey crypto.PrivK
dialing: NewMutexMap(), dialing: NewMutexMap(),
reconnecting: NewMutexMap(), reconnecting: NewMutexMap(),
broadcastChannel: make(chan MsgInfo, maxSendQueueSize), broadcastChannel: make(chan MsgInfo, maxSendQueueSize),
unicastChannel: make(chan MsgInfo, minSendQueueSize), unicastChannel: make(chan MsgInfo, maxSendQueueSize),
state: state, state: state,
localIPs: make(map[string]net.IP), localIPs: make(map[string]net.IP),
} }
......
...@@ -481,7 +481,7 @@ FOR_LOOP: ...@@ -481,7 +481,7 @@ FOR_LOOP:
pc.sendBuffer = append(pc.sendBuffer, bytes...) pc.sendBuffer = append(pc.sendBuffer, bytes...)
if len+5 > MaxMsgPacketPayloadSize { if len+5 > MaxMsgPacketPayloadSize {
pc.sendBuffer = append(pc.sendBuffer, bytes[MaxMsgPacketPayloadSize-5:]...) tendermintlog.Info("packet exceed max size", "len", len+5)
} }
_, err = pc.bufWriter.Write(pc.sendBuffer[:len+5]) _, err = pc.bufWriter.Write(pc.sendBuffer[:len+5])
if err != nil { if err != nil {
...@@ -688,29 +688,23 @@ OUTER_LOOP: ...@@ -688,29 +688,23 @@ OUTER_LOOP:
// If the peer is on a previous height, help catch up. // If the peer is on a previous height, help catch up.
if (0 < prs.Height) && (prs.Height < rs.Height) { if (0 < prs.Height) && (prs.Height < rs.Height) {
if prs.ProposalBlockHash == nil || prs.ProposalBlock { time.Sleep(2 * pc.myState.PeerGossipSleep())
time.Sleep(pc.myState.PeerGossipSleep()) if prs.Height >= rs.Height {
continue OUTER_LOOP continue OUTER_LOOP
} }
tendermintlog.Info("help catch up", "peerip", pc.ip.String(), "selfHeight", rs.Height, "peerHeight", prs.Height)
proposalBlock := pc.myState.client.LoadProposalBlock(prs.Height) proposalBlock := pc.myState.client.LoadProposalBlock(prs.Height)
newBlock := &ttypes.TendermintBlock{TendermintBlock: proposalBlock}
if proposalBlock == nil { if proposalBlock == nil {
tendermintlog.Error("Fail to load propsal block", "selfHeight", rs.Height, tendermintlog.Error("load proposal block fail", "selfHeight", rs.Height,
"blockstoreHeight", pc.myState.client.GetCurrentHeight()) "blockHeight", pc.myState.client.GetCurrentHeight())
time.Sleep(pc.myState.PeerGossipSleep())
continue OUTER_LOOP
} else if !bytes.Equal(newBlock.Hash(), prs.ProposalBlockHash) {
tendermintlog.Error("Peer ProposalBlockHash mismatch", "ProposalBlockHash", fmt.Sprintf("%X", prs.ProposalBlockHash),
"newBlockHash", fmt.Sprintf("%X", newBlock.Hash()))
time.Sleep(pc.myState.PeerGossipSleep())
continue OUTER_LOOP continue OUTER_LOOP
} }
newBlock := &ttypes.TendermintBlock{TendermintBlock: proposalBlock}
msg := MsgInfo{TypeID: ttypes.ProposalBlockID, Msg: proposalBlock, PeerID: pc.id, PeerIP: pc.ip.String()} msg := MsgInfo{TypeID: ttypes.ProposalBlockID, Msg: proposalBlock, PeerID: pc.id, PeerIP: pc.ip.String()}
tendermintlog.Info("Sending block for catchup", "peerip", pc.ip.String(), "block(H/R)", tendermintlog.Info("Sending block for catchup", "peerip", pc.ip.String(),
fmt.Sprintf("%v/%v", proposalBlock.Header.Height, proposalBlock.Header.Round)) "selfHeight", rs.Height, "peerHeight", prs.Height, "block(H/R/hash)",
if pc.Send(msg) { fmt.Sprintf("%v/%v/%X", proposalBlock.Header.Height, proposalBlock.Header.Round, newBlock.Hash()))
prs.SetHasProposalBlock(newBlock) if !pc.Send(msg) {
tendermintlog.Error("send catchup block fail")
} }
continue OUTER_LOOP continue OUTER_LOOP
} }
......
...@@ -473,9 +473,29 @@ func (client *Client) StopC() <-chan struct{} { ...@@ -473,9 +473,29 @@ func (client *Client) StopC() <-chan struct{} {
return client.stopC return client.stopC
} }
// GetMempoolSize get tx num in mempool
func (client *Client) GetMempoolSize() int64 {
msg := client.GetQueueClient().NewMessage("mempool", types.EventGetMempoolSize, nil)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
tendermintlog.Error("GetMempoolSize send", "err", err)
return 0
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
tendermintlog.Error("GetMempoolSize result", "err", err)
return 0
}
return resp.GetData().(*types.MempoolSize).GetSize()
}
// CheckTxsAvailable check whether some new transactions arriving // CheckTxsAvailable check whether some new transactions arriving
func (client *Client) CheckTxsAvailable(height int64) bool { func (client *Client) CheckTxsAvailable(height int64) bool {
txs := client.RequestTx(10, nil) num := client.GetMempoolSize()
if num == 0 {
return false
}
txs := client.RequestTx(int(num), nil)
txs = client.CheckTxDup(txs, height) txs = client.CheckTxDup(txs, height)
return len(txs) != 0 return len(txs) != 0
} }
...@@ -493,7 +513,11 @@ func (client *Client) CheckTxDup(txs []*types.Transaction, height int64) (transa ...@@ -493,7 +513,11 @@ func (client *Client) CheckTxDup(txs []*types.Transaction, height int64) (transa
// BuildBlock build a new block // BuildBlock build a new block
func (client *Client) BuildBlock() *types.Block { func (client *Client) BuildBlock() *types.Block {
lastBlock := client.GetCurrentBlock() lastBlock, err := client.RequestLastBlock()
if err != nil {
tendermintlog.Error("BuildBlock fail", "err", err)
return nil
}
cfg := client.GetAPI().GetConfig() cfg := client.GetAPI().GetConfig()
txs := client.RequestTx(int(cfg.GetP(lastBlock.Height+1).MaxTxNumber)-1, nil) txs := client.RequestTx(int(cfg.GetP(lastBlock.Height+1).MaxTxNumber)-1, nil)
// placeholder // placeholder
......
...@@ -74,7 +74,7 @@ func main() { ...@@ -74,7 +74,7 @@ func main() {
// LoadHelp ... // LoadHelp ...
func LoadHelp() { func LoadHelp() {
fmt.Println("Available Commands:") fmt.Println("Available Commands:")
fmt.Println("perf [ip, size, num, interval, duration] {offset} : 写数据性能测试") fmt.Println("perf [ip, size, num, interval, duration] : 写数据性能测试")
fmt.Println("put [ip, size] : 写数据") fmt.Println("put [ip, size] : 写数据")
fmt.Println("get [ip, hash] : 读数据") fmt.Println("get [ip, hash] : 读数据")
fmt.Println("valnode [ip, pubkey, power] : 增加/删除/修改tendermint节点") fmt.Println("valnode [ip, pubkey, power] : 增加/删除/修改tendermint节点")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment