Commit 1310ccf2 authored by caopingcp's avatar caopingcp Committed by vipwzw

tendermint check new tx

parent 7d55e0d5
......@@ -442,11 +442,13 @@ func (client *Client) CreateBlock() {
continue
}
if !client.CheckTxsAvailable(height) {
time.Sleep(500 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
continue
}
client.txsAvailable <- height + 1
if height+1 == client.csState.GetRoundState().Height {
client.txsAvailable <- height + 1
}
time.Sleep(time.Duration(timeoutTxAvail) * time.Millisecond)
}
}
......@@ -469,29 +471,9 @@ func (client *Client) StopC() <-chan struct{} {
return client.stopC
}
// GetMempoolSize get tx num in mempool
func (client *Client) GetMempoolSize() int64 {
msg := client.GetQueueClient().NewMessage("mempool", types.EventGetMempoolSize, nil)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
tendermintlog.Error("GetMempoolSize send", "err", err)
return 0
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
tendermintlog.Error("GetMempoolSize result", "err", err)
return 0
}
return resp.GetData().(*types.MempoolSize).GetSize()
}
// CheckTxsAvailable check whether some new transactions arriving
func (client *Client) CheckTxsAvailable(height int64) bool {
num := client.GetMempoolSize()
if num == 0 {
return false
}
txs := client.RequestTx(int(num), nil)
txs := client.RequestTx(1, nil)
txs = client.CheckTxDup(txs, height)
return len(txs) != 0
}
......
......@@ -6,6 +6,7 @@ package main
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
......@@ -14,19 +15,26 @@ import (
"math/rand"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/log/log15"
rpctypes "github.com/33cn/chain33/rpc/types"
"github.com/33cn/chain33/types"
ty "github.com/33cn/plugin/plugin/dapp/valnode/types"
"google.golang.org/grpc"
_ "google.golang.org/grpc/encoding/gzip"
)
const fee = 1e6
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ123456789-=_+=/<>!@#$%^&"
var r *rand.Rand
......@@ -68,20 +76,186 @@ func main() {
return
}
ValNode(argsWithoutProg[1], argsWithoutProg[2], argsWithoutProg[3])
case "perfOld":
if len(argsWithoutProg) != 6 {
fmt.Print(errors.New("参数错误").Error())
return
}
PerfOld(argsWithoutProg[1], argsWithoutProg[2], argsWithoutProg[3], argsWithoutProg[4], argsWithoutProg[5])
}
}
// LoadHelp ...
func LoadHelp() {
fmt.Println("Available Commands:")
fmt.Println("perf [ip, size, num, interval, duration] : 写数据性能测试")
fmt.Println("perf [host, size, num, interval, duration] : 写数据性能测试,interval单位为100毫秒,host形式为ip:port")
fmt.Println("put [ip, size] : 写数据")
fmt.Println("get [ip, hash] : 读数据")
fmt.Println("valnode [ip, pubkey, power] : 增加/删除/修改tendermint节点")
fmt.Println("perfOld [ip, size, num, interval, duration] : 不推荐使用,写数据性能测试,interval单位为100毫秒")
}
// Perf 性能测试
func Perf(host, txsize, num, sleepinterval, totalduration string) {
var numThread int
numInt, err := strconv.Atoi(num)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
sleep, err := strconv.Atoi(sleepinterval)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
durInt, err := strconv.Atoi(totalduration)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
sizeInt, _ := strconv.Atoi(txsize)
if numInt < 10 {
numThread = 1
} else if numInt > 100 {
numThread = 10
} else {
numThread = numInt / 10
}
numThread = runtime.NumCPU()
ch := make(chan struct{}, numThread)
chSend := make(chan struct{}, numThread*2)
txChan := make(chan *types.Transaction, numInt)
//payload := RandStringBytes(sizeInt)
var blockHeight int64
total := int64(0)
success := int64(0)
go func() {
ch <- struct{}{}
conn := newGrpcConn(host)
defer conn.Close()
gcli := types.NewChain33Client(conn)
for {
height, err := getHeight(gcli)
if err != nil {
//conn.Close()
log.Error("getHeight", "err", err)
//conn = newGrpcConn(ip)
//gcli = types.NewChain33Client(conn)
time.Sleep(time.Second)
} else {
atomic.StoreInt64(&blockHeight, height)
}
time.Sleep(time.Millisecond * 500)
}
}()
<-ch
for i := 0; i < numThread; i++ {
go func() {
_, priv := genaddress()
for sec := 0; durInt == 0 || sec < durInt; sec++ {
height := atomic.LoadInt64(&blockHeight)
for txs := 0; txs < numInt/numThread; txs++ {
//构造存证交易
tx := txPool.Get().(*types.Transaction)
tx.To = execAddr
tx.Fee = rand.Int63()
tx.Nonce = time.Now().UnixNano()
tx.Expire = height + types.TxHeightFlag + types.LowAllowPackHeight
tx.Payload = RandStringBytes(sizeInt)
//交易签名
tx.Sign(types.SECP256K1, priv)
txChan <- tx
}
if sleep > 0 {
time.Sleep(100 * time.Millisecond * time.Duration(sleep))
}
}
ch <- struct{}{}
}()
}
for i := 0; i < numThread*2; i++ {
go func() {
conn := newGrpcConn(host)
defer conn.Close()
gcli := types.NewChain33Client(conn)
for tx := range txChan {
//发送交易
_, err := gcli.SendTransaction(context.Background(), tx, grpc.UseCompressor("gzip"))
txPool.Put(tx)
atomic.AddInt64(&total, 1)
if err != nil {
if strings.Contains(err.Error(), "ErrTxExpire") {
continue
}
if strings.Contains(err.Error(), "ErrMemFull") {
time.Sleep(time.Second)
continue
}
log.Error("sendtx", "err", err)
time.Sleep(time.Second)
//conn.Close()
//conn = newGrpcConn(ip)
//gcli = types.NewChain33Client(conn)
} else {
atomic.AddInt64(&success, 1)
}
}
chSend <- struct{}{}
}()
}
for j := 0; j < numThread; j++ {
<-ch
}
close(txChan)
for k := 0; k < numThread*2; k++ {
<-chSend
}
//打印发送的交易总数
log.Info("sendtx total tx", "total", total)
//打印成功发送的交易总数
log.Info("sendtx success tx", "success", success)
}
var (
log = log15.New()
execAddr = address.ExecAddress("user.write")
)
func getHeight(gcli types.Chain33Client) (int64, error) {
header, err := gcli.GetLastHeader(context.Background(), &types.ReqNil{})
if err != nil {
log.Error("getHeight", "err", err)
return 0, err
}
return header.Height, nil
}
var txPool = sync.Pool{
New: func() interface{} {
tx := &types.Transaction{Execer: []byte("user.write")}
return tx
},
}
func newGrpcConn(host string) *grpc.ClientConn {
conn, err := grpc.Dial(host, grpc.WithInsecure())
for err != nil {
log.Error("grpc dial", "err", err)
time.Sleep(time.Millisecond * 100)
conn, err = grpc.Dial(host, grpc.WithInsecure())
}
return conn
}
// Perf ...
func Perf(ip, size, num, interval, duration string) {
// PerfOld ...
func PerfOld(ip, size, num, interval, duration string) {
var numThread int
numInt, err := strconv.Atoi(num)
if err != nil {
......@@ -121,7 +295,7 @@ func Perf(ip, size, num, interval, duration string) {
Put(ip, size, common.ToHex(priv.Bytes()))
txCount++
}
time.Sleep(time.Second * time.Duration(intervalInt))
time.Sleep(100 * time.Millisecond * time.Duration(intervalInt))
sec += intervalInt
}
ch <- struct{}{}
......@@ -145,9 +319,9 @@ func Put(ip string, size string, privkey string) {
privkey = common.ToHex(priv.Bytes())
}
payload := RandStringBytes(sizeInt)
//fmt.Println("payload:", common.ToHex([]byte(payload)))
//fmt.Println("payload:", common.ToHex(payload))
tx := &types.Transaction{Execer: []byte("user.write"), Payload: []byte(payload), Fee: 1e6}
tx := &types.Transaction{Execer: []byte("user.write"), Payload: payload, Fee: 1e6}
tx.To = address.ExecAddress("user.write")
tx.Expire = TxHeightOffset + types.TxHeightFlag
tx.Sign(types.SECP256K1, getprivkey(privkey))
......@@ -253,13 +427,13 @@ func genaddress() (string, crypto.PrivKey) {
}
// RandStringBytes ...
func RandStringBytes(n int) string {
func RandStringBytes(n int) []byte {
b := make([]byte, n)
rand.Seed(time.Now().UnixNano())
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
return b
}
// ValNode ...
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment