Commit e2d4fe05 authored by vipwzw's avatar vipwzw

update_chain33_1212

parent cc8b634f
...@@ -84,7 +84,7 @@ git merge upstream/master ...@@ -84,7 +84,7 @@ git merge upstream/master
git fetch upstream git fetch upstream
git checkout master git checkout master
git merge upstream/master git merge upstream/master
git branch -a "fixbug_ci" git branch -b "fixbug_ci"
``` ```
* 开发完成后, push 到 `vipwzw/chain33` * 开发完成后, push 到 `vipwzw/chain33`
......
...@@ -207,6 +207,9 @@ func (chain *BlockChain) SetQueueClient(client queue.Client) { ...@@ -207,6 +207,9 @@ func (chain *BlockChain) SetQueueClient(client queue.Client) {
go chain.ProcRecvMsg() go chain.ProcRecvMsg()
} }
//Wait for ready
func (chain *BlockChain) Wait() {}
//GetStore only used for test //GetStore only used for test
func (chain *BlockChain) GetStore() *BlockStore { func (chain *BlockChain) GetStore() *BlockStore {
return chain.blockStore return chain.blockStore
......
...@@ -8,7 +8,7 @@ set -o pipefail ...@@ -8,7 +8,7 @@ set -o pipefail
# os: ubuntu16.04 x64 # os: ubuntu16.04 x64
#chain33 dapp autotest root directory #chain33 dapp autotest root directory
declare -a Chain33AutoTestDirs=("system" "plugin" "vendor/github.com/33cn/chain33/system") declare -a Chain33AutoTestDirs=("system" "plugin" "vendor/github.com/33cn/chain33/system" "vendor/github.com/33cn/plugin/plugin")
#copy auto test to specific directory #copy auto test to specific directory
# check args # check args
......
...@@ -64,10 +64,24 @@ certFile="cert.pem" ...@@ -64,10 +64,24 @@ certFile="cert.pem"
keyFile="key.pem" keyFile="key.pem"
[mempool] [mempool]
name="timeline"
poolCacheSize=10240 poolCacheSize=10240
minTxFee=1000000 minTxFee=100000
maxTxNumPerAccount=10000 maxTxNumPerAccount=10000
[mempool.sub.timeline]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
[mempool.sub.trade]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
timeParam=1 #时间占价格比例
priceConstant=1 #一个合适的常量
pricePower=0 #手续费占常量比例
[consensus] [consensus]
name="solo" name="solo"
minerstart=true minerstart=true
......
...@@ -64,10 +64,24 @@ certFile="cert.pem" ...@@ -64,10 +64,24 @@ certFile="cert.pem"
keyFile="key.pem" keyFile="key.pem"
[mempool] [mempool]
name="timeline"
poolCacheSize=10240 poolCacheSize=10240
minTxFee=100000 minTxFee=100000
maxTxNumPerAccount=10000 maxTxNumPerAccount=10000
[mempool.sub.timeline]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
[mempool.sub.trade]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
timeParam=1 #时间占价格比例
priceConstant=1 #一个合适的常量
pricePower=0 #手续费占常量比例
[consensus] [consensus]
name="solo" name="solo"
minerstart=true minerstart=true
......
...@@ -18,13 +18,24 @@ import ( ...@@ -18,13 +18,24 @@ import (
var addrSeed = []byte("address seed bytes for public key") var addrSeed = []byte("address seed bytes for public key")
var addressCache *lru.Cache var addressCache *lru.Cache
var checkAddressCache *lru.Cache var checkAddressCache *lru.Cache
var multisignCache *lru.Cache
var multiCheckAddressCache *lru.Cache
var errVersion = errors.New("check version error")
//MaxExecNameLength 执行器名最大长度 //MaxExecNameLength 执行器名最大长度
const MaxExecNameLength = 100 const MaxExecNameLength = 100
//NormalVer 普通地址的版本号
const NormalVer byte = 0
//MultiSignVer 多重签名地址的版本号
const MultiSignVer byte = 5
func init() { func init() {
multisignCache, _ = lru.New(10240)
addressCache, _ = lru.New(10240) addressCache, _ = lru.New(10240)
checkAddressCache, _ = lru.New(10240) checkAddressCache, _ = lru.New(10240)
multiCheckAddressCache, _ = lru.New(10240)
} }
//ExecPubKey 计算公钥 //ExecPubKey 计算公钥
...@@ -44,12 +55,23 @@ func ExecAddress(name string) string { ...@@ -44,12 +55,23 @@ func ExecAddress(name string) string {
if value, ok := addressCache.Get(name); ok { if value, ok := addressCache.Get(name); ok {
return value.(string) return value.(string)
} }
addr := PubKeyToAddress(ExecPubkey(name)) addr := GetExecAddress(name)
addrstr := addr.String() addrstr := addr.String()
addressCache.Add(name, addrstr) addressCache.Add(name, addrstr)
return addrstr return addrstr
} }
//MultiSignAddress create a multi sign address
func MultiSignAddress(pubkey []byte) string {
if value, ok := multisignCache.Get(string(pubkey)); ok {
return value.(string)
}
addr := HashToAddress(MultiSignVer, pubkey)
addrstr := addr.String()
multisignCache.Add(string(pubkey), addrstr)
return addrstr
}
//ExecPubkey 计算公钥 //ExecPubkey 计算公钥
func ExecPubkey(name string) []byte { func ExecPubkey(name string) []byte {
if len(name) > MaxExecNameLength { if len(name) > MaxExecNameLength {
...@@ -64,35 +86,27 @@ func ExecPubkey(name string) []byte { ...@@ -64,35 +86,27 @@ func ExecPubkey(name string) []byte {
//GetExecAddress 获取地址 //GetExecAddress 获取地址
func GetExecAddress(name string) *Address { func GetExecAddress(name string) *Address {
if len(name) > MaxExecNameLength { hash := ExecPubkey(name)
panic("name too long")
}
var bname [200]byte
buf := append(bname[:0], addrSeed...)
buf = append(buf, []byte(name)...)
hash := common.Sha2Sum(buf)
addr := PubKeyToAddress(hash[:]) addr := PubKeyToAddress(hash[:])
return addr return addr
} }
//PubKeyToAddress 公钥转为地址 //PubKeyToAddress 公钥转为地址
func PubKeyToAddress(in []byte) *Address { func PubKeyToAddress(in []byte) *Address {
return HashToAddress(NormalVer, in)
}
//HashToAddress hash32 to address
func HashToAddress(version byte, in []byte) *Address {
a := new(Address) a := new(Address)
a.Pubkey = make([]byte, len(in)) a.Pubkey = make([]byte, len(in))
copy(a.Pubkey[:], in[:]) copy(a.Pubkey[:], in[:])
a.Version = 0 a.Version = version
a.Hash160 = common.Rimp160AfterSha256(in) a.Hash160 = common.Rimp160AfterSha256(in)
return a return a
} }
//CheckAddress 检查地址 func checkAddress(ver byte, addr string) (e error) {
func CheckAddress(addr string) (e error) {
if value, ok := checkAddressCache.Get(addr); ok {
if value == nil {
return nil
}
return value.(error)
}
dec := base58.Decode(addr) dec := base58.Decode(addr)
if dec == nil { if dec == nil {
e = errors.New("Cannot decode b58 string '" + addr + "'") e = errors.New("Cannot decode b58 string '" + addr + "'")
...@@ -110,6 +124,34 @@ func CheckAddress(addr string) (e error) { ...@@ -110,6 +124,34 @@ func CheckAddress(addr string) (e error) {
e = errors.New("Address Checksum error") e = errors.New("Address Checksum error")
} }
} }
if dec[0] != ver {
e = errVersion
}
return e
}
//CheckMultiSignAddress 检查多重签名地址的有效性
func CheckMultiSignAddress(addr string) (e error) {
if value, ok := multiCheckAddressCache.Get(addr); ok {
if value == nil {
return nil
}
return value.(error)
}
e = checkAddress(MultiSignVer, addr)
multiCheckAddressCache.Add(addr, e)
return
}
//CheckAddress 检查地址
func CheckAddress(addr string) (e error) {
if value, ok := checkAddressCache.Get(addr); ok {
if value == nil {
return nil
}
return value.(error)
}
e = checkAddress(NormalVer, addr)
checkAddressCache.Add(addr, e) checkAddressCache.Add(addr, e)
return return
} }
......
...@@ -12,27 +12,42 @@ import ( ...@@ -12,27 +12,42 @@ import (
"time" "time"
"github.com/33cn/chain33/common/crypto" "github.com/33cn/chain33/common/crypto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
_ "github.com/33cn/chain33/system/crypto/init" _ "github.com/33cn/chain33/system/crypto/init"
) )
func TestAddress(t *testing.T) { func genkey() crypto.PrivKey {
c, err := crypto.New("secp256k1") c, err := crypto.New("secp256k1")
if err != nil { if err != nil {
t.Error(err) panic(err)
return
} }
key, err := c.GenKey() key, err := c.GenKey()
if err != nil { if err != nil {
t.Error(err) panic(err)
return
} }
return key
}
func TestAddress(t *testing.T) {
key := genkey()
t.Logf("%X", key.Bytes()) t.Logf("%X", key.Bytes())
addr := PubKeyToAddress(key.PubKey().Bytes()) addr := PubKeyToAddress(key.PubKey().Bytes())
t.Log(addr) t.Log(addr)
} }
func TestMultiSignAddress(t *testing.T) {
key := genkey()
addr1 := MultiSignAddress(key.PubKey().Bytes())
addr := MultiSignAddress(key.PubKey().Bytes())
assert.Equal(t, addr1, addr)
err := CheckAddress(addr)
assert.Equal(t, errVersion, err)
err = CheckMultiSignAddress(addr)
assert.Nil(t, err)
t.Log(addr)
}
func TestPubkeyToAddress(t *testing.T) { func TestPubkeyToAddress(t *testing.T) {
pubkey := "024a17b0c6eb3143839482faa7e917c9b90a8cfe5008dff748789b8cea1a3d08d5" pubkey := "024a17b0c6eb3143839482faa7e917c9b90a8cfe5008dff748789b8cea1a3d08d5"
b, err := hex.DecodeString(pubkey) b, err := hex.DecodeString(pubkey)
...@@ -61,6 +76,14 @@ func TestCheckAddress(t *testing.T) { ...@@ -61,6 +76,14 @@ func TestCheckAddress(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func TestExecAddress(t *testing.T) {
assert.Equal(t, "16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp", ExecAddress("ticket"))
assert.Equal(t, "16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp", ExecAddress("ticket"))
addr, err := NewAddrFromString(ExecAddress("ticket"))
assert.Nil(t, err)
assert.Equal(t, addr.Version, NormalVer)
}
func BenchmarkExecAddress(b *testing.B) { func BenchmarkExecAddress(b *testing.B) {
start := time.Now().UnixNano() / 1000000 start := time.Now().UnixNano() / 1000000
fmt.Println(start) fmt.Println(start)
......
package listmap
import (
"container/list"
"github.com/33cn/chain33/types"
)
//ListMap list 和 map 组合的一个数据结构体
type ListMap struct {
m map[string]*list.Element
l *list.List
}
//New 创建一个新的数据结构
func New() *ListMap {
return &ListMap{
m: make(map[string]*list.Element),
l: list.New(),
}
}
//Size 结构中的item 数目
func (lm *ListMap) Size() int {
return len(lm.m)
}
//Exist 是否存在这个元素
func (lm *ListMap) Exist(key string) bool {
_, ok := lm.m[key]
return ok
}
//GetItem 通过key 获取这个 item
func (lm *ListMap) GetItem(key string) (interface{}, error) {
item, ok := lm.m[key]
if !ok {
return nil, types.ErrNotFound
}
return item.Value, nil
}
//Push 在队伍尾部插入
func (lm *ListMap) Push(key string, value interface{}) {
if elm, ok := lm.m[key]; ok {
elm.Value = value
return
}
elm := lm.l.PushBack(value)
lm.m[key] = elm
}
//GetTop 获取队列头部的数据
func (lm *ListMap) GetTop() interface{} {
elm := lm.l.Front()
if elm == nil {
return nil
}
return elm.Value
}
//Remove 删除某个key
func (lm *ListMap) Remove(key string) interface{} {
if elm, ok := lm.m[key]; ok {
value := lm.l.Remove(elm)
delete(lm.m, key)
return value
}
return nil
}
//Walk 遍历整个结构,如果cb 返回false 那么停止遍历
func (lm *ListMap) Walk(cb func(value interface{}) bool) {
for e := lm.l.Front(); e != nil; e = e.Next() {
if cb == nil {
return
}
if !cb(e.Value) {
return
}
}
}
package listmap
import (
"testing"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
func TestListMap(t *testing.T) {
l := New()
l.Push("1", "1")
assert.Equal(t, 1, l.Size())
l.Push("1", "11")
assert.Equal(t, 1, l.Size())
value, err := l.GetItem("1")
assert.Equal(t, err, nil)
assert.Equal(t, "11", value.(string))
l.Remove("1")
assert.Equal(t, 0, l.Size())
assert.Equal(t, false, l.Exist("1"))
v := l.GetTop()
assert.Equal(t, nil, v)
_, err = l.GetItem("11")
assert.Equal(t, types.ErrNotFound, err)
l.Push("1", "11")
assert.Equal(t, true, l.Exist("1"))
l.Push("2", "2")
assert.Equal(t, "11", l.GetTop().(string))
var data [2]string
i := 0
l.Walk(func(value interface{}) bool {
data[i] = value.(string)
i++
return true
})
assert.Equal(t, data[0], "11")
assert.Equal(t, data[1], "2")
var data2 [2]string
i = 0
l.Walk(func(value interface{}) bool {
data2[i] = value.(string)
i++
return false
})
assert.Equal(t, data2[0], "11")
assert.Equal(t, data2[1], "")
l.Walk(nil)
l.Remove("xxxx")
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
import (
"container/list"
"github.com/33cn/chain33/types"
)
//--------------------------------------------------------------------------------
// Module txCache
type txCache struct {
size int
txMap map[string]*list.Element
txList *list.List
txFrontTen []*types.Transaction
accMap map[string][]*types.Transaction
}
// Item 为Mempool中包装交易的数据结构
type Item struct {
value *types.Transaction
priority int64
enterTime int64
}
// NewTxCache初始化txCache
func newTxCache(cacheSize int64) *txCache {
return &txCache{
size: int(cacheSize),
txMap: make(map[string]*list.Element, cacheSize),
txList: list.New(),
txFrontTen: make([]*types.Transaction, 0),
accMap: make(map[string][]*types.Transaction),
}
}
// txCache.TxNumOfAccount返回账户在Mempool中交易数量
func (cache *txCache) TxNumOfAccount(addr string) int64 {
return int64(len(cache.accMap[addr]))
}
// txCache.Exists判断txCache中是否存在给定tx
func (cache *txCache) Exists(hash []byte) bool {
_, exists := cache.txMap[string(hash)]
return exists
}
// txCache.Push把给定tx添加到txCache;如果tx已经存在txCache中或Mempool已满则返回对应error
func (cache *txCache) Push(tx *types.Transaction) error {
hash := tx.Hash()
if cache.Exists(hash) {
addedItem, ok := cache.txMap[string(hash)].Value.(*Item)
if !ok {
mlog.Error("mempoolItemError", "item", cache.txMap[string(hash)].Value)
return types.ErrTxExist
}
addedTime := addedItem.enterTime
if types.Now().Unix()-addedTime < mempoolDupResendInterval {
return types.ErrTxExist
}
// 超过2分钟之后的重发交易返回nil,再次发送给P2P,但是不再次加入mempool
// 并修改其enterTime,以避免该交易一直在节点间被重发
newEnterTime := types.Now().Unix()
resendItem := &Item{value: tx, priority: tx.Fee, enterTime: newEnterTime}
newItem := cache.txList.InsertAfter(resendItem, cache.txMap[string(hash)])
cache.txList.Remove(cache.txMap[string(hash)])
cache.txMap[string(hash)] = newItem
// ------------------
return nil
}
if cache.txList.Len() >= cache.size {
return types.ErrMemFull
}
it := &Item{value: tx, priority: tx.Fee, enterTime: types.Now().Unix()}
txElement := cache.txList.PushBack(it)
cache.txMap[string(hash)] = txElement
// 账户交易数量
accountAddr := tx.From()
cache.accMap[accountAddr] = append(cache.accMap[accountAddr], tx)
if len(cache.txFrontTen) >= 10 {
cache.txFrontTen = cache.txFrontTen[len(cache.txFrontTen)-9:]
}
cache.txFrontTen = append(cache.txFrontTen, tx)
return nil
}
// txCache.GetLatestTx返回最新十条加入到txCache的交易
func (cache *txCache) GetLatestTx() []*types.Transaction {
return cache.txFrontTen
}
// txCache.Remove移除txCache中给定tx
func (cache *txCache) Remove(hash []byte) {
value := cache.txList.Remove(cache.txMap[string(hash)])
delete(cache.txMap, string(hash))
// 账户交易数量减1
if value == nil {
return
}
tx := value.(*Item).value
addr := tx.From()
if cache.TxNumOfAccount(addr) > 0 {
cache.AccountTxNumDecrease(addr, hash)
}
}
// txCache.Size返回txCache中已存tx数目
func (cache *txCache) Size() int {
return cache.txList.Len()
}
// txCache.SetSize用来设置Mempool容量
func (cache *txCache) SetSize(newSize int) {
if cache.txList.Len() > 0 {
panic("only can set a empty size")
}
cache.size = newSize
}
// txCache.GetAccTxs用来获取对应账户地址(列表)中的全部交易详细信息
func (cache *txCache) GetAccTxs(addrs *types.ReqAddrs) *types.TransactionDetails {
res := &types.TransactionDetails{}
for _, addr := range addrs.Addrs {
if value, ok := cache.accMap[addr]; ok {
for _, v := range value {
txAmount, err := v.Amount()
if err != nil {
// continue
txAmount = 0
}
res.Txs = append(res.Txs,
&types.TransactionDetail{
Tx: v,
Amount: txAmount,
Fromaddr: addr,
ActionName: v.ActionName(),
})
}
}
}
return res
}
// txCache.AccountTxNumDecrease根据交易哈希删除对应账户的对应交易
func (cache *txCache) AccountTxNumDecrease(addr string, hash []byte) {
if value, ok := cache.accMap[addr]; ok {
for i, t := range value {
if string(t.Hash()) == string(hash) {
cache.accMap[addr] = append(cache.accMap[addr][:i], cache.accMap[addr][i+1:]...)
if len(cache.accMap[addr]) == 0 {
delete(cache.accMap, addr)
}
return
}
}
}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool package mempool
//mempool 模块的功能:实现交易暂存的功能。主要是解决共识模块可能比rpc模块速度慢的问题。 //交易打包排序相关的模块
//模块的接口的设计: //模块功能:模块主要的功能是实现共识排序的功能,包括完整的共识的实现。
//接口设计:
...@@ -76,6 +76,9 @@ func New(cfg *types.P2P) *P2p { ...@@ -76,6 +76,9 @@ func New(cfg *types.P2P) *P2p {
return p2p return p2p
} }
//Wait wait for ready
func (network *P2p) Wait() {}
func (network *P2p) isClose() bool { func (network *P2p) isClose() bool {
return atomic.LoadInt32(&network.closed) == 1 return atomic.LoadInt32(&network.closed) == 1
} }
......
...@@ -43,6 +43,8 @@ type Client interface { ...@@ -43,6 +43,8 @@ type Client interface {
// Module be used for module interface // Module be used for module interface
type Module interface { type Module interface {
SetQueueClient(client Client) SetQueueClient(client Client)
//wait for ready
Wait()
Close() Close()
} }
......
...@@ -47,6 +47,9 @@ func (c *channelClient) CreateRawTransaction(param *types.CreateTx) ([]byte, err ...@@ -47,6 +47,9 @@ func (c *channelClient) CreateRawTransaction(param *types.CreateTx) ([]byte, err
if param.IsToken { if param.IsToken {
execer = types.ExecName("token") execer = types.ExecName("token")
} }
if param.Execer != "" {
execer = param.Execer
}
return types.CallCreateTx(execer, "", param) return types.CallCreateTx(execer, "", param)
} }
......
...@@ -88,23 +88,6 @@ func testCreateRawTransactionCoinTransfer(t *testing.T) { ...@@ -88,23 +88,6 @@ func testCreateRawTransactionCoinTransfer(t *testing.T) {
Note: []byte("note"), Note: []byte("note"),
} }
//v := &cty.CoinsAction_Transfer{
// Transfer:&cty.CoinsTransfer{
// Amount:ctx.Amount,
// Note:ctx.To,
// },
//}
//transfer := &cty.CoinsAction{
// Value:v,
// Ty:cty.CoinsActionTransfer,
//}
//
//tx := &types.Transaction{
// Execer:[]byte("coins"),
// Payload:types.Encode(transfer),
// To:ctx.To,
//}
client := newTestChannelClient() client := newTestChannelClient()
txHex, err := client.CreateRawTransaction(&ctx) txHex, err := client.CreateRawTransaction(&ctx)
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -38,7 +38,8 @@ func (g *Grpc) CreateRawTransaction(ctx context.Context, in *pb.CreateTx) (*pb.U ...@@ -38,7 +38,8 @@ func (g *Grpc) CreateRawTransaction(ctx context.Context, in *pb.CreateTx) (*pb.U
// CreateTransaction create transaction of grpc // CreateTransaction create transaction of grpc
func (g *Grpc) CreateTransaction(ctx context.Context, in *pb.CreateTxIn) (*pb.UnsignTx, error) { func (g *Grpc) CreateTransaction(ctx context.Context, in *pb.CreateTxIn) (*pb.UnsignTx, error) {
exec := pb.LoadExecutorType(string(in.Execer)) execer := pb.ExecName(string(in.Execer))
exec := pb.LoadExecutorType(execer)
if exec == nil { if exec == nil {
log.Error("callExecNewTx", "Error", "exec not found") log.Error("callExecNewTx", "Error", "exec not found")
return nil, pb.ErrNotSupport return nil, pb.ErrNotSupport
...@@ -52,7 +53,7 @@ func (g *Grpc) CreateTransaction(ctx context.Context, in *pb.CreateTxIn) (*pb.Un ...@@ -52,7 +53,7 @@ func (g *Grpc) CreateTransaction(ctx context.Context, in *pb.CreateTxIn) (*pb.Un
if err != nil { if err != nil {
return nil, err return nil, err
} }
reply, err := pb.CallCreateTx(string(in.Execer), in.ActionName, msg) reply, err := pb.CallCreateTx(execer, in.ActionName, msg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -20,12 +20,26 @@ import ( ...@@ -20,12 +20,26 @@ import (
) )
// CreateRawTransaction create rawtransaction by jrpc // CreateRawTransaction create rawtransaction by jrpc
func (c *Chain33) CreateRawTransaction(in *types.CreateTx, result *interface{}) error { func (c *Chain33) CreateRawTransaction(in *rpctypes.CreateTx, result *interface{}) error {
reply, err := c.cli.CreateRawTransaction(in) if in == nil {
log.Error("CreateRawTransaction", "Error", types.ErrInvalidParam)
return types.ErrInvalidParam
}
inpb := &types.CreateTx{
To: in.To,
Amount: in.Amount,
Fee: in.Fee,
Note: []byte(in.Note),
IsWithdraw: in.IsWithdraw,
IsToken: in.IsToken,
TokenSymbol: in.TokenSymbol,
ExecName: in.ExecName,
Execer: in.Execer,
}
reply, err := c.cli.CreateRawTransaction(inpb)
if err != nil { if err != nil {
return err return err
} }
*result = hex.EncodeToString(reply) *result = hex.EncodeToString(reply)
return nil return nil
} }
...@@ -404,50 +418,6 @@ func (c *Chain33) ImportPrivkey(in types.ReqWalletImportPrivkey, result *interfa ...@@ -404,50 +418,6 @@ func (c *Chain33) ImportPrivkey(in types.ReqWalletImportPrivkey, result *interfa
// SendToAddress send to address of coins // SendToAddress send to address of coins
func (c *Chain33) SendToAddress(in types.ReqWalletSendToAddress, result *interface{}) error { func (c *Chain33) SendToAddress(in types.ReqWalletSendToAddress, result *interface{}) error {
log.Debug("Rpc SendToAddress", "Tx", in)
if types.IsPara() {
createTx := &types.CreateTx{
To: in.GetTo(),
Amount: in.GetAmount(),
Fee: 1e5,
Note: in.GetNote(),
IsWithdraw: false,
IsToken: true,
TokenSymbol: in.GetTokenSymbol(),
ExecName: types.ExecName("token"),
}
tx, err := c.cli.CreateRawTransaction(createTx)
if err != nil {
log.Debug("ParaChain CreateRawTransaction", "Error", err.Error())
return err
}
//不需要自己去导出私钥,signRawTx 里面只需带入公钥地址,也回优先去查出相应的私钥,前提是私钥已经导入
reqSignRawTx := &types.ReqSignRawTx{
Addr: in.From,
Privkey: "",
TxHex: hex.EncodeToString(tx),
Expire: "300s",
Index: 0,
Token: "",
}
replySignRawTx, err := c.cli.SignRawTx(reqSignRawTx)
if err != nil {
log.Debug("ParaChain SignRawTx", "Error", err.Error())
return err
}
rawParm := rpctypes.RawParm{
Token: "",
Data: replySignRawTx.GetTxHex(),
}
var txHash interface{}
err = forwardTranToMainNet(rawParm, &txHash)
if err != nil {
log.Debug("ParaChain forwardTranToMainNet", "Error", err.Error())
return err
}
*result = &rpctypes.ReplyHash{Hash: txHash.(string)}
return nil
}
reply, err := c.cli.WalletSendToAddress(&in) reply, err := c.cli.WalletSendToAddress(&in)
if err != nil { if err != nil {
log.Debug("SendToAddress", "Error", err.Error()) log.Debug("SendToAddress", "Error", err.Error())
...@@ -1120,7 +1090,7 @@ func (c *Chain33) CreateTransaction(in *rpctypes.CreateTxIn, result *interface{} ...@@ -1120,7 +1090,7 @@ func (c *Chain33) CreateTransaction(in *rpctypes.CreateTxIn, result *interface{}
if in == nil { if in == nil {
return types.ErrInvalidParam return types.ErrInvalidParam
} }
btx, err := types.CallCreateTxJSON(in.Execer, in.ActionName, in.Payload) btx, err := types.CallCreateTxJSON(types.ExecName(in.Execer), in.ActionName, in.Payload)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -387,11 +387,11 @@ func TestChain33_CreateRawTransaction(t *testing.T) { ...@@ -387,11 +387,11 @@ func TestChain33_CreateRawTransaction(t *testing.T) {
assert.Nil(t, testResult) assert.Nil(t, testResult)
assert.NotNil(t, err) assert.NotNil(t, err)
tx := &types.CreateTx{ tx := &rpctypes.CreateTx{
To: "qew", To: "184wj4nsgVxKyz2NhM3Yb5RK5Ap6AFRFq2",
Amount: 10, Amount: 10,
Fee: 1, Fee: 1,
Note: []byte("12312"), Note: "12312",
IsWithdraw: false, IsWithdraw: false,
IsToken: false, IsToken: false,
TokenSymbol: "", TokenSymbol: "",
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
package rpc package rpc
import ( import (
"encoding/hex"
"errors" "errors"
"testing" "testing"
"time" "time"
...@@ -114,10 +115,33 @@ func TestJSONClient_Call(t *testing.T) { ...@@ -114,10 +115,33 @@ func TestJSONClient_Call(t *testing.T) {
err = jsonClient.Call("Chain33.IsNtpClockSync", &types.ReqNil{}, &retNtp) err = jsonClient.Call("Chain33.IsNtpClockSync", &types.ReqNil{}, &retNtp)
assert.Nil(t, err) assert.Nil(t, err)
assert.True(t, retNtp) assert.True(t, retNtp)
testCreateTxCoins(t, jsonClient)
server.Close() server.Close()
mock.AssertExpectationsForObjects(t, api) mock.AssertExpectationsForObjects(t, api)
} }
func testCreateTxCoins(t *testing.T, jsonClient *jsonclient.JSONClient) {
req := &rpctypes.CreateTx{
To: "184wj4nsgVxKyz2NhM3Yb5RK5Ap6AFRFq2",
Amount: 10,
Fee: 1,
Note: "12312",
IsWithdraw: false,
IsToken: false,
TokenSymbol: "",
ExecName: types.ExecName("coins"),
}
var res string
err := jsonClient.Call("Chain33.CreateRawTransaction", req, &res)
assert.Nil(t, err)
txbytes, err := hex.DecodeString(res)
assert.Nil(t, err)
var tx types.Transaction
err = types.Decode(txbytes, &tx)
assert.Nil(t, err)
assert.Equal(t, "184wj4nsgVxKyz2NhM3Yb5RK5Ap6AFRFq2", tx.To)
}
func TestGrpc_Call(t *testing.T) { func TestGrpc_Call(t *testing.T) {
rpcCfg = new(types.RPC) rpcCfg = new(types.RPC)
rpcCfg.GrpcBindAddr = "127.0.0.1:8101" rpcCfg.GrpcBindAddr = "127.0.0.1:8101"
......
...@@ -360,3 +360,16 @@ type ExecAccount struct { ...@@ -360,3 +360,16 @@ type ExecAccount struct {
type ExecNameParm struct { type ExecNameParm struct {
ExecName string `json:"execname"` ExecName string `json:"execname"`
} }
//CreateTx 为了简化Note 的创建过程,在json rpc 中,note 采用string 格式
type CreateTx struct {
To string `json:"to,omitempty"`
Amount int64 `json:"amount,omitempty"`
Fee int64 `json:"fee,omitempty"`
Note string `json:"note,omitempty"`
IsWithdraw bool `json:"isWithdraw,omitempty"`
IsToken bool `json:"isToken,omitempty"`
TokenSymbol string `json:"tokenSymbol,omitempty"`
ExecName string `json:"execName,omitempty"` //TransferToExec and Withdraw 的执行器
Execer string `json:"execer,omitempty"` //执行器名称
}
...@@ -105,6 +105,9 @@ func (bc *BaseClient) InitMiner() { ...@@ -105,6 +105,9 @@ func (bc *BaseClient) InitMiner() {
bc.once.Do(bc.minerstartCB) bc.once.Do(bc.minerstartCB)
} }
//Wait wait for ready
func (bc *BaseClient) Wait() {}
//SetQueueClient 设置客户端队列 //SetQueueClient 设置客户端队列
func (bc *BaseClient) SetQueueClient(c queue.Client) { func (bc *BaseClient) SetQueueClient(c queue.Client) {
bc.InitClient(c, func() { bc.InitClient(c, func() {
...@@ -334,6 +337,10 @@ func buildHashList(deltx []*types.Transaction) *types.TxHashList { ...@@ -334,6 +337,10 @@ func buildHashList(deltx []*types.Transaction) *types.TxHashList {
//WriteBlock 向blockchain写区块 //WriteBlock 向blockchain写区块
func (bc *BaseClient) WriteBlock(prev []byte, block *types.Block) error { func (bc *BaseClient) WriteBlock(prev []byte, block *types.Block) error {
//保存block的原始信息用于删除mempool中的错误交易
rawtxs := make([]*types.Transaction, len(block.Txs))
copy(rawtxs, block.Txs)
blockdetail := &types.BlockDetail{Block: block} blockdetail := &types.BlockDetail{Block: block}
msg := bc.client.NewMessage("blockchain", types.EventAddBlockDetail, blockdetail) msg := bc.client.NewMessage("blockchain", types.EventAddBlockDetail, blockdetail)
bc.client.Send(msg, true) bc.client.Send(msg, true)
...@@ -343,7 +350,7 @@ func (bc *BaseClient) WriteBlock(prev []byte, block *types.Block) error { ...@@ -343,7 +350,7 @@ func (bc *BaseClient) WriteBlock(prev []byte, block *types.Block) error {
} }
blockdetail = resp.GetData().(*types.BlockDetail) blockdetail = resp.GetData().(*types.BlockDetail)
//从mempool 中删除错误的交易 //从mempool 中删除错误的交易
deltx := diffTx(block.Txs, blockdetail.Block.Txs) deltx := diffTx(rawtxs, blockdetail.Block.Txs)
if len(deltx) > 0 { if len(deltx) > 0 {
bc.delMempoolTx(deltx) bc.delMempoolTx(deltx)
} }
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
//加载系统内置store, 不要依赖plugin //加载系统内置store, 不要依赖plugin
_ "github.com/33cn/chain33/system/dapp/init" _ "github.com/33cn/chain33/system/dapp/init"
_ "github.com/33cn/chain33/system/mempool/init"
_ "github.com/33cn/chain33/system/store/init" _ "github.com/33cn/chain33/system/store/init"
) )
......
...@@ -293,7 +293,7 @@ func getblockbyhashs(cmd *cobra.Command, args []string) { ...@@ -293,7 +293,7 @@ func getblockbyhashs(cmd *cobra.Command, args []string) {
Hashes: hashesArr, Hashes: hashesArr,
} }
var res types.BlockDetails var res rpctypes.BlockDetails
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.GetBlockByHashes", params, &res) ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.GetBlockByHashes", params, &res)
//ctx.SetResultCb(parseQueryTxsByHashesRes) //ctx.SetResultCb(parseQueryTxsByHashesRes)
ctx.Run() ctx.Run()
......
...@@ -9,5 +9,6 @@ import ( ...@@ -9,5 +9,6 @@ import (
_ "github.com/33cn/chain33/system/consensus/init" //register consensus init package _ "github.com/33cn/chain33/system/consensus/init" //register consensus init package
_ "github.com/33cn/chain33/system/crypto/init" _ "github.com/33cn/chain33/system/crypto/init"
_ "github.com/33cn/chain33/system/dapp/init" _ "github.com/33cn/chain33/system/dapp/init"
_ "github.com/33cn/chain33/system/mempool/init"
_ "github.com/33cn/chain33/system/store/init" _ "github.com/33cn/chain33/system/store/init"
) )
package mempool
import (
"github.com/33cn/chain33/common/listmap"
"github.com/33cn/chain33/types"
)
//AccountTxIndex 账户和交易索引
type AccountTxIndex struct {
maxperaccount int
accMap map[string]*listmap.ListMap
}
//NewAccountTxIndex 创建一个新的索引
func NewAccountTxIndex(maxperaccount int) *AccountTxIndex {
return &AccountTxIndex{
maxperaccount: maxperaccount,
accMap: make(map[string]*listmap.ListMap),
}
}
// TxNumOfAccount 返回账户在Mempool中交易数量
func (cache *AccountTxIndex) TxNumOfAccount(addr string) int {
if _, ok := cache.accMap[addr]; ok {
return cache.accMap[addr].Size()
}
return 0
}
// GetAccTxs 用来获取对应账户地址(列表)中的全部交易详细信息
func (cache *AccountTxIndex) GetAccTxs(addrs *types.ReqAddrs) *types.TransactionDetails {
res := &types.TransactionDetails{}
for _, addr := range addrs.Addrs {
if value, ok := cache.accMap[addr]; ok {
value.Walk(func(val interface{}) bool {
v := val.(*types.Transaction)
txAmount, err := v.Amount()
if err != nil {
txAmount = 0
}
res.Txs = append(res.Txs,
&types.TransactionDetail{
Tx: v,
Amount: txAmount,
Fromaddr: addr,
ActionName: v.ActionName(),
})
return true
})
}
}
return res
}
//Remove 根据交易哈希删除对应账户的对应交易
func (cache *AccountTxIndex) Remove(tx *types.Transaction) {
addr := tx.From()
if lm, ok := cache.accMap[addr]; ok {
lm.Remove(string(tx.Hash()))
if lm.Size() == 0 {
delete(cache.accMap, addr)
}
}
}
// Push push transaction to AccountTxIndex
func (cache *AccountTxIndex) Push(tx *types.Transaction) error {
addr := tx.From()
_, ok := cache.accMap[addr]
if !ok {
cache.accMap[addr] = listmap.New()
}
if cache.accMap[addr].Size() >= cache.maxperaccount {
return types.ErrManyTx
}
cache.accMap[addr].Push(string(tx.Hash()), tx)
return nil
}
//CanPush 是否可以push 进 account index
func (cache *AccountTxIndex) CanPush(tx *types.Transaction) bool {
if item, ok := cache.accMap[tx.From()]; ok {
return item.Size() < cache.maxperaccount
}
return true
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
import (
"sync"
"sync/atomic"
"time"
"github.com/33cn/chain33/common"
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
)
var mlog = log.New("module", "mempool.base")
//Mempool mempool 基础类
type Mempool struct {
proxyMtx sync.Mutex
in chan queue.Message
out <-chan queue.Message
client queue.Client
header *types.Header
sync bool
cfg *types.Mempool
poolHeader chan struct{}
isclose int32
wg sync.WaitGroup
done chan struct{}
removeBlockTicket *time.Ticker
cache *txCache
}
//GetSync 判断是否mempool 同步
func (mem *Mempool) getSync() bool {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
return mem.sync
}
//NewMempool 新建mempool 实例
func NewMempool(cfg *types.Mempool) *Mempool {
pool := &Mempool{}
if cfg.MaxTxNumPerAccount == 0 {
cfg.MaxTxNumPerAccount = maxTxNumPerAccount
}
if cfg.MaxTxLast == 0 {
cfg.MaxTxLast = maxTxLast
}
if cfg.PoolCacheSize == 0 {
cfg.PoolCacheSize = poolCacheSize
}
pool.in = make(chan queue.Message)
pool.out = make(<-chan queue.Message)
pool.done = make(chan struct{})
pool.cfg = cfg
pool.poolHeader = make(chan struct{}, 2)
pool.removeBlockTicket = time.NewTicker(time.Minute)
pool.cache = newCache(cfg.MaxTxNumPerAccount, cfg.MaxTxLast)
return pool
}
//Close 关闭mempool
func (mem *Mempool) Close() {
if mem.isClose() {
return
}
atomic.StoreInt32(&mem.isclose, 1)
close(mem.done)
if mem.client != nil {
mem.client.Close()
}
mem.removeBlockTicket.Stop()
mlog.Info("mempool module closing")
mem.wg.Wait()
mlog.Info("mempool module closed")
}
//SetQueueClient 初始化mempool模块
func (mem *Mempool) SetQueueClient(client queue.Client) {
mem.client = client
mem.client.Sub("mempool")
mem.wg.Add(1)
go mem.pollLastHeader()
mem.wg.Add(1)
go mem.checkSync()
mem.wg.Add(1)
go mem.removeBlockedTxs()
mem.wg.Add(1)
go mem.eventProcess()
}
// Size 返回mempool中txCache大小
func (mem *Mempool) Size() int {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
return mem.cache.Size()
}
// SetMinFee 设置最小交易费用
func (mem *Mempool) SetMinFee(fee int64) {
mem.proxyMtx.Lock()
mem.cfg.MinTxFee = fee
mem.proxyMtx.Unlock()
}
//SetQueueCache 设置排队策略
func (mem *Mempool) SetQueueCache(qcache QueueCache) {
mem.cache.SetQueueCache(qcache)
}
// GetTxList 从txCache中返回给定数目的tx
func (mem *Mempool) getTxList(filterList *types.TxHashList) (txs []*types.Transaction) {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
count := filterList.GetCount()
dupMap := make(map[string]bool)
for i := 0; i < len(filterList.GetHashes()); i++ {
dupMap[string(filterList.GetHashes()[i])] = true
}
return mem.filterTxList(count, dupMap)
}
func (mem *Mempool) filterTxList(count int64, dupMap map[string]bool) (txs []*types.Transaction) {
height := mem.header.GetHeight()
blocktime := mem.header.GetBlockTime()
mem.cache.Walk(int(count), func(tx *Item) bool {
if dupMap != nil {
if _, ok := dupMap[string(tx.Value.Hash())]; ok {
return true
}
}
if isExpired(tx, height, blocktime) {
return true
}
txs = append(txs, tx.Value)
return true
})
return txs
}
// RemoveTxs 从mempool中删除给定Hash的txs
func (mem *Mempool) RemoveTxs(hashList *types.TxHashList) error {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
for _, hash := range hashList.Hashes {
exist := mem.cache.Exist(string(hash))
if exist {
mem.cache.Remove(string(hash))
}
}
return nil
}
// PushTx 将交易推入mempool,并返回结果(error)
func (mem *Mempool) PushTx(tx *types.Transaction) error {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
err := mem.cache.Push(tx)
return err
}
// setHeader设置mempool.header
func (mem *Mempool) setHeader(h *types.Header) {
mem.proxyMtx.Lock()
mem.header = h
mem.proxyMtx.Unlock()
}
// GetHeader 获取Mempool.header
func (mem *Mempool) GetHeader() *types.Header {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
return mem.header
}
//IsClose 判断是否mempool 关闭
func (mem *Mempool) isClose() bool {
return atomic.LoadInt32(&mem.isclose) == 1
}
// GetLastHeader 获取LastHeader的height和blockTime
func (mem *Mempool) GetLastHeader() (interface{}, error) {
if mem.client == nil {
panic("client not bind message queue.")
}
msg := mem.client.NewMessage("blockchain", types.EventGetLastHeader, nil)
err := mem.client.Send(msg, true)
if err != nil {
mlog.Error("blockchain closed", "err", err.Error())
return nil, err
}
return mem.client.Wait(msg)
}
// GetAccTxs 用来获取对应账户地址(列表)中的全部交易详细信息
func (mem *Mempool) GetAccTxs(addrs *types.ReqAddrs) *types.TransactionDetails {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
return mem.cache.GetAccTxs(addrs)
}
// TxNumOfAccount 返回账户在mempool中交易数量
func (mem *Mempool) TxNumOfAccount(addr string) int64 {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
return int64(mem.cache.TxNumOfAccount(addr))
}
// GetLatestTx 返回最新十条加入到mempool的交易
func (mem *Mempool) GetLatestTx() []*types.Transaction {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
return mem.cache.GetLatestTx()
}
// pollLastHeader在初始化后循环获取LastHeader,直到获取成功后,返回
func (mem *Mempool) pollLastHeader() {
defer mem.wg.Done()
defer func() {
mlog.Info("pollLastHeader quit")
mem.poolHeader <- struct{}{}
}()
for {
if mem.isClose() {
return
}
lastHeader, err := mem.GetLastHeader()
if err != nil {
mlog.Error(err.Error())
time.Sleep(time.Second)
continue
}
h := lastHeader.(queue.Message).Data.(*types.Header)
mem.setHeader(h)
return
}
}
func (mem *Mempool) removeExpired() {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
mem.cache.removeExpiredTx(mem.header.GetHeight(), mem.header.GetBlockTime())
}
// removeBlockedTxs 每隔1分钟清理一次已打包的交易
func (mem *Mempool) removeBlockedTxs() {
defer mem.wg.Done()
defer mlog.Info("RemoveBlockedTxs quit")
if mem.client == nil {
panic("client not bind message queue.")
}
for {
select {
case <-mem.removeBlockTicket.C:
if mem.isClose() {
return
}
mem.removeExpired()
case <-mem.done:
return
}
}
}
// RemoveTxsOfBlock 移除mempool中已被Blockchain打包的tx
func (mem *Mempool) RemoveTxsOfBlock(block *types.Block) bool {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
for _, tx := range block.Txs {
hash := tx.Hash()
exist := mem.cache.Exist(string(hash))
if exist {
mem.cache.Remove(string(hash))
}
}
return true
}
// Mempool.DelBlock将回退的区块内的交易重新加入mempool中
func (mem *Mempool) delBlock(block *types.Block) {
if len(block.Txs) <= 0 {
return
}
blkTxs := block.Txs
header := mem.GetHeader()
for i := 0; i < len(blkTxs); i++ {
tx := blkTxs[i]
//当前包括ticket和平行链的第一笔挖矿交易,统一actionName为miner
if i == 0 && tx.ActionName() == types.MinerAction {
continue
}
groupCount := int(tx.GetGroupCount())
if groupCount > 1 && i+groupCount <= len(blkTxs) {
group := types.Transactions{Txs: blkTxs[i : i+groupCount]}
tx = group.Tx()
i = i + groupCount - 1
}
err := tx.Check(header.GetHeight(), mem.cfg.MinTxFee)
if err != nil {
continue
}
if !mem.checkExpireValid(tx) {
continue
}
mem.PushTx(tx)
}
}
// Height 获取区块高度
func (mem *Mempool) Height() int64 {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
if mem.header == nil {
return -1
}
return mem.header.GetHeight()
}
// Wait wait mempool ready
func (mem *Mempool) Wait() {
<-mem.poolHeader
//wait sync
<-mem.poolHeader
}
// SendTxToP2P 向"p2p"发送消息
func (mem *Mempool) sendTxToP2P(tx *types.Transaction) {
if mem.client == nil {
panic("client not bind message queue.")
}
msg := mem.client.NewMessage("p2p", types.EventTxBroadcast, tx)
mem.client.Send(msg, false)
mlog.Debug("tx sent to p2p", "tx.Hash", common.ToHex(tx.Hash()))
}
// Mempool.checkSync检查并获取mempool同步状态
func (mem *Mempool) checkSync() {
defer func() {
mlog.Info("getsync quit")
mem.poolHeader <- struct{}{}
}()
defer mem.wg.Done()
if mem.getSync() {
return
}
if mem.cfg.ForceAccept {
mem.setSync(true)
}
for {
if mem.isClose() {
return
}
if mem.client == nil {
panic("client not bind message queue.")
}
msg := mem.client.NewMessage("blockchain", types.EventIsSync, nil)
err := mem.client.Send(msg, true)
if err != nil {
time.Sleep(time.Second)
continue
}
resp, err := mem.client.Wait(msg)
if err != nil {
time.Sleep(time.Second)
continue
}
if resp.GetData().(*types.IsCaughtUp).GetIscaughtup() {
mem.setSync(true)
return
}
time.Sleep(time.Second)
continue
}
}
func (mem *Mempool) setSync(status bool) {
mem.proxyMtx.Lock()
mem.sync = status
mem.proxyMtx.Unlock()
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found In the LICENSE file.
package mempool
import (
"github.com/33cn/chain33/types"
)
//QueueCache 排队交易处理
type QueueCache interface {
Exist(hash string) bool
GetItem(hash string) (*Item, error)
Push(tx *Item) error
Remove(hash string) error
Size() int
Walk(count int, cb func(tx *Item) bool)
}
// Item 为Mempool中包装交易的数据结构
type Item struct {
Value *types.Transaction
Priority int64
EnterTime int64
}
//TxCache 管理交易cache 包括账户索引,最后的交易,排队策略缓存
type txCache struct {
*AccountTxIndex
*LastTxCache
qcache QueueCache
}
//NewTxCache init accountIndex and last cache
func newCache(maxTxPerAccount int64, sizeLast int64) *txCache {
return &txCache{
AccountTxIndex: NewAccountTxIndex(int(maxTxPerAccount)),
LastTxCache: NewLastTxCache(int(sizeLast)),
}
}
//SetQueueCache set queue cache , 这个接口可以扩展
func (cache *txCache) SetQueueCache(qcache QueueCache) {
cache.qcache = qcache
}
//Remove 移除txCache中给定tx
func (cache *txCache) Remove(hash string) {
item, err := cache.qcache.GetItem(hash)
if err != nil {
return
}
tx := item.Value
cache.qcache.Remove(hash)
cache.AccountTxIndex.Remove(tx)
cache.LastTxCache.Remove(tx)
}
//Exist 是否存在
func (cache *txCache) Exist(hash string) bool {
if cache.qcache == nil {
return false
}
return cache.qcache.Exist(hash)
}
//Size cache tx num
func (cache *txCache) Size() int {
if cache.qcache == nil {
return 0
}
return cache.qcache.Size()
}
//Walk iter all txs
func (cache *txCache) Walk(count int, cb func(tx *Item) bool) {
if cache.qcache == nil {
return
}
cache.qcache.Walk(count, cb)
}
//RemoveTxs 删除一组交易
func (cache *txCache) RemoveTxs(txs []string) {
for _, t := range txs {
cache.Remove(t)
}
}
//Push 存入交易到cache 中
func (cache *txCache) Push(tx *types.Transaction) error {
if !cache.AccountTxIndex.CanPush(tx) {
return types.ErrManyTx
}
item := &Item{Value: tx, Priority: tx.Fee, EnterTime: types.Now().Unix()}
err := cache.qcache.Push(item)
if err != nil {
return err
}
cache.AccountTxIndex.Push(tx)
cache.LastTxCache.Push(tx)
return nil
}
func (cache *txCache) removeExpiredTx(height, blocktime int64) {
var txs []string
cache.qcache.Walk(0, func(tx *Item) bool {
if isExpired(tx, height, blocktime) {
txs = append(txs, string(tx.Value.Hash()))
}
return true
})
cache.RemoveTxs(txs)
}
//判断交易是否过期
func isExpired(item *Item, height, blockTime int64) bool {
if types.Now().Unix()-item.EnterTime >= mempoolExpiredInterval {
return true
}
if item.Value.IsExpire(height, blockTime) {
return true
}
return false
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool package mempool
import ( import (
"errors" "errors"
"time"
"github.com/33cn/chain33/common/address" "github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/queue" "github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
) )
// Mempool.CheckTxList初步检查并筛选交易消息 // CheckExpireValid 检查交易过期有效性,过期返回false,未过期返回true
func (mem *Mempool) CheckExpireValid(msg queue.Message) (bool, error) {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
if mem.header == nil {
return false, types.ErrHeaderNotSet
}
tx := msg.GetData().(types.TxGroup).Tx()
ok := mem.checkExpireValid(tx)
if !ok {
return ok, types.ErrTxExpire
}
return ok, nil
}
// checkTxListRemote 发送消息给执行模块检查交易
func (mem *Mempool) checkTxListRemote(txlist *types.ExecTxList) (*types.ReceiptCheckTxList, error) {
if mem.client == nil {
panic("client not bind message queue.")
}
msg := mem.client.NewMessage("execs", types.EventCheckTx, txlist)
err := mem.client.Send(msg, true)
if err != nil {
mlog.Error("execs closed", "err", err.Error())
return nil, err
}
msg, err = mem.client.Wait(msg)
if err != nil {
return nil, err
}
return msg.GetData().(*types.ReceiptCheckTxList), nil
}
func (mem *Mempool) checkExpireValid(tx *types.Transaction) bool {
if tx.IsExpire(mem.header.GetHeight(), mem.header.GetBlockTime()) {
return false
}
if tx.Expire > 1000000000 && tx.Expire < types.Now().Unix()+int64(time.Minute/time.Second) {
return false
}
return true
}
// CheckTx 初步检查并筛选交易消息
func (mem *Mempool) checkTx(msg queue.Message) queue.Message { func (mem *Mempool) checkTx(msg queue.Message) queue.Message {
tx := msg.GetData().(types.TxGroup).Tx() tx := msg.GetData().(types.TxGroup).Tx()
// 检查接收地址是否合法 // 检查接收地址是否合法
...@@ -20,13 +61,7 @@ func (mem *Mempool) checkTx(msg queue.Message) queue.Message { ...@@ -20,13 +61,7 @@ func (mem *Mempool) checkTx(msg queue.Message) queue.Message {
msg.Data = types.ErrInvalidAddress msg.Data = types.ErrInvalidAddress
return msg return msg
} }
// 检查交易是否为重复交易 // 检查交易账户在mempool中是否存在过多交易
if mem.addedTxs.Contains(string(tx.Hash())) {
msg.Data = types.ErrDupTx
return msg
}
// 检查交易账户在Mempool中是否存在过多交易
from := tx.From() from := tx.From()
if mem.TxNumOfAccount(from) >= maxTxNumPerAccount { if mem.TxNumOfAccount(from) >= maxTxNumPerAccount {
msg.Data = types.ErrManyTx msg.Data = types.ErrManyTx
...@@ -42,7 +77,7 @@ func (mem *Mempool) checkTx(msg queue.Message) queue.Message { ...@@ -42,7 +77,7 @@ func (mem *Mempool) checkTx(msg queue.Message) queue.Message {
} }
// CheckTxs 初步检查并筛选交易消息 // CheckTxs 初步检查并筛选交易消息
func (mem *Mempool) CheckTxs(msg queue.Message) queue.Message { func (mem *Mempool) checkTxs(msg queue.Message) queue.Message {
// 判断消息是否含有nil交易 // 判断消息是否含有nil交易
if msg.GetData() == nil { if msg.GetData() == nil {
msg.Data = types.ErrEmptyTx msg.Data = types.ErrEmptyTx
...@@ -52,7 +87,7 @@ func (mem *Mempool) CheckTxs(msg queue.Message) queue.Message { ...@@ -52,7 +87,7 @@ func (mem *Mempool) CheckTxs(msg queue.Message) queue.Message {
txmsg := msg.GetData().(*types.Transaction) txmsg := msg.GetData().(*types.Transaction)
//普通的交易 //普通的交易
tx := types.NewTransactionCache(txmsg) tx := types.NewTransactionCache(txmsg)
err := tx.Check(header.GetHeight(), mem.minFee) err := tx.Check(header.GetHeight(), mem.cfg.MinTxFee)
if err != nil { if err != nil {
msg.Data = err msg.Data = err
return msg return msg
...@@ -79,12 +114,12 @@ func (mem *Mempool) CheckTxs(msg queue.Message) queue.Message { ...@@ -79,12 +114,12 @@ func (mem *Mempool) CheckTxs(msg queue.Message) queue.Message {
return msg return msg
} }
// Mempool.checkTxList检查账户余额是否足够,并加入到Mempool,成功则传入goodChan,若加入Mempool失败则传入badChan //checkTxList 检查账户余额是否足够,并加入到Mempool,成功则传入goodChan,若加入Mempool失败则传入badChan
func (mem *Mempool) checkTxRemote(msg queue.Message) queue.Message { func (mem *Mempool) checkTxRemote(msg queue.Message) queue.Message {
tx := msg.GetData().(types.TxGroup) tx := msg.GetData().(types.TxGroup)
txlist := &types.ExecTxList{} txlist := &types.ExecTxList{}
txlist.Txs = append(txlist.Txs, tx.Tx()) txlist.Txs = append(txlist.Txs, tx.Tx())
//检查是否重复
lastheader := mem.GetHeader() lastheader := mem.GetHeader()
txlist.BlockTime = lastheader.BlockTime txlist.BlockTime = lastheader.BlockTime
txlist.Height = lastheader.Height txlist.Height = lastheader.Height
...@@ -92,6 +127,16 @@ func (mem *Mempool) checkTxRemote(msg queue.Message) queue.Message { ...@@ -92,6 +127,16 @@ func (mem *Mempool) checkTxRemote(msg queue.Message) queue.Message {
// 增加这个属性,在执行器中会使用到 // 增加这个属性,在执行器中会使用到
txlist.Difficulty = uint64(lastheader.Difficulty) txlist.Difficulty = uint64(lastheader.Difficulty)
txlist.IsMempool = true txlist.IsMempool = true
//add check dup tx
newtxs, err := util.CheckDupTx(mem.client, txlist.Txs, txlist.Height)
if err != nil {
msg.Data = err
return msg
}
if len(newtxs) != len(txlist.Txs) {
msg.Data = types.ErrDupTx
return msg
}
result, err := mem.checkTxListRemote(txlist) result, err := mem.checkTxListRemote(txlist)
if err != nil { if err != nil {
msg.Data = err msg.Data = err
......
...@@ -6,25 +6,18 @@ package mempool ...@@ -6,25 +6,18 @@ package mempool
import ( import (
"runtime" "runtime"
//log "github.com/33cn/chain33/common/log/log15"
log "github.com/33cn/chain33/common/log/log15"
) )
var ( var (
mlog = log.New("module", "mempool") poolCacheSize int64 = 10240 // mempool容量
poolCacheSize int64 = 10240 // mempool容量 mempoolExpiredInterval int64 = 600 // mempool内交易过期时间,10分钟
mempoolExpiredInterval int64 = 600 // mempool内交易过期时间,10分钟 maxTxNumPerAccount int64 = 100 // TODO 每个账户在mempool中最大交易数量,10
mempoolReSendInterval int64 = 60 // mempool内交易重发时间,1分钟 maxTxLast int64 = 10
mempoolDupResendInterval int64 = 120 // mempool重复交易可再次发送间隔,120秒 processNum int
mempoolAddedTxSize = 102400 // 已添加过的交易缓存大小
maxTxNumPerAccount int64 = 100 // TODO 每个账户在mempool中最大交易数量,10
processNum int
) )
// TODO // TODO
func init() { func init() {
processNum = runtime.NumCPU() processNum = runtime.NumCPU()
//if processNum >= 2 {
//processNum -= 1
//}
} }
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
//mempool 模块的功能:实现交易暂存的功能。主要是解决共识模块可能比rpc模块速度慢的问题。
//模块的接口的设计:
package mempool
import (
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
)
func (mem *Mempool) reply() {
defer mlog.Info("piple line quit")
defer mem.wg.Done()
for m := range mem.out {
if m.Err() != nil {
m.Reply(mem.client.NewMessage("rpc", types.EventReply,
&types.Reply{IsOk: false, Msg: []byte(m.Err().Error())}))
} else {
mem.sendTxToP2P(m.GetData().(types.TxGroup).Tx())
m.Reply(mem.client.NewMessage("rpc", types.EventReply, &types.Reply{IsOk: true, Msg: nil}))
}
}
}
func (mem *Mempool) pipeLine() <-chan queue.Message {
//check sign
step1 := func(data queue.Message) queue.Message {
if data.Err() != nil {
return data
}
return mem.checkSign(data)
}
chs := make([]<-chan queue.Message, processNum)
for i := 0; i < processNum; i++ {
chs[i] = step(mem.done, mem.in, step1)
}
out1 := merge(mem.done, chs)
//checktx remote
step2 := func(data queue.Message) queue.Message {
if data.Err() != nil {
return data
}
return mem.checkTxRemote(data)
}
chs2 := make([]<-chan queue.Message, processNum)
for i := 0; i < processNum; i++ {
chs2[i] = step(mem.done, out1, step2)
}
return merge(mem.done, chs2)
}
// 处理其他模块的消息
func (mem *Mempool) eventProcess() {
defer mem.wg.Done()
defer close(mem.in)
//event process
mem.out = mem.pipeLine()
mlog.Info("mempool piple line start")
mem.wg.Add(1)
go mem.reply()
for msg := range mem.client.Recv() {
mlog.Debug("mempool recv", "msgid", msg.ID, "msg", types.GetEventName(int(msg.Ty)))
beg := types.Now()
switch msg.Ty {
case types.EventTx:
mem.eventTx(msg)
case types.EventGetMempool:
// 消息类型EventGetMempool:获取mempool内所有交易
mem.eventGetMempool(msg)
case types.EventTxList:
// 消息类型EventTxList:获取mempool中一定数量交易
mem.eventTxList(msg)
case types.EventDelTxList:
// 消息类型EventDelTxList:获取mempool中一定数量交易,并把这些交易从mempool中删除
mem.eventDelTxList(msg)
case types.EventAddBlock:
// 消息类型EventAddBlock:将添加到区块内的交易从mempool中删除
mem.eventAddBlock(msg)
case types.EventGetMempoolSize:
// 消息类型EventGetMempoolSize:获取mempool大小
mem.eventGetMempoolSize(msg)
case types.EventGetLastMempool:
// 消息类型EventGetLastMempool:获取最新十条加入到mempool的交易
mem.eventGetLastMempool(msg)
case types.EventDelBlock:
// 回滚区块,把该区块内交易重新加回mempool
mem.eventDelBlock(msg)
case types.EventGetAddrTxs:
// 获取mempool中对应账户(组)所有交易
mem.eventGetAddrTxs(msg)
default:
}
mlog.Debug("mempool", "cost", types.Since(beg), "msg", types.GetEventName(int(msg.Ty)))
}
}
//EventTx 初步筛选后存入mempool
func (mem *Mempool) eventTx(msg queue.Message) {
if !mem.getSync() {
msg.Reply(mem.client.NewMessage("", types.EventReply, &types.Reply{Msg: []byte(types.ErrNotSync.Error())}))
mlog.Error("wrong tx", "err", types.ErrNotSync.Error())
} else {
checkedMsg := mem.checkTxs(msg)
select {
case mem.in <- checkedMsg:
case <-mem.done:
}
}
}
// EventGetMempool 获取Mempool内所有交易
func (mem *Mempool) eventGetMempool(msg queue.Message) {
msg.Reply(mem.client.NewMessage("rpc", types.EventReplyTxList,
&types.ReplyTxList{Txs: mem.filterTxList(0, nil)}))
}
// EventDelTxList 获取Mempool中一定数量交易,并把这些交易从Mempool中删除
func (mem *Mempool) eventDelTxList(msg queue.Message) {
hashList := msg.GetData().(*types.TxHashList)
if len(hashList.GetHashes()) == 0 {
msg.ReplyErr("EventDelTxList", types.ErrSize)
} else {
err := mem.RemoveTxs(hashList)
msg.ReplyErr("EventDelTxList", err)
}
}
// EventTxList 获取mempool中一定数量交易
func (mem *Mempool) eventTxList(msg queue.Message) {
hashList := msg.GetData().(*types.TxHashList)
if hashList.Count <= 0 {
msg.Reply(mem.client.NewMessage("", types.EventReplyTxList, types.ErrSize))
mlog.Error("not an valid size", "msg", msg)
} else {
txList := mem.getTxList(hashList)
msg.Reply(mem.client.NewMessage("", types.EventReplyTxList, &types.ReplyTxList{Txs: txList}))
}
}
// EventAddBlock 将添加到区块内的交易从mempool中删除
func (mem *Mempool) eventAddBlock(msg queue.Message) {
block := msg.GetData().(*types.BlockDetail).Block
if block.Height > mem.Height() || (block.Height == 0 && mem.Height() == 0) {
header := &types.Header{}
header.BlockTime = block.BlockTime
header.Height = block.Height
header.StateHash = block.StateHash
mem.setHeader(header)
}
mem.RemoveTxsOfBlock(block)
}
// EventGetMempoolSize 获取mempool大小
func (mem *Mempool) eventGetMempoolSize(msg queue.Message) {
memSize := int64(mem.Size())
msg.Reply(mem.client.NewMessage("rpc", types.EventMempoolSize,
&types.MempoolSize{Size: memSize}))
}
// EventGetLastMempool 获取最新十条加入到mempool的交易
func (mem *Mempool) eventGetLastMempool(msg queue.Message) {
txList := mem.GetLatestTx()
msg.Reply(mem.client.NewMessage("rpc", types.EventReplyTxList,
&types.ReplyTxList{Txs: txList}))
}
// EventDelBlock 回滚区块,把该区块内交易重新加回mempool
func (mem *Mempool) eventDelBlock(msg queue.Message) {
block := msg.GetData().(*types.BlockDetail).Block
if block.Height != mem.GetHeader().GetHeight() {
return
}
lastHeader, err := mem.GetLastHeader()
if err != nil {
mlog.Error(err.Error())
return
}
h := lastHeader.(queue.Message).Data.(*types.Header)
mem.setHeader(h)
mem.delBlock(block)
}
// eventGetAddrTxs 获取mempool中对应账户(组)所有交易
func (mem *Mempool) eventGetAddrTxs(msg queue.Message) {
addrs := msg.GetData().(*types.ReqAddrs)
txlist := mem.GetAccTxs(addrs)
msg.Reply(mem.client.NewMessage("", types.EventReplyAddrTxs, txlist))
}
func (mem *Mempool) checkSign(data queue.Message) queue.Message {
tx, ok := data.GetData().(types.TxGroup)
if ok && tx.CheckSign() {
return data
}
mlog.Error("wrong tx", "err", types.ErrSign)
data.Data = types.ErrSign
return data
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package init
import (
_ "github.com/33cn/chain33/system/mempool/timeline" //最简单的排队模式,按照时间
)
package mempool
import (
"github.com/33cn/chain33/common/listmap"
"github.com/33cn/chain33/types"
)
//LastTxCache 最后放入cache的交易
type LastTxCache struct {
max int
l *listmap.ListMap
}
//NewLastTxCache 创建最后交易的cache
func NewLastTxCache(size int) *LastTxCache {
return &LastTxCache{
max: size,
l: listmap.New(),
}
}
//GetLatestTx 返回最新十条加入到txCache的交易
func (cache *LastTxCache) GetLatestTx() (txs []*types.Transaction) {
cache.l.Walk(func(v interface{}) bool {
txs = append(txs, v.(*types.Transaction))
return true
})
return txs
}
//Remove remove tx of last cache
func (cache *LastTxCache) Remove(tx *types.Transaction) {
cache.l.Remove(string(tx.Hash()))
}
//Push tx into LastTxCache
func (cache *LastTxCache) Push(tx *types.Transaction) {
if cache.l.Size() >= cache.max {
v := cache.l.GetTop()
if v != nil {
cache.Remove(v.(*types.Transaction))
}
}
cache.l.Push(string(tx.Hash()), tx)
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
import (
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
)
//Create 创建一个mempool模块
type Create func(cfg *types.Mempool, sub []byte) queue.Module
var regMempool = make(map[string]Create)
//Reg 注册一个create
func Reg(name string, create Create) {
if create == nil {
panic("Mempool: Register driver is nil")
}
if _, dup := regMempool[name]; dup {
panic("Mempool: Register called twice for driver " + name)
}
regMempool[name] = create
}
//Load 加载一个create
func Load(name string) (create Create, err error) {
if driver, ok := regMempool[name]; ok {
return driver, nil
}
return nil, types.ErrNotFound
}
...@@ -14,12 +14,17 @@ import ( ...@@ -14,12 +14,17 @@ import (
"github.com/33cn/chain33/common/address" "github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/common/crypto" "github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/limits" "github.com/33cn/chain33/common/limits"
"github.com/33cn/chain33/common/log"
"github.com/33cn/chain33/executor" "github.com/33cn/chain33/executor"
"github.com/33cn/chain33/queue" "github.com/33cn/chain33/queue"
"github.com/33cn/chain33/store" "github.com/33cn/chain33/store"
_ "github.com/33cn/chain33/system" _ "github.com/33cn/chain33/system/consensus/init"
_ "github.com/33cn/chain33/system/crypto/init"
cty "github.com/33cn/chain33/system/dapp/coins/types" cty "github.com/33cn/chain33/system/dapp/coins/types"
_ "github.com/33cn/chain33/system/dapp/init"
_ "github.com/33cn/chain33/system/store/init"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
) )
//----------------------------- data for testing --------------------------------- //----------------------------- data for testing ---------------------------------
...@@ -63,10 +68,6 @@ var blk = &types.Block{ ...@@ -63,10 +68,6 @@ var blk = &types.Block{
Txs: []*types.Transaction{tx3, tx5}, Txs: []*types.Transaction{tx3, tx5},
} }
func mergeList(done <-chan struct{}, cs ...<-chan queue.Message) <-chan queue.Message {
return merge(done, cs)
}
func init() { func init() {
err := limits.SetLimits() err := limits.SetLimits()
if err != nil { if err != nil {
...@@ -74,14 +75,7 @@ func init() { ...@@ -74,14 +75,7 @@ func init() {
} }
random = rand.New(rand.NewSource(types.Now().UnixNano())) random = rand.New(rand.NewSource(types.Now().UnixNano()))
queue.DisableLog() queue.DisableLog()
// DisableLog() // 不输出任何log log.SetLogLevel("err") // 输出WARN(含)以下log
// SetLogLevel("debug") // 输出DBUG(含)以下log
// SetLogLevel("info") // 输出INFO(含)以下log
SetLogLevel("info") // 输出WARN(含)以下log
// SetLogLevel("eror") // 输出EROR(含)以下log
// SetLogLevel("crit") // 输出CRIT(含)以下log
// SetLogLevel("") // 输出所有log
// maxTxNumPerAccount = 10000
mainPriv = getprivkey("CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944") mainPriv = getprivkey("CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944")
tx1.Sign(types.SECP256K1, privKey) tx1.Sign(types.SECP256K1, privKey)
tx2.Sign(types.SECP256K1, privKey) tx2.Sign(types.SECP256K1, privKey)
...@@ -98,7 +92,6 @@ func init() { ...@@ -98,7 +92,6 @@ func init() {
tx13.Sign(types.SECP256K1, privKey) tx13.Sign(types.SECP256K1, privKey)
tx14.Sign(types.SECP256K1, privKey) tx14.Sign(types.SECP256K1, privKey)
tx15.Sign(types.SECP256K1, privKey) tx15.Sign(types.SECP256K1, privKey)
} }
func getprivkey(key string) crypto.PrivKey { func getprivkey(key string) crypto.PrivKey {
...@@ -119,7 +112,8 @@ func getprivkey(key string) crypto.PrivKey { ...@@ -119,7 +112,8 @@ func getprivkey(key string) crypto.PrivKey {
func initEnv3() (queue.Queue, queue.Module, queue.Module, *Mempool) { func initEnv3() (queue.Queue, queue.Module, queue.Module, *Mempool) {
var q = queue.New("channel") var q = queue.New("channel")
cfg, sub := types.InitCfg("../cmd/chain33/chain33.test.toml") cfg, sub := types.InitCfg("../../cmd/chain33/chain33.test.toml")
types.Init(cfg.Title, cfg)
cfg.Consensus.Minerstart = false cfg.Consensus.Minerstart = false
chain := blockchain.New(cfg.BlockChain) chain := blockchain.New(cfg.BlockChain)
chain.SetQueueClient(q.Client()) chain.SetQueueClient(q.Client())
...@@ -130,43 +124,45 @@ func initEnv3() (queue.Queue, queue.Module, queue.Module, *Mempool) { ...@@ -130,43 +124,45 @@ func initEnv3() (queue.Queue, queue.Module, queue.Module, *Mempool) {
types.SetMinFee(0) types.SetMinFee(0)
s := store.New(cfg.Store, sub.Store) s := store.New(cfg.Store, sub.Store)
s.SetQueueClient(q.Client()) s.SetQueueClient(q.Client())
mem := New(cfg.MemPool) mem := NewMempool(cfg.Mempool)
mem.SetQueueCache(NewSimpleQueue(int(cfg.Mempool.PoolCacheSize)))
mem.SetQueueClient(q.Client()) mem.SetQueueClient(q.Client())
mem.setSync(true) mem.Wait()
mem.WaitPollLastHeader()
return q, chain, s, mem return q, chain, s, mem
} }
func initEnv2(size int) (queue.Queue, *Mempool) { func initEnv2(size int) (queue.Queue, *Mempool) {
var q = queue.New("channel") var q = queue.New("channel")
cfg, _ := types.InitCfg("../cmd/chain33/chain33.test.toml") cfg, _ := types.InitCfg("../../cmd/chain33/chain33.test.toml")
types.Init(cfg.Title, cfg)
blockchainProcess(q) blockchainProcess(q)
execProcess(q) execProcess(q)
mem := New(cfg.MemPool) cfg.Mempool.PoolCacheSize = int64(size)
mem := NewMempool(cfg.Mempool)
mem.SetQueueCache(NewSimpleQueue(size))
mem.SetQueueClient(q.Client()) mem.SetQueueClient(q.Client())
mem.setSync(true) mem.setSync(true)
if size > 0 {
mem.Resize(size)
}
mem.SetMinFee(0) mem.SetMinFee(0)
mem.WaitPollLastHeader() mem.Wait()
return q, mem return q, mem
} }
func initEnv(size int) (queue.Queue, *Mempool) { func initEnv(size int) (queue.Queue, *Mempool) {
if size == 0 {
size = 100
}
var q = queue.New("channel") var q = queue.New("channel")
cfg, _ := types.InitCfg("../cmd/chain33/chain33.test.toml") cfg, _ := types.InitCfg("../../cmd/chain33/chain33.test.toml")
types.Init(cfg.Title, cfg)
blockchainProcess(q) blockchainProcess(q)
execProcess(q) execProcess(q)
mem := New(cfg.MemPool) cfg.Mempool.PoolCacheSize = int64(size)
mem := NewMempool(cfg.Mempool)
mem.SetQueueCache(NewSimpleQueue(size))
mem.SetQueueClient(q.Client()) mem.SetQueueClient(q.Client())
mem.setSync(true) mem.setSync(true)
if size > 0 {
mem.Resize(size)
}
mem.SetMinFee(types.GInt("MinFee")) mem.SetMinFee(types.GInt("MinFee"))
mem.WaitPollLastHeader() mem.Wait()
return q, mem return q, mem
} }
...@@ -211,21 +207,19 @@ func TestAddEmptyTx(t *testing.T) { ...@@ -211,21 +207,19 @@ func TestAddEmptyTx(t *testing.T) {
} }
func TestAddTx(t *testing.T) { func TestAddTx(t *testing.T) {
q, mem := initEnv(0) q, mem := initEnv(1)
defer q.Close() defer q.Close()
defer mem.Close() defer mem.Close()
msg := mem.client.NewMessage("mempool", types.EventTx, tx2) msg := mem.client.NewMessage("mempool", types.EventTx, tx2)
mem.client.Send(msg, true) mem.client.Send(msg, true)
mem.client.Wait(msg) mem.client.Wait(msg)
if mem.Size() != 1 { if mem.Size() != 1 {
t.Error("TestAddTx failed") t.Error("TestAddTx failed")
} }
} }
func TestAddDuplicatedTx(t *testing.T) { func TestAddDuplicatedTx(t *testing.T) {
q, mem := initEnv(0) q, mem := initEnv(100)
defer q.Close() defer q.Close()
defer mem.Close() defer mem.Close()
...@@ -330,47 +324,16 @@ func add10Tx(client queue.Client) error { ...@@ -330,47 +324,16 @@ func add10Tx(client queue.Client) error {
if err != nil { if err != nil {
return err return err
} }
txs := []*types.Transaction{tx5, tx6, tx7, tx8, tx9, tx10}
msg5 := client.NewMessage("mempool", types.EventTx, tx5) for _, tx := range txs {
msg6 := client.NewMessage("mempool", types.EventTx, tx6) msg := client.NewMessage("mempool", types.EventTx, tx)
msg7 := client.NewMessage("mempool", types.EventTx, tx7) client.Send(msg, true)
msg8 := client.NewMessage("mempool", types.EventTx, tx8) _, err = client.Wait(msg)
msg9 := client.NewMessage("mempool", types.EventTx, tx9) if err != nil {
msg10 := client.NewMessage("mempool", types.EventTx, tx10) return err
}
client.Send(msg5, true)
_, err = client.Wait(msg5)
if err != nil {
return err
}
client.Send(msg6, true)
_, err = client.Wait(msg6)
if err != nil {
return err
}
client.Send(msg7, true)
_, err = client.Wait(msg7)
if err != nil {
return err
}
client.Send(msg8, true)
_, err = client.Wait(msg8)
if err != nil {
return err
}
client.Send(msg9, true)
_, err = client.Wait(msg9)
if err != nil {
return err
} }
return nil
client.Send(msg10, true)
_, err = client.Wait(msg10)
return err
} }
func TestGetTxList(t *testing.T) { func TestGetTxList(t *testing.T) {
...@@ -471,8 +434,8 @@ func TestAddMoreTxThanPoolSize(t *testing.T) { ...@@ -471,8 +434,8 @@ func TestAddMoreTxThanPoolSize(t *testing.T) {
mem.client.Send(msg5, true) mem.client.Send(msg5, true)
mem.client.Wait(msg5) mem.client.Wait(msg5)
if mem.Size() != 4 || mem.cache.Exists(tx5.Hash()) { if mem.Size() != 4 || mem.cache.Exist(string(tx5.Hash())) {
t.Error("TestAddMoreTxThanPoolSize failed", mem.Size(), mem.cache.Exists(tx5.Hash())) t.Error("TestAddMoreTxThanPoolSize failed", mem.Size(), mem.cache.Exist(string(tx5.Hash())))
} }
} }
...@@ -513,30 +476,18 @@ func TestAddBlockedTx(t *testing.T) { ...@@ -513,30 +476,18 @@ func TestAddBlockedTx(t *testing.T) {
msg1 := mem.client.NewMessage("mempool", types.EventTx, tx3) msg1 := mem.client.NewMessage("mempool", types.EventTx, tx3)
err := mem.client.Send(msg1, true) err := mem.client.Send(msg1, true)
if err != nil { assert.Nil(t, err)
t.Error(err)
return
}
_, err = mem.client.Wait(msg1) _, err = mem.client.Wait(msg1)
if err != nil { assert.Nil(t, err)
t.Error(err)
return
}
blkDetail := &types.BlockDetail{Block: blk} blkDetail := &types.BlockDetail{Block: blk}
msg2 := mem.client.NewMessage("mempool", types.EventAddBlock, blkDetail) msg2 := mem.client.NewMessage("mempool", types.EventAddBlock, blkDetail)
mem.client.Send(msg2, false) mem.client.Send(msg2, false)
msg3 := mem.client.NewMessage("mempool", types.EventTx, tx3) msg3 := mem.client.NewMessage("mempool", types.EventTx, tx3)
err = mem.client.Send(msg3, true) err = mem.client.Send(msg3, true)
if err != nil { assert.Nil(t, err)
t.Error(err)
return
}
resp, err := mem.client.Wait(msg3) resp, err := mem.client.Wait(msg3)
if err != nil { assert.Nil(t, err)
t.Error(err)
return
}
if string(resp.GetData().(*types.Reply).GetMsg()) != types.ErrDupTx.Error() { if string(resp.GetData().(*types.Reply).GetMsg()) != types.ErrDupTx.Error() {
t.Error("TestAddBlockedTx failed") t.Error("TestAddBlockedTx failed")
} }
...@@ -553,7 +504,7 @@ func TestDuplicateMempool(t *testing.T) { ...@@ -553,7 +504,7 @@ func TestDuplicateMempool(t *testing.T) {
t.Error("add tx error", err.Error()) t.Error("add tx error", err.Error())
return return
} }
assert.Equal(t, mem.Size(), 10)
msg := mem.client.NewMessage("mempool", types.EventGetMempool, nil) msg := mem.client.NewMessage("mempool", types.EventGetMempool, nil)
mem.client.Send(msg, true) mem.client.Send(msg, true)
...@@ -674,6 +625,23 @@ func TestCheckExpire2(t *testing.T) { ...@@ -674,6 +625,23 @@ func TestCheckExpire2(t *testing.T) {
} }
} }
func TestCheckExpire3(t *testing.T) {
q, mem := initEnv(0)
defer q.Close()
defer mem.Close()
// add tx
err := add4Tx(mem.client)
if err != nil {
t.Error("add tx error", err.Error())
return
}
mem.setHeader(&types.Header{Height: 50, BlockTime: 1e9 + 1})
assert.Equal(t, mem.Size(), 4)
mem.removeExpired()
assert.Equal(t, mem.Size(), 3)
}
func TestWrongToAddr(t *testing.T) { func TestWrongToAddr(t *testing.T) {
q, mem := initEnv(0) q, mem := initEnv(0)
defer q.Close() defer q.Close()
...@@ -773,9 +741,7 @@ func TestAddTxGroup(t *testing.T) { ...@@ -773,9 +741,7 @@ func TestAddTxGroup(t *testing.T) {
q, mem := initEnv(0) q, mem := initEnv(0)
defer q.Close() defer q.Close()
defer mem.Close() defer mem.Close()
//copytx //copytx
ctx2 := *tx2 ctx2 := *tx2
ctx3 := *tx3 ctx3 := *tx3
ctx4 := *tx4 ctx4 := *tx4
...@@ -790,10 +756,9 @@ func TestAddTxGroup(t *testing.T) { ...@@ -790,10 +756,9 @@ func TestAddTxGroup(t *testing.T) {
} }
func BenchmarkMempool(b *testing.B) { func BenchmarkMempool(b *testing.B) {
q, mem := initEnv(0) q, mem := initEnv(10240)
defer q.Close() defer q.Close()
defer mem.Close() defer mem.Close()
maxTxNumPerAccount = 100000 maxTxNumPerAccount = 100000
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
to, _ := genaddress() to, _ := genaddress()
...@@ -813,6 +778,7 @@ func BenchmarkMempool(b *testing.B) { ...@@ -813,6 +778,7 @@ func BenchmarkMempool(b *testing.B) {
} }
func blockchainProcess(q queue.Queue) { func blockchainProcess(q queue.Queue) {
dup := make(map[string]bool)
go func() { go func() {
client := q.Client() client := q.Client()
client.Sub("blockchain") client.Sub("blockchain")
...@@ -821,6 +787,17 @@ func blockchainProcess(q queue.Queue) { ...@@ -821,6 +787,17 @@ func blockchainProcess(q queue.Queue) {
msg.Reply(client.NewMessage("", types.EventHeader, &types.Header{Height: 1, BlockTime: 1})) msg.Reply(client.NewMessage("", types.EventHeader, &types.Header{Height: 1, BlockTime: 1}))
} else if msg.Ty == types.EventIsSync { } else if msg.Ty == types.EventIsSync {
msg.Reply(client.NewMessage("", types.EventReplyIsSync, &types.IsCaughtUp{Iscaughtup: true})) msg.Reply(client.NewMessage("", types.EventReplyIsSync, &types.IsCaughtUp{Iscaughtup: true}))
} else if msg.Ty == types.EventTxHashList {
txs := msg.Data.(*types.TxHashList)
var hashlist [][]byte
for _, hash := range txs.Hashes {
if dup[string(hash)] {
hashlist = append(hashlist, hash)
continue
}
dup[string(hash)] = true
}
msg.Reply(client.NewMessage("consensus", types.EventTxHashListReply, &types.TxHashList{Hashes: hashlist}))
} }
} }
}() }()
......
...@@ -101,3 +101,7 @@ func BenchmarkStepMerge(b *testing.B) { ...@@ -101,3 +101,7 @@ func BenchmarkStepMerge(b *testing.B) {
} }
close(done) close(done)
} }
func mergeList(done <-chan struct{}, cs ...<-chan queue.Message) <-chan queue.Message {
return merge(done, cs)
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
import (
"github.com/33cn/chain33/common/listmap"
"github.com/33cn/chain33/types"
)
//SimpleQueue 简单队列模式(默认提供一个队列,便于测试)
type SimpleQueue struct {
txList *listmap.ListMap
maxsize int
}
//NewSimpleQueue 创建队列
func NewSimpleQueue(cacheSize int) *SimpleQueue {
return &SimpleQueue{
txList: listmap.New(),
maxsize: cacheSize,
}
}
//Exist 是否存在
func (cache *SimpleQueue) Exist(hash string) bool {
return cache.txList.Exist(hash)
}
//GetItem 获取数据通过 key
func (cache *SimpleQueue) GetItem(hash string) (*Item, error) {
item, err := cache.txList.GetItem(hash)
if err != nil {
return nil, err
}
return item.(*Item), nil
}
// Push 把给定tx添加到SimpleQueue;如果tx已经存在SimpleQueue中或Mempool已满则返回对应error
func (cache *SimpleQueue) Push(tx *Item) error {
hash := tx.Value.Hash()
if cache.Exist(string(hash)) {
return types.ErrTxExist
}
if cache.txList.Size() >= cache.maxsize {
return types.ErrMemFull
}
cache.txList.Push(string(hash), tx)
return nil
}
// Remove 删除数据
func (cache *SimpleQueue) Remove(hash string) error {
cache.txList.Remove(hash)
return nil
}
// Size 数据总数
func (cache *SimpleQueue) Size() int {
return cache.txList.Size()
}
// Walk 遍历整个队列
func (cache *SimpleQueue) Walk(count int, cb func(value *Item) bool) {
i := 0
cache.txList.Walk(func(item interface{}) bool {
if !cb(item.(*Item)) {
return false
}
i++
return i != count
})
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
import (
"testing"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
func TestCache(t *testing.T) {
cache := NewSimpleQueue(1)
tx := &types.Transaction{Payload: []byte("123")}
hash := string(tx.Hash())
assert.Equal(t, false, cache.Exist(hash))
item1 := &Item{Value: tx, Priority: tx.Fee, EnterTime: types.Now().Unix()}
err := cache.Push(item1)
assert.Nil(t, err)
assert.Equal(t, true, cache.Exist(hash))
it, err := cache.GetItem(hash)
assert.Nil(t, err)
assert.Equal(t, item1, it)
_, err = cache.GetItem(hash + ":")
assert.Equal(t, types.ErrNotFound, err)
err = cache.Push(item1)
assert.Equal(t, types.ErrTxExist, err)
tx2 := &types.Transaction{Payload: []byte("1234")}
item2 := &Item{Value: tx2, Priority: tx.Fee, EnterTime: types.Now().Unix()}
err = cache.Push(item2)
assert.Equal(t, types.ErrMemFull, err)
cache.Remove(hash)
assert.Equal(t, 0, cache.Size())
//push to item
cache = NewSimpleQueue(2)
cache.Push(item1)
cache.Push(item2)
assert.Equal(t, 2, cache.Size())
var data [2]*Item
i := 0
cache.Walk(1, func(value *Item) bool {
data[i] = value
i++
return true
})
assert.Equal(t, 1, i)
assert.Equal(t, data[0], item1)
i = 0
cache.Walk(2, func(value *Item) bool {
data[i] = value
i++
return true
})
assert.Equal(t, 2, i)
assert.Equal(t, data[0], item1)
assert.Equal(t, data[1], item2)
i = 0
cache.Walk(2, func(value *Item) bool {
data[i] = value
i++
return false
})
assert.Equal(t, 1, i)
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found In the LICENSE file.
package timeline
import (
"github.com/33cn/chain33/queue"
drivers "github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
func init() {
drivers.Reg("timeline", New)
}
type subConfig struct {
PoolCacheSize int64 `json:"poolCacheSize"`
}
//New 创建timeline cache 结构的 mempool
func New(cfg *types.Mempool, sub []byte) queue.Module {
c := drivers.NewMempool(cfg)
var subcfg subConfig
types.MustDecode(sub, &subcfg)
if subcfg.PoolCacheSize == 0 {
subcfg.PoolCacheSize = cfg.PoolCacheSize
}
c.SetQueueCache(drivers.NewSimpleQueue(int(subcfg.PoolCacheSize)))
return c
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package timeline
import (
"encoding/json"
"testing"
"github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
func TestNewMempool(t *testing.T) {
sub, _ := json.Marshal(&subConfig{PoolCacheSize: 2})
module := New(&types.Mempool{}, sub)
mem := module.(*mempool.Mempool)
mem.Close()
}
...@@ -84,6 +84,9 @@ func (store *BaseStore) SetQueueClient(c queue.Client) { ...@@ -84,6 +84,9 @@ func (store *BaseStore) SetQueueClient(c queue.Client) {
}() }()
} }
//Wait wait for basestore ready
func (store *BaseStore) Wait() {}
func (store *BaseStore) processMessage(msg queue.Message) { func (store *BaseStore) processMessage(msg queue.Message) {
client := store.qclient client := store.qclient
if msg.Ty == types.EventStoreSet { if msg.Ty == types.EventStoreSet {
...@@ -216,14 +219,11 @@ func (t *StorelistQuery) IterateCallBack(key, value []byte) bool { ...@@ -216,14 +219,11 @@ func (t *StorelistQuery) IterateCallBack(key, value []byte) bool {
return false return false
} }
return false return false
} }
return false return false
} }
slog.Error("StoreListReply.IterateCallBack unsupported mode", "mode", t.Mode) slog.Error("StoreListReply.IterateCallBack unsupported mode", "mode", t.Mode)
return true return true
} }
func cloneByte(v []byte) []byte { func cloneByte(v []byte) []byte {
......
...@@ -11,7 +11,7 @@ type Config struct { ...@@ -11,7 +11,7 @@ type Config struct {
Log *Log `protobuf:"bytes,2,opt,name=log" json:"log,omitempty"` Log *Log `protobuf:"bytes,2,opt,name=log" json:"log,omitempty"`
Store *Store `protobuf:"bytes,3,opt,name=store" json:"store,omitempty"` Store *Store `protobuf:"bytes,3,opt,name=store" json:"store,omitempty"`
Consensus *Consensus `protobuf:"bytes,5,opt,name=consensus" json:"consensus,omitempty"` Consensus *Consensus `protobuf:"bytes,5,opt,name=consensus" json:"consensus,omitempty"`
MemPool *MemPool `protobuf:"bytes,6,opt,name=memPool" json:"memPool,omitempty"` Mempool *Mempool `protobuf:"bytes,6,opt,name=mempool" json:"memPool,omitempty"`
BlockChain *BlockChain `protobuf:"bytes,7,opt,name=blockChain" json:"blockChain,omitempty"` BlockChain *BlockChain `protobuf:"bytes,7,opt,name=blockChain" json:"blockChain,omitempty"`
Wallet *Wallet `protobuf:"bytes,8,opt,name=wallet" json:"wallet,omitempty"` Wallet *Wallet `protobuf:"bytes,8,opt,name=wallet" json:"wallet,omitempty"`
P2P *P2P `protobuf:"bytes,9,opt,name=p2p" json:"p2p,omitempty"` P2P *P2P `protobuf:"bytes,9,opt,name=p2p" json:"p2p,omitempty"`
...@@ -52,12 +52,14 @@ type Log struct { ...@@ -52,12 +52,14 @@ type Log struct {
CallerFunction bool `protobuf:"varint,10,opt,name=callerFunction" json:"callerFunction,omitempty"` CallerFunction bool `protobuf:"varint,10,opt,name=callerFunction" json:"callerFunction,omitempty"`
} }
// MemPool 配置 // Mempool 配置
type MemPool struct { type Mempool struct {
PoolCacheSize int64 `protobuf:"varint,1,opt,name=poolCacheSize" json:"poolCacheSize,omitempty"` Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
MinTxFee int64 `protobuf:"varint,2,opt,name=minTxFee" json:"minTxFee,omitempty"` PoolCacheSize int64 `protobuf:"varint,1,opt,name=poolCacheSize" json:"poolCacheSize,omitempty"`
ForceAccept bool `protobuf:"varint,3,opt,name=forceAccept" json:"forceAccept,omitempty"` MinTxFee int64 `protobuf:"varint,2,opt,name=minTxFee" json:"minTxFee,omitempty"`
MaxTxNumPerAccount int64 `protobuf:"varint,4,opt,name=maxTxNumPerAccount" json:"maxTxNumPerAccount,omitempty"` ForceAccept bool `protobuf:"varint,3,opt,name=forceAccept" json:"forceAccept,omitempty"`
MaxTxNumPerAccount int64 `protobuf:"varint,4,opt,name=maxTxNumPerAccount" json:"maxTxNumPerAccount,omitempty"`
MaxTxLast int64 `protobuf:"varint,4,opt,name=maxTxLast" json:"maxTxLast,omitempty"`
} }
// Consensus 配置 // Consensus 配置
......
...@@ -225,6 +225,13 @@ func S(key string, value interface{}) { ...@@ -225,6 +225,13 @@ func S(key string, value interface{}) {
setChainConfig(key, value) setChainConfig(key, value)
} }
//SetTitleOnlyForTest set title only for test use
func SetTitleOnlyForTest(ti string) {
mu.Lock()
defer mu.Unlock()
title = ti
}
// Init 初始化 // Init 初始化
func Init(t string, cfg *Config) { func Init(t string, cfg *Config) {
mu.Lock() mu.Lock()
...@@ -243,7 +250,7 @@ func Init(t string, cfg *Config) { ...@@ -243,7 +250,7 @@ func Init(t string, cfg *Config) {
} else { } else {
setTestNet(cfg.TestNet) setTestNet(cfg.TestNet)
} }
if cfg.Exec.MinExecFee > cfg.MemPool.MinTxFee || cfg.MemPool.MinTxFee > cfg.Wallet.MinFee { if cfg.Exec.MinExecFee > cfg.Mempool.MinTxFee || cfg.Mempool.MinTxFee > cfg.Wallet.MinFee {
panic("config must meet: wallet.minFee >= mempool.minTxFee >= exec.minExecFee") panic("config must meet: wallet.minFee >= mempool.minTxFee >= exec.minExecFee")
} }
setMinFee(cfg.Exec.MinExecFee) setMinFee(cfg.Exec.MinExecFee)
...@@ -303,7 +310,6 @@ func SetMinFee(fee int64) { ...@@ -303,7 +310,6 @@ func SetMinFee(fee int64) {
} }
func isPara() bool { func isPara() bool {
//user.p.guodun.
return strings.Count(title, ".") == 3 && strings.HasPrefix(title, ParaKeyX) return strings.Count(title, ".") == 3 && strings.HasPrefix(title, ParaKeyX)
} }
...@@ -524,6 +530,7 @@ type subModule struct { ...@@ -524,6 +530,7 @@ type subModule struct {
Exec map[string]interface{} Exec map[string]interface{}
Consensus map[string]interface{} Consensus map[string]interface{}
Wallet map[string]interface{} Wallet map[string]interface{}
Mempool map[string]interface{}
} }
func readFile(path string) string { func readFile(path string) string {
...@@ -548,6 +555,7 @@ func parseSubModule(cfg *subModule) (*ConfigSubModule, error) { ...@@ -548,6 +555,7 @@ func parseSubModule(cfg *subModule) (*ConfigSubModule, error) {
subcfg.Exec = parseItem(cfg.Exec) subcfg.Exec = parseItem(cfg.Exec)
subcfg.Consensus = parseItem(cfg.Consensus) subcfg.Consensus = parseItem(cfg.Consensus)
subcfg.Wallet = parseItem(cfg.Wallet) subcfg.Wallet = parseItem(cfg.Wallet)
subcfg.Mempool = parseItem(cfg.Mempool)
return &subcfg, nil return &subcfg, nil
} }
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
) )
var slash = []byte("-") var slash = []byte("-")
var sharp = []byte("#")
//Debug 调试开关 //Debug 调试开关
var Debug = false var Debug = false
......
...@@ -109,8 +109,8 @@ func CallExecNewTx(execName, action string, param interface{}) ([]byte, error) { ...@@ -109,8 +109,8 @@ func CallExecNewTx(execName, action string, param interface{}) ([]byte, error) {
return FormatTxEncode(execName, tx) return FormatTxEncode(execName, tx)
} }
// CallCreateTx 构造交易信息 //CallCreateTransaction 创建一个交易
func CallCreateTx(execName, action string, param Message) ([]byte, error) { func CallCreateTransaction(execName, action string, param Message) (*Transaction, error) {
exec := LoadExecutorType(execName) exec := LoadExecutorType(execName)
if exec == nil { if exec == nil {
tlog.Error("CallCreateTx", "Error", "exec not found") tlog.Error("CallCreateTx", "Error", "exec not found")
...@@ -121,9 +121,13 @@ func CallCreateTx(execName, action string, param Message) ([]byte, error) { ...@@ -121,9 +121,13 @@ func CallCreateTx(execName, action string, param Message) ([]byte, error) {
tlog.Error("CallCreateTx", "Error", "param in nil") tlog.Error("CallCreateTx", "Error", "param in nil")
return nil, ErrInvalidParam return nil, ErrInvalidParam
} }
tx, err := exec.Create(action, param) return exec.Create(action, param)
}
// CallCreateTx 构造交易信息
func CallCreateTx(execName, action string, param Message) ([]byte, error) {
tx, err := CallCreateTransaction(execName, action, param)
if err != nil { if err != nil {
tlog.Error("CallCreateTx", "Error", err)
return nil, err return nil, err
} }
return FormatTxEncode(execName, tx) return FormatTxEncode(execName, tx)
......
...@@ -132,5 +132,4 @@ func TestCallCreateTx(t *testing.T) { ...@@ -132,5 +132,4 @@ func TestCallCreateTx(t *testing.T) {
assert.Equal(t, tx.Execer, []byte("manage")) assert.Equal(t, tx.Execer, []byte("manage"))
fee, _ = tx.GetRealFee(GInt("MinFee")) fee, _ = tx.GetRealFee(GInt("MinFee"))
assert.Equal(t, tx.Fee, fee) assert.Equal(t, tx.Fee, fee)
} }
...@@ -91,7 +91,7 @@ func (f *Forks) GetFork(title, key string) int64 { ...@@ -91,7 +91,7 @@ func (f *Forks) GetFork(title, key string) int64 {
if title == "local" { if title == "local" {
panic("title not exisit -> " + title) panic("title not exisit -> " + title)
} else { } else {
tlog.Error("getfork title not exisit -> " + title) tlog.Error("getfork title not exisit -> ", "title", title, "key", key)
} }
return MaxHeight return MaxHeight
} }
......
...@@ -49,6 +49,7 @@ message CreateTx { ...@@ -49,6 +49,7 @@ message CreateTx {
bool isToken = 6; bool isToken = 6;
string tokenSymbol = 7; string tokenSymbol = 7;
string execName = 8; string execName = 8;
string execer = 9;
} }
message CreateTransactionGroup { message CreateTransactionGroup {
......
...@@ -10,4 +10,5 @@ type ConfigSubModule struct { ...@@ -10,4 +10,5 @@ type ConfigSubModule struct {
Exec map[string][]byte Exec map[string][]byte
Consensus map[string][]byte Consensus map[string][]byte
Wallet map[string][]byte Wallet map[string][]byte
Mempool map[string][]byte
} }
...@@ -41,6 +41,9 @@ type TxGroup interface { ...@@ -41,6 +41,9 @@ type TxGroup interface {
//ExecName 执行器name //ExecName 执行器name
func ExecName(name string) string { func ExecName(name string) string {
if len(name) > 1 && name[0] == '#' {
return name[1:]
}
if IsParaExecName(name) { if IsParaExecName(name) {
return name return name
} }
...@@ -61,7 +64,7 @@ func IsAllowExecName(name []byte, execer []byte) bool { ...@@ -61,7 +64,7 @@ func IsAllowExecName(name []byte, execer []byte) bool {
return false return false
} }
// name中不允许有 "-" // name中不允许有 "-"
if bytes.Contains(name, slash) { if bytes.Contains(name, slash) || bytes.Contains(name, sharp) {
return false return false
} }
if !bytes.Equal(name, execer) && !bytes.Equal(name, GetRealExecName(execer)) { if !bytes.Equal(name, execer) && !bytes.Equal(name, GetRealExecName(execer)) {
...@@ -435,6 +438,9 @@ func MustPBToJSON(req Message) []byte { ...@@ -435,6 +438,9 @@ func MustPBToJSON(req Message) []byte {
// MustDecode 数据是否已经编码 // MustDecode 数据是否已经编码
func MustDecode(data []byte, v interface{}) { func MustDecode(data []byte, v interface{}) {
if data == nil {
return
}
err := json.Unmarshal(data, v) err := json.Unmarshal(data, v)
if err != nil { if err != nil {
panic(err) panic(err)
......
...@@ -5,5 +5,48 @@ ...@@ -5,5 +5,48 @@
package types_test package types_test
import ( import (
"testing"
"github.com/33cn/chain33/common/address"
_ "github.com/33cn/chain33/system" _ "github.com/33cn/chain33/system"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
) )
//how to create transafer for para
func TestCallCreateTxPara(t *testing.T) {
ti := types.GetTitle()
defer types.SetTitleOnlyForTest(ti)
types.SetTitleOnlyForTest("user.p.sto.")
req := &types.CreateTx{
To: "184wj4nsgVxKyz2NhM3Yb5RK5Ap6AFRFq2",
Amount: 10,
Fee: 1,
Note: []byte("12312"),
IsWithdraw: false,
IsToken: false,
TokenSymbol: "",
ExecName: types.ExecName("coins"),
}
assert.True(t, types.IsPara())
tx, err := types.CallCreateTransaction("coins", "", req)
assert.Nil(t, err)
tx, err = types.FormatTx("coins", tx)
assert.Nil(t, err)
assert.Equal(t, "coins", string(tx.Execer))
assert.Equal(t, address.ExecAddress("coins"), tx.To)
tx, err = types.FormatTx(types.ExecName("coins"), tx)
assert.Nil(t, err)
assert.Equal(t, "user.p.sto.coins", string(tx.Execer))
assert.Equal(t, address.ExecAddress("user.p.sto.coins"), tx.To)
}
func TestExecName(t *testing.T) {
assert.Equal(t, types.ExecName("coins"), "coins")
ti := types.GetTitle()
defer types.SetTitleOnlyForTest(ti)
types.SetTitleOnlyForTest("user.p.sto.")
assert.Equal(t, types.ExecName("coins"), "user.p.sto.coins")
//#在exec前面加一个 # 表示不重写执行器
assert.Equal(t, types.ExecName("#coins"), "coins")
}
...@@ -46,6 +46,12 @@ func TestAllowExecName(t *testing.T) { ...@@ -46,6 +46,12 @@ func TestAllowExecName(t *testing.T) {
isok = IsAllowExecName([]byte("coins"), []byte("user.p.guodun.user.coins")) isok = IsAllowExecName([]byte("coins"), []byte("user.p.guodun.user.coins"))
assert.Equal(t, isok, true) assert.Equal(t, isok, true)
isok = IsAllowExecName([]byte("#coins"), []byte("user.p.guodun.user.coins"))
assert.Equal(t, isok, false)
isok = IsAllowExecName([]byte("coins-"), []byte("user.p.guodun.user.coins"))
assert.Equal(t, isok, false)
} }
func BenchmarkExecName(b *testing.B) { func BenchmarkExecName(b *testing.B) {
......
...@@ -130,7 +130,7 @@ func RunChain33(name string) { ...@@ -130,7 +130,7 @@ func RunChain33(name string) {
q := queue.New("channel") q := queue.New("channel")
log.Info("loading mempool module") log.Info("loading mempool module")
mem := mempool.New(cfg.MemPool) mem := mempool.New(cfg.Mempool, sub.Mempool)
mem.SetQueueClient(q.Client()) mem.SetQueueClient(q.Client())
log.Info("loading execs module") log.Info("loading execs module")
......
...@@ -26,7 +26,7 @@ var rootCmd = &cobra.Command{ ...@@ -26,7 +26,7 @@ var rootCmd = &cobra.Command{
var sendCmd = &cobra.Command{ var sendCmd = &cobra.Command{
Use: "send", Use: "send",
Short: "Send transaction in one move", Short: "Send transaction in one step",
Run: func(cmd *cobra.Command, args []string) {}, Run: func(cmd *cobra.Command, args []string) {},
} }
......
...@@ -105,6 +105,22 @@ func checkTxDupInner(txs []*types.TransactionCache) (ret []*types.TransactionCac ...@@ -105,6 +105,22 @@ func checkTxDupInner(txs []*types.TransactionCache) (ret []*types.TransactionCac
return ret return ret
} }
//CheckDupTx : check use txs []*types.Transaction and not []*types.TransactionCache
func CheckDupTx(client queue.Client, txs []*types.Transaction, height int64) (transactions []*types.Transaction, err error) {
txcache := make([]*types.TransactionCache, len(txs))
for i := 0; i < len(txcache); i++ {
txcache[i] = &types.TransactionCache{Transaction: txs[i]}
}
cache, err := CheckTxDup(client, txcache, height)
if err != nil {
return nil, err
}
for i := 0; i < len(cache); i++ {
transactions = append(transactions, cache[i].Transaction)
}
return transactions, nil
}
//CheckTxDup : check whether the tx is duplicated within the while chain //CheckTxDup : check whether the tx is duplicated within the while chain
func CheckTxDup(client queue.Client, txs []*types.TransactionCache, height int64) (transactions []*types.TransactionCache, err error) { func CheckTxDup(client queue.Client, txs []*types.TransactionCache, height int64) (transactions []*types.TransactionCache, err error) {
var checkHashList types.TxHashList var checkHashList types.TxHashList
......
...@@ -68,6 +68,7 @@ jrpcFuncWhitelist=["*"] ...@@ -68,6 +68,7 @@ jrpcFuncWhitelist=["*"]
grpcFuncWhitelist=["*"] grpcFuncWhitelist=["*"]
[mempool] [mempool]
name="timeline"
poolCacheSize=10240 poolCacheSize=10240
minTxFee=100000 minTxFee=100000
maxTxNumPerAccount=10000 maxTxNumPerAccount=10000
......
...@@ -57,7 +57,7 @@ type Chain33Mock struct { ...@@ -57,7 +57,7 @@ type Chain33Mock struct {
client queue.Client client queue.Client
api client.QueueProtocolAPI api client.QueueProtocolAPI
chain *blockchain.BlockChain chain *blockchain.BlockChain
mem *mempool.Mempool mem queue.Module
cs queue.Module cs queue.Module
exec *executor.Executor exec *executor.Executor
wallet queue.Module wallet queue.Module
...@@ -103,10 +103,10 @@ func newWithConfig(cfg *types.Config, sub *types.ConfigSubModule, mockapi client ...@@ -103,10 +103,10 @@ func newWithConfig(cfg *types.Config, sub *types.ConfigSubModule, mockapi client
mock.cs.SetQueueClient(q.Client()) mock.cs.SetQueueClient(q.Client())
lognode.Info("init consensus " + cfg.Consensus.Name) lognode.Info("init consensus " + cfg.Consensus.Name)
mock.mem = mempool.New(cfg.MemPool) mock.mem = mempool.New(cfg.Mempool, sub.Mempool)
mock.mem.SetQueueClient(q.Client()) mock.mem.SetQueueClient(q.Client())
mock.mem.Wait()
lognode.Info("init mempool") lognode.Info("init mempool")
mock.mem.WaitPollLastHeader()
if cfg.P2P.Enable { if cfg.P2P.Enable {
mock.network = p2p.New(cfg.P2P) mock.network = p2p.New(cfg.P2P)
mock.network.SetQueueClient(q.Client()) mock.network.SetQueueClient(q.Client())
...@@ -169,7 +169,7 @@ func (mock *Chain33Mock) GetBlockChain() *blockchain.BlockChain { ...@@ -169,7 +169,7 @@ func (mock *Chain33Mock) GetBlockChain() *blockchain.BlockChain {
func setFee(cfg *types.Config, fee int64) { func setFee(cfg *types.Config, fee int64) {
cfg.Exec.MinExecFee = fee cfg.Exec.MinExecFee = fee
cfg.MemPool.MinTxFee = fee cfg.Mempool.MinTxFee = fee
cfg.Wallet.MinFee = fee cfg.Wallet.MinFee = fee
if fee == 0 { if fee == 0 {
cfg.Exec.IsFree = true cfg.Exec.IsFree = true
...@@ -394,6 +394,9 @@ func (m *mockP2P) SetQueueClient(client queue.Client) { ...@@ -394,6 +394,9 @@ func (m *mockP2P) SetQueueClient(client queue.Client) {
}() }()
} }
//Wait for ready
func (m *mockP2P) Wait() {}
//Close : //Close :
func (m *mockP2P) Close() { func (m *mockP2P) Close() {
} }
...@@ -119,6 +119,9 @@ func New(cfg *types.Wallet, sub map[string][]byte) *Wallet { ...@@ -119,6 +119,9 @@ func New(cfg *types.Wallet, sub map[string][]byte) *Wallet {
return wallet return wallet
} }
//Wait for wallet ready
func (wallet *Wallet) Wait() {}
// RegisterMineStatusReporter 向钱包注册状态回报 // RegisterMineStatusReporter 向钱包注册状态回报
func (wallet *Wallet) RegisterMineStatusReporter(reporter wcom.MineStatusReport) error { func (wallet *Wallet) RegisterMineStatusReporter(reporter wcom.MineStatusReport) error {
if reporter == nil { if reporter == nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment