Commit 63a555bf authored by 陈德海's avatar 陈德海

complate trade

parent 6e169f63
...@@ -60,10 +60,25 @@ whitelist=["127.0.0.1"] ...@@ -60,10 +60,25 @@ whitelist=["127.0.0.1"]
jrpcFuncWhitelist=["*"] jrpcFuncWhitelist=["*"]
grpcFuncWhitelist=["*"] grpcFuncWhitelist=["*"]
[mempool] [mempool]
name="trade"
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
[mempool.sub.timeline]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
[mempool.sub.trade]
poolCacheSize=10240 poolCacheSize=10240
minTxFee=100000 minTxFee=100000
maxTxNumPerAccount=10000 maxTxNumPerAccount=10000
timeParam=1 #时间占价格比例
priceConstant=1 #一个合适的常量
pricePower=0 #手续费占常量比例
[consensus] [consensus]
name="ticket" name="ticket"
......
...@@ -117,9 +117,9 @@ type RoundState struct { ...@@ -117,9 +117,9 @@ type RoundState struct {
// RoundStateMessage ... // RoundStateMessage ...
func (rs *RoundState) RoundStateMessage() *tmtypes.NewRoundStepMsg { func (rs *RoundState) RoundStateMessage() *tmtypes.NewRoundStepMsg {
return &tmtypes.NewRoundStepMsg{ return &tmtypes.NewRoundStepMsg{
Height: rs.Height, Height: rs.Height,
Round: int32(rs.Round), Round: int32(rs.Round),
Step: int32(rs.Step), Step: int32(rs.Step),
SecondsSinceStartTime: int32(time.Since(rs.StartTime).Seconds()), SecondsSinceStartTime: int32(time.Since(rs.StartTime).Seconds()),
LastCommitRound: int32(rs.LastCommit.Round()), LastCommitRound: int32(rs.LastCommit.Round()),
} }
......
...@@ -78,9 +78,9 @@ func ParseX509CertificateToSm2(x509Cert *x509.Certificate) *sm2.Certificate { ...@@ -78,9 +78,9 @@ func ParseX509CertificateToSm2(x509Cert *x509.Certificate) *sm2.Certificate {
UnknownExtKeyUsage: x509Cert.UnknownExtKeyUsage, UnknownExtKeyUsage: x509Cert.UnknownExtKeyUsage,
BasicConstraintsValid: x509Cert.BasicConstraintsValid, BasicConstraintsValid: x509Cert.BasicConstraintsValid,
IsCA: x509Cert.IsCA, IsCA: x509Cert.IsCA,
MaxPathLen: x509Cert.MaxPathLen, MaxPathLen: x509Cert.MaxPathLen,
MaxPathLenZero: x509Cert.MaxPathLenZero, MaxPathLenZero: x509Cert.MaxPathLenZero,
SubjectKeyId: x509Cert.SubjectKeyId, SubjectKeyId: x509Cert.SubjectKeyId,
AuthorityKeyId: x509Cert.AuthorityKeyId, AuthorityKeyId: x509Cert.AuthorityKeyId,
...@@ -141,9 +141,9 @@ func ParseSm2CertificateToX509(sm2Cert *sm2.Certificate) *x509.Certificate { ...@@ -141,9 +141,9 @@ func ParseSm2CertificateToX509(sm2Cert *sm2.Certificate) *x509.Certificate {
UnknownExtKeyUsage: sm2Cert.UnknownExtKeyUsage, UnknownExtKeyUsage: sm2Cert.UnknownExtKeyUsage,
BasicConstraintsValid: sm2Cert.BasicConstraintsValid, BasicConstraintsValid: sm2Cert.BasicConstraintsValid,
IsCA: sm2Cert.IsCA, IsCA: sm2Cert.IsCA,
MaxPathLen: sm2Cert.MaxPathLen, MaxPathLen: sm2Cert.MaxPathLen,
MaxPathLenZero: sm2Cert.MaxPathLenZero, MaxPathLenZero: sm2Cert.MaxPathLenZero,
SubjectKeyId: sm2Cert.SubjectKeyId, SubjectKeyId: sm2Cert.SubjectKeyId,
AuthorityKeyId: sm2Cert.AuthorityKeyId, AuthorityKeyId: sm2Cert.AuthorityKeyId,
......
...@@ -4,5 +4,6 @@ import ( ...@@ -4,5 +4,6 @@ import (
_ "github.com/33cn/plugin/plugin/consensus/init" //consensus init _ "github.com/33cn/plugin/plugin/consensus/init" //consensus init
_ "github.com/33cn/plugin/plugin/crypto/init" //crypto init _ "github.com/33cn/plugin/plugin/crypto/init" //crypto init
_ "github.com/33cn/plugin/plugin/dapp/init" //dapp init _ "github.com/33cn/plugin/plugin/dapp/init" //dapp init
_ "github.com/33cn/plugin/plugin/mempool/init" //mempool init
_ "github.com/33cn/plugin/plugin/store/init" //store init _ "github.com/33cn/plugin/plugin/store/init" //store init
) )
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package init
import (
_ "github.com/33cn/plugin/plugin/mempool/trade"
)
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trade
import (
"bytes"
"encoding/gob"
"github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
//TradeQueue 简单队列模式(默认提供一个队列,便于测试)
type TradeQueue struct {
txMap map[string]*SkipValue
txList *SkipList
subConfig subConfig
}
//NewTradeQueue 创建队列
func NewTradeQueue(subcfg subConfig) *TradeQueue {
return &TradeQueue{
txMap: make(map[string]*SkipValue, subcfg.PoolCacheSize),
txList: NewSkipList(&SkipValue{-1, nil}),
subConfig: subcfg,
}
}
func (cache *TradeQueue) newSkipValue(item *mempool.Item) (*SkipValue, error) {
//tx := item.value
buf := bytes.NewBuffer(nil)
enc := gob.NewEncoder(buf)
err := enc.Encode(item.Value)
if err != nil {
return nil, err
}
size := len(buf.Bytes())
return &SkipValue{Score: cache.subConfig.TimeParam*item.EnterTime - cache.subConfig.PriceConstant*(item.Value.Fee/int64(size))*cache.subConfig.PricePower, Value: item}, nil
}
//Exist 是否存在
func (cache *TradeQueue) Exist(hash string) bool {
_, exists := cache.txMap[string(hash)]
return exists
}
//GetItem 获取数据通过 key
func (cache *TradeQueue) GetItem(hash string) (*mempool.Item, error) {
if k, exist := cache.txMap[string(hash)]; exist {
return k.Value.(*mempool.Item), nil
}
return nil, types.ErrNotFound
}
// Push 把给定tx添加到TradeQueue;如果tx已经存在TradeQueue中或Mempool已满则返回对应error
func (cache *TradeQueue) Push(item *mempool.Item) error {
hash := item.Value.Hash()
if cache.Exist(string(hash)) {
s := cache.txMap[string(hash)]
addedItem := s.Value.(*mempool.Item)
addedTime := addedItem.EnterTime
if types.Now().Unix()-addedTime < mempoolDupResendInterval {
return types.ErrTxExist
} else {
// 超过2分钟之后的重发交易返回nil,再次发送给P2P,但是不再次加入mempool
// 并修改其enterTime,以避免该交易一直在节点间被重发
newEnterTime := types.Now().Unix()
resendItem := &mempool.Item{Value: item.Value, Priority: item.Value.Fee, EnterTime: newEnterTime}
var err error
sv, err := cache.newSkipValue(resendItem)
if err != nil {
return err
}
cache.Remove(string(hash))
cache.txList.Insert(sv)
cache.txMap[string(hash)] = sv
// ------------------
return nil
}
}
it := &mempool.Item{Value: item.Value, Priority: item.Value.Fee, EnterTime: types.Now().Unix()}
sv, err := cache.newSkipValue(it)
if err != nil {
return err
}
if cache.txList.Len() >= cache.Size() {
tail := cache.txList.tail
//价格高
if sv.Compare(tail.Value) == 1 {
cache.Remove(string(tail.Value.Value.(*mempool.Item).Value.Hash()))
} else {
return types.ErrMemFull
}
}
cache.txList.Insert(sv)
cache.txMap[string(hash)] = sv
return nil
}
// Remove 删除数据
func (cache *TradeQueue) Remove(hash string) error {
cache.txList.Delete(cache.txMap[hash])
delete(cache.txMap, hash)
return nil
}
// Size 数据总数
func (cache *TradeQueue) Size() int {
return cache.txList.Len()
}
// Walk 遍历整个队列
func (cache *TradeQueue) Walk(count int, cb func(value *mempool.Item) bool) {
i := 0
cache.txList.Walk(func(item interface{}) bool {
if !cb(item.(*mempool.Item)) {
return false
}
i++
return i != count
})
}
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package mempool package trade
import ( import (
"runtime" "runtime"
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trade
import (
clog "github.com/33cn/chain33/common/log"
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/queue"
drivers "github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
func SetLogLevel(level string) {
clog.SetLogLevel(level)
}
func DisableLog() {
mlog.SetHandler(log.DiscardHandler())
}
//--------------------------------------------------------------------------------
// Module Mempool
type Mempool struct {
subConfig subConfig
}
type subConfig struct {
PoolCacheSize int64 `json:"poolCacheSize"`
MinTxFee int64 `json:"minTxFee"`
MaxTxNumPerAccount int64 `json:"maxTxNumPerAccount"`
TimeParam int64 `json:"timeParam"`
PriceConstant int64 `json:"priceConstant"`
PricePower int64 `json:"pricePower"`
}
func init() {
drivers.Reg("trade", New)
}
//New 创建timeline cache 结构的 mempool
func New(cfg *types.Mempool, sub []byte) queue.Module {
c := drivers.NewMempool(cfg)
var subcfg subConfig
types.MustDecode(sub, &subcfg)
if subcfg.PoolCacheSize == 0 {
subcfg.PoolCacheSize = cfg.PoolCacheSize
}
c.SetQueueCache(NewTradeQueue(subcfg))
return c
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trade
import (
"encoding/json"
"testing"
"github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
func TestNewMempool(t *testing.T) {
sub, _ := json.Marshal(&subConfig{
PoolCacheSize: 2,
MinTxFee: 100000,
MaxTxNumPerAccount: 10000,
TimeParam: 1,
PriceConstant: 1,
PricePower: 1,
})
module := New(&types.Mempool{}, sub)
mem := module.(*mempool.Mempool)
mem.Close()
}
package trade
import (
"fmt"
"math/rand"
)
const maxLevel = 32
const prob = 0.35
type SkipValue struct {
Score int64
Value interface{}
}
func (v *SkipValue) Compare(value *SkipValue) int {
f1 := v.Score
f2 := value.Score
if f1 > f2 {
return 1
} else if f1 == f2 {
return 0
}
return -1
}
type skipListNode struct {
next []*skipListNode
prev *skipListNode
Value *SkipValue
}
type SkipList struct {
header, tail *skipListNode
findcount int
count int
level int
}
type SkipListIterator struct {
list *SkipList
node *skipListNode
}
func (sli *SkipListIterator) First() *SkipValue {
if sli.list.header.next[0] == nil {
return nil
}
sli.node = sli.list.header.next[0]
return sli.node.Value
}
func (sli *SkipListIterator) Last() *SkipValue {
if sli.list.tail == nil {
return nil
}
sli.node = sli.list.tail
return sli.node.Value
}
func (sli *SkipListIterator) Current() *SkipValue {
if sli.node == nil {
return nil
}
return sli.node.Value
}
func (node *skipListNode) Prev() *skipListNode {
if node == nil || node.prev == nil {
return nil
}
return node.prev
}
func (node *skipListNode) Next() *skipListNode {
if node == nil || node.next[0] == nil {
return nil
}
return node.next[0]
}
func (sli *SkipListIterator) Seek(value *SkipValue) *SkipValue {
x := sli.list.find(value)
if x.next[0] == nil {
return nil
}
sli.node = x.next[0]
return sli.node.Value
}
func newskipListNode(level int, value *SkipValue) *skipListNode {
node := &skipListNode{}
node.next = make([]*skipListNode, level)
node.Value = value
return node
}
//构建一个value的最小值
func NewSkipList(min *SkipValue) *SkipList {
sl := &SkipList{}
sl.level = 1
sl.header = newskipListNode(maxLevel, min)
return sl
}
func randomLevel() int {
level := 1
t := prob * 0xFFFF
for rand.Int()&0xFFFF < int(t) {
level += 1
if level == maxLevel {
break
}
}
return level
}
func (sl *SkipList) GetIterator() *SkipListIterator {
it := &SkipListIterator{}
it.list = sl
it.First()
return it
}
func (sl *SkipList) Len() int {
return sl.count
}
func (sl *SkipList) Level() int {
return sl.level
}
func (sl *SkipList) find(value *SkipValue) *skipListNode {
x := sl.header
for i := sl.level - 1; i >= 0; i-- {
for x.next[i] != nil && x.next[i].Value.Compare(value) < 0 {
sl.findcount++
x = x.next[i]
}
}
return x
}
func (sl *SkipList) FindCount() int {
return sl.findcount
}
func (sl *SkipList) Find(value *SkipValue) *SkipValue {
x := sl.find(value)
if x.next[0] != nil && x.next[0].Value.Compare(value) == 0 {
return x.next[0].Value
}
return nil
}
func (sl *SkipList) FindGreaterOrEqual(value *SkipValue) *SkipValue {
x := sl.find(value)
if x.next[0] != nil {
return x.next[0].Value
}
return nil
}
func (sl *SkipList) Insert(value *SkipValue) int {
var update [maxLevel]*skipListNode
x := sl.header
for i := sl.level - 1; i >= 0; i-- {
for x.next[i] != nil && x.next[i].Value.Compare(value) <= 0 {
x = x.next[i]
}
update[i] = x
}
//if x.next[0] != nil && x.next[0].Value.Compare(value) == 0 { //update
// x.next[0].Value = value
// return 0
//}
level := randomLevel()
if level > sl.level {
for i := sl.level; i < level; i++ {
update[i] = sl.header
}
sl.level = level
}
x = newskipListNode(level, value)
for i := 0; i < level; i++ {
x.next[i] = update[i].next[i]
update[i].next[i] = x
}
//形成一个双向链表
if update[0] != sl.header {
x.prev = update[0]
}
if x.next[0] != nil {
x.next[0].prev = x
} else {
sl.tail = x
}
sl.count++
return 1
}
func (sl *SkipList) Delete(value *SkipValue) int {
var update [maxLevel]*skipListNode
x := sl.header
for i := sl.level - 1; i >= 0; i-- {
for x.next[i] != nil && x.next[i].Value.Compare(value) < 0 {
x = x.next[i]
}
update[i] = x
}
if x.next[0] == nil || x.next[0].Value.Compare(value) != 0 { //not find
return 0
}
x = x.next[0]
for i := 0; i < sl.level; i++ {
if update[i].next[i] == x {
update[i].next[i] = x.next[i]
}
}
if x.next[0] != nil {
x.next[0].prev = x.prev
} else {
sl.tail = x.prev
}
for sl.level > 1 && sl.header.next[sl.level-1] == nil {
sl.level--
}
sl.count--
return 1
}
//测试用的输出函数
func (l *SkipList) Print() {
if l.count > 0 {
r := l.header
for i := l.level - 1; i >= 0; i-- {
e := r.next[i]
//fmt.Print(i)
for e != nil {
fmt.Print(e.Value)
fmt.Print(" ")
e = e.next[i]
}
fmt.Println()
}
} else {
fmt.Println("空")
}
}
//Walk 遍历整个结构,如果cb 返回false 那么停止遍历
func (lm *SkipList) Walk(cb func(value interface{}) bool) {
for e := lm.header.Next(); e != nil; e = e.Next() {
if cb == nil {
return
}
if !cb(e.Value.Value) {
return
}
}
}
...@@ -84,7 +84,7 @@ git merge upstream/master ...@@ -84,7 +84,7 @@ git merge upstream/master
git fetch upstream git fetch upstream
git checkout master git checkout master
git merge upstream/master git merge upstream/master
git branch -a "fixbug_ci" git branch -b "fixbug_ci"
``` ```
* 开发完成后, push 到 `vipwzw/chain33` * 开发完成后, push 到 `vipwzw/chain33`
......
...@@ -207,6 +207,9 @@ func (chain *BlockChain) SetQueueClient(client queue.Client) { ...@@ -207,6 +207,9 @@ func (chain *BlockChain) SetQueueClient(client queue.Client) {
go chain.ProcRecvMsg() go chain.ProcRecvMsg()
} }
//Wait for ready
func (chain *BlockChain) Wait() {}
//GetStore only used for test //GetStore only used for test
func (chain *BlockChain) GetStore() *BlockStore { func (chain *BlockChain) GetStore() *BlockStore {
return chain.blockStore return chain.blockStore
......
...@@ -64,10 +64,24 @@ certFile="cert.pem" ...@@ -64,10 +64,24 @@ certFile="cert.pem"
keyFile="key.pem" keyFile="key.pem"
[mempool] [mempool]
name="trade"
poolCacheSize=10240 poolCacheSize=10240
minTxFee=1000000 minTxFee=100000
maxTxNumPerAccount=10000 maxTxNumPerAccount=10000
[mempool.sub.timeline]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
[mempool.sub.trade]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
timeParam=1 #时间占价格比例
priceConstant=1 #一个合适的常量
pricePower=1 #手续费占常量比例
[consensus] [consensus]
name="solo" name="solo"
minerstart=true minerstart=true
......
...@@ -64,10 +64,24 @@ certFile="cert.pem" ...@@ -64,10 +64,24 @@ certFile="cert.pem"
keyFile="key.pem" keyFile="key.pem"
[mempool] [mempool]
name="timeline"
poolCacheSize=10240 poolCacheSize=10240
minTxFee=100000 minTxFee=100000
maxTxNumPerAccount=10000 maxTxNumPerAccount=10000
[mempool.sub.timeline]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
[mempool.sub.trade]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
timeParam=1 #时间占价格比例
priceConstant=1 #一个合适的常量
pricePower=0 #手续费占常量比例
[consensus] [consensus]
name="solo" name="solo"
minerstart=true minerstart=true
......
package listmap
import (
"container/list"
"github.com/33cn/chain33/types"
)
//ListMap list 和 map 组合的一个数据结构体
type ListMap struct {
m map[string]*list.Element
l *list.List
}
//New 创建一个新的数据结构
func New() *ListMap {
return &ListMap{
m: make(map[string]*list.Element),
l: list.New(),
}
}
//Size 结构中的item 数目
func (lm *ListMap) Size() int {
return len(lm.m)
}
//Exist 是否存在这个元素
func (lm *ListMap) Exist(key string) bool {
_, ok := lm.m[key]
return ok
}
//GetItem 通过key 获取这个 item
func (lm *ListMap) GetItem(key string) (interface{}, error) {
item, ok := lm.m[key]
if !ok {
return nil, types.ErrNotFound
}
return item.Value, nil
}
//Push 在队伍尾部插入
func (lm *ListMap) Push(key string, value interface{}) {
if elm, ok := lm.m[key]; ok {
elm.Value = value
return
}
elm := lm.l.PushBack(value)
lm.m[key] = elm
}
//GetTop 获取队列头部的数据
func (lm *ListMap) GetTop() interface{} {
elm := lm.l.Front()
if elm == nil {
return nil
}
return elm.Value
}
//Remove 删除某个key
func (lm *ListMap) Remove(key string) interface{} {
if elm, ok := lm.m[key]; ok {
value := lm.l.Remove(elm)
delete(lm.m, key)
return value
}
return nil
}
//Walk 遍历整个结构,如果cb 返回false 那么停止遍历
func (lm *ListMap) Walk(cb func(value interface{}) bool) {
for e := lm.l.Front(); e != nil; e = e.Next() {
if cb == nil {
return
}
if !cb(e.Value) {
return
}
}
}
package listmap
import (
"testing"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
func TestListMap(t *testing.T) {
l := New()
l.Push("1", "1")
assert.Equal(t, 1, l.Size())
l.Push("1", "11")
assert.Equal(t, 1, l.Size())
value, err := l.GetItem("1")
assert.Equal(t, err, nil)
assert.Equal(t, "11", value.(string))
l.Remove("1")
assert.Equal(t, 0, l.Size())
assert.Equal(t, false, l.Exist("1"))
v := l.GetTop()
assert.Equal(t, nil, v)
_, err = l.GetItem("11")
assert.Equal(t, types.ErrNotFound, err)
l.Push("1", "11")
assert.Equal(t, true, l.Exist("1"))
l.Push("2", "2")
assert.Equal(t, "11", l.GetTop().(string))
var data [2]string
i := 0
l.Walk(func(value interface{}) bool {
data[i] = value.(string)
i++
return true
})
assert.Equal(t, data[0], "11")
assert.Equal(t, data[1], "2")
var data2 [2]string
i = 0
l.Walk(func(value interface{}) bool {
data2[i] = value.(string)
i++
return false
})
assert.Equal(t, data2[0], "11")
assert.Equal(t, data2[1], "")
l.Walk(nil)
l.Remove("xxxx")
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
import (
"container/list"
"github.com/33cn/chain33/types"
)
//--------------------------------------------------------------------------------
// Module txCache
type txCache struct {
size int
txMap map[string]*list.Element
txList *list.List
txFrontTen []*types.Transaction
accMap map[string][]*types.Transaction
}
// Item 为Mempool中包装交易的数据结构
type Item struct {
value *types.Transaction
priority int64
enterTime int64
}
// NewTxCache初始化txCache
func newTxCache(cacheSize int64) *txCache {
return &txCache{
size: int(cacheSize),
txMap: make(map[string]*list.Element, cacheSize),
txList: list.New(),
txFrontTen: make([]*types.Transaction, 0),
accMap: make(map[string][]*types.Transaction),
}
}
// txCache.TxNumOfAccount返回账户在Mempool中交易数量
func (cache *txCache) TxNumOfAccount(addr string) int64 {
return int64(len(cache.accMap[addr]))
}
// txCache.Exists判断txCache中是否存在给定tx
func (cache *txCache) Exists(hash []byte) bool {
_, exists := cache.txMap[string(hash)]
return exists
}
// txCache.Push把给定tx添加到txCache;如果tx已经存在txCache中或Mempool已满则返回对应error
func (cache *txCache) Push(tx *types.Transaction) error {
hash := tx.Hash()
if cache.Exists(hash) {
addedItem, ok := cache.txMap[string(hash)].Value.(*Item)
if !ok {
mlog.Error("mempoolItemError", "item", cache.txMap[string(hash)].Value)
return types.ErrTxExist
}
addedTime := addedItem.enterTime
if types.Now().Unix()-addedTime < mempoolDupResendInterval {
return types.ErrTxExist
}
// 超过2分钟之后的重发交易返回nil,再次发送给P2P,但是不再次加入mempool
// 并修改其enterTime,以避免该交易一直在节点间被重发
newEnterTime := types.Now().Unix()
resendItem := &Item{value: tx, priority: tx.Fee, enterTime: newEnterTime}
newItem := cache.txList.InsertAfter(resendItem, cache.txMap[string(hash)])
cache.txList.Remove(cache.txMap[string(hash)])
cache.txMap[string(hash)] = newItem
// ------------------
return nil
}
if cache.txList.Len() >= cache.size {
return types.ErrMemFull
}
it := &Item{value: tx, priority: tx.Fee, enterTime: types.Now().Unix()}
txElement := cache.txList.PushBack(it)
cache.txMap[string(hash)] = txElement
// 账户交易数量
accountAddr := tx.From()
cache.accMap[accountAddr] = append(cache.accMap[accountAddr], tx)
if len(cache.txFrontTen) >= 10 {
cache.txFrontTen = cache.txFrontTen[len(cache.txFrontTen)-9:]
}
cache.txFrontTen = append(cache.txFrontTen, tx)
return nil
}
// txCache.GetLatestTx返回最新十条加入到txCache的交易
func (cache *txCache) GetLatestTx() []*types.Transaction {
return cache.txFrontTen
}
// txCache.Remove移除txCache中给定tx
func (cache *txCache) Remove(hash []byte) {
value := cache.txList.Remove(cache.txMap[string(hash)])
delete(cache.txMap, string(hash))
// 账户交易数量减1
if value == nil {
return
}
tx := value.(*Item).value
addr := tx.From()
if cache.TxNumOfAccount(addr) > 0 {
cache.AccountTxNumDecrease(addr, hash)
}
}
// txCache.Size返回txCache中已存tx数目
func (cache *txCache) Size() int {
return cache.txList.Len()
}
// txCache.SetSize用来设置Mempool容量
func (cache *txCache) SetSize(newSize int) {
if cache.txList.Len() > 0 {
panic("only can set a empty size")
}
cache.size = newSize
}
// txCache.GetAccTxs用来获取对应账户地址(列表)中的全部交易详细信息
func (cache *txCache) GetAccTxs(addrs *types.ReqAddrs) *types.TransactionDetails {
res := &types.TransactionDetails{}
for _, addr := range addrs.Addrs {
if value, ok := cache.accMap[addr]; ok {
for _, v := range value {
txAmount, err := v.Amount()
if err != nil {
// continue
txAmount = 0
}
res.Txs = append(res.Txs,
&types.TransactionDetail{
Tx: v,
Amount: txAmount,
Fromaddr: addr,
ActionName: v.ActionName(),
})
}
}
}
return res
}
// txCache.AccountTxNumDecrease根据交易哈希删除对应账户的对应交易
func (cache *txCache) AccountTxNumDecrease(addr string, hash []byte) {
if value, ok := cache.accMap[addr]; ok {
for i, t := range value {
if string(t.Hash()) == string(hash) {
cache.accMap[addr] = append(cache.accMap[addr][:i], cache.accMap[addr][i+1:]...)
if len(cache.accMap[addr]) == 0 {
delete(cache.accMap, addr)
}
return
}
}
}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool package mempool
//mempool 模块的功能:实现交易暂存的功能。主要是解决共识模块可能比rpc模块速度慢的问题。 //交易打包排序相关的模块
//模块的接口的设计: //模块功能:模块主要的功能是实现共识排序的功能,包括完整的共识的实现。
//接口设计:
...@@ -76,6 +76,9 @@ func New(cfg *types.P2P) *P2p { ...@@ -76,6 +76,9 @@ func New(cfg *types.P2P) *P2p {
return p2p return p2p
} }
//Wait wait for ready
func (network *P2p) Wait() {}
func (network *P2p) isClose() bool { func (network *P2p) isClose() bool {
return atomic.LoadInt32(&network.closed) == 1 return atomic.LoadInt32(&network.closed) == 1
} }
......
...@@ -43,6 +43,8 @@ type Client interface { ...@@ -43,6 +43,8 @@ type Client interface {
// Module be used for module interface // Module be used for module interface
type Module interface { type Module interface {
SetQueueClient(client Client) SetQueueClient(client Client)
//wait for ready
Wait()
Close() Close()
} }
......
...@@ -105,6 +105,9 @@ func (bc *BaseClient) InitMiner() { ...@@ -105,6 +105,9 @@ func (bc *BaseClient) InitMiner() {
bc.once.Do(bc.minerstartCB) bc.once.Do(bc.minerstartCB)
} }
//Wait wait for ready
func (bc *BaseClient) Wait() {}
//SetQueueClient 设置客户端队列 //SetQueueClient 设置客户端队列
func (bc *BaseClient) SetQueueClient(c queue.Client) { func (bc *BaseClient) SetQueueClient(c queue.Client) {
bc.InitClient(c, func() { bc.InitClient(c, func() {
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
//加载系统内置store, 不要依赖plugin //加载系统内置store, 不要依赖plugin
_ "github.com/33cn/chain33/system/dapp/init" _ "github.com/33cn/chain33/system/dapp/init"
_ "github.com/33cn/chain33/system/mempool/init"
_ "github.com/33cn/chain33/system/store/init" _ "github.com/33cn/chain33/system/store/init"
) )
......
...@@ -293,7 +293,7 @@ func getblockbyhashs(cmd *cobra.Command, args []string) { ...@@ -293,7 +293,7 @@ func getblockbyhashs(cmd *cobra.Command, args []string) {
Hashes: hashesArr, Hashes: hashesArr,
} }
var res types.BlockDetails var res rpctypes.BlockDetails
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.GetBlockByHashes", params, &res) ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.GetBlockByHashes", params, &res)
//ctx.SetResultCb(parseQueryTxsByHashesRes) //ctx.SetResultCb(parseQueryTxsByHashesRes)
ctx.Run() ctx.Run()
......
...@@ -9,5 +9,6 @@ import ( ...@@ -9,5 +9,6 @@ import (
_ "github.com/33cn/chain33/system/consensus/init" //register consensus init package _ "github.com/33cn/chain33/system/consensus/init" //register consensus init package
_ "github.com/33cn/chain33/system/crypto/init" _ "github.com/33cn/chain33/system/crypto/init"
_ "github.com/33cn/chain33/system/dapp/init" _ "github.com/33cn/chain33/system/dapp/init"
_ "github.com/33cn/chain33/system/mempool/init"
_ "github.com/33cn/chain33/system/store/init" _ "github.com/33cn/chain33/system/store/init"
) )
package mempool
import (
"github.com/33cn/chain33/common/listmap"
"github.com/33cn/chain33/types"
)
//AccountTxIndex 账户和交易索引
type AccountTxIndex struct {
maxperaccount int
accMap map[string]*listmap.ListMap
}
//NewAccountTxIndex 创建一个新的索引
func NewAccountTxIndex(maxperaccount int) *AccountTxIndex {
return &AccountTxIndex{
maxperaccount: maxperaccount,
accMap: make(map[string]*listmap.ListMap),
}
}
// TxNumOfAccount 返回账户在Mempool中交易数量
func (cache *AccountTxIndex) TxNumOfAccount(addr string) int {
if _, ok := cache.accMap[addr]; ok {
return cache.accMap[addr].Size()
}
return 0
}
// GetAccTxs 用来获取对应账户地址(列表)中的全部交易详细信息
func (cache *AccountTxIndex) GetAccTxs(addrs *types.ReqAddrs) *types.TransactionDetails {
res := &types.TransactionDetails{}
for _, addr := range addrs.Addrs {
if value, ok := cache.accMap[addr]; ok {
value.Walk(func(val interface{}) bool {
v := val.(*types.Transaction)
txAmount, err := v.Amount()
if err != nil {
txAmount = 0
}
res.Txs = append(res.Txs,
&types.TransactionDetail{
Tx: v,
Amount: txAmount,
Fromaddr: addr,
ActionName: v.ActionName(),
})
return true
})
}
}
return res
}
//Remove 根据交易哈希删除对应账户的对应交易
func (cache *AccountTxIndex) Remove(tx *types.Transaction) {
addr := tx.From()
if lm, ok := cache.accMap[addr]; ok {
lm.Remove(string(tx.Hash()))
if lm.Size() == 0 {
delete(cache.accMap, addr)
}
}
}
// Push push transaction to AccountTxIndex
func (cache *AccountTxIndex) Push(tx *types.Transaction) error {
addr := tx.From()
_, ok := cache.accMap[addr]
if !ok {
cache.accMap[addr] = listmap.New()
}
if cache.accMap[addr].Size() >= cache.maxperaccount {
return types.ErrManyTx
}
cache.accMap[addr].Push(string(tx.Hash()), tx)
return nil
}
//CanPush 是否可以push 进 account index
func (cache *AccountTxIndex) CanPush(tx *types.Transaction) bool {
if item, ok := cache.accMap[tx.From()]; ok {
return item.Size() < cache.maxperaccount
}
return true
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
import (
"sync"
"sync/atomic"
"time"
"github.com/33cn/chain33/common"
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
)
var mlog = log.New("module", "mempool.base")
//Mempool mempool 基础类
type Mempool struct {
proxyMtx sync.Mutex
in chan queue.Message
out <-chan queue.Message
client queue.Client
header *types.Header
sync bool
cfg *types.Mempool
poolHeader chan struct{}
isclose int32
wg sync.WaitGroup
done chan struct{}
removeBlockTicket *time.Ticker
cache *txCache
}
//GetSync 判断是否mempool 同步
func (mem *Mempool) getSync() bool {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
return mem.sync
}
//NewMempool 新建mempool 实例
func NewMempool(cfg *types.Mempool) *Mempool {
pool := &Mempool{}
if cfg.MaxTxNumPerAccount == 0 {
cfg.MaxTxNumPerAccount = maxTxNumPerAccount
}
if cfg.MaxTxLast == 0 {
cfg.MaxTxLast = maxTxLast
}
if cfg.PoolCacheSize == 0 {
cfg.PoolCacheSize = poolCacheSize
}
pool.in = make(chan queue.Message)
pool.out = make(<-chan queue.Message)
pool.done = make(chan struct{})
pool.cfg = cfg
pool.poolHeader = make(chan struct{}, 2)
pool.removeBlockTicket = time.NewTicker(time.Minute)
pool.cache = newCache(cfg.MaxTxNumPerAccount, cfg.MaxTxLast)
return pool
}
//Close 关闭mempool
func (mem *Mempool) Close() {
if mem.isClose() {
return
}
atomic.StoreInt32(&mem.isclose, 1)
close(mem.done)
if mem.client != nil {
mem.client.Close()
}
mem.removeBlockTicket.Stop()
mlog.Info("mempool module closing")
mem.wg.Wait()
mlog.Info("mempool module closed")
}
//SetQueueClient 初始化mempool模块
func (mem *Mempool) SetQueueClient(client queue.Client) {
mem.client = client
mem.client.Sub("mempool")
mem.wg.Add(1)
go mem.pollLastHeader()
mem.wg.Add(1)
go mem.checkSync()
mem.wg.Add(1)
go mem.removeBlockedTxs()
mem.wg.Add(1)
go mem.eventProcess()
}
// Size 返回mempool中txCache大小
func (mem *Mempool) Size() int {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
return mem.cache.Size()
}
// SetMinFee 设置最小交易费用
func (mem *Mempool) SetMinFee(fee int64) {
mem.proxyMtx.Lock()
mem.cfg.MinTxFee = fee
mem.proxyMtx.Unlock()
}
//SetQueueCache 设置排队策略
func (mem *Mempool) SetQueueCache(qcache QueueCache) {
mem.cache.SetQueueCache(qcache)
}
// GetTxList 从txCache中返回给定数目的tx
func (mem *Mempool) getTxList(filterList *types.TxHashList) (txs []*types.Transaction) {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
count := filterList.GetCount()
dupMap := make(map[string]bool)
for i := 0; i < len(filterList.GetHashes()); i++ {
dupMap[string(filterList.GetHashes()[i])] = true
}
return mem.filterTxList(count, dupMap)
}
func (mem *Mempool) filterTxList(count int64, dupMap map[string]bool) (txs []*types.Transaction) {
height := mem.header.GetHeight()
blocktime := mem.header.GetBlockTime()
mem.cache.Walk(int(count), func(tx *Item) bool {
if dupMap != nil {
if _, ok := dupMap[string(tx.Value.Hash())]; ok {
return true
}
}
if isExpired(tx, height, blocktime) {
return true
}
txs = append(txs, tx.Value)
return true
})
return txs
}
// RemoveTxs 从mempool中删除给定Hash的txs
func (mem *Mempool) RemoveTxs(hashList *types.TxHashList) error {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
for _, hash := range hashList.Hashes {
exist := mem.cache.Exist(string(hash))
if exist {
mem.cache.Remove(string(hash))
}
}
return nil
}
// PushTx 将交易推入mempool,并返回结果(error)
func (mem *Mempool) PushTx(tx *types.Transaction) error {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
err := mem.cache.Push(tx)
return err
}
// setHeader设置mempool.header
func (mem *Mempool) setHeader(h *types.Header) {
mem.proxyMtx.Lock()
mem.header = h
mem.proxyMtx.Unlock()
}
// GetHeader 获取Mempool.header
func (mem *Mempool) GetHeader() *types.Header {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
return mem.header
}
//IsClose 判断是否mempool 关闭
func (mem *Mempool) isClose() bool {
return atomic.LoadInt32(&mem.isclose) == 1
}
// GetLastHeader 获取LastHeader的height和blockTime
func (mem *Mempool) GetLastHeader() (interface{}, error) {
if mem.client == nil {
panic("client not bind message queue.")
}
msg := mem.client.NewMessage("blockchain", types.EventGetLastHeader, nil)
err := mem.client.Send(msg, true)
if err != nil {
mlog.Error("blockchain closed", "err", err.Error())
return nil, err
}
return mem.client.Wait(msg)
}
// GetAccTxs 用来获取对应账户地址(列表)中的全部交易详细信息
func (mem *Mempool) GetAccTxs(addrs *types.ReqAddrs) *types.TransactionDetails {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
return mem.cache.GetAccTxs(addrs)
}
// TxNumOfAccount 返回账户在mempool中交易数量
func (mem *Mempool) TxNumOfAccount(addr string) int64 {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
return int64(mem.cache.TxNumOfAccount(addr))
}
// GetLatestTx 返回最新十条加入到mempool的交易
func (mem *Mempool) GetLatestTx() []*types.Transaction {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
return mem.cache.GetLatestTx()
}
// pollLastHeader在初始化后循环获取LastHeader,直到获取成功后,返回
func (mem *Mempool) pollLastHeader() {
defer mem.wg.Done()
defer func() {
mlog.Info("pollLastHeader quit")
mem.poolHeader <- struct{}{}
}()
for {
if mem.isClose() {
return
}
lastHeader, err := mem.GetLastHeader()
if err != nil {
mlog.Error(err.Error())
time.Sleep(time.Second)
continue
}
h := lastHeader.(queue.Message).Data.(*types.Header)
mem.setHeader(h)
return
}
}
func (mem *Mempool) removeExpired() {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
mem.cache.removeExpiredTx(mem.header.GetHeight(), mem.header.GetBlockTime())
}
// removeBlockedTxs 每隔1分钟清理一次已打包的交易
func (mem *Mempool) removeBlockedTxs() {
defer mem.wg.Done()
defer mlog.Info("RemoveBlockedTxs quit")
if mem.client == nil {
panic("client not bind message queue.")
}
for {
select {
case <-mem.removeBlockTicket.C:
if mem.isClose() {
return
}
mem.removeExpired()
case <-mem.done:
return
}
}
}
// RemoveTxsOfBlock 移除mempool中已被Blockchain打包的tx
func (mem *Mempool) RemoveTxsOfBlock(block *types.Block) bool {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
for _, tx := range block.Txs {
hash := tx.Hash()
exist := mem.cache.Exist(string(hash))
if exist {
mem.cache.Remove(string(hash))
}
}
return true
}
// Mempool.DelBlock将回退的区块内的交易重新加入mempool中
func (mem *Mempool) delBlock(block *types.Block) {
if len(block.Txs) <= 0 {
return
}
blkTxs := block.Txs
header := mem.GetHeader()
for i := 0; i < len(blkTxs); i++ {
tx := blkTxs[i]
//当前包括ticket和平行链的第一笔挖矿交易,统一actionName为miner
if i == 0 && tx.ActionName() == types.MinerAction {
continue
}
groupCount := int(tx.GetGroupCount())
if groupCount > 1 && i+groupCount <= len(blkTxs) {
group := types.Transactions{Txs: blkTxs[i : i+groupCount]}
tx = group.Tx()
i = i + groupCount - 1
}
err := tx.Check(header.GetHeight(), mem.cfg.MinTxFee)
if err != nil {
continue
}
if !mem.checkExpireValid(tx) {
continue
}
mem.PushTx(tx)
}
}
// Height 获取区块高度
func (mem *Mempool) Height() int64 {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
if mem.header == nil {
return -1
}
return mem.header.GetHeight()
}
// Wait wait mempool ready
func (mem *Mempool) Wait() {
<-mem.poolHeader
//wait sync
<-mem.poolHeader
}
// SendTxToP2P 向"p2p"发送消息
func (mem *Mempool) sendTxToP2P(tx *types.Transaction) {
if mem.client == nil {
panic("client not bind message queue.")
}
msg := mem.client.NewMessage("p2p", types.EventTxBroadcast, tx)
mem.client.Send(msg, false)
mlog.Debug("tx sent to p2p", "tx.Hash", common.ToHex(tx.Hash()))
}
// Mempool.checkSync检查并获取mempool同步状态
func (mem *Mempool) checkSync() {
defer func() {
mlog.Info("getsync quit")
mem.poolHeader <- struct{}{}
}()
defer mem.wg.Done()
if mem.getSync() {
return
}
if mem.cfg.ForceAccept {
mem.setSync(true)
}
for {
if mem.isClose() {
return
}
if mem.client == nil {
panic("client not bind message queue.")
}
msg := mem.client.NewMessage("blockchain", types.EventIsSync, nil)
err := mem.client.Send(msg, true)
if err != nil {
time.Sleep(time.Second)
continue
}
resp, err := mem.client.Wait(msg)
if err != nil {
time.Sleep(time.Second)
continue
}
if resp.GetData().(*types.IsCaughtUp).GetIscaughtup() {
mem.setSync(true)
return
}
time.Sleep(time.Second)
continue
}
}
func (mem *Mempool) setSync(status bool) {
mem.proxyMtx.Lock()
mem.sync = status
mem.proxyMtx.Unlock()
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found In the LICENSE file.
package mempool
import (
"github.com/33cn/chain33/types"
)
//QueueCache 排队交易处理
type QueueCache interface {
Exist(hash string) bool
GetItem(hash string) (*Item, error)
Push(tx *Item) error
Remove(hash string) error
Size() int
Walk(count int, cb func(tx *Item) bool)
}
// Item 为Mempool中包装交易的数据结构
type Item struct {
Value *types.Transaction
Priority int64
EnterTime int64
}
//TxCache 管理交易cache 包括账户索引,最后的交易,排队策略缓存
type txCache struct {
*AccountTxIndex
*LastTxCache
qcache QueueCache
}
//NewTxCache init accountIndex and last cache
func newCache(maxTxPerAccount int64, sizeLast int64) *txCache {
return &txCache{
AccountTxIndex: NewAccountTxIndex(int(maxTxPerAccount)),
LastTxCache: NewLastTxCache(int(sizeLast)),
}
}
//SetQueueCache set queue cache , 这个接口可以扩展
func (cache *txCache) SetQueueCache(qcache QueueCache) {
cache.qcache = qcache
}
//Remove 移除txCache中给定tx
func (cache *txCache) Remove(hash string) {
item, err := cache.qcache.GetItem(hash)
if err != nil {
return
}
tx := item.Value
cache.qcache.Remove(hash)
cache.AccountTxIndex.Remove(tx)
cache.LastTxCache.Remove(tx)
}
//Exist 是否存在
func (cache *txCache) Exist(hash string) bool {
if cache.qcache == nil {
return false
}
return cache.qcache.Exist(hash)
}
//Size cache tx num
func (cache *txCache) Size() int {
if cache.qcache == nil {
return 0
}
return cache.qcache.Size()
}
//Walk iter all txs
func (cache *txCache) Walk(count int, cb func(tx *Item) bool) {
if cache.qcache == nil {
return
}
cache.qcache.Walk(count, cb)
}
//RemoveTxs 删除一组交易
func (cache *txCache) RemoveTxs(txs []string) {
for _, t := range txs {
cache.Remove(t)
}
}
//Push 存入交易到cache 中
func (cache *txCache) Push(tx *types.Transaction) error {
if !cache.AccountTxIndex.CanPush(tx) {
return types.ErrManyTx
}
item := &Item{Value: tx, Priority: tx.Fee, EnterTime: types.Now().Unix()}
err := cache.qcache.Push(item)
if err != nil {
return err
}
cache.AccountTxIndex.Push(tx)
cache.LastTxCache.Push(tx)
return nil
}
func (cache *txCache) removeExpiredTx(height, blocktime int64) {
var txs []string
cache.qcache.Walk(0, func(tx *Item) bool {
if isExpired(tx, height, blocktime) {
txs = append(txs, string(tx.Value.Hash()))
}
return true
})
cache.RemoveTxs(txs)
}
//判断交易是否过期
func isExpired(item *Item, height, blockTime int64) bool {
if types.Now().Unix()-item.EnterTime >= mempoolExpiredInterval {
return true
}
if item.Value.IsExpire(height, blockTime) {
return true
}
return false
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool package mempool
import ( import (
"errors" "errors"
"time"
"github.com/33cn/chain33/common/address" "github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/queue" "github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
) )
// Mempool.CheckTxList初步检查并筛选交易消息 // CheckExpireValid 检查交易过期有效性,过期返回false,未过期返回true
func (mem *Mempool) CheckExpireValid(msg queue.Message) (bool, error) {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
if mem.header == nil {
return false, types.ErrHeaderNotSet
}
tx := msg.GetData().(types.TxGroup).Tx()
ok := mem.checkExpireValid(tx)
if !ok {
return ok, types.ErrTxExpire
}
return ok, nil
}
// checkTxListRemote 发送消息给执行模块检查交易
func (mem *Mempool) checkTxListRemote(txlist *types.ExecTxList) (*types.ReceiptCheckTxList, error) {
if mem.client == nil {
panic("client not bind message queue.")
}
msg := mem.client.NewMessage("execs", types.EventCheckTx, txlist)
err := mem.client.Send(msg, true)
if err != nil {
mlog.Error("execs closed", "err", err.Error())
return nil, err
}
msg, err = mem.client.Wait(msg)
if err != nil {
return nil, err
}
return msg.GetData().(*types.ReceiptCheckTxList), nil
}
func (mem *Mempool) checkExpireValid(tx *types.Transaction) bool {
if tx.IsExpire(mem.header.GetHeight(), mem.header.GetBlockTime()) {
return false
}
if tx.Expire > 1000000000 && tx.Expire < types.Now().Unix()+int64(time.Minute/time.Second) {
return false
}
return true
}
// CheckTx 初步检查并筛选交易消息
func (mem *Mempool) checkTx(msg queue.Message) queue.Message { func (mem *Mempool) checkTx(msg queue.Message) queue.Message {
tx := msg.GetData().(types.TxGroup).Tx() tx := msg.GetData().(types.TxGroup).Tx()
// 检查接收地址是否合法 // 检查接收地址是否合法
...@@ -20,13 +61,7 @@ func (mem *Mempool) checkTx(msg queue.Message) queue.Message { ...@@ -20,13 +61,7 @@ func (mem *Mempool) checkTx(msg queue.Message) queue.Message {
msg.Data = types.ErrInvalidAddress msg.Data = types.ErrInvalidAddress
return msg return msg
} }
// 检查交易是否为重复交易 // 检查交易账户在mempool中是否存在过多交易
if mem.addedTxs.Contains(string(tx.Hash())) {
msg.Data = types.ErrDupTx
return msg
}
// 检查交易账户在Mempool中是否存在过多交易
from := tx.From() from := tx.From()
if mem.TxNumOfAccount(from) >= maxTxNumPerAccount { if mem.TxNumOfAccount(from) >= maxTxNumPerAccount {
msg.Data = types.ErrManyTx msg.Data = types.ErrManyTx
...@@ -42,7 +77,7 @@ func (mem *Mempool) checkTx(msg queue.Message) queue.Message { ...@@ -42,7 +77,7 @@ func (mem *Mempool) checkTx(msg queue.Message) queue.Message {
} }
// CheckTxs 初步检查并筛选交易消息 // CheckTxs 初步检查并筛选交易消息
func (mem *Mempool) CheckTxs(msg queue.Message) queue.Message { func (mem *Mempool) checkTxs(msg queue.Message) queue.Message {
// 判断消息是否含有nil交易 // 判断消息是否含有nil交易
if msg.GetData() == nil { if msg.GetData() == nil {
msg.Data = types.ErrEmptyTx msg.Data = types.ErrEmptyTx
...@@ -52,7 +87,7 @@ func (mem *Mempool) CheckTxs(msg queue.Message) queue.Message { ...@@ -52,7 +87,7 @@ func (mem *Mempool) CheckTxs(msg queue.Message) queue.Message {
txmsg := msg.GetData().(*types.Transaction) txmsg := msg.GetData().(*types.Transaction)
//普通的交易 //普通的交易
tx := types.NewTransactionCache(txmsg) tx := types.NewTransactionCache(txmsg)
err := tx.Check(header.GetHeight(), mem.minFee) err := tx.Check(header.GetHeight(), mem.cfg.MinTxFee)
if err != nil { if err != nil {
msg.Data = err msg.Data = err
return msg return msg
...@@ -79,12 +114,12 @@ func (mem *Mempool) CheckTxs(msg queue.Message) queue.Message { ...@@ -79,12 +114,12 @@ func (mem *Mempool) CheckTxs(msg queue.Message) queue.Message {
return msg return msg
} }
// Mempool.checkTxList检查账户余额是否足够,并加入到Mempool,成功则传入goodChan,若加入Mempool失败则传入badChan //checkTxList 检查账户余额是否足够,并加入到Mempool,成功则传入goodChan,若加入Mempool失败则传入badChan
func (mem *Mempool) checkTxRemote(msg queue.Message) queue.Message { func (mem *Mempool) checkTxRemote(msg queue.Message) queue.Message {
tx := msg.GetData().(types.TxGroup) tx := msg.GetData().(types.TxGroup)
txlist := &types.ExecTxList{} txlist := &types.ExecTxList{}
txlist.Txs = append(txlist.Txs, tx.Tx()) txlist.Txs = append(txlist.Txs, tx.Tx())
//检查是否重复
lastheader := mem.GetHeader() lastheader := mem.GetHeader()
txlist.BlockTime = lastheader.BlockTime txlist.BlockTime = lastheader.BlockTime
txlist.Height = lastheader.Height txlist.Height = lastheader.Height
...@@ -92,6 +127,16 @@ func (mem *Mempool) checkTxRemote(msg queue.Message) queue.Message { ...@@ -92,6 +127,16 @@ func (mem *Mempool) checkTxRemote(msg queue.Message) queue.Message {
// 增加这个属性,在执行器中会使用到 // 增加这个属性,在执行器中会使用到
txlist.Difficulty = uint64(lastheader.Difficulty) txlist.Difficulty = uint64(lastheader.Difficulty)
txlist.IsMempool = true txlist.IsMempool = true
//add check dup tx
newtxs, err := util.CheckDupTx(mem.client, txlist.Txs, txlist.Height)
if err != nil {
msg.Data = err
return msg
}
if len(newtxs) != len(txlist.Txs) {
msg.Data = types.ErrDupTx
return msg
}
result, err := mem.checkTxListRemote(txlist) result, err := mem.checkTxListRemote(txlist)
if err != nil { if err != nil {
msg.Data = err msg.Data = err
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
import (
"runtime"
//log "github.com/33cn/chain33/common/log/log15"
)
var (
poolCacheSize int64 = 10240 // mempool容量
mempoolExpiredInterval int64 = 600 // mempool内交易过期时间,10分钟
maxTxNumPerAccount int64 = 100 // TODO 每个账户在mempool中最大交易数量,10
maxTxLast int64 = 10
processNum int
)
// TODO
func init() {
processNum = runtime.NumCPU()
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
//mempool 模块的功能:实现交易暂存的功能。主要是解决共识模块可能比rpc模块速度慢的问题。
//模块的接口的设计:
package mempool
import (
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
)
func (mem *Mempool) reply() {
defer mlog.Info("piple line quit")
defer mem.wg.Done()
for m := range mem.out {
if m.Err() != nil {
m.Reply(mem.client.NewMessage("rpc", types.EventReply,
&types.Reply{IsOk: false, Msg: []byte(m.Err().Error())}))
} else {
mem.sendTxToP2P(m.GetData().(types.TxGroup).Tx())
m.Reply(mem.client.NewMessage("rpc", types.EventReply, &types.Reply{IsOk: true, Msg: nil}))
}
}
}
func (mem *Mempool) pipeLine() <-chan queue.Message {
//check sign
step1 := func(data queue.Message) queue.Message {
if data.Err() != nil {
return data
}
return mem.checkSign(data)
}
chs := make([]<-chan queue.Message, processNum)
for i := 0; i < processNum; i++ {
chs[i] = step(mem.done, mem.in, step1)
}
out1 := merge(mem.done, chs)
//checktx remote
step2 := func(data queue.Message) queue.Message {
if data.Err() != nil {
return data
}
return mem.checkTxRemote(data)
}
chs2 := make([]<-chan queue.Message, processNum)
for i := 0; i < processNum; i++ {
chs2[i] = step(mem.done, out1, step2)
}
return merge(mem.done, chs2)
}
// 处理其他模块的消息
func (mem *Mempool) eventProcess() {
defer mem.wg.Done()
defer close(mem.in)
//event process
mem.out = mem.pipeLine()
mlog.Info("mempool piple line start")
mem.wg.Add(1)
go mem.reply()
for msg := range mem.client.Recv() {
mlog.Debug("mempool recv", "msgid", msg.ID, "msg", types.GetEventName(int(msg.Ty)))
beg := types.Now()
switch msg.Ty {
case types.EventTx:
mem.eventTx(msg)
case types.EventGetMempool:
// 消息类型EventGetMempool:获取mempool内所有交易
mem.eventGetMempool(msg)
case types.EventTxList:
// 消息类型EventTxList:获取mempool中一定数量交易
mem.eventTxList(msg)
case types.EventDelTxList:
// 消息类型EventDelTxList:获取mempool中一定数量交易,并把这些交易从mempool中删除
mem.eventDelTxList(msg)
case types.EventAddBlock:
// 消息类型EventAddBlock:将添加到区块内的交易从mempool中删除
mem.eventAddBlock(msg)
case types.EventGetMempoolSize:
// 消息类型EventGetMempoolSize:获取mempool大小
mem.eventGetMempoolSize(msg)
case types.EventGetLastMempool:
// 消息类型EventGetLastMempool:获取最新十条加入到mempool的交易
mem.eventGetLastMempool(msg)
case types.EventDelBlock:
// 回滚区块,把该区块内交易重新加回mempool
mem.eventDelBlock(msg)
case types.EventGetAddrTxs:
// 获取mempool中对应账户(组)所有交易
mem.eventGetAddrTxs(msg)
default:
}
mlog.Debug("mempool", "cost", types.Since(beg), "msg", types.GetEventName(int(msg.Ty)))
}
}
//EventTx 初步筛选后存入mempool
func (mem *Mempool) eventTx(msg queue.Message) {
if !mem.getSync() {
msg.Reply(mem.client.NewMessage("", types.EventReply, &types.Reply{Msg: []byte(types.ErrNotSync.Error())}))
mlog.Error("wrong tx", "err", types.ErrNotSync.Error())
} else {
checkedMsg := mem.checkTxs(msg)
select {
case mem.in <- checkedMsg:
case <-mem.done:
}
}
}
// EventGetMempool 获取Mempool内所有交易
func (mem *Mempool) eventGetMempool(msg queue.Message) {
msg.Reply(mem.client.NewMessage("rpc", types.EventReplyTxList,
&types.ReplyTxList{Txs: mem.filterTxList(0, nil)}))
}
// EventDelTxList 获取Mempool中一定数量交易,并把这些交易从Mempool中删除
func (mem *Mempool) eventDelTxList(msg queue.Message) {
hashList := msg.GetData().(*types.TxHashList)
if len(hashList.GetHashes()) == 0 {
msg.ReplyErr("EventDelTxList", types.ErrSize)
} else {
err := mem.RemoveTxs(hashList)
msg.ReplyErr("EventDelTxList", err)
}
}
// EventTxList 获取mempool中一定数量交易
func (mem *Mempool) eventTxList(msg queue.Message) {
hashList := msg.GetData().(*types.TxHashList)
if hashList.Count <= 0 {
msg.Reply(mem.client.NewMessage("", types.EventReplyTxList, types.ErrSize))
mlog.Error("not an valid size", "msg", msg)
} else {
txList := mem.getTxList(hashList)
msg.Reply(mem.client.NewMessage("", types.EventReplyTxList, &types.ReplyTxList{Txs: txList}))
}
}
// EventAddBlock 将添加到区块内的交易从mempool中删除
func (mem *Mempool) eventAddBlock(msg queue.Message) {
block := msg.GetData().(*types.BlockDetail).Block
if block.Height > mem.Height() || (block.Height == 0 && mem.Height() == 0) {
header := &types.Header{}
header.BlockTime = block.BlockTime
header.Height = block.Height
header.StateHash = block.StateHash
mem.setHeader(header)
}
mem.RemoveTxsOfBlock(block)
}
// EventGetMempoolSize 获取mempool大小
func (mem *Mempool) eventGetMempoolSize(msg queue.Message) {
memSize := int64(mem.Size())
msg.Reply(mem.client.NewMessage("rpc", types.EventMempoolSize,
&types.MempoolSize{Size: memSize}))
}
// EventGetLastMempool 获取最新十条加入到mempool的交易
func (mem *Mempool) eventGetLastMempool(msg queue.Message) {
txList := mem.GetLatestTx()
msg.Reply(mem.client.NewMessage("rpc", types.EventReplyTxList,
&types.ReplyTxList{Txs: txList}))
}
// EventDelBlock 回滚区块,把该区块内交易重新加回mempool
func (mem *Mempool) eventDelBlock(msg queue.Message) {
block := msg.GetData().(*types.BlockDetail).Block
if block.Height != mem.GetHeader().GetHeight() {
return
}
lastHeader, err := mem.GetLastHeader()
if err != nil {
mlog.Error(err.Error())
return
}
h := lastHeader.(queue.Message).Data.(*types.Header)
mem.setHeader(h)
mem.delBlock(block)
}
// eventGetAddrTxs 获取mempool中对应账户(组)所有交易
func (mem *Mempool) eventGetAddrTxs(msg queue.Message) {
addrs := msg.GetData().(*types.ReqAddrs)
txlist := mem.GetAccTxs(addrs)
msg.Reply(mem.client.NewMessage("", types.EventReplyAddrTxs, txlist))
}
func (mem *Mempool) checkSign(data queue.Message) queue.Message {
tx, ok := data.GetData().(types.TxGroup)
if ok && tx.CheckSign() {
return data
}
mlog.Error("wrong tx", "err", types.ErrSign)
data.Data = types.ErrSign
return data
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package init
import (
_ "github.com/33cn/chain33/system/mempool/timeline" //最简单的排队模式,按照时间
)
package mempool
import (
"github.com/33cn/chain33/common/listmap"
"github.com/33cn/chain33/types"
)
//LastTxCache 最后放入cache的交易
type LastTxCache struct {
max int
l *listmap.ListMap
}
//NewLastTxCache 创建最后交易的cache
func NewLastTxCache(size int) *LastTxCache {
return &LastTxCache{
max: size,
l: listmap.New(),
}
}
//GetLatestTx 返回最新十条加入到txCache的交易
func (cache *LastTxCache) GetLatestTx() (txs []*types.Transaction) {
cache.l.Walk(func(v interface{}) bool {
txs = append(txs, v.(*types.Transaction))
return true
})
return txs
}
//Remove remove tx of last cache
func (cache *LastTxCache) Remove(tx *types.Transaction) {
cache.l.Remove(string(tx.Hash()))
}
//Push tx into LastTxCache
func (cache *LastTxCache) Push(tx *types.Transaction) {
if cache.l.Size() >= cache.max {
v := cache.l.GetTop()
if v != nil {
cache.Remove(v.(*types.Transaction))
}
}
cache.l.Push(string(tx.Hash()), tx)
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
import (
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
)
//Create 创建一个mempool模块
type Create func(cfg *types.Mempool, sub []byte) queue.Module
var regMempool = make(map[string]Create)
//Reg 注册一个create
func Reg(name string, create Create) {
if create == nil {
panic("Mempool: Register driver is nil")
}
if _, dup := regMempool[name]; dup {
panic("Mempool: Register called twice for driver " + name)
}
regMempool[name] = create
}
//Load 加载一个create
func Load(name string) (create Create, err error) {
if driver, ok := regMempool[name]; ok {
return driver, nil
}
return nil, types.ErrNotFound
}
...@@ -14,12 +14,17 @@ import ( ...@@ -14,12 +14,17 @@ import (
"github.com/33cn/chain33/common/address" "github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/common/crypto" "github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/limits" "github.com/33cn/chain33/common/limits"
"github.com/33cn/chain33/common/log"
"github.com/33cn/chain33/executor" "github.com/33cn/chain33/executor"
"github.com/33cn/chain33/queue" "github.com/33cn/chain33/queue"
"github.com/33cn/chain33/store" "github.com/33cn/chain33/store"
_ "github.com/33cn/chain33/system" _ "github.com/33cn/chain33/system/consensus/init"
_ "github.com/33cn/chain33/system/crypto/init"
cty "github.com/33cn/chain33/system/dapp/coins/types" cty "github.com/33cn/chain33/system/dapp/coins/types"
_ "github.com/33cn/chain33/system/dapp/init"
_ "github.com/33cn/chain33/system/store/init"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
) )
//----------------------------- data for testing --------------------------------- //----------------------------- data for testing ---------------------------------
...@@ -63,10 +68,6 @@ var blk = &types.Block{ ...@@ -63,10 +68,6 @@ var blk = &types.Block{
Txs: []*types.Transaction{tx3, tx5}, Txs: []*types.Transaction{tx3, tx5},
} }
func mergeList(done <-chan struct{}, cs ...<-chan queue.Message) <-chan queue.Message {
return merge(done, cs)
}
func init() { func init() {
err := limits.SetLimits() err := limits.SetLimits()
if err != nil { if err != nil {
...@@ -74,14 +75,7 @@ func init() { ...@@ -74,14 +75,7 @@ func init() {
} }
random = rand.New(rand.NewSource(types.Now().UnixNano())) random = rand.New(rand.NewSource(types.Now().UnixNano()))
queue.DisableLog() queue.DisableLog()
// DisableLog() // 不输出任何log log.SetLogLevel("err") // 输出WARN(含)以下log
// SetLogLevel("debug") // 输出DBUG(含)以下log
// SetLogLevel("info") // 输出INFO(含)以下log
SetLogLevel("info") // 输出WARN(含)以下log
// SetLogLevel("eror") // 输出EROR(含)以下log
// SetLogLevel("crit") // 输出CRIT(含)以下log
// SetLogLevel("") // 输出所有log
// maxTxNumPerAccount = 10000
mainPriv = getprivkey("CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944") mainPriv = getprivkey("CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944")
tx1.Sign(types.SECP256K1, privKey) tx1.Sign(types.SECP256K1, privKey)
tx2.Sign(types.SECP256K1, privKey) tx2.Sign(types.SECP256K1, privKey)
...@@ -98,7 +92,6 @@ func init() { ...@@ -98,7 +92,6 @@ func init() {
tx13.Sign(types.SECP256K1, privKey) tx13.Sign(types.SECP256K1, privKey)
tx14.Sign(types.SECP256K1, privKey) tx14.Sign(types.SECP256K1, privKey)
tx15.Sign(types.SECP256K1, privKey) tx15.Sign(types.SECP256K1, privKey)
} }
func getprivkey(key string) crypto.PrivKey { func getprivkey(key string) crypto.PrivKey {
...@@ -119,7 +112,8 @@ func getprivkey(key string) crypto.PrivKey { ...@@ -119,7 +112,8 @@ func getprivkey(key string) crypto.PrivKey {
func initEnv3() (queue.Queue, queue.Module, queue.Module, *Mempool) { func initEnv3() (queue.Queue, queue.Module, queue.Module, *Mempool) {
var q = queue.New("channel") var q = queue.New("channel")
cfg, sub := types.InitCfg("../cmd/chain33/chain33.test.toml") cfg, sub := types.InitCfg("../../cmd/chain33/chain33.test.toml")
types.Init(cfg.Title, cfg)
cfg.Consensus.Minerstart = false cfg.Consensus.Minerstart = false
chain := blockchain.New(cfg.BlockChain) chain := blockchain.New(cfg.BlockChain)
chain.SetQueueClient(q.Client()) chain.SetQueueClient(q.Client())
...@@ -130,43 +124,45 @@ func initEnv3() (queue.Queue, queue.Module, queue.Module, *Mempool) { ...@@ -130,43 +124,45 @@ func initEnv3() (queue.Queue, queue.Module, queue.Module, *Mempool) {
types.SetMinFee(0) types.SetMinFee(0)
s := store.New(cfg.Store, sub.Store) s := store.New(cfg.Store, sub.Store)
s.SetQueueClient(q.Client()) s.SetQueueClient(q.Client())
mem := New(cfg.MemPool) mem := NewMempool(cfg.Mempool)
mem.SetQueueCache(NewSimpleQueue(int(cfg.Mempool.PoolCacheSize)))
mem.SetQueueClient(q.Client()) mem.SetQueueClient(q.Client())
mem.setSync(true) mem.Wait()
mem.WaitPollLastHeader()
return q, chain, s, mem return q, chain, s, mem
} }
func initEnv2(size int) (queue.Queue, *Mempool) { func initEnv2(size int) (queue.Queue, *Mempool) {
var q = queue.New("channel") var q = queue.New("channel")
cfg, _ := types.InitCfg("../cmd/chain33/chain33.test.toml") cfg, _ := types.InitCfg("../../cmd/chain33/chain33.test.toml")
types.Init(cfg.Title, cfg)
blockchainProcess(q) blockchainProcess(q)
execProcess(q) execProcess(q)
mem := New(cfg.MemPool) cfg.Mempool.PoolCacheSize = int64(size)
mem := NewMempool(cfg.Mempool)
mem.SetQueueCache(NewSimpleQueue(size))
mem.SetQueueClient(q.Client()) mem.SetQueueClient(q.Client())
mem.setSync(true) mem.setSync(true)
if size > 0 {
mem.Resize(size)
}
mem.SetMinFee(0) mem.SetMinFee(0)
mem.WaitPollLastHeader() mem.Wait()
return q, mem return q, mem
} }
func initEnv(size int) (queue.Queue, *Mempool) { func initEnv(size int) (queue.Queue, *Mempool) {
if size == 0 {
size = 100
}
var q = queue.New("channel") var q = queue.New("channel")
cfg, _ := types.InitCfg("../cmd/chain33/chain33.test.toml") cfg, _ := types.InitCfg("../../cmd/chain33/chain33.test.toml")
types.Init(cfg.Title, cfg)
blockchainProcess(q) blockchainProcess(q)
execProcess(q) execProcess(q)
mem := New(cfg.MemPool) cfg.Mempool.PoolCacheSize = int64(size)
mem := NewMempool(cfg.Mempool)
mem.SetQueueCache(NewSimpleQueue(size))
mem.SetQueueClient(q.Client()) mem.SetQueueClient(q.Client())
mem.setSync(true) mem.setSync(true)
if size > 0 {
mem.Resize(size)
}
mem.SetMinFee(types.GInt("MinFee")) mem.SetMinFee(types.GInt("MinFee"))
mem.WaitPollLastHeader() mem.Wait()
return q, mem return q, mem
} }
...@@ -211,21 +207,19 @@ func TestAddEmptyTx(t *testing.T) { ...@@ -211,21 +207,19 @@ func TestAddEmptyTx(t *testing.T) {
} }
func TestAddTx(t *testing.T) { func TestAddTx(t *testing.T) {
q, mem := initEnv(0) q, mem := initEnv(1)
defer q.Close() defer q.Close()
defer mem.Close() defer mem.Close()
msg := mem.client.NewMessage("mempool", types.EventTx, tx2) msg := mem.client.NewMessage("mempool", types.EventTx, tx2)
mem.client.Send(msg, true) mem.client.Send(msg, true)
mem.client.Wait(msg) mem.client.Wait(msg)
if mem.Size() != 1 { if mem.Size() != 1 {
t.Error("TestAddTx failed") t.Error("TestAddTx failed")
} }
} }
func TestAddDuplicatedTx(t *testing.T) { func TestAddDuplicatedTx(t *testing.T) {
q, mem := initEnv(0) q, mem := initEnv(100)
defer q.Close() defer q.Close()
defer mem.Close() defer mem.Close()
...@@ -330,47 +324,16 @@ func add10Tx(client queue.Client) error { ...@@ -330,47 +324,16 @@ func add10Tx(client queue.Client) error {
if err != nil { if err != nil {
return err return err
} }
txs := []*types.Transaction{tx5, tx6, tx7, tx8, tx9, tx10}
msg5 := client.NewMessage("mempool", types.EventTx, tx5) for _, tx := range txs {
msg6 := client.NewMessage("mempool", types.EventTx, tx6) msg := client.NewMessage("mempool", types.EventTx, tx)
msg7 := client.NewMessage("mempool", types.EventTx, tx7) client.Send(msg, true)
msg8 := client.NewMessage("mempool", types.EventTx, tx8) _, err = client.Wait(msg)
msg9 := client.NewMessage("mempool", types.EventTx, tx9) if err != nil {
msg10 := client.NewMessage("mempool", types.EventTx, tx10) return err
}
client.Send(msg5, true)
_, err = client.Wait(msg5)
if err != nil {
return err
}
client.Send(msg6, true)
_, err = client.Wait(msg6)
if err != nil {
return err
}
client.Send(msg7, true)
_, err = client.Wait(msg7)
if err != nil {
return err
}
client.Send(msg8, true)
_, err = client.Wait(msg8)
if err != nil {
return err
}
client.Send(msg9, true)
_, err = client.Wait(msg9)
if err != nil {
return err
} }
return nil
client.Send(msg10, true)
_, err = client.Wait(msg10)
return err
} }
func TestGetTxList(t *testing.T) { func TestGetTxList(t *testing.T) {
...@@ -471,8 +434,8 @@ func TestAddMoreTxThanPoolSize(t *testing.T) { ...@@ -471,8 +434,8 @@ func TestAddMoreTxThanPoolSize(t *testing.T) {
mem.client.Send(msg5, true) mem.client.Send(msg5, true)
mem.client.Wait(msg5) mem.client.Wait(msg5)
if mem.Size() != 4 || mem.cache.Exists(tx5.Hash()) { if mem.Size() != 4 || mem.cache.Exist(string(tx5.Hash())) {
t.Error("TestAddMoreTxThanPoolSize failed", mem.Size(), mem.cache.Exists(tx5.Hash())) t.Error("TestAddMoreTxThanPoolSize failed", mem.Size(), mem.cache.Exist(string(tx5.Hash())))
} }
} }
...@@ -513,30 +476,18 @@ func TestAddBlockedTx(t *testing.T) { ...@@ -513,30 +476,18 @@ func TestAddBlockedTx(t *testing.T) {
msg1 := mem.client.NewMessage("mempool", types.EventTx, tx3) msg1 := mem.client.NewMessage("mempool", types.EventTx, tx3)
err := mem.client.Send(msg1, true) err := mem.client.Send(msg1, true)
if err != nil { assert.Nil(t, err)
t.Error(err)
return
}
_, err = mem.client.Wait(msg1) _, err = mem.client.Wait(msg1)
if err != nil { assert.Nil(t, err)
t.Error(err)
return
}
blkDetail := &types.BlockDetail{Block: blk} blkDetail := &types.BlockDetail{Block: blk}
msg2 := mem.client.NewMessage("mempool", types.EventAddBlock, blkDetail) msg2 := mem.client.NewMessage("mempool", types.EventAddBlock, blkDetail)
mem.client.Send(msg2, false) mem.client.Send(msg2, false)
msg3 := mem.client.NewMessage("mempool", types.EventTx, tx3) msg3 := mem.client.NewMessage("mempool", types.EventTx, tx3)
err = mem.client.Send(msg3, true) err = mem.client.Send(msg3, true)
if err != nil { assert.Nil(t, err)
t.Error(err)
return
}
resp, err := mem.client.Wait(msg3) resp, err := mem.client.Wait(msg3)
if err != nil { assert.Nil(t, err)
t.Error(err)
return
}
if string(resp.GetData().(*types.Reply).GetMsg()) != types.ErrDupTx.Error() { if string(resp.GetData().(*types.Reply).GetMsg()) != types.ErrDupTx.Error() {
t.Error("TestAddBlockedTx failed") t.Error("TestAddBlockedTx failed")
} }
...@@ -553,7 +504,7 @@ func TestDuplicateMempool(t *testing.T) { ...@@ -553,7 +504,7 @@ func TestDuplicateMempool(t *testing.T) {
t.Error("add tx error", err.Error()) t.Error("add tx error", err.Error())
return return
} }
assert.Equal(t, mem.Size(), 10)
msg := mem.client.NewMessage("mempool", types.EventGetMempool, nil) msg := mem.client.NewMessage("mempool", types.EventGetMempool, nil)
mem.client.Send(msg, true) mem.client.Send(msg, true)
...@@ -674,6 +625,23 @@ func TestCheckExpire2(t *testing.T) { ...@@ -674,6 +625,23 @@ func TestCheckExpire2(t *testing.T) {
} }
} }
func TestCheckExpire3(t *testing.T) {
q, mem := initEnv(0)
defer q.Close()
defer mem.Close()
// add tx
err := add4Tx(mem.client)
if err != nil {
t.Error("add tx error", err.Error())
return
}
mem.setHeader(&types.Header{Height: 50, BlockTime: 1e9 + 1})
assert.Equal(t, mem.Size(), 4)
mem.removeExpired()
assert.Equal(t, mem.Size(), 3)
}
func TestWrongToAddr(t *testing.T) { func TestWrongToAddr(t *testing.T) {
q, mem := initEnv(0) q, mem := initEnv(0)
defer q.Close() defer q.Close()
...@@ -773,9 +741,7 @@ func TestAddTxGroup(t *testing.T) { ...@@ -773,9 +741,7 @@ func TestAddTxGroup(t *testing.T) {
q, mem := initEnv(0) q, mem := initEnv(0)
defer q.Close() defer q.Close()
defer mem.Close() defer mem.Close()
//copytx //copytx
ctx2 := *tx2 ctx2 := *tx2
ctx3 := *tx3 ctx3 := *tx3
ctx4 := *tx4 ctx4 := *tx4
...@@ -790,10 +756,9 @@ func TestAddTxGroup(t *testing.T) { ...@@ -790,10 +756,9 @@ func TestAddTxGroup(t *testing.T) {
} }
func BenchmarkMempool(b *testing.B) { func BenchmarkMempool(b *testing.B) {
q, mem := initEnv(0) q, mem := initEnv(10240)
defer q.Close() defer q.Close()
defer mem.Close() defer mem.Close()
maxTxNumPerAccount = 100000 maxTxNumPerAccount = 100000
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
to, _ := genaddress() to, _ := genaddress()
...@@ -813,6 +778,7 @@ func BenchmarkMempool(b *testing.B) { ...@@ -813,6 +778,7 @@ func BenchmarkMempool(b *testing.B) {
} }
func blockchainProcess(q queue.Queue) { func blockchainProcess(q queue.Queue) {
dup := make(map[string]bool)
go func() { go func() {
client := q.Client() client := q.Client()
client.Sub("blockchain") client.Sub("blockchain")
...@@ -821,6 +787,17 @@ func blockchainProcess(q queue.Queue) { ...@@ -821,6 +787,17 @@ func blockchainProcess(q queue.Queue) {
msg.Reply(client.NewMessage("", types.EventHeader, &types.Header{Height: 1, BlockTime: 1})) msg.Reply(client.NewMessage("", types.EventHeader, &types.Header{Height: 1, BlockTime: 1}))
} else if msg.Ty == types.EventIsSync { } else if msg.Ty == types.EventIsSync {
msg.Reply(client.NewMessage("", types.EventReplyIsSync, &types.IsCaughtUp{Iscaughtup: true})) msg.Reply(client.NewMessage("", types.EventReplyIsSync, &types.IsCaughtUp{Iscaughtup: true}))
} else if msg.Ty == types.EventTxHashList {
txs := msg.Data.(*types.TxHashList)
var hashlist [][]byte
for _, hash := range txs.Hashes {
if dup[string(hash)] {
hashlist = append(hashlist, hash)
continue
}
dup[string(hash)] = true
}
msg.Reply(client.NewMessage("consensus", types.EventTxHashListReply, &types.TxHashList{Hashes: hashlist}))
} }
} }
}() }()
......
...@@ -101,3 +101,7 @@ func BenchmarkStepMerge(b *testing.B) { ...@@ -101,3 +101,7 @@ func BenchmarkStepMerge(b *testing.B) {
} }
close(done) close(done)
} }
func mergeList(done <-chan struct{}, cs ...<-chan queue.Message) <-chan queue.Message {
return merge(done, cs)
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
import (
"github.com/33cn/chain33/common/listmap"
"github.com/33cn/chain33/types"
)
//SimpleQueue 简单队列模式(默认提供一个队列,便于测试)
type SimpleQueue struct {
txList *listmap.ListMap
maxsize int
}
//NewSimpleQueue 创建队列
func NewSimpleQueue(cacheSize int) *SimpleQueue {
return &SimpleQueue{
txList: listmap.New(),
maxsize: cacheSize,
}
}
//Exist 是否存在
func (cache *SimpleQueue) Exist(hash string) bool {
return cache.txList.Exist(hash)
}
//GetItem 获取数据通过 key
func (cache *SimpleQueue) GetItem(hash string) (*Item, error) {
item, err := cache.txList.GetItem(hash)
if err != nil {
return nil, err
}
return item.(*Item), nil
}
// Push 把给定tx添加到SimpleQueue;如果tx已经存在SimpleQueue中或Mempool已满则返回对应error
func (cache *SimpleQueue) Push(tx *Item) error {
hash := tx.Value.Hash()
if cache.Exist(string(hash)) {
return types.ErrTxExist
}
if cache.txList.Size() >= cache.maxsize {
return types.ErrMemFull
}
cache.txList.Push(string(hash), tx)
return nil
}
// Remove 删除数据
func (cache *SimpleQueue) Remove(hash string) error {
cache.txList.Remove(hash)
return nil
}
// Size 数据总数
func (cache *SimpleQueue) Size() int {
return cache.txList.Size()
}
// Walk 遍历整个队列
func (cache *SimpleQueue) Walk(count int, cb func(value *Item) bool) {
i := 0
cache.txList.Walk(func(item interface{}) bool {
if !cb(item.(*Item)) {
return false
}
i++
return i != count
})
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mempool
import (
"testing"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
func TestCache(t *testing.T) {
cache := NewSimpleQueue(1)
tx := &types.Transaction{Payload: []byte("123")}
hash := string(tx.Hash())
assert.Equal(t, false, cache.Exist(hash))
item1 := &Item{Value: tx, Priority: tx.Fee, EnterTime: types.Now().Unix()}
err := cache.Push(item1)
assert.Nil(t, err)
assert.Equal(t, true, cache.Exist(hash))
it, err := cache.GetItem(hash)
assert.Nil(t, err)
assert.Equal(t, item1, it)
_, err = cache.GetItem(hash + ":")
assert.Equal(t, types.ErrNotFound, err)
err = cache.Push(item1)
assert.Equal(t, types.ErrTxExist, err)
tx2 := &types.Transaction{Payload: []byte("1234")}
item2 := &Item{Value: tx2, Priority: tx.Fee, EnterTime: types.Now().Unix()}
err = cache.Push(item2)
assert.Equal(t, types.ErrMemFull, err)
cache.Remove(hash)
assert.Equal(t, 0, cache.Size())
//push to item
cache = NewSimpleQueue(2)
cache.Push(item1)
cache.Push(item2)
assert.Equal(t, 2, cache.Size())
var data [2]*Item
i := 0
cache.Walk(1, func(value *Item) bool {
data[i] = value
i++
return true
})
assert.Equal(t, 1, i)
assert.Equal(t, data[0], item1)
i = 0
cache.Walk(2, func(value *Item) bool {
data[i] = value
i++
return true
})
assert.Equal(t, 2, i)
assert.Equal(t, data[0], item1)
assert.Equal(t, data[1], item2)
i = 0
cache.Walk(2, func(value *Item) bool {
data[i] = value
i++
return false
})
assert.Equal(t, 1, i)
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found In the LICENSE file.
package timeline
import (
"github.com/33cn/chain33/queue"
drivers "github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
func init() {
drivers.Reg("timeline", New)
}
type subConfig struct {
PoolCacheSize int64 `json:"poolCacheSize"`
}
//New 创建timeline cache 结构的 mempool
func New(cfg *types.Mempool, sub []byte) queue.Module {
c := drivers.NewMempool(cfg)
var subcfg subConfig
types.MustDecode(sub, &subcfg)
if subcfg.PoolCacheSize == 0 {
subcfg.PoolCacheSize = cfg.PoolCacheSize
}
c.SetQueueCache(drivers.NewSimpleQueue(int(subcfg.PoolCacheSize)))
return c
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package timeline
import (
"encoding/json"
"testing"
"github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
func TestNewMempool(t *testing.T) {
sub, _ := json.Marshal(&subConfig{PoolCacheSize: 2})
module := New(&types.Mempool{}, sub)
mem := module.(*mempool.Mempool)
mem.Close()
}
...@@ -84,6 +84,9 @@ func (store *BaseStore) SetQueueClient(c queue.Client) { ...@@ -84,6 +84,9 @@ func (store *BaseStore) SetQueueClient(c queue.Client) {
}() }()
} }
//Wait wait for basestore ready
func (store *BaseStore) Wait() {}
func (store *BaseStore) processMessage(msg queue.Message) { func (store *BaseStore) processMessage(msg queue.Message) {
client := store.qclient client := store.qclient
if msg.Ty == types.EventStoreSet { if msg.Ty == types.EventStoreSet {
...@@ -216,14 +219,11 @@ func (t *StorelistQuery) IterateCallBack(key, value []byte) bool { ...@@ -216,14 +219,11 @@ func (t *StorelistQuery) IterateCallBack(key, value []byte) bool {
return false return false
} }
return false return false
} }
return false return false
} }
slog.Error("StoreListReply.IterateCallBack unsupported mode", "mode", t.Mode) slog.Error("StoreListReply.IterateCallBack unsupported mode", "mode", t.Mode)
return true return true
} }
func cloneByte(v []byte) []byte { func cloneByte(v []byte) []byte {
......
...@@ -11,7 +11,7 @@ type Config struct { ...@@ -11,7 +11,7 @@ type Config struct {
Log *Log `protobuf:"bytes,2,opt,name=log" json:"log,omitempty"` Log *Log `protobuf:"bytes,2,opt,name=log" json:"log,omitempty"`
Store *Store `protobuf:"bytes,3,opt,name=store" json:"store,omitempty"` Store *Store `protobuf:"bytes,3,opt,name=store" json:"store,omitempty"`
Consensus *Consensus `protobuf:"bytes,5,opt,name=consensus" json:"consensus,omitempty"` Consensus *Consensus `protobuf:"bytes,5,opt,name=consensus" json:"consensus,omitempty"`
MemPool *MemPool `protobuf:"bytes,6,opt,name=memPool" json:"memPool,omitempty"` Mempool *Mempool `protobuf:"bytes,6,opt,name=mempool" json:"memPool,omitempty"`
BlockChain *BlockChain `protobuf:"bytes,7,opt,name=blockChain" json:"blockChain,omitempty"` BlockChain *BlockChain `protobuf:"bytes,7,opt,name=blockChain" json:"blockChain,omitempty"`
Wallet *Wallet `protobuf:"bytes,8,opt,name=wallet" json:"wallet,omitempty"` Wallet *Wallet `protobuf:"bytes,8,opt,name=wallet" json:"wallet,omitempty"`
P2P *P2P `protobuf:"bytes,9,opt,name=p2p" json:"p2p,omitempty"` P2P *P2P `protobuf:"bytes,9,opt,name=p2p" json:"p2p,omitempty"`
...@@ -52,12 +52,14 @@ type Log struct { ...@@ -52,12 +52,14 @@ type Log struct {
CallerFunction bool `protobuf:"varint,10,opt,name=callerFunction" json:"callerFunction,omitempty"` CallerFunction bool `protobuf:"varint,10,opt,name=callerFunction" json:"callerFunction,omitempty"`
} }
// MemPool 配置 // Mempool 配置
type MemPool struct { type Mempool struct {
PoolCacheSize int64 `protobuf:"varint,1,opt,name=poolCacheSize" json:"poolCacheSize,omitempty"` Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
MinTxFee int64 `protobuf:"varint,2,opt,name=minTxFee" json:"minTxFee,omitempty"` PoolCacheSize int64 `protobuf:"varint,1,opt,name=poolCacheSize" json:"poolCacheSize,omitempty"`
ForceAccept bool `protobuf:"varint,3,opt,name=forceAccept" json:"forceAccept,omitempty"` MinTxFee int64 `protobuf:"varint,2,opt,name=minTxFee" json:"minTxFee,omitempty"`
MaxTxNumPerAccount int64 `protobuf:"varint,4,opt,name=maxTxNumPerAccount" json:"maxTxNumPerAccount,omitempty"` ForceAccept bool `protobuf:"varint,3,opt,name=forceAccept" json:"forceAccept,omitempty"`
MaxTxNumPerAccount int64 `protobuf:"varint,4,opt,name=maxTxNumPerAccount" json:"maxTxNumPerAccount,omitempty"`
MaxTxLast int64 `protobuf:"varint,4,opt,name=maxTxLast" json:"maxTxLast,omitempty"`
} }
// Consensus 配置 // Consensus 配置
......
...@@ -243,7 +243,7 @@ func Init(t string, cfg *Config) { ...@@ -243,7 +243,7 @@ func Init(t string, cfg *Config) {
} else { } else {
setTestNet(cfg.TestNet) setTestNet(cfg.TestNet)
} }
if cfg.Exec.MinExecFee > cfg.MemPool.MinTxFee || cfg.MemPool.MinTxFee > cfg.Wallet.MinFee { if cfg.Exec.MinExecFee > cfg.Mempool.MinTxFee || cfg.Mempool.MinTxFee > cfg.Wallet.MinFee {
panic("config must meet: wallet.minFee >= mempool.minTxFee >= exec.minExecFee") panic("config must meet: wallet.minFee >= mempool.minTxFee >= exec.minExecFee")
} }
setMinFee(cfg.Exec.MinExecFee) setMinFee(cfg.Exec.MinExecFee)
...@@ -524,6 +524,7 @@ type subModule struct { ...@@ -524,6 +524,7 @@ type subModule struct {
Exec map[string]interface{} Exec map[string]interface{}
Consensus map[string]interface{} Consensus map[string]interface{}
Wallet map[string]interface{} Wallet map[string]interface{}
Mempool map[string]interface{}
} }
func readFile(path string) string { func readFile(path string) string {
...@@ -548,6 +549,7 @@ func parseSubModule(cfg *subModule) (*ConfigSubModule, error) { ...@@ -548,6 +549,7 @@ func parseSubModule(cfg *subModule) (*ConfigSubModule, error) {
subcfg.Exec = parseItem(cfg.Exec) subcfg.Exec = parseItem(cfg.Exec)
subcfg.Consensus = parseItem(cfg.Consensus) subcfg.Consensus = parseItem(cfg.Consensus)
subcfg.Wallet = parseItem(cfg.Wallet) subcfg.Wallet = parseItem(cfg.Wallet)
subcfg.Mempool = parseItem(cfg.Mempool)
return &subcfg, nil return &subcfg, nil
} }
......
...@@ -91,7 +91,7 @@ func (f *Forks) GetFork(title, key string) int64 { ...@@ -91,7 +91,7 @@ func (f *Forks) GetFork(title, key string) int64 {
if title == "local" { if title == "local" {
panic("title not exisit -> " + title) panic("title not exisit -> " + title)
} else { } else {
tlog.Error("getfork title not exisit -> " + title) tlog.Error("getfork title not exisit -> ", "title", title, "key", key)
} }
return MaxHeight return MaxHeight
} }
......
...@@ -10,4 +10,5 @@ type ConfigSubModule struct { ...@@ -10,4 +10,5 @@ type ConfigSubModule struct {
Exec map[string][]byte Exec map[string][]byte
Consensus map[string][]byte Consensus map[string][]byte
Wallet map[string][]byte Wallet map[string][]byte
Mempool map[string][]byte
} }
...@@ -435,6 +435,9 @@ func MustPBToJSON(req Message) []byte { ...@@ -435,6 +435,9 @@ func MustPBToJSON(req Message) []byte {
// MustDecode 数据是否已经编码 // MustDecode 数据是否已经编码
func MustDecode(data []byte, v interface{}) { func MustDecode(data []byte, v interface{}) {
if data == nil {
return
}
err := json.Unmarshal(data, v) err := json.Unmarshal(data, v)
if err != nil { if err != nil {
panic(err) panic(err)
......
...@@ -130,7 +130,7 @@ func RunChain33(name string) { ...@@ -130,7 +130,7 @@ func RunChain33(name string) {
q := queue.New("channel") q := queue.New("channel")
log.Info("loading mempool module") log.Info("loading mempool module")
mem := mempool.New(cfg.MemPool) mem := mempool.New(cfg.Mempool, sub.Mempool)
mem.SetQueueClient(q.Client()) mem.SetQueueClient(q.Client())
log.Info("loading execs module") log.Info("loading execs module")
......
...@@ -26,7 +26,7 @@ var rootCmd = &cobra.Command{ ...@@ -26,7 +26,7 @@ var rootCmd = &cobra.Command{
var sendCmd = &cobra.Command{ var sendCmd = &cobra.Command{
Use: "send", Use: "send",
Short: "Send transaction in one move", Short: "Send transaction in one step",
Run: func(cmd *cobra.Command, args []string) {}, Run: func(cmd *cobra.Command, args []string) {},
} }
......
...@@ -105,6 +105,22 @@ func checkTxDupInner(txs []*types.TransactionCache) (ret []*types.TransactionCac ...@@ -105,6 +105,22 @@ func checkTxDupInner(txs []*types.TransactionCache) (ret []*types.TransactionCac
return ret return ret
} }
//CheckDupTx : check use txs []*types.Transaction and not []*types.TransactionCache
func CheckDupTx(client queue.Client, txs []*types.Transaction, height int64) (transactions []*types.Transaction, err error) {
txcache := make([]*types.TransactionCache, len(txs))
for i := 0; i < len(txcache); i++ {
txcache[i] = &types.TransactionCache{Transaction: txs[i]}
}
cache, err := CheckTxDup(client, txcache, height)
if err != nil {
return nil, err
}
for i := 0; i < len(cache); i++ {
transactions = append(transactions, cache[i].Transaction)
}
return transactions, nil
}
//CheckTxDup : check whether the tx is duplicated within the while chain //CheckTxDup : check whether the tx is duplicated within the while chain
func CheckTxDup(client queue.Client, txs []*types.TransactionCache, height int64) (transactions []*types.TransactionCache, err error) { func CheckTxDup(client queue.Client, txs []*types.TransactionCache, height int64) (transactions []*types.TransactionCache, err error) {
var checkHashList types.TxHashList var checkHashList types.TxHashList
......
...@@ -68,6 +68,7 @@ jrpcFuncWhitelist=["*"] ...@@ -68,6 +68,7 @@ jrpcFuncWhitelist=["*"]
grpcFuncWhitelist=["*"] grpcFuncWhitelist=["*"]
[mempool] [mempool]
name="timeline"
poolCacheSize=10240 poolCacheSize=10240
minTxFee=100000 minTxFee=100000
maxTxNumPerAccount=10000 maxTxNumPerAccount=10000
......
...@@ -57,7 +57,7 @@ type Chain33Mock struct { ...@@ -57,7 +57,7 @@ type Chain33Mock struct {
client queue.Client client queue.Client
api client.QueueProtocolAPI api client.QueueProtocolAPI
chain *blockchain.BlockChain chain *blockchain.BlockChain
mem *mempool.Mempool mem queue.Module
cs queue.Module cs queue.Module
exec *executor.Executor exec *executor.Executor
wallet queue.Module wallet queue.Module
...@@ -103,10 +103,10 @@ func newWithConfig(cfg *types.Config, sub *types.ConfigSubModule, mockapi client ...@@ -103,10 +103,10 @@ func newWithConfig(cfg *types.Config, sub *types.ConfigSubModule, mockapi client
mock.cs.SetQueueClient(q.Client()) mock.cs.SetQueueClient(q.Client())
lognode.Info("init consensus " + cfg.Consensus.Name) lognode.Info("init consensus " + cfg.Consensus.Name)
mock.mem = mempool.New(cfg.MemPool) mock.mem = mempool.New(cfg.Mempool, sub.Mempool)
mock.mem.SetQueueClient(q.Client()) mock.mem.SetQueueClient(q.Client())
mock.mem.Wait()
lognode.Info("init mempool") lognode.Info("init mempool")
mock.mem.WaitPollLastHeader()
if cfg.P2P.Enable { if cfg.P2P.Enable {
mock.network = p2p.New(cfg.P2P) mock.network = p2p.New(cfg.P2P)
mock.network.SetQueueClient(q.Client()) mock.network.SetQueueClient(q.Client())
...@@ -169,7 +169,7 @@ func (mock *Chain33Mock) GetBlockChain() *blockchain.BlockChain { ...@@ -169,7 +169,7 @@ func (mock *Chain33Mock) GetBlockChain() *blockchain.BlockChain {
func setFee(cfg *types.Config, fee int64) { func setFee(cfg *types.Config, fee int64) {
cfg.Exec.MinExecFee = fee cfg.Exec.MinExecFee = fee
cfg.MemPool.MinTxFee = fee cfg.Mempool.MinTxFee = fee
cfg.Wallet.MinFee = fee cfg.Wallet.MinFee = fee
if fee == 0 { if fee == 0 {
cfg.Exec.IsFree = true cfg.Exec.IsFree = true
...@@ -394,6 +394,9 @@ func (m *mockP2P) SetQueueClient(client queue.Client) { ...@@ -394,6 +394,9 @@ func (m *mockP2P) SetQueueClient(client queue.Client) {
}() }()
} }
//Wait for ready
func (m *mockP2P) Wait() {}
//Close : //Close :
func (m *mockP2P) Close() { func (m *mockP2P) Close() {
} }
...@@ -119,6 +119,9 @@ func New(cfg *types.Wallet, sub map[string][]byte) *Wallet { ...@@ -119,6 +119,9 @@ func New(cfg *types.Wallet, sub map[string][]byte) *Wallet {
return wallet return wallet
} }
//Wait for wallet ready
func (wallet *Wallet) Wait() {}
// RegisterMineStatusReporter 向钱包注册状态回报 // RegisterMineStatusReporter 向钱包注册状态回报
func (wallet *Wallet) RegisterMineStatusReporter(reporter wcom.MineStatusReport) error { func (wallet *Wallet) RegisterMineStatusReporter(reporter wcom.MineStatusReport) error {
if reporter == nil { if reporter == nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment