Commit c67c3566 authored by vipwzw's avatar vipwzw Committed by 33cn

update chain33 06/05

parent 350b2a9a
......@@ -58,6 +58,6 @@ matrix:
- sudo mv docker-compose /usr/local/bin
script:
- make build_ci
- make autotest_ci
- make autotest dapp=all
- make docker-compose && make docker-compose-down
......@@ -49,14 +49,6 @@ function config_chain33() {
else
sed -i $sedfix '/^Title/a TestNet=true' ${chain33Config}
fi
#update fee
sed -i $sedfix 's/Fee=.*/Fee=100000/' ${chain33Config}
#update block time
#update wallet store driver
sed -i $sedfix '/^\[wallet\]/,/^\[wallet./ s/^driver.*/driver="leveldb"/' ${chain33Config}
}
function config_autotest() {
......
......@@ -40,12 +40,10 @@ function config_chain33() {
fi
#update fee
sed -i $sedfix 's/Fee=.*/Fee=100000/' ${chain33Config}
#update block time
# sed -i $sedfix 's/Fee=.*/Fee=100000/' ${chain33Config}
#update wallet store driver
sed -i $sedfix '/^\[wallet\]/,/^\[wallet./ s/^driver.*/driver="leveldb"/' ${chain33Config}
# sed -i $sedfix '/^\[wallet\]/,/^\[wallet./ s/^driver.*/driver="leveldb"/' ${chain33Config}
}
autotestConfig="autotest.toml"
......
......@@ -138,13 +138,17 @@ chain33_GetHeaders() {
chain33_GetLastMemPool() {
req='"method":"Chain33.GetLastMemPool", "params":[{}]'
echo "#request: $req"
resp=$(curl -ksd "{$req}" "$1")
# echo "#response: $resp"
ok=$(jq '(.error|not) and (.result.txs|length >= 0)' <<<"$resp")
[ "$ok" == true ]
echo_rst "$FUNCNAME" "$?"
if [ "$IS_PARA" == true ]; then
echo_rst "$FUNCNAME" 2
else
req='"method":"Chain33.GetLastMemPool", "params":[{}]'
echo "#request: $req"
resp=$(curl -ksd "{$req}" "$1")
#echo "#response: $resp"
ok=$(jq '(.error|not) and (.result.txs|length >= 0)' <<<"$resp")
[ "$ok" == true ]
echo_rst "$FUNCNAME" "$?"
fi
}
chain33_GetProperFee() {
......@@ -394,11 +398,16 @@ chain33_GetTxByHashes() {
}
chain33_GetMempool() {
resp=$(curl -ksd '{"jsonrpc":"2.0","id":2,"method":"Chain33.GetMempool","params":[{}]}' -H 'content-type:text/plain;' ${MAIN_HTTP})
ok=$(jq '(.error|not) and (.result.txs|length >= 0)' <<<"$resp")
[ "$ok" == true ]
rst=$?
echo_rst "$FUNCNAME" "$rst"
if [ "$IS_PARA" == true ]; then
echo_rst "$FUNCNAME" 2
else
resp=$(curl -ksd '{"jsonrpc":"2.0","id":2,"method":"Chain33.GetMempool","params":[{}]}' -H 'content-type:text/plain;' ${MAIN_HTTP})
ok=$(jq '(.error|not) and (.result.txs|length >= 0)' <<<"$resp")
[ "$ok" == true ]
rst=$?
echo_rst "$FUNCNAME" "$rst"
fi
}
chain33_GetAccountsV2() {
......
......@@ -54,6 +54,8 @@ dbPath="datadir/addrbook"
dbCache=4
# GRPC请求日志文件
grpcLogFile="grpc33.log"
#waitPid 等待seed导入
waitPid=false
[rpc]
# jrpc绑定地址
......
......@@ -53,6 +53,8 @@ grpcLogFile="grpc33.log"
version=216
verMix=216
verMax=217
#waitPid 等待seed导入
waitPid=false
[rpc]
jrpcBindAddr="localhost:8801"
......@@ -69,6 +71,7 @@ name="timeline"
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=100
isLevelFee=false
[mempool.sub.timeline]
poolCacheSize=10240
......@@ -163,6 +166,7 @@ minerwhitelist=["*"]
[exec]
isFree=false
minExecFee=100000
maxExecFee=1000000000
enableStat=false
enableMVCC=false
......
Title="chain33"
TestNet=true
FixTime=false
# 配置主币的名称: 比特元链bty, 部分未配置的平行链需要配置为 bty, 新的平行链配置 para或其他, 不配置panic
CoinSymbol="bty"
[log]
......@@ -77,6 +78,8 @@ dbPath="datadir/addrbook"
dbCache=4
# GRPC请求日志文件
grpcLogFile="grpc33.log"
#waitPid 等待seed导入
waitPid=false
[rpc]
# jrpc绑定地址
......@@ -105,7 +108,10 @@ poolCacheSize=10240
minTxFee=100000
# 每个账户在mempool中得最大交易数量,默认100
maxTxNumPerAccount=100
# 最大得交易手续费用,这个没有默认值,必填,一般是100000
maxTxFee=1000000000
# 是否开启阶梯手续费
isLevelFee=false
[mempool.sub.timeline]
# mempool缓存容量大小,默认10240
......
......@@ -243,15 +243,7 @@ func (a *AddrBook) genPubkey(privkey string) string {
func (a *AddrBook) loadDb() bool {
a.bookDb = db.NewDB("addrbook", a.cfg.Driver, a.cfg.DbPath, a.cfg.DbCache)
privkey, err := a.bookDb.Get([]byte(privKeyTag))
if len(privkey) == 0 || err != nil {
a.initKey()
privkey, _ := a.GetPrivPubKey()
err := a.bookDb.Set([]byte(privKeyTag), []byte(privkey))
if err != nil {
panic(err)
}
} else {
if len(privkey) != 0 && err == nil {
a.setKey(string(privkey), a.genPubkey(string(privkey)))
}
......@@ -393,10 +385,12 @@ func (a *AddrBook) setKey(privkey, pubkey string) {
//ResetPeerkey reset priv,pub key
func (a *AddrBook) ResetPeerkey(privkey, pubkey string) {
a.keymtx.Lock()
defer a.keymtx.Unlock()
a.privkey = privkey
a.pubkey = pubkey
if privkey == "" || pubkey == "" {
a.initKey()
privkey, pubkey = a.GetPrivPubKey()
}
a.setKey(privkey, pubkey)
err := a.bookDb.Set([]byte(privKeyTag), []byte(privkey))
if err != nil {
panic(err)
......
......@@ -34,12 +34,6 @@ const (
maxSamIPNum = 20
)
var (
// LocalAddr local address
LocalAddr string
//defaultPort = 13802
)
const (
defalutNatPort = 23802
maxOutBoundNum = 25
......
......@@ -147,6 +147,7 @@ func (d *DownloadJob) GetFreePeer(blockHeight int64) *Peer {
var jobNum int32 = 10
var bestPeer *Peer
for _, peer := range d.downloadPeers {
pbpeer, ok := infos[peer.Addr()]
if ok {
if len(peer.GetPeerName()) == 0 {
......@@ -194,6 +195,7 @@ func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory,
REGET:
freePeer := d.GetFreePeer(inv.GetHeight()) //获取当前任务数最少的节点,相当于 下载速度最快的节点
if freePeer == nil {
log.Debug("no free peer")
time.Sleep(time.Millisecond * 100)
goto REGET
}
......
......@@ -123,7 +123,8 @@ func (n *Node) flushNodePort(localport, export uint16) {
n.nodeInfo.SetExternalAddr(exaddr)
n.nodeInfo.addrBook.AddOurAddress(exaddr)
}
if listenAddr, err := NewNetAddressString(fmt.Sprintf("%v:%v", LocalAddr, localport)); err == nil {
if listenAddr, err := NewNetAddressString(fmt.Sprintf("%v:%v", n.nodeInfo.GetListenAddr().IP.String(), localport)); err == nil {
n.nodeInfo.SetListenAddr(listenAddr)
n.nodeInfo.addrBook.AddOurAddress(listenAddr)
}
......@@ -354,9 +355,9 @@ func (n *Node) detectNodeAddr() {
for {
cfg := n.nodeInfo.cfg
laddr := P2pComm.GetLocalAddr()
LocalAddr = laddr
//LocalAddr = laddr
log.Info("DetectNodeAddr", "addr:", laddr)
if len(LocalAddr) == 0 {
if laddr == "" {
log.Error("DetectNodeAddr", "NetWork Disable p2p Disable", "Retry until Network enable")
time.Sleep(time.Second * 5)
continue
......
......@@ -34,9 +34,10 @@ type P2p struct {
txFactory chan struct{}
otherFactory chan struct{}
waitRestart chan struct{}
closed int32
restart int32
cfg *types.P2P
closed int32
restart int32
cfg *types.P2P
}
// New produce a p2p object
......@@ -96,15 +97,15 @@ func (network *P2p) isRestart() bool {
// Close network client
func (network *P2p) Close() {
log.Info("p2p network start shutdown")
atomic.StoreInt32(&network.closed, 1)
log.Debug("close", "network", "ShowTaskCapcity done")
network.node.Close()
log.Debug("close", "node", "done")
if network.client != nil {
if !network.isRestart() {
network.client.Close()
}
}
network.node.pubsub.Shutdown()
......@@ -126,25 +127,74 @@ func (network *P2p) SetQueueClient(cli queue.Client) {
go func(p2p *P2p) {
p2p.node.Start()
if p2p.isRestart() {
//reset
p2p.node.Start()
atomic.StoreInt32(&p2p.closed, 0)
atomic.StoreInt32(&p2p.restart, 0)
network.waitRestart <- struct{}{}
return
}
p2p.subP2pMsg()
key, pub := p2p.node.nodeInfo.addrBook.GetPrivPubKey()
log.Debug("key pub:", pub, "")
if key == "" {
if p2p.cfg.WaitPid { //key为空,则为初始钱包,阻塞模式,一直等到钱包导入助记词,解锁
if p2p.genAirDropKeyFromWallet() != nil {
return
}
} else {
//创建随机Pid,会同时出现node award ,airdropaddr
p2p.node.nodeInfo.addrBook.ResetPeerkey(key, pub)
go p2p.genAirDropKeyFromWallet()
}
} else {
p2p.subP2pMsg()
go p2p.loadP2PPrivKeyToWallet()
//key 有两种可能,老版本的随机key,也有可能是seed的key, 非阻塞模式
go p2p.genAirDropKeyFromWallet()
}
p2p.node.Start()
log.Debug("SetQueueClient gorountine ret")
}(network)
}
func (network *P2p) loadP2PPrivKeyToWallet() error {
var parm types.ReqWalletImportPrivkey
parm.Privkey, _ = network.node.nodeInfo.addrBook.GetPrivPubKey()
parm.Label = "node award"
ReTry:
msg := network.client.NewMessage("wallet", types.EventWalletImportPrivkey, &parm)
err := network.client.SendTimeout(msg, true, time.Minute)
if err != nil {
log.Error("ImportPrivkey", "Error", err.Error())
return err
}
resp, err := network.client.WaitTimeout(msg, time.Minute)
if err != nil {
if err == types.ErrPrivkeyExist {
return nil
}
if err == types.ErrLabelHasUsed {
//切换随机lable
parm.Label = fmt.Sprintf("node award %v", P2pComm.RandStr(3))
time.Sleep(time.Second)
goto ReTry
}
log.Error("loadP2PPrivKeyToWallet", "err", err.Error())
return err
}
log.Debug("loadP2PPrivKeyToWallet", "resp", resp.GetData())
return nil
}
func (network *P2p) showTaskCapcity() {
ticker := time.NewTicker(time.Second * 5)
log.Info("ShowTaskCapcity", "Capcity", atomic.LoadInt32(&network.txCapcity))
......@@ -160,10 +210,11 @@ func (network *P2p) showTaskCapcity() {
}
func (network *P2p) genAirDropKeyFromWallet() error {
_, savePub := network.node.nodeInfo.addrBook.GetPrivPubKey()
for {
if network.isClose() {
return nil
log.Error("genAirDropKeyFromWallet", "p2p closed", "")
return fmt.Errorf("p2p closed")
}
msg := network.client.NewMessage("wallet", types.EventGetWalletStatus, nil)
err := network.client.SendTimeout(msg, true, time.Minute)
......@@ -179,12 +230,20 @@ func (network *P2p) genAirDropKeyFromWallet() error {
continue
}
if resp.GetData().(*types.WalletStatus).GetIsWalletLock() { //上锁
if savePub == "" {
log.Warn("P2P Stuck ! Wallet must be unlock and save with mnemonics")
}
time.Sleep(time.Second)
continue
}
if !resp.GetData().(*types.WalletStatus).GetIsHasSeed() { //无种子
time.Sleep(time.Second)
if savePub == "" {
log.Warn("P2P Stuck ! Wallet must be imported with mnemonics")
}
time.Sleep(time.Second * 5)
continue
}
......@@ -208,7 +267,6 @@ func (network *P2p) genAirDropKeyFromWallet() error {
} else {
hexPrivkey = reply.GetData()
}
log.Info("genAirDropKeyFromWallet", "hexprivkey", hexPrivkey)
if hexPrivkey[:2] == "0x" {
hexPrivkey = hexPrivkey[2:]
}
......@@ -221,19 +279,30 @@ func (network *P2p) genAirDropKeyFromWallet() error {
log.Info("genAirDropKeyFromWallet", "pubkey", hexPubkey)
_, pub := network.node.nodeInfo.addrBook.GetPrivPubKey()
if pub == hexPubkey {
if savePub == hexPubkey {
return nil
}
if savePub != "" {
//priv,pub是随机公私钥对,兼容老版本,先对其进行导入钱包处理
err = network.loadP2PPrivKeyToWallet()
if err != nil {
log.Error("genAirDropKeyFromWallet", "loadP2PPrivKeyToWallet error", err)
panic(err)
}
network.node.nodeInfo.addrBook.ResetPeerkey(hexPrivkey, hexPubkey)
//重启p2p模块
log.Info("genAirDropKeyFromWallet", "p2p will Restart....")
network.ReStart()
return nil
}
//覆盖addrbook 中的公私钥对
network.node.nodeInfo.addrBook.ResetPeerkey(hexPrivkey, hexPubkey)
//重启p2p模块
log.Info("genAirDropKeyFromWallet", "p2p will Restart....")
network.ReStart()
return nil
}
// ReStart p2p
//ReStart p2p
func (network *P2p) ReStart() {
atomic.StoreInt32(&network.restart, 1)
network.Close()
......@@ -246,67 +315,6 @@ func (network *P2p) ReStart() {
network.SetQueueClient(network.client)
}
func (network *P2p) loadP2PPrivKeyToWallet() error {
for {
if network.isClose() {
return nil
}
msg := network.client.NewMessage("wallet", types.EventGetWalletStatus, nil)
err := network.client.SendTimeout(msg, true, time.Minute)
if err != nil {
log.Error("GetWalletStatus", "Error", err.Error())
time.Sleep(time.Second)
continue
}
resp, err := network.client.WaitTimeout(msg, time.Minute)
if err != nil {
time.Sleep(time.Second)
continue
}
if resp.GetData().(*types.WalletStatus).GetIsWalletLock() { //上锁
time.Sleep(time.Second)
continue
}
if !resp.GetData().(*types.WalletStatus).GetIsHasSeed() { //无种子
time.Sleep(time.Second)
continue
}
break
}
var parm types.ReqWalletImportPrivkey
parm.Privkey, _ = network.node.nodeInfo.addrBook.GetPrivPubKey()
parm.Label = "node award"
ReTry:
msg := network.client.NewMessage("wallet", types.EventWalletImportPrivkey, &parm)
err := network.client.SendTimeout(msg, true, time.Minute)
if err != nil {
log.Error("ImportPrivkey", "Error", err.Error())
return err
}
resp, err := network.client.WaitTimeout(msg, time.Minute)
if err != nil {
if err == types.ErrPrivkeyExist {
return nil
}
if err == types.ErrLabelHasUsed {
//切换随机lable
parm.Label = fmt.Sprintf("node award %v", P2pComm.RandStr(3))
time.Sleep(time.Second)
goto ReTry
}
log.Error("loadP2PPrivKeyToWallet", "err", err.Error())
return err
}
log.Debug("loadP2PPrivKeyToWallet", "resp", resp.GetData())
return nil
}
func (network *P2p) subP2pMsg() {
if network.client == nil {
......
......@@ -2,6 +2,7 @@ package p2p
import (
"encoding/hex"
//"fmt"
"net"
"os"
"sort"
......@@ -31,8 +32,6 @@ func init() {
q = queue.New("channel")
go q.Start()
p2pModule = initP2p(33802, dataDir)
p2pModule.Wait()
go func() {
cfg, sub := types.InitCfg("../cmd/chain33/chain33.test.toml")
......@@ -115,6 +114,9 @@ func init() {
}
}
}()
time.Sleep(time.Second)
p2pModule = initP2p(53802, dataDir)
p2pModule.Wait()
}
......@@ -128,9 +130,14 @@ func initP2p(port int32, dbpath string) *P2p {
cfg.Version = 119
cfg.ServerStart = true
cfg.Driver = "leveldb"
p2pcli := New(cfg)
p2pcli.SetQueueClient(q.Client())
p2pcli.node.nodeInfo.addrBook.initKey()
privkey, _ := p2pcli.node.nodeInfo.addrBook.GetPrivPubKey()
p2pcli.node.nodeInfo.addrBook.bookDb.Set([]byte(privKeyTag), []byte(privkey))
p2pcli.node.nodeInfo.SetServiceTy(7)
p2pcli.SetQueueClient(q.Client())
return p2pcli
}
......@@ -162,17 +169,19 @@ func TestNetInfo(t *testing.T) {
p2pModule.node.nodeInfo.SetNatDone()
p2pModule.node.nodeInfo.Get()
p2pModule.node.nodeInfo.Set(p2pModule.node.nodeInfo)
assert.NotNil(t, p2pModule.node.nodeInfo.GetListenAddr())
assert.NotNil(t, p2pModule.node.nodeInfo.GetExternalAddr())
}
//测试Peer
func TestPeer(t *testing.T) {
conn, err := grpc.Dial("localhost:33802", grpc.WithInsecure(),
conn, err := grpc.Dial("localhost:53802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
assert.Nil(t, err)
defer conn.Close()
remote, err := NewNetAddressString("127.0.0.1:33802")
remote, err := NewNetAddressString("127.0.0.1:53802")
assert.Nil(t, err)
localP2P := initP2p(43802, "testdata2")
......@@ -219,7 +228,7 @@ func TestPeer(t *testing.T) {
_, err = p2pcli.SendVersion(peer, localP2P.node.nodeInfo)
assert.Nil(t, err)
t.Log(p2pcli.CheckPeerNatOk("localhost:33802"))
t.Log(p2pcli.CheckPeerNatOk("localhost:53802"))
t.Log("checkself:", p2pcli.CheckSelf("loadhost:43803", localP2P.node.nodeInfo))
_, err = p2pcli.GetAddr(peer)
assert.Nil(t, err)
......@@ -230,7 +239,7 @@ func TestPeer(t *testing.T) {
height, err := p2pcli.GetBlockHeight(localP2P.node.nodeInfo)
assert.Nil(t, err)
assert.Equal(t, int(height), 2019)
assert.Equal(t, false, p2pcli.CheckSelf("localhost:33802", localP2P.node.nodeInfo))
assert.Equal(t, false, p2pcli.CheckSelf("localhost:53802", localP2P.node.nodeInfo))
//测试下载
job := NewDownloadJob(NewP2PCli(localP2P).(*Cli), []*Peer{peer})
......@@ -271,7 +280,7 @@ func TestGrpcConns(t *testing.T) {
var conns []*grpc.ClientConn
for i := 0; i < maxSamIPNum; i++ {
conn, err := grpc.Dial("localhost:33802", grpc.WithInsecure(),
conn, err := grpc.Dial("localhost:53802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
assert.Nil(t, err)
......@@ -282,7 +291,7 @@ func TestGrpcConns(t *testing.T) {
conns = append(conns, conn)
}
conn, err := grpc.Dial("localhost:33802", grpc.WithInsecure(),
conn, err := grpc.Dial("localhost:53802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
assert.Nil(t, err)
cli := types.NewP2PgserviceClient(conn)
......@@ -300,7 +309,7 @@ func TestGrpcConns(t *testing.T) {
//测试grpc 流多连接
func TestGrpcStreamConns(t *testing.T) {
conn, err := grpc.Dial("localhost:33802", grpc.WithInsecure(),
conn, err := grpc.Dial("localhost:53802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
assert.Nil(t, err)
cli := types.NewP2PgserviceClient(conn)
......@@ -329,7 +338,7 @@ func TestGrpcStreamConns(t *testing.T) {
func TestP2pComm(t *testing.T) {
addrs := P2pComm.AddrRouteble([]string{"localhost:33802"})
addrs := P2pComm.AddrRouteble([]string{"localhost:53802"})
t.Log(addrs)
i32 := P2pComm.BytesToInt32([]byte{0xff})
......@@ -398,7 +407,13 @@ func TestAddrBook(t *testing.T) {
assert.Equal(t, addrBook.genPubkey(hex.EncodeToString(prv)), pubstr)
addrBook.Save()
addrBook.GetAddrs()
addrBook.ResetPeerkey("", "")
privkey, _ := addrBook.GetPrivPubKey()
assert.NotEmpty(t, privkey)
addrBook.ResetPeerkey(hex.EncodeToString(prv), pubstr)
resetkey, _ := addrBook.GetPrivPubKey()
assert.NotEqual(t, resetkey, privkey)
}
func TestBytesToInt32(t *testing.T) {
......
......@@ -630,8 +630,8 @@ func (m *Cli) getLocalPeerInfo() (*pb.P2PPeerInfo, error) {
localpeerinfo.Name = pub
localpeerinfo.MempoolSize = int32(meminfo.GetSize())
if m.network.node.nodeInfo.GetExternalAddr().IP == nil {
localpeerinfo.Addr = LocalAddr
localpeerinfo.Port = int32(m.network.node.listenPort)
localpeerinfo.Addr = m.network.node.nodeInfo.GetListenAddr().IP.String()
localpeerinfo.Port = int32(m.network.node.nodeInfo.GetListenAddr().Port)
} else {
localpeerinfo.Addr = m.network.node.nodeInfo.GetExternalAddr().IP.String()
localpeerinfo.Port = int32(m.network.node.nodeInfo.GetExternalAddr().Port)
......
......@@ -574,7 +574,7 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
if s.node.Size() > 0 {
if remoteIP != LocalAddr && remoteIP != s.node.nodeInfo.GetExternalAddr().IP.String() {
if remoteIP != s.node.nodeInfo.GetListenAddr().IP.String() && remoteIP != s.node.nodeInfo.GetExternalAddr().IP.String() {
s.node.nodeInfo.SetServiceTy(Service)
}
}
......
......@@ -281,6 +281,35 @@ func (mem *Mempool) RemoveTxsOfBlock(block *types.Block) bool {
return true
}
// GetProperFeeRate 获取合适的手续费率
func (mem *Mempool) GetProperFeeRate() int64 {
baseFeeRate := mem.cache.GetProperFee()
if mem.cfg.IsLevelFee {
return mem.getLevelFeeRate(baseFeeRate)
}
return baseFeeRate
}
// getLevelFeeRate 获取合适的阶梯手续费率
func (mem *Mempool) getLevelFeeRate(baseFeeRate int64) int64 {
var feeRate int64
sumByte := mem.cache.TotalByte()
maxTxNumber := types.GetP(mem.Height()).MaxTxNumber
switch {
case (sumByte > int64(types.MaxBlockSize/100) && sumByte < int64(types.MaxBlockSize/20)) ||
(int64(mem.Size()) >= maxTxNumber/10 && int64(mem.Size()) < maxTxNumber/2):
feeRate = 10 * baseFeeRate
case sumByte >= int64(types.MaxBlockSize/20) || int64(mem.Size()) >= maxTxNumber/2:
feeRate = 100 * baseFeeRate
default:
return baseFeeRate
}
if feeRate > 10000000 {
feeRate = 10000000
}
return feeRate
}
// Mempool.DelBlock将回退的区块内的交易重新加入mempool中
func (mem *Mempool) delBlock(block *types.Block) {
if len(block.Txs) <= 0 {
......
......@@ -6,6 +6,7 @@ package mempool
import (
"github.com/33cn/chain33/types"
"github.com/golang/protobuf/proto"
)
//QueueCache 排队交易处理
......@@ -30,7 +31,9 @@ type Item struct {
type txCache struct {
*AccountTxIndex
*LastTxCache
qcache QueueCache
qcache QueueCache
totalFee int64
totalByte int64
}
//NewTxCache init accountIndex and last cache
......@@ -59,6 +62,8 @@ func (cache *txCache) Remove(hash string) {
}
cache.AccountTxIndex.Remove(tx)
cache.LastTxCache.Remove(tx)
cache.totalFee -= tx.Fee
cache.totalByte -= int64(proto.Size(tx))
}
//Exist 是否存在
......@@ -69,6 +74,14 @@ func (cache *txCache) Exist(hash string) bool {
return cache.qcache.Exist(hash)
}
//GetProperFee 获取合适手续费
func (cache *txCache) GetProperFee() int64 {
if cache.qcache == nil {
return 0
}
return cache.qcache.GetProperFee()
}
//Size cache tx num
func (cache *txCache) Size() int {
if cache.qcache == nil {
......@@ -77,6 +90,16 @@ func (cache *txCache) Size() int {
return cache.qcache.Size()
}
//TotalFee 手续费总和
func (cache *txCache) TotalFee() int64 {
return cache.totalFee
}
//TotalByte 交易字节数总和
func (cache *txCache) TotalByte() int64 {
return cache.totalByte
}
//Walk iter all txs
func (cache *txCache) Walk(count int, cb func(tx *Item) bool) {
if cache.qcache == nil {
......@@ -107,6 +130,8 @@ func (cache *txCache) Push(tx *types.Transaction) error {
return err
}
cache.LastTxCache.Push(tx)
cache.totalFee += tx.Fee
cache.totalByte += int64(proto.Size(tx))
return nil
}
......
......@@ -92,6 +92,13 @@ func (mem *Mempool) checkTxs(msg *queue.Message) *queue.Message {
msg.Data = err
return msg
}
if mem.cfg.IsLevelFee {
err = mem.checkLevelFee(tx)
if err != nil {
msg.Data = err
return msg
}
}
//检查txgroup 中的每个交易
txs, err := tx.GetTxGroup()
if err != nil {
......@@ -114,7 +121,21 @@ func (mem *Mempool) checkTxs(msg *queue.Message) *queue.Message {
return msg
}
//checkTxList 检查账户余额是否足够,并加入到Mempool,成功则传入goodChan,若加入Mempool失败则传入badChan
// checkLevelFee 检查阶梯手续费
func (mem *Mempool) checkLevelFee(tx *types.TransactionCache) error {
//获取mempool里所有交易手续费总和
feeRate := mem.GetProperFeeRate()
totalfee, err := tx.GetTotalFee(feeRate)
if err != nil {
return err
}
if tx.Fee < totalfee {
return types.ErrTxFeeTooLow
}
return nil
}
//checkTxRemote 检查账户余额是否足够,并加入到Mempool,成功则传入goodChan,若加入Mempool失败则传入badChan
func (mem *Mempool) checkTxRemote(msg *queue.Message) *queue.Message {
tx := msg.GetData().(types.TxGroup)
lastheader := mem.GetHeader()
......
......@@ -189,9 +189,9 @@ func (mem *Mempool) eventGetAddrTxs(msg *queue.Message) {
msg.Reply(mem.client.NewMessage("", types.EventReplyAddrTxs, txlist))
}
// eventGetProperFee 获取排队策略中合适的手续费
// eventGetProperFee 获取排队策略中合适的手续费
func (mem *Mempool) eventGetProperFee(msg *queue.Message) {
properFee := mem.cache.qcache.GetProperFee()
properFee := mem.GetProperFeeRate()
msg.Reply(mem.client.NewMessage("rpc", types.EventReplyProperFee,
&types.ReplyProperFee{ProperFee: properFee}))
}
......
......@@ -9,6 +9,8 @@ import (
"math/rand"
"testing"
"github.com/golang/protobuf/proto"
"github.com/33cn/chain33/blockchain"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/address"
......@@ -38,6 +40,7 @@ var (
toAddr = address.PubKeyToAddress(privKey.PubKey().Bytes()).String()
amount = int64(1e8)
v = &cty.CoinsAction_Transfer{Transfer: &types.AssetsTransfer{Amount: amount}}
bigByte = make([]byte, 99510)
transfer = &cty.CoinsAction{Value: v, Ty: cty.CoinsActionTransfer}
tx1 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 2, To: toAddr}
tx2 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 100000000, Expire: 0, To: toAddr}
......@@ -54,6 +57,21 @@ var (
tx13 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 100, Expire: 0, To: toAddr}
tx14 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 100000000, Expire: 0, To: "notaddress"}
tx15 = &types.Transaction{Execer: []byte("user.write"), Payload: types.Encode(transfer), Fee: 100000000, Expire: 0, To: toAddr}
tx16 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 100000, Expire: 3, To: toAddr}
tx17 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 100000, Expire: 4, To: toAddr}
tx18 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 4000000, Expire: 4, To: toAddr}
bigTx1 = &types.Transaction{Execer: []byte("user.write"), Payload: bigByte, Fee: 100100000, Expire: 0, To: toAddr}
bigTx2 = &types.Transaction{Execer: []byte("user.write"), Payload: bigByte, Fee: 100100000, Expire: 11, To: toAddr}
bigTx3 = &types.Transaction{Execer: []byte("user.write"), Payload: bigByte, Fee: 1001000000, Expire: 11, To: toAddr}
bigTx4 = &types.Transaction{Execer: []byte("user.write"), Payload: bigByte, Fee: 1001000000, Expire: 12, To: toAddr}
bigTx5 = &types.Transaction{Execer: []byte("user.write"), Payload: bigByte, Fee: 1001000000, Expire: 13, To: toAddr}
bigTx6 = &types.Transaction{Execer: []byte("user.write"), Payload: bigByte, Fee: 1001000000, Expire: 14, To: toAddr}
bigTx7 = &types.Transaction{Execer: []byte("user.write"), Payload: bigByte, Fee: 1001000000, Expire: 15, To: toAddr}
bigTx8 = &types.Transaction{Execer: []byte("user.write"), Payload: bigByte, Fee: 1001000000, Expire: 16, To: toAddr}
bigTx9 = &types.Transaction{Execer: []byte("user.write"), Payload: bigByte, Fee: 1001000000, Expire: 17, To: toAddr}
bigTx10 = &types.Transaction{Execer: []byte("user.write"), Payload: bigByte, Fee: 1001000000, Expire: 18, To: toAddr}
bigTx11 = &types.Transaction{Execer: []byte("user.write"), Payload: bigByte, Fee: 1001000000, Expire: 19, To: toAddr}
)
//var privTo, _ = c.GenKey()
......@@ -92,6 +110,21 @@ func init() {
tx13.Sign(types.SECP256K1, privKey)
tx14.Sign(types.SECP256K1, privKey)
tx15.Sign(types.SECP256K1, privKey)
tx16.Sign(types.SECP256K1, privKey)
tx17.Sign(types.SECP256K1, privKey)
tx18.Sign(types.SECP256K1, privKey)
bigTx1.Sign(types.SECP256K1, privKey)
bigTx2.Sign(types.SECP256K1, privKey)
bigTx3.Sign(types.SECP256K1, privKey)
bigTx4.Sign(types.SECP256K1, privKey)
bigTx5.Sign(types.SECP256K1, privKey)
bigTx6.Sign(types.SECP256K1, privKey)
bigTx7.Sign(types.SECP256K1, privKey)
bigTx8.Sign(types.SECP256K1, privKey)
bigTx9.Sign(types.SECP256K1, privKey)
bigTx10.Sign(types.SECP256K1, privKey)
bigTx11.Sign(types.SECP256K1, privKey)
}
func getprivkey(key string) crypto.PrivKey {
......@@ -169,6 +202,27 @@ func initEnv(size int) (queue.Queue, *Mempool) {
return q, mem
}
func initEnv4(size int) (queue.Queue, *Mempool) {
if size == 0 {
size = 100
}
var q = queue.New("channel")
cfg, _ := types.InitCfg("testdata/chain33.test.toml")
types.Init(cfg.Title, cfg)
blockchainProcess(q)
execProcess(q)
cfg.Mempool.PoolCacheSize = int64(size)
subConfig := SubConfig{cfg.Mempool.PoolCacheSize, cfg.Mempool.MinTxFee}
mem := NewMempool(cfg.Mempool)
mem.SetQueueCache(NewSimpleQueue(subConfig))
mem.SetQueueClient(q.Client())
mem.setSync(true)
mem.SetMinFee(types.GInt("MinFee"))
mem.Wait()
return q, mem
}
func createTx(priv crypto.PrivKey, to string, amount int64) *types.Transaction {
v := &cty.CoinsAction_Transfer{Transfer: &types.AssetsTransfer{Amount: amount}}
transfer := &cty.CoinsAction{Value: v, Ty: cty.CoinsActionTransfer}
......@@ -821,6 +875,211 @@ func TestAddTxGroup(t *testing.T) {
}
}
func TestLevelFeeBigByte(t *testing.T) {
q, mem := initEnv(0)
defer q.Close()
defer mem.Close()
defer func() {
mem.cfg.IsLevelFee = false
}()
mem.cfg.IsLevelFee = true
mem.SetMinFee(100000)
msg0 := mem.client.NewMessage("mempool", types.EventTx, tx1)
mem.client.Send(msg0, true)
resp0, _ := mem.client.Wait(msg0)
if string(resp0.GetData().(*types.Reply).GetMsg()) != "" {
t.Error(string(resp0.GetData().(*types.Reply).GetMsg()))
}
msg00 := mem.client.NewMessage("mempool", types.EventTx, tx17)
mem.client.Send(msg00, true)
resp00, _ := mem.client.Wait(msg00)
if string(resp00.GetData().(*types.Reply).GetMsg()) != "" {
t.Error(string(resp00.GetData().(*types.Reply).GetMsg()))
}
msgBig1 := mem.client.NewMessage("mempool", types.EventTx, bigTx1)
mem.client.Send(msgBig1, true)
respBig1, _ := mem.client.Wait(msgBig1)
if string(respBig1.GetData().(*types.Reply).GetMsg()) != "" {
t.Error(string(respBig1.GetData().(*types.Reply).GetMsg()))
}
msgBig2 := mem.client.NewMessage("mempool", types.EventTx, bigTx2)
mem.client.Send(msgBig2, true)
respBig2, _ := mem.client.Wait(msgBig2)
if string(respBig2.GetData().(*types.Reply).GetMsg()) != "" {
t.Error(string(respBig2.GetData().(*types.Reply).GetMsg()))
}
msgBig3 := mem.client.NewMessage("mempool", types.EventTx, bigTx3)
mem.client.Send(msgBig3, true)
respBig3, _ := mem.client.Wait(msgBig3)
if string(respBig3.GetData().(*types.Reply).GetMsg()) != "" {
t.Error(string(respBig3.GetData().(*types.Reply).GetMsg()))
}
//test low fee , feeRate = 10 * minfee
msg2 := mem.client.NewMessage("mempool", types.EventTx, tx16)
mem.client.Send(msg2, true)
resp2, _ := mem.client.Wait(msg2)
if string(resp2.GetData().(*types.Reply).GetMsg()) != types.ErrTxFeeTooLow.Error() {
t.Error(string(resp2.GetData().(*types.Reply).GetMsg()))
}
//test high fee , feeRate = 10 * minfee
msg3 := mem.client.NewMessage("mempool", types.EventTx, tx6)
mem.client.Send(msg3, true)
resp3, _ := mem.client.Wait(msg3)
if string(resp3.GetData().(*types.Reply).GetMsg()) != "" {
t.Error(string(resp3.GetData().(*types.Reply).GetMsg()))
}
//test group high fee , feeRate = 10 * minfee
txGroup, err := types.CreateTxGroup([]*types.Transaction{bigTx4, bigTx5, bigTx6, bigTx7, bigTx8, bigTx9, bigTx10, bigTx11})
if err != nil {
t.Error("CreateTxGroup err ", err.Error())
}
for i := range txGroup.Txs {
err := txGroup.SignN(i, types.SECP256K1, mainPriv)
if err != nil {
t.Error("TestAddTxGroup SignNfailed ", err.Error())
}
}
bigtxGroup := txGroup.Tx()
msgBigG := mem.client.NewMessage("mempool", types.EventTx, bigtxGroup)
mem.client.Send(msgBigG, true)
respBigG, _ := mem.client.Wait(msgBigG)
if string(respBigG.GetData().(*types.Reply).GetMsg()) != "" {
t.Error(string(respBigG.GetData().(*types.Reply).GetMsg()))
}
//test low fee , feeRate = 100 * minfee
msg4 := mem.client.NewMessage("mempool", types.EventTx, tx18)
mem.client.Send(msg4, true)
resp4, _ := mem.client.Wait(msg4)
if string(resp4.GetData().(*types.Reply).GetMsg()) != types.ErrTxFeeTooLow.Error() {
t.Error(string(resp4.GetData().(*types.Reply).GetMsg()))
}
//test high fee , feeRate = 100 * minfee
msg5 := mem.client.NewMessage("mempool", types.EventTx, tx8)
mem.client.Send(msg5, true)
resp5, _ := mem.client.Wait(msg5)
if string(resp5.GetData().(*types.Reply).GetMsg()) != "" {
t.Error(string(resp5.GetData().(*types.Reply).GetMsg()))
}
}
func TestLevelFeeTxNum(t *testing.T) {
q, mem := initEnv4(0)
defer q.Close()
defer mem.Close()
defer func() {
mem.cfg.IsLevelFee = false
}()
mem.cfg.IsLevelFee = true
mem.SetMinFee(100000)
//test low fee , feeRate = 10 * minfee
msg1 := mem.client.NewMessage("mempool", types.EventTx, tx16)
mem.client.Send(msg1, true)
resp1, _ := mem.client.Wait(msg1)
if string(resp1.GetData().(*types.Reply).GetMsg()) != types.ErrTxFeeTooLow.Error() {
t.Error(string(resp1.GetData().(*types.Reply).GetMsg()))
}
//test high fee , feeRate = 10 * minfee
msg2 := mem.client.NewMessage("mempool", types.EventTx, tx6)
mem.client.Send(msg2, true)
resp2, _ := mem.client.Wait(msg2)
if string(resp2.GetData().(*types.Reply).GetMsg()) != "" {
t.Error(string(resp2.GetData().(*types.Reply).GetMsg()))
}
//test high fee , feeRate = 10 * minfee
msg3 := mem.client.NewMessage("mempool", types.EventTx, tx7)
mem.client.Send(msg3, true)
resp3, _ := mem.client.Wait(msg3)
if string(resp3.GetData().(*types.Reply).GetMsg()) != "" {
t.Error(string(resp3.GetData().(*types.Reply).GetMsg()))
}
//test low fee , feeRate = 100 * minfee
msg4 := mem.client.NewMessage("mempool", types.EventTx, tx18)
mem.client.Send(msg4, true)
resp4, _ := mem.client.Wait(msg4)
if string(resp4.GetData().(*types.Reply).GetMsg()) != types.ErrTxFeeTooLow.Error() {
t.Error(string(resp4.GetData().(*types.Reply).GetMsg()))
}
//test high fee , feeRate = 100 * minfee
msg5 := mem.client.NewMessage("mempool", types.EventTx, tx8)
mem.client.Send(msg5, true)
resp5, _ := mem.client.Wait(msg5)
if string(resp5.GetData().(*types.Reply).GetMsg()) != "" {
t.Error(string(resp5.GetData().(*types.Reply).GetMsg()))
}
}
func TestSimpleQueue_TotalFee(t *testing.T) {
q, mem := initEnv(0)
defer q.Close()
defer mem.Close()
txa := &types.Transaction{Payload: []byte("123"), Fee: 100000}
mem.cache.Push(txa)
txb := &types.Transaction{Payload: []byte("1234"), Fee: 100000}
mem.cache.Push(txb)
var sumFee int64
mem.cache.Walk(mem.cache.Size(), func(it *Item) bool {
sumFee += it.Value.Fee
return true
})
assert.Equal(t, sumFee, mem.cache.TotalFee())
assert.Equal(t, sumFee, int64(200000))
mem.cache.Remove(string(txb.Hash()))
var sumFee2 int64
mem.cache.Walk(mem.cache.Size(), func(it *Item) bool {
sumFee2 += it.Value.Fee
return true
})
assert.Equal(t, sumFee2, mem.cache.TotalFee())
assert.Equal(t, sumFee2, int64(100000))
}
func TestSimpleQueue_TotalByte(t *testing.T) {
q, mem := initEnv(0)
defer q.Close()
defer mem.Close()
txa := &types.Transaction{Payload: []byte("123"), Fee: 100000}
mem.cache.Push(txa)
txb := &types.Transaction{Payload: []byte("1234"), Fee: 100000}
mem.cache.Push(txb)
var sumByte int64
mem.cache.Walk(mem.cache.Size(), func(it *Item) bool {
sumByte += int64(proto.Size(it.Value))
return true
})
assert.Equal(t, sumByte, mem.cache.TotalByte())
assert.Equal(t, sumByte, int64(19))
mem.cache.Remove(string(txb.Hash()))
var sumByte2 int64
mem.cache.Walk(mem.cache.Size(), func(it *Item) bool {
sumByte2 += int64(proto.Size(it.Value))
return true
})
assert.Equal(t, sumByte2, mem.cache.TotalByte())
assert.Equal(t, sumByte2, int64(9))
}
func BenchmarkMempool(b *testing.B) {
q, mem := initEnv(10240)
defer q.Close()
......
......@@ -58,6 +58,10 @@ func (cache *SimpleQueue) Push(tx *Item) error {
// Remove 删除数据
func (cache *SimpleQueue) Remove(hash string) error {
_, err := cache.GetItem(hash)
if err != nil {
return err
}
cache.txList.Remove(hash)
return nil
}
......
......@@ -68,6 +68,7 @@ type Mempool struct {
// 每个账户在mempool中得最大交易数量,默认100
MaxTxNumPerAccount int64 `protobuf:"varint,5,opt,name=maxTxNumPerAccount" json:"maxTxNumPerAccount,omitempty"`
MaxTxLast int64 `protobuf:"varint,6,opt,name=maxTxLast" json:"maxTxLast,omitempty"`
IsLevelFee bool `protobuf:"varint,7,opt,name=isLevelFee" json:"isLevelFee,omitempty"`
}
// Consensus 配置
......@@ -176,6 +177,8 @@ type P2P struct {
InnerBounds int32 `protobuf:"varint,15,opt,name=innerBounds" json:"innerBounds,omitempty"`
// 是否使用Github获取种子节点
UseGithub bool `protobuf:"varint,16,opt,name=useGithub" json:"useGithub,omitempty"`
//是否等待Pid
WaitPid bool `protobuf:"varint,17,opt,name=waitPid" json:"waitPid,omitempty"`
}
// RPC 配置
......
......@@ -286,6 +286,12 @@ func Init(t string, cfg *Config) {
panic("config CoinSymbol must without '-'")
}
coinSymbol = cfg.CoinSymbol
} else {
if isPara() {
panic("must config CoinSymbol in para chain")
} else {
coinSymbol = DefaultCoinsSymbol
}
}
}
//local 只用于单元测试
......
......@@ -33,6 +33,11 @@ const (
NoneX = "none"
)
//DefaultCoinsSymbol 默认的主币名称
const (
DefaultCoinsSymbol = "bty"
)
//UserKeyX 用户自定义执行器前缀byte类型
var (
UserKey = []byte(UserKeyX)
......
......@@ -2,6 +2,7 @@ Title="user.p.guodun."
TestNet=false
FixTime=false
EnableParaFork=true
CoinSymbol="gd"
[log]
# 日志级别,支持debug(dbug)/info/warn/error(eror)/crit
......
......@@ -2,6 +2,7 @@ Title="user.p.guodun2."
TestNet=false
FixTime=false
EnableParaFork=false
CoinSymbol="gd2"
[log]
# 日志级别,支持debug(dbug)/info/warn/error(eror)/crit
......
......@@ -348,6 +348,28 @@ func (tx *TransactionCache) Check(height, minfee, maxFee int64) error {
return tx.checkok
}
//GetTotalFee 获取交易真实费用
func (tx *TransactionCache) GetTotalFee(minFee int64) (int64, error) {
txgroup, err := tx.GetTxGroup()
if err != nil {
tx.checkok = err
return 0, err
}
var totalfee int64
if txgroup == nil {
return tx.GetRealFee(minFee)
}
txs := txgroup.Txs
for i := 0; i < len(txs); i++ {
fee, err := txs[i].GetRealFee(minFee)
if err != nil {
return 0, err
}
totalfee += fee
}
return totalfee, nil
}
//GetTxGroup 获取交易组
func (tx *TransactionCache) GetTxGroup() (*Transactions, error) {
var err error
......
......@@ -49,6 +49,7 @@ var (
datadir = flag.String("datadir", "", "data dir of chain33, include logs and datas")
versionCmd = flag.Bool("v", false, "version")
fixtime = flag.Bool("fixtime", false, "fix time")
waitPid = flag.Bool("waitpid", false, "p2p stuck until seed save info wallet & wallet unlock")
)
//RunChain33 : run Chain33
......@@ -91,6 +92,9 @@ func RunChain33(name string) {
if *fixtime {
cfg.FixTime = *fixtime
}
if *waitPid {
cfg.P2P.WaitPid = *waitPid
}
//set test net flag
types.Init(cfg.Title, cfg)
if cfg.FixTime {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment