Commit 095909dc authored by 陈德海's avatar 陈德海

add mempool price queue, change trade queue to score queue, add some mempool test

parent 61843ab8
......@@ -72,7 +72,7 @@ poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
[mempool.sub.trade]
[mempool.sub.score]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
......@@ -80,6 +80,11 @@ timeParam=1 #时间占价格比例
priceConstant=1544 #手续费相对于时间的一个合适的常量,取当前unxi时间戳前四位数,排序时手续费高1e-5~=快1s
pricePower=1 #常量比例
[mempool.sub.price]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
[consensus]
name="ticket"
minerstart=true
......
......@@ -117,9 +117,9 @@ type RoundState struct {
// RoundStateMessage ...
func (rs *RoundState) RoundStateMessage() *tmtypes.NewRoundStepMsg {
return &tmtypes.NewRoundStepMsg{
Height: rs.Height,
Round: int32(rs.Round),
Step: int32(rs.Step),
Height: rs.Height,
Round: int32(rs.Round),
Step: int32(rs.Step),
SecondsSinceStartTime: int32(time.Since(rs.StartTime).Seconds()),
LastCommitRound: int32(rs.LastCommit.Round()),
}
......
......@@ -78,9 +78,9 @@ func ParseX509CertificateToSm2(x509Cert *x509.Certificate) *sm2.Certificate {
UnknownExtKeyUsage: x509Cert.UnknownExtKeyUsage,
BasicConstraintsValid: x509Cert.BasicConstraintsValid,
IsCA: x509Cert.IsCA,
MaxPathLen: x509Cert.MaxPathLen,
MaxPathLenZero: x509Cert.MaxPathLenZero,
IsCA: x509Cert.IsCA,
MaxPathLen: x509Cert.MaxPathLen,
MaxPathLenZero: x509Cert.MaxPathLenZero,
SubjectKeyId: x509Cert.SubjectKeyId,
AuthorityKeyId: x509Cert.AuthorityKeyId,
......@@ -141,9 +141,9 @@ func ParseSm2CertificateToX509(sm2Cert *sm2.Certificate) *x509.Certificate {
UnknownExtKeyUsage: sm2Cert.UnknownExtKeyUsage,
BasicConstraintsValid: sm2Cert.BasicConstraintsValid,
IsCA: sm2Cert.IsCA,
MaxPathLen: sm2Cert.MaxPathLen,
MaxPathLenZero: sm2Cert.MaxPathLenZero,
IsCA: sm2Cert.IsCA,
MaxPathLen: sm2Cert.MaxPathLen,
MaxPathLenZero: sm2Cert.MaxPathLenZero,
SubjectKeyId: sm2Cert.SubjectKeyId,
AuthorityKeyId: sm2Cert.AuthorityKeyId,
......
......@@ -5,5 +5,5 @@
package init
import (
_ "github.com/33cn/plugin/plugin/mempool/trade"
_ "github.com/33cn/plugin/plugin/mempool/score"
)
package price
import (
"bytes"
"encoding/gob"
"github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
//PriceQueue 简单队列模式(默认提供一个队列,便于测试)
type PriceQueue struct {
txMap map[string]*SkipValue
txList *SkipList
subConfig subConfig
}
//NewPriceQueue 创建队列
func NewPriceQueue(subcfg subConfig) *PriceQueue {
return &PriceQueue{
txMap: make(map[string]*SkipValue, subcfg.PoolCacheSize),
txList: NewSkipList(&SkipValue{-1, nil}),
subConfig: subcfg,
}
}
func (cache *PriceQueue) newSkipValue(item *mempool.Item) (*SkipValue, error) {
//tx := item.value
buf := bytes.NewBuffer(nil)
enc := gob.NewEncoder(buf)
err := enc.Encode(item.Value)
if err != nil {
return nil, err
}
size := len(buf.Bytes())
return &SkipValue{Price: item.Value.Fee / int64(size), Value: item}, nil
}
//Exist 是否存在
func (cache *PriceQueue) Exist(hash string) bool {
_, exists := cache.txMap[hash]
return exists
}
//GetItem 获取数据通过 key
func (cache *PriceQueue) GetItem(hash string) (*mempool.Item, error) {
if k, exist := cache.txMap[string(hash)]; exist {
return k.Value.(*mempool.Item), nil
}
return nil, types.ErrNotFound
}
// Push 把给定tx添加到PriceQueue;如果tx已经存在PriceQueue中或Mempool已满则返回对应error
func (cache *PriceQueue) Push(item *mempool.Item) error {
hash := item.Value.Hash()
if cache.Exist(string(hash)) {
s := cache.txMap[string(hash)]
addedItem := s.Value.(*mempool.Item)
addedTime := addedItem.EnterTime
if types.Now().Unix()-addedTime < mempoolDupResendInterval {
return types.ErrTxExist
} else {
// 超过2分钟之后的重发交易返回nil,再次发送给P2P,但是不再次加入mempool
// 并修改其enterTime,以避免该交易一直在节点间被重发
newEnterTime := types.Now().Unix()
resendItem := &mempool.Item{Value: item.Value, Priority: item.Value.Fee, EnterTime: newEnterTime}
var err error
sv, err := cache.newSkipValue(resendItem)
if err != nil {
return err
}
cache.Remove(string(hash))
cache.txList.Insert(sv)
cache.txMap[string(hash)] = sv
// ------------------
return nil
}
}
it := &mempool.Item{Value: item.Value, Priority: item.Value.Fee, EnterTime: item.EnterTime}
sv, err := cache.newSkipValue(it)
if err != nil {
return err
}
if int64(cache.txList.Len()) >= cache.subConfig.PoolCacheSize {
tail := cache.txList.GetIterator().Last()
//价格高
if sv.Compare(tail) == -1 {
cache.Remove(string(tail.Value.(*mempool.Item).Value.Hash()))
} else {
return types.ErrMemFull
}
}
cache.txList.Insert(sv)
cache.txMap[string(hash)] = sv
return nil
}
// Remove 删除数据
func (cache *PriceQueue) Remove(hash string) error {
cache.txList.Delete(cache.txMap[hash])
delete(cache.txMap, hash)
return nil
}
// Size 数据总数
func (cache *PriceQueue) Size() int {
return cache.txList.Len()
}
// Walk 遍历整个队列
func (cache *PriceQueue) Walk(count int, cb func(value *mempool.Item) bool) {
i := 0
cache.txList.Walk(func(item interface{}) bool {
if !cb(item.(*mempool.Item)) {
return false
}
i++
return i != count
})
}
package trade
package price
import (
"testing"
......@@ -24,22 +24,24 @@ var (
tx1 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 1, To: toAddr}
tx2 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 2, To: toAddr}
tx3 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 3, To: toAddr}
tx4 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 2000000, Expire: 2, To: toAddr}
tx4 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 2000000, Expire: 4, To: toAddr}
tx5 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 5, To: toAddr}
item1 = &drivers.Item{Value: tx1, Priority: tx1.Fee, EnterTime: types.Now().Unix()}
item2 = &drivers.Item{Value: tx2, Priority: tx2.Fee, EnterTime: types.Now().Unix()}
item3 = &drivers.Item{Value: tx3, Priority: tx3.Fee, EnterTime: types.Now().Unix() - 1000}
item4 = &drivers.Item{Value: tx4, Priority: tx4.Fee, EnterTime: types.Now().Unix() - 1000}
item5 = &drivers.Item{Value: tx5, Priority: tx5.Fee, EnterTime: types.Now().Unix() - 1000}
)
func initEnv(size int64) *TradeQueue {
func initEnv(size int64) *PriceQueue {
if size == 0 {
size = 100
}
_, sub := types.InitCfg("chain33.test.toml")
var subcfg subConfig
types.MustDecode(sub.Mempool["trade"], &subcfg)
types.MustDecode(sub.Mempool["price"], &subcfg)
subcfg.PoolCacheSize = size
cache := NewTradeQueue(subcfg)
cache := NewPriceQueue(subcfg)
return cache
}
......@@ -105,16 +107,32 @@ func TestTimeCompetition(t *testing.T) {
cache := initEnv(1)
cache.Push(item1)
cache.Push(item3)
if cache.Exist(string(item1.Value.Hash())) || !cache.Exist(string(item3.Value.Hash())) {
t.Error("queue not by time")
}
assert.Equal(t, false, cache.Exist(string(item1.Value.Hash())))
assert.Equal(t, true, cache.Exist(string(item3.Value.Hash())))
}
func TestPriceCompetition(t *testing.T) {
cache := initEnv(1)
cache.Push(item1)
cache.Push(item4)
if cache.Exist(string(item1.Value.Hash())) || !cache.Exist(string(item4.Value.Hash())) {
t.Error("queue not by price")
}
assert.Equal(t, false, cache.Exist(string(item1.Value.Hash())))
assert.Equal(t, true, cache.Exist(string(item4.Value.Hash())))
}
func TestAddDuplicateItem(t *testing.T) {
cache := initEnv(1)
cache.Push(item1)
err := cache.Push(item1)
assert.Equal(t, types.ErrTxExist, err)
}
func TestQueueDirection(t *testing.T) {
cache := initEnv(0)
cache.Push(item1)
cache.Push(item2)
cache.Push(item3)
cache.Push(item4)
cache.Push(item5)
cache.txList.Print()
assert.Equal(t, true, cache.txList.GetIterator().First().Price >= cache.txList.GetIterator().Last().Price)
}
Title="chain33"
TestNet=true
[log]
# 日志级别,支持debug(dbug)/info/warn/error(eror)/crit
loglevel = "debug"
logConsoleLevel = "info"
# 日志文件名,可带目录,所有生成的日志文件都放到此目录下
logFile = "logs/chain33.log"
# 单个日志文件的最大值(单位:兆)
maxFileSize = 20
# 最多保存的历史日志文件个数
maxBackups = 20
# 最多保存的历史日志消息(单位:天)
maxAge = 28
# 日志文件名是否使用本地事件(否则使用UTC时间)
localTime = true
# 历史日志文件是否压缩(压缩格式为gz)
compress = false
# 是否打印调用源文件和行号
callerFile = true
# 是否打印调用方法
callerFunction = true
[blockchain]
defCacheSize=128
maxFetchBlockNum=128
timeoutSeconds=5
batchBlockNum=128
driver="memdb"
dbPath="datadir"
dbCache=64
isStrongConsistency=true
singleMode=true
batchsync=false
isRecordBlockSequence=true
isParaChain=false
enableTxQuickIndex=false
[p2p]
port=13802
seeds=["47.104.125.151:13802","47.104.125.97:13802","47.104.125.177:13802"]
enable=true
isSeed=true
serverStart=true
msgCacheSize=10240
driver="memdb"
dbPath="datadir/addrbook"
dbCache=4
grpcLogFile="grpc33.log"
version=216
verMix=216
verMax=217
[rpc]
jrpcBindAddr="localhost:8801"
grpcBindAddr="localhost:8802"
whitelist=["127.0.0.1"]
jrpcFuncWhitelist=["*"]
grpcFuncWhitelist=["*"]
enableTLS=false
certFile="cert.pem"
keyFile="key.pem"
[mempool]
name="price"
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
[mempool.sub.timeline]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
[mempool.sub.score]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
timeParam=1 #时间占价格比例
priceConstant=1544 #手续费相对于时间的一个合适的常量,取当前unxi时间戳前四位数,排序时手续费高1e-5~=快1s
pricePower=1 #常量比例
[mempool.sub.price]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
[consensus]
name="solo"
minerstart=true
genesisBlockTime=1514533394
genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
[mver.consensus]
fundKeyAddr = "1BQXS6TxaYYG5mADaWij4AxhZZUTpw95a5"
coinReward = 18
coinDevFund = 12
ticketPrice = 10000
powLimitBits = "0x1f00ffff"
retargetAdjustmentFactor = 4
futureBlockTime = 16
ticketFrozenTime = 5 #5s only for test
ticketWithdrawTime = 10 #10s only for test
ticketMinerWaitTime = 2 #2s only for test
maxTxNumber = 1600 #160
targetTimespan = 2304
targetTimePerBlock = 16
[mver.consensus.ForkChainParamV1]
maxTxNumber = 10000
targetTimespan = 288 #only for test
targetTimePerBlock = 2
[mver.consensus.ForkChainParamV2]
powLimitBits = "0x1f2fffff"
[consensus.sub.solo]
genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
genesisBlockTime=1514533394
hotkeyAddr="12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv"
waitTxMs=10
[consensus.sub.ticket]
genesisBlockTime=1514533394
[[consensus.sub.ticket.genesis]]
minerAddr="12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv"
returnAddr="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
count=10000
[[consensus.sub.ticket.genesis]]
minerAddr="1PUiGcbsccfxW3zuvHXZBJfznziph5miAo"
returnAddr="1EbDHAXpoiewjPLX9uqoz38HsKqMXayZrF"
count=10000
[[consensus.sub.ticket.genesis]]
minerAddr="1EDnnePAZN48aC2hiTDzhkczfF39g1pZZX"
returnAddr="1KcCVZLSQYRUwE5EXTsAoQs9LuJW6xwfQa"
count=10000
[store]
name="mavl"
driver="memdb"
dbPath="datadir/mavltree"
dbCache=128
[store.sub.mavl]
enableMavlPrefix=false
enableMVCC=false
enableMavlPrune=false
pruneHeight=10000
[wallet]
minFee=1000000
driver="memdb"
dbPath="datadir/wallet"
dbCache=16
signType="secp256k1"
[wallet.sub.ticket]
minerwhitelist=["*"]
[exec]
isFree=false
minExecFee=100000
enableStat=false
enableMVCC=false
[exec.sub.token]
saveTokenTxList=true
tokenApprs = [
"1Bsg9j6gW83sShoee1fZAt9TkUjcrCgA9S",
"1Q8hGLfoGe63efeWa8fJ4Pnukhkngt6poK",
"1LY8GFia5EiyoTodMLfkB5PHNNpXRqxhyB",
"1GCzJDS6HbgTQ2emade7mEJGGWFfA15pS9",
"1JYB8sxi4He5pZWHCd3Zi2nypQ4JMB6AxN",
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv",
]
[exec.sub.relay]
genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
[exec.sub.cert]
# 是否启用证书验证和签名
enable=false
# 加密文件路径
cryptoPath="authdir/crypto"
# 带证书签名类型,支持"auth_ecdsa", "auth_sm2"
signType="auth_ecdsa"
[exec.sub.manage]
superManager=[
"1Bsg9j6gW83sShoee1fZAt9TkUjcrCgA9S",
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv",
"1Q8hGLfoGe63efeWa8fJ4Pnukhkngt6poK"
]
\ No newline at end of file
......@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trade
package price
import (
"runtime"
......
package price
import (
clog "github.com/33cn/chain33/common/log"
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/queue"
drivers "github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
func SetLogLevel(level string) {
clog.SetLogLevel(level)
}
func DisableLog() {
mlog.SetHandler(log.DiscardHandler())
}
//--------------------------------------------------------------------------------
// Module Mempool
type Mempool struct {
subConfig subConfig
}
type subConfig struct {
PoolCacheSize int64 `json:"poolCacheSize"`
MinTxFee int64 `json:"minTxFee"`
MaxTxNumPerAccount int64 `json:"maxTxNumPerAccount"`
}
func init() {
drivers.Reg("price", New)
}
//New 创建timeline cache 结构的 mempool
func New(cfg *types.Mempool, sub []byte) queue.Module {
c := drivers.NewMempool(cfg)
var subcfg subConfig
types.MustDecode(sub, &subcfg)
if subcfg.PoolCacheSize == 0 {
subcfg.PoolCacheSize = cfg.PoolCacheSize
}
c.SetQueueCache(NewPriceQueue(subcfg))
return c
}
package price
import (
"fmt"
"math/rand"
"github.com/33cn/chain33/system/mempool"
)
const maxLevel = 32
const prob = 0.35
type SkipValue struct {
Price int64
Value interface{}
}
func (v *SkipValue) Compare(value *SkipValue) int {
if v.Price > value.Price {
return -1
} else if v.Price == value.Price {
if v.Value.(*mempool.Item).EnterTime < value.Value.(*mempool.Item).EnterTime {
return -1
}
return 0
}
return 1
}
type skipListNode struct {
next []*skipListNode
prev *skipListNode
Value *SkipValue
}
type SkipList struct {
header, tail *skipListNode
findcount int
count int
level int
}
type SkipListIterator struct {
list *SkipList
node *skipListNode
}
func (sli *SkipListIterator) First() *SkipValue {
if sli.list.header.next[0] == nil {
return nil
}
sli.node = sli.list.header.next[0]
return sli.node.Value
}
func (sli *SkipListIterator) Last() *SkipValue {
if sli.list.tail == nil {
return nil
}
sli.node = sli.list.tail
return sli.node.Value
}
func (sli *SkipListIterator) Current() *SkipValue {
if sli.node == nil {
return nil
}
return sli.node.Value
}
func (node *skipListNode) Prev() *skipListNode {
if node == nil || node.prev == nil {
return nil
}
return node.prev
}
func (node *skipListNode) Next() *skipListNode {
if node == nil || node.next[0] == nil {
return nil
}
return node.next[0]
}
func (sli *SkipListIterator) Seek(value *SkipValue) *SkipValue {
x := sli.list.find(value)
if x.next[0] == nil {
return nil
}
sli.node = x.next[0]
return sli.node.Value
}
func newskipListNode(level int, value *SkipValue) *skipListNode {
node := &skipListNode{}
node.next = make([]*skipListNode, level)
node.Value = value
return node
}
//构建一个value的最小值
func NewSkipList(min *SkipValue) *SkipList {
sl := &SkipList{}
sl.level = 1
sl.header = newskipListNode(maxLevel, min)
return sl
}
func randomLevel() int {
level := 1
t := prob * 0xFFFF
for rand.Int()&0xFFFF < int(t) {
level += 1
if level == maxLevel {
break
}
}
return level
}
func (sl *SkipList) GetIterator() *SkipListIterator {
it := &SkipListIterator{}
it.list = sl
it.First()
return it
}
func (sl *SkipList) Len() int {
return sl.count
}
func (sl *SkipList) Level() int {
return sl.level
}
func (sl *SkipList) find(value *SkipValue) *skipListNode {
x := sl.header
for i := sl.level - 1; i >= 0; i-- {
for x.next[i] != nil && x.next[i].Value.Compare(value) < 0 {
sl.findcount++
x = x.next[i]
}
}
return x
}
func (sl *SkipList) FindCount() int {
return sl.findcount
}
func (sl *SkipList) Find(value *SkipValue) *SkipValue {
x := sl.find(value)
if x.next[0] != nil && x.next[0].Value.Compare(value) == 0 {
return x.next[0].Value
}
return nil
}
func (sl *SkipList) FindGreaterOrEqual(value *SkipValue) *SkipValue {
x := sl.find(value)
if x.next[0] != nil {
return x.next[0].Value
}
return nil
}
func (sl *SkipList) Insert(value *SkipValue) int {
var update [maxLevel]*skipListNode
x := sl.header
for i := sl.level - 1; i >= 0; i-- {
for x.next[i] != nil && x.next[i].Value.Compare(value) <= 0 {
x = x.next[i]
}
update[i] = x
}
//if x.next[0] != nil && x.next[0].Value.Compare(value) == 0 { //update
// x.next[0].Value = value
// return 0
//}
level := randomLevel()
if level > sl.level {
for i := sl.level; i < level; i++ {
update[i] = sl.header
}
sl.level = level
}
x = newskipListNode(level, value)
for i := 0; i < level; i++ {
x.next[i] = update[i].next[i]
update[i].next[i] = x
}
//形成一个双向链表
if update[0] != sl.header {
x.prev = update[0]
}
if x.next[0] != nil {
x.next[0].prev = x
} else {
sl.tail = x
}
sl.count++
return 1
}
func (sl *SkipList) Delete(value *SkipValue) int {
var update [maxLevel]*skipListNode
x := sl.header
for i := sl.level - 1; i >= 0; i-- {
for x.next[i] != nil && x.next[i].Value.Compare(value) < 0 {
x = x.next[i]
}
update[i] = x
}
if x.next[0] == nil || x.next[0].Value.Compare(value) != 0 { //not find
return 0
}
x = x.next[0]
for i := 0; i < sl.level; i++ {
if update[i].next[i] == x {
update[i].next[i] = x.next[i]
}
}
if x.next[0] != nil {
x.next[0].prev = x.prev
} else {
sl.tail = x.prev
}
for sl.level > 1 && sl.header.next[sl.level-1] == nil {
sl.level--
}
sl.count--
return 1
}
//测试用的输出函数
func (l *SkipList) Print() {
if l.count > 0 {
r := l.header
for i := l.level - 1; i >= 0; i-- {
e := r.next[i]
//fmt.Print(i)
for e != nil {
fmt.Print(e.Value.Price)
fmt.Print(" ")
fmt.Print(e.Value)
fmt.Println("")
e = e.next[i]
}
fmt.Println()
}
} else {
fmt.Println("空")
}
}
//Walk 遍历整个结构,如果cb 返回false 那么停止遍历
func (lm *SkipList) Walk(cb func(value interface{}) bool) {
for e := lm.header.Next(); e != nil; e = e.Next() {
if cb == nil {
return
}
if !cb(e.Value.Value) {
return
}
}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trade
package score
import (
"bytes"
......@@ -12,23 +8,23 @@ import (
"github.com/33cn/chain33/types"
)
//TradeQueue 简单队列模式(默认提供一个队列,便于测试)
type TradeQueue struct {
//ScoreQueue 简单队列模式(默认提供一个队列,便于测试)
type ScoreQueue struct {
txMap map[string]*SkipValue
txList *SkipList
subConfig subConfig
}
//NewTradeQueue 创建队列
func NewTradeQueue(subcfg subConfig) *TradeQueue {
return &TradeQueue{
//NewScoreQueue 创建队列
func NewScoreQueue(subcfg subConfig) *ScoreQueue {
return &ScoreQueue{
txMap: make(map[string]*SkipValue, subcfg.PoolCacheSize),
txList: NewSkipList(&SkipValue{-1, nil}),
subConfig: subcfg,
}
}
func (cache *TradeQueue) newSkipValue(item *mempool.Item) (*SkipValue, error) {
func (cache *ScoreQueue) newSkipValue(item *mempool.Item) (*SkipValue, error) {
//tx := item.value
buf := bytes.NewBuffer(nil)
enc := gob.NewEncoder(buf)
......@@ -37,25 +33,25 @@ func (cache *TradeQueue) newSkipValue(item *mempool.Item) (*SkipValue, error) {
return nil, err
}
size := len(buf.Bytes())
return &SkipValue{Score: cache.subConfig.TimeParam*item.EnterTime - cache.subConfig.PriceConstant*(item.Value.Fee/int64(size))*cache.subConfig.PricePower, Value: item}, nil
return &SkipValue{Score: cache.subConfig.PriceConstant*(item.Value.Fee/int64(size))*cache.subConfig.PricePower - cache.subConfig.TimeParam*item.EnterTime, Value: item}, nil
}
//Exist 是否存在
func (cache *TradeQueue) Exist(hash string) bool {
func (cache *ScoreQueue) Exist(hash string) bool {
_, exists := cache.txMap[hash]
return exists
}
//GetItem 获取数据通过 key
func (cache *TradeQueue) GetItem(hash string) (*mempool.Item, error) {
func (cache *ScoreQueue) GetItem(hash string) (*mempool.Item, error) {
if k, exist := cache.txMap[string(hash)]; exist {
return k.Value.(*mempool.Item), nil
}
return nil, types.ErrNotFound
}
// Push 把给定tx添加到TradeQueue;如果tx已经存在TradeQueue中或Mempool已满则返回对应error
func (cache *TradeQueue) Push(item *mempool.Item) error {
// Push 把给定tx添加到ScoreQueue;如果tx已经存在ScoreQueue中或Mempool已满则返回对应error
func (cache *ScoreQueue) Push(item *mempool.Item) error {
hash := item.Value.Hash()
if cache.Exist(string(hash)) {
s := cache.txMap[string(hash)]
......@@ -90,7 +86,7 @@ func (cache *TradeQueue) Push(item *mempool.Item) error {
if int64(cache.txList.Len()) >= cache.subConfig.PoolCacheSize {
tail := cache.txList.GetIterator().Last()
//价格高
if sv.Compare(tail) == 1 {
if sv.Compare(tail) == -1 {
cache.Remove(string(tail.Value.(*mempool.Item).Value.Hash()))
} else {
return types.ErrMemFull
......@@ -102,19 +98,19 @@ func (cache *TradeQueue) Push(item *mempool.Item) error {
}
// Remove 删除数据
func (cache *TradeQueue) Remove(hash string) error {
func (cache *ScoreQueue) Remove(hash string) error {
cache.txList.Delete(cache.txMap[hash])
delete(cache.txMap, hash)
return nil
}
// Size 数据总数
func (cache *TradeQueue) Size() int {
func (cache *ScoreQueue) Size() int {
return cache.txList.Len()
}
// Walk 遍历整个队列
func (cache *TradeQueue) Walk(count int, cb func(value *mempool.Item) bool) {
func (cache *ScoreQueue) Walk(count int, cb func(value *mempool.Item) bool) {
i := 0
cache.txList.Walk(func(item interface{}) bool {
if !cb(item.(*mempool.Item)) {
......
package score
import (
"testing"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/common/crypto"
cty "github.com/33cn/chain33/system/dapp/coins/types"
drivers "github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
var (
c, _ = crypto.New(types.GetSignName("", types.SECP256K1))
hex = "CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944"
a, _ = common.FromHex(hex)
privKey, _ = c.PrivKeyFromBytes(a)
toAddr = address.PubKeyToAddress(privKey.PubKey().Bytes()).String()
amount = int64(1e8)
v = &cty.CoinsAction_Transfer{Transfer: &types.AssetsTransfer{Amount: amount}}
transfer = &cty.CoinsAction{Value: v, Ty: cty.CoinsActionTransfer}
tx1 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 1, To: toAddr}
tx2 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 2, To: toAddr}
tx3 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 3, To: toAddr}
tx4 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 2000000, Expire: 4, To: toAddr}
tx5 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 5, To: toAddr}
item1 = &drivers.Item{Value: tx1, Priority: tx1.Fee, EnterTime: types.Now().Unix()}
item2 = &drivers.Item{Value: tx2, Priority: tx2.Fee, EnterTime: types.Now().Unix()}
item3 = &drivers.Item{Value: tx3, Priority: tx3.Fee, EnterTime: types.Now().Unix() - 1000}
item4 = &drivers.Item{Value: tx4, Priority: tx4.Fee, EnterTime: types.Now().Unix() - 1000}
item5 = &drivers.Item{Value: tx5, Priority: tx5.Fee, EnterTime: types.Now().Unix() - 1000}
)
func initEnv(size int64) *ScoreQueue {
if size == 0 {
size = 100
}
_, sub := types.InitCfg("chain33.test.toml")
var subcfg subConfig
types.MustDecode(sub.Mempool["score"], &subcfg)
subcfg.PoolCacheSize = size
cache := NewScoreQueue(subcfg)
return cache
}
func TestMemFull(t *testing.T) {
cache := initEnv(1)
hash := string(tx1.Hash())
err := cache.Push(item1)
assert.Nil(t, err)
assert.Equal(t, true, cache.Exist(hash))
it, err := cache.GetItem(hash)
assert.Nil(t, err)
assert.Equal(t, item1, it)
_, err = cache.GetItem(hash + ":")
assert.Equal(t, types.ErrNotFound, err)
err = cache.Push(item1)
assert.Equal(t, types.ErrTxExist, err)
err = cache.Push(item2)
assert.Equal(t, types.ErrMemFull, err)
cache.Remove(hash)
assert.Equal(t, 0, cache.Size())
}
func TestWalk(t *testing.T) {
//push to item
cache := initEnv(2)
cache.Push(item1)
cache.Push(item2)
assert.Equal(t, 2, cache.Size())
var data [2]*drivers.Item
i := 0
cache.Walk(1, func(value *drivers.Item) bool {
data[i] = value
i++
return true
})
assert.Equal(t, 1, i)
assert.Equal(t, data[0], item1)
i = 0
cache.Walk(2, func(value *drivers.Item) bool {
data[i] = value
i++
return true
})
assert.Equal(t, 2, i)
assert.Equal(t, data[0], item1)
assert.Equal(t, data[1], item2)
i = 0
cache.Walk(2, func(value *drivers.Item) bool {
data[i] = value
i++
return false
})
assert.Equal(t, 1, i)
}
func TestTimeCompetition(t *testing.T) {
cache := initEnv(1)
cache.Push(item1)
cache.Push(item3)
assert.Equal(t, false, cache.Exist(string(item1.Value.Hash())))
assert.Equal(t, true, cache.Exist(string(item3.Value.Hash())))
}
func TestPriceCompetition(t *testing.T) {
cache := initEnv(1)
cache.Push(item1)
cache.Push(item4)
assert.Equal(t, false, cache.Exist(string(item1.Value.Hash())))
assert.Equal(t, true, cache.Exist(string(item4.Value.Hash())))
}
func TestAddDuplicateItem(t *testing.T) {
cache := initEnv(1)
cache.Push(item1)
err := cache.Push(item1)
assert.Equal(t, types.ErrTxExist, err)
}
func TestQueueDirection(t *testing.T) {
cache := initEnv(0)
cache.Push(item1)
cache.Push(item2)
cache.Push(item3)
cache.Push(item4)
cache.Push(item5)
cache.txList.Print()
assert.Equal(t, true, cache.txList.GetIterator().First().Score >= cache.txList.GetIterator().Last().Score)
}
......@@ -64,7 +64,7 @@ certFile="cert.pem"
keyFile="key.pem"
[mempool]
name="trade"
name="score"
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
......@@ -74,12 +74,12 @@ poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
[mempool.sub.trade]
[mempool.sub.score]
poolCacheSize=10240
minTxFee=100000
maxTxNumPerAccount=10000
timeParam=1 #时间占价格比例
priceConstant=1544 #手续费相对于时间的一个合适的常量,取当前unxi时间戳前四位数,排序时手续费高1e-5~=快1s
priceConstant=1544 #手续费相对于时间的一个合适的常量,取当前unxi时间戳前四位数,排队时手续费高1e-5的分数~=快1s的分数
pricePower=1 #常量比例
[consensus]
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package score
import (
"runtime"
log "github.com/33cn/chain33/common/log/log15"
)
var (
mlog = log.New("module", "mempool")
poolCacheSize int64 = 10240 // mempool容量
mempoolExpiredInterval int64 = 600 // mempool内交易过期时间,10分钟
mempoolReSendInterval int64 = 60 // mempool内交易重发时间,1分钟
mempoolDupResendInterval int64 = 120 // mempool重复交易可再次发送间隔,120秒
mempoolAddedTxSize = 102400 // 已添加过的交易缓存大小
maxTxNumPerAccount int64 = 100 // TODO 每个账户在mempool中最大交易数量,10
processNum int
)
// TODO
func init() {
processNum = runtime.NumCPU()
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trade
package score
import (
clog "github.com/33cn/chain33/common/log"
......@@ -48,6 +44,6 @@ func New(cfg *types.Mempool, sub []byte) queue.Module {
if subcfg.PoolCacheSize == 0 {
subcfg.PoolCacheSize = cfg.PoolCacheSize
}
c.SetQueueCache(NewTradeQueue(subcfg))
c.SetQueueCache(NewScoreQueue(subcfg))
return c
}
package trade
package score
import (
"fmt"
......@@ -16,12 +16,12 @@ type SkipValue struct {
func (v *SkipValue) Compare(value *SkipValue) int {
f1 := v.Score
f2 := value.Score
if f1 < f2 {
return 1
if f1 > f2 {
return -1
} else if f1 == f2 {
return 0
}
return -1
return 1
}
type skipListNode struct {
......@@ -237,8 +237,10 @@ func (l *SkipList) Print() {
e := r.next[i]
//fmt.Print(i)
for e != nil {
fmt.Print(e.Value)
fmt.Print(e.Value.Score)
fmt.Print(" ")
fmt.Print(e.Value)
fmt.Println("")
e = e.next[i]
}
fmt.Println()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment