Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
P
plugin
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
JIRA
JIRA
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
link33
plugin
Commits
3b2ed068
Unverified
Commit
3b2ed068
authored
Dec 15, 2018
by
33cn
Committed by
GitHub
Dec 15, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #136 from CCCCCDH/mempool
Mempool trade
parents
afbd0de6
96867718
Show whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
1042 additions
and
0 deletions
+1042
-0
chain33.toml
chain33.toml
+19
-0
init.go
plugin/init.go
+1
-0
init.go
plugin/mempool/init/init.go
+6
-0
cache.go
plugin/mempool/price/cache.go
+132
-0
cache_test.go
plugin/mempool/price/cache_test.go
+151
-0
chain33.test.toml
plugin/mempool/price/chain33.test.toml
+198
-0
mempool.go
plugin/mempool/price/mempool.go
+32
-0
cache.go
plugin/mempool/score/cache.go
+124
-0
cache_test.go
plugin/mempool/score/cache_test.go
+151
-0
chain33.test.toml
plugin/mempool/score/chain33.test.toml
+193
-0
mempool.go
plugin/mempool/score/mempool.go
+35
-0
No files found.
chain33.toml
View file @
3b2ed068
...
...
@@ -60,12 +60,31 @@ whitelist=["127.0.0.1"]
jrpcFuncWhitelist
=
["*"]
grpcFuncWhitelist
=
["*"]
[mempool]
name
=
"timeline"
poolCacheSize
=
10240
minTxFee
=
100000
maxTxNumPerAccount
=
10000
[mempool.sub.timeline]
poolCacheSize
=
10240
minTxFee
=
100000
maxTxNumPerAccount
=
10000
[mempool.sub.score]
poolCacheSize
=
10240
minTxFee
=
100000
maxTxNumPerAccount
=
10000
timeParam
=
1
#时间占价格比例
priceConstant
=
1544
#手续费相对于时间的一个合适的常量,取当前unxi时间戳前四位数,排序时手续费高1e-5~=快1s
pricePower
=
1
#常量比例
[mempool.sub.price]
poolCacheSize
=
10240
minTxFee
=
100000
maxTxNumPerAccount
=
10000
[consensus]
name
=
"ticket"
minerstart
=
true
...
...
plugin/init.go
View file @
3b2ed068
...
...
@@ -4,5 +4,6 @@ import (
_
"github.com/33cn/plugin/plugin/consensus/init"
//consensus init
_
"github.com/33cn/plugin/plugin/crypto/init"
//crypto init
_
"github.com/33cn/plugin/plugin/dapp/init"
//dapp init
_
"github.com/33cn/plugin/plugin/mempool/init"
//mempool init
_
"github.com/33cn/plugin/plugin/store/init"
//store init
)
plugin/mempool/init/init.go
0 → 100644
View file @
3b2ed068
package
init
import
(
_
"github.com/33cn/plugin/plugin/mempool/price"
//auto gen
_
"github.com/33cn/plugin/plugin/mempool/score"
//auto gen
)
plugin/mempool/price/cache.go
0 → 100644
View file @
3b2ed068
package
price
import
(
"bytes"
"encoding/gob"
"github.com/33cn/chain33/common/skiplist"
"github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
var
mempoolDupResendInterval
int64
=
600
// mempool内交易过期时间,10分钟
// Queue 价格队列模式(价格=手续费/交易字节数,价格高者优先,同价则时间早优先)
type
Queue
struct
{
txMap
map
[
string
]
*
skiplist
.
SkipValue
txList
*
skiplist
.
SkipList
subConfig
subConfig
}
// NewQueue 创建队列
func
NewQueue
(
subcfg
subConfig
)
*
Queue
{
return
&
Queue
{
make
(
map
[
string
]
*
skiplist
.
SkipValue
,
subcfg
.
PoolCacheSize
),
skiplist
.
NewSkipList
(
&
skiplist
.
SkipValue
{
Score
:
-
1
,
Value
:
nil
}),
subcfg
,
}
}
func
(
cache
*
Queue
)
newSkipValue
(
item
*
mempool
.
Item
)
(
*
skiplist
.
SkipValue
,
error
)
{
//tx := item.value
buf
:=
bytes
.
NewBuffer
(
nil
)
enc
:=
gob
.
NewEncoder
(
buf
)
err
:=
enc
.
Encode
(
item
.
Value
)
if
err
!=
nil
{
return
nil
,
err
}
size
:=
len
(
buf
.
Bytes
())
return
&
skiplist
.
SkipValue
{
Score
:
item
.
Value
.
Fee
/
int64
(
size
),
Value
:
item
},
nil
}
//Exist 是否存在
func
(
cache
*
Queue
)
Exist
(
hash
string
)
bool
{
_
,
exists
:=
cache
.
txMap
[
hash
]
return
exists
}
//GetItem 获取数据通过 key
func
(
cache
*
Queue
)
GetItem
(
hash
string
)
(
*
mempool
.
Item
,
error
)
{
if
k
,
exist
:=
cache
.
txMap
[
hash
];
exist
{
return
k
.
Value
.
(
*
mempool
.
Item
),
nil
}
return
nil
,
types
.
ErrNotFound
}
// Push 把给定tx添加到Queue;如果tx已经存在Queue中或Mempool已满则返回对应error
func
(
cache
*
Queue
)
Push
(
item
*
mempool
.
Item
)
error
{
hash
:=
item
.
Value
.
Hash
()
if
cache
.
Exist
(
string
(
hash
))
{
s
:=
cache
.
txMap
[
string
(
hash
)]
addedItem
:=
s
.
Value
.
(
*
mempool
.
Item
)
addedTime
:=
addedItem
.
EnterTime
if
types
.
Now
()
.
Unix
()
-
addedTime
<
mempoolDupResendInterval
{
return
types
.
ErrTxExist
}
// 超过2分钟之后的重发交易返回nil,再次发送给P2P,但是不再次加入mempool
// 并修改其enterTime,以避免该交易一直在节点间被重发
newEnterTime
:=
types
.
Now
()
.
Unix
()
resendItem
:=
&
mempool
.
Item
{
Value
:
item
.
Value
,
Priority
:
item
.
Value
.
Fee
,
EnterTime
:
newEnterTime
}
var
err
error
sv
,
err
:=
cache
.
newSkipValue
(
resendItem
)
if
err
!=
nil
{
return
err
}
cache
.
Remove
(
string
(
hash
))
cache
.
txList
.
Insert
(
sv
)
cache
.
txMap
[
string
(
hash
)]
=
sv
// ------------------
return
nil
}
it
:=
&
mempool
.
Item
{
Value
:
item
.
Value
,
Priority
:
item
.
Value
.
Fee
,
EnterTime
:
item
.
EnterTime
}
sv
,
err
:=
cache
.
newSkipValue
(
it
)
if
err
!=
nil
{
return
err
}
if
int64
(
cache
.
txList
.
Len
())
>=
cache
.
subConfig
.
PoolCacheSize
{
tail
:=
cache
.
txList
.
GetIterator
()
.
Last
()
//价格高存留
switch
sv
.
Compare
(
tail
)
{
case
-
1
:
cache
.
Remove
(
string
(
tail
.
Value
.
(
*
mempool
.
Item
)
.
Value
.
Hash
()))
case
0
:
if
sv
.
Value
.
(
*
mempool
.
Item
)
.
EnterTime
<
tail
.
Value
.
(
*
mempool
.
Item
)
.
EnterTime
{
cache
.
Remove
(
string
(
tail
.
Value
.
(
*
mempool
.
Item
)
.
Value
.
Hash
()))
break
}
return
types
.
ErrMemFull
case
1
:
return
types
.
ErrMemFull
default
:
return
types
.
ErrMemFull
}
}
cache
.
txList
.
Insert
(
sv
)
cache
.
txMap
[
string
(
hash
)]
=
sv
return
nil
}
// Remove 删除数据
func
(
cache
*
Queue
)
Remove
(
hash
string
)
error
{
cache
.
txList
.
Delete
(
cache
.
txMap
[
hash
])
delete
(
cache
.
txMap
,
hash
)
return
nil
}
// Size 数据总数
func
(
cache
*
Queue
)
Size
()
int
{
return
cache
.
txList
.
Len
()
}
// Walk 遍历整个队列
func
(
cache
*
Queue
)
Walk
(
count
int
,
cb
func
(
value
*
mempool
.
Item
)
bool
)
{
i
:=
0
cache
.
txList
.
Walk
(
func
(
item
interface
{})
bool
{
if
!
cb
(
item
.
(
*
mempool
.
Item
))
{
return
false
}
i
++
return
i
!=
count
})
}
plugin/mempool/price/cache_test.go
0 → 100644
View file @
3b2ed068
package
price
import
(
"testing"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/common/crypto"
cty
"github.com/33cn/chain33/system/dapp/coins/types"
drivers
"github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
var
(
c
,
_
=
crypto
.
New
(
types
.
GetSignName
(
""
,
types
.
SECP256K1
))
hex
=
"CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944"
a
,
_
=
common
.
FromHex
(
hex
)
privKey
,
_
=
c
.
PrivKeyFromBytes
(
a
)
toAddr
=
address
.
PubKeyToAddress
(
privKey
.
PubKey
()
.
Bytes
())
.
String
()
amount
=
int64
(
1e8
)
v
=
&
cty
.
CoinsAction_Transfer
{
Transfer
:
&
types
.
AssetsTransfer
{
Amount
:
amount
}}
transfer
=
&
cty
.
CoinsAction
{
Value
:
v
,
Ty
:
cty
.
CoinsActionTransfer
}
tx1
=
&
types
.
Transaction
{
Execer
:
[]
byte
(
"coins"
),
Payload
:
types
.
Encode
(
transfer
),
Fee
:
1000000
,
Expire
:
1
,
To
:
toAddr
}
tx2
=
&
types
.
Transaction
{
Execer
:
[]
byte
(
"coins"
),
Payload
:
types
.
Encode
(
transfer
),
Fee
:
1000000
,
Expire
:
2
,
To
:
toAddr
}
tx3
=
&
types
.
Transaction
{
Execer
:
[]
byte
(
"coins"
),
Payload
:
types
.
Encode
(
transfer
),
Fee
:
1000000
,
Expire
:
3
,
To
:
toAddr
}
tx4
=
&
types
.
Transaction
{
Execer
:
[]
byte
(
"coins"
),
Payload
:
types
.
Encode
(
transfer
),
Fee
:
2000000
,
Expire
:
4
,
To
:
toAddr
}
tx5
=
&
types
.
Transaction
{
Execer
:
[]
byte
(
"coins"
),
Payload
:
types
.
Encode
(
transfer
),
Fee
:
1000000
,
Expire
:
5
,
To
:
toAddr
}
item1
=
&
drivers
.
Item
{
Value
:
tx1
,
Priority
:
tx1
.
Fee
,
EnterTime
:
types
.
Now
()
.
Unix
()}
item2
=
&
drivers
.
Item
{
Value
:
tx2
,
Priority
:
tx2
.
Fee
,
EnterTime
:
types
.
Now
()
.
Unix
()}
item3
=
&
drivers
.
Item
{
Value
:
tx3
,
Priority
:
tx3
.
Fee
,
EnterTime
:
types
.
Now
()
.
Unix
()
-
1000
}
item4
=
&
drivers
.
Item
{
Value
:
tx4
,
Priority
:
tx4
.
Fee
,
EnterTime
:
types
.
Now
()
.
Unix
()
-
1000
}
item5
=
&
drivers
.
Item
{
Value
:
tx5
,
Priority
:
tx5
.
Fee
,
EnterTime
:
types
.
Now
()
.
Unix
()
-
1000
}
)
func
initEnv
(
size
int64
)
*
Queue
{
if
size
==
0
{
size
=
100
}
_
,
sub
:=
types
.
InitCfg
(
"chain33.test.toml"
)
var
subcfg
subConfig
types
.
MustDecode
(
sub
.
Mempool
[
"price"
],
&
subcfg
)
subcfg
.
PoolCacheSize
=
size
cache
:=
NewQueue
(
subcfg
)
return
cache
}
func
TestMemFull
(
t
*
testing
.
T
)
{
cache
:=
initEnv
(
1
)
hash
:=
string
(
tx1
.
Hash
())
err
:=
cache
.
Push
(
item1
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
true
,
cache
.
Exist
(
hash
))
it
,
err
:=
cache
.
GetItem
(
hash
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
item1
,
it
)
_
,
err
=
cache
.
GetItem
(
hash
+
":"
)
assert
.
Equal
(
t
,
types
.
ErrNotFound
,
err
)
err
=
cache
.
Push
(
item1
)
assert
.
Equal
(
t
,
types
.
ErrTxExist
,
err
)
err
=
cache
.
Push
(
item2
)
assert
.
Equal
(
t
,
types
.
ErrMemFull
,
err
)
cache
.
Remove
(
hash
)
assert
.
Equal
(
t
,
0
,
cache
.
Size
())
}
func
TestWalk
(
t
*
testing
.
T
)
{
//push to item
cache
:=
initEnv
(
2
)
cache
.
Push
(
item1
)
cache
.
Push
(
item2
)
assert
.
Equal
(
t
,
2
,
cache
.
Size
())
var
data
[
2
]
*
drivers
.
Item
i
:=
0
cache
.
Walk
(
1
,
func
(
value
*
drivers
.
Item
)
bool
{
data
[
i
]
=
value
i
++
return
true
})
assert
.
Equal
(
t
,
1
,
i
)
assert
.
Equal
(
t
,
data
[
0
],
item1
)
i
=
0
cache
.
Walk
(
2
,
func
(
value
*
drivers
.
Item
)
bool
{
data
[
i
]
=
value
i
++
return
true
})
assert
.
Equal
(
t
,
2
,
i
)
assert
.
Equal
(
t
,
data
[
0
],
item1
)
assert
.
Equal
(
t
,
data
[
1
],
item2
)
i
=
0
cache
.
Walk
(
2
,
func
(
value
*
drivers
.
Item
)
bool
{
data
[
i
]
=
value
i
++
return
false
})
assert
.
Equal
(
t
,
1
,
i
)
}
func
TestTimeCompetition
(
t
*
testing
.
T
)
{
cache
:=
initEnv
(
1
)
cache
.
Push
(
item1
)
cache
.
Push
(
item3
)
assert
.
Equal
(
t
,
false
,
cache
.
Exist
(
string
(
item1
.
Value
.
Hash
())))
assert
.
Equal
(
t
,
true
,
cache
.
Exist
(
string
(
item3
.
Value
.
Hash
())))
}
func
TestPriceCompetition
(
t
*
testing
.
T
)
{
cache
:=
initEnv
(
1
)
cache
.
Push
(
item1
)
cache
.
Push
(
item4
)
assert
.
Equal
(
t
,
false
,
cache
.
Exist
(
string
(
item1
.
Value
.
Hash
())))
assert
.
Equal
(
t
,
true
,
cache
.
Exist
(
string
(
item4
.
Value
.
Hash
())))
}
func
TestAddDuplicateItem
(
t
*
testing
.
T
)
{
cache
:=
initEnv
(
1
)
cache
.
Push
(
item1
)
err
:=
cache
.
Push
(
item1
)
assert
.
Equal
(
t
,
types
.
ErrTxExist
,
err
)
}
func
TestQueueDirection
(
t
*
testing
.
T
)
{
cache
:=
initEnv
(
0
)
cache
.
Push
(
item1
)
cache
.
Push
(
item2
)
cache
.
Push
(
item3
)
cache
.
Push
(
item4
)
cache
.
Push
(
item5
)
cache
.
txList
.
Print
()
i
:=
0
lastScore
:=
cache
.
txList
.
GetIterator
()
.
First
()
.
Score
var
tmpScore
int64
cache
.
Walk
(
5
,
func
(
value
*
drivers
.
Item
)
bool
{
tmpScore
=
cache
.
txMap
[
string
(
value
.
Value
.
Hash
())]
.
Score
if
lastScore
<
tmpScore
{
return
false
}
lastScore
=
tmpScore
i
++
return
true
})
assert
.
Equal
(
t
,
5
,
i
)
assert
.
Equal
(
t
,
true
,
lastScore
==
cache
.
txList
.
GetIterator
()
.
Last
()
.
Score
)
}
plugin/mempool/price/chain33.test.toml
0 → 100644
View file @
3b2ed068
Title
=
"chain33"
TestNet
=
true
[log]
# 日志级别,支持debug(dbug)/info/warn/error(eror)/crit
loglevel
=
"debug"
logConsoleLevel
=
"info"
# 日志文件名,可带目录,所有生成的日志文件都放到此目录下
logFile
=
"logs/chain33.log"
# 单个日志文件的最大值(单位:兆)
maxFileSize
=
20
# 最多保存的历史日志文件个数
maxBackups
=
20
# 最多保存的历史日志消息(单位:天)
maxAge
=
28
# 日志文件名是否使用本地事件(否则使用UTC时间)
localTime
=
true
# 历史日志文件是否压缩(压缩格式为gz)
compress
=
false
# 是否打印调用源文件和行号
callerFile
=
true
# 是否打印调用方法
callerFunction
=
true
[blockchain]
defCacheSize
=
128
maxFetchBlockNum
=
128
timeoutSeconds
=
5
batchBlockNum
=
128
driver
=
"memdb"
dbPath
=
"datadir"
dbCache
=
64
isStrongConsistency
=
true
singleMode
=
true
batchsync
=
false
isRecordBlockSequence
=
true
isParaChain
=
false
enableTxQuickIndex
=
false
[p2p]
port
=
13802
seeds
=
["47.104.125.151:13802","47.104.125.97:13802","47.104.125.177:13802"]
enable
=
true
isSeed
=
true
serverStart
=
true
msgCacheSize
=
10240
driver
=
"memdb"
dbPath
=
"datadir/addrbook"
dbCache
=
4
grpcLogFile
=
"grpc33.log"
version
=
216
verMix
=
216
verMax
=
217
[rpc]
jrpcBindAddr
=
"localhost:8801"
grpcBindAddr
=
"localhost:8802"
whitelist
=
["127.0.0.1"]
jrpcFuncWhitelist
=
["*"]
grpcFuncWhitelist
=
["*"]
enableTLS
=
false
certFile
=
"cert.pem"
keyFile
=
"key.pem"
[mempool]
name
=
"price"
poolCacheSize
=
10240
minTxFee
=
100000
maxTxNumPerAccount
=
10000
[mempool.sub.timeline]
poolCacheSize
=
10240
minTxFee
=
100000
maxTxNumPerAccount
=
10000
[mempool.sub.score]
poolCacheSize
=
10240
minTxFee
=
100000
maxTxNumPerAccount
=
10000
timeParam
=
1
#时间占价格比例
priceConstant
=
1544
#手续费相对于时间的一个合适的常量,取当前unxi时间戳前四位数,排序时手续费高1e-5~=快1s
pricePower
=
1
#常量比例
[mempool.sub.price]
poolCacheSize
=
10240
minTxFee
=
100000
maxTxNumPerAccount
=
10000
[consensus]
name
=
"solo"
minerstart
=
true
genesisBlockTime
=
1514533394
genesis
=
"14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
[mver.consensus]
fundKeyAddr
=
"1BQXS6TxaYYG5mADaWij4AxhZZUTpw95a5"
coinReward
=
18
coinDevFund
=
12
ticketPrice
=
10000
powLimitBits
=
"0x1f00ffff"
retargetAdjustmentFactor
=
4
futureBlockTime
=
16
ticketFrozenTime
=
5
#5s only for test
ticketWithdrawTime
=
10
#10s only for test
ticketMinerWaitTime
=
2
#2s only for test
maxTxNumber
=
1600
#160
targetTimespan
=
2304
targetTimePerBlock
=
16
[mver.consensus.ForkChainParamV1]
maxTxNumber
=
10000
targetTimespan
=
288
#only for test
targetTimePerBlock
=
2
[mver.consensus.ForkChainParamV2]
powLimitBits
=
"0x1f2fffff"
[consensus.sub.solo]
genesis
=
"14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
genesisBlockTime
=
1514533394
hotkeyAddr
=
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv"
waitTxMs
=
10
[consensus.sub.ticket]
genesisBlockTime
=
1514533394
[[consensus.sub.ticket.genesis]]
minerAddr
=
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv"
returnAddr
=
"14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
count
=
10000
[[consensus.sub.ticket.genesis]]
minerAddr
=
"1PUiGcbsccfxW3zuvHXZBJfznziph5miAo"
returnAddr
=
"1EbDHAXpoiewjPLX9uqoz38HsKqMXayZrF"
count
=
10000
[[consensus.sub.ticket.genesis]]
minerAddr
=
"1EDnnePAZN48aC2hiTDzhkczfF39g1pZZX"
returnAddr
=
"1KcCVZLSQYRUwE5EXTsAoQs9LuJW6xwfQa"
count
=
10000
[store]
name
=
"mavl"
driver
=
"memdb"
dbPath
=
"datadir/mavltree"
dbCache
=
128
[store.sub.mavl]
enableMavlPrefix
=
false
enableMVCC
=
false
enableMavlPrune
=
false
pruneHeight
=
10000
[wallet]
minFee
=
1000000
driver
=
"memdb"
dbPath
=
"datadir/wallet"
dbCache
=
16
signType
=
"secp256k1"
[wallet.sub.ticket]
minerwhitelist
=
["*"]
[exec]
isFree
=
false
minExecFee
=
100000
enableStat
=
false
enableMVCC
=
false
[exec.sub.token]
saveTokenTxList
=
true
tokenApprs
=
[
"1Bsg9j6gW83sShoee1fZAt9TkUjcrCgA9S"
,
"1Q8hGLfoGe63efeWa8fJ4Pnukhkngt6poK"
,
"1LY8GFia5EiyoTodMLfkB5PHNNpXRqxhyB"
,
"1GCzJDS6HbgTQ2emade7mEJGGWFfA15pS9"
,
"1JYB8sxi4He5pZWHCd3Zi2nypQ4JMB6AxN"
,
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv"
,
]
[exec.sub.relay]
genesis
=
"14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
[exec.sub.cert]
# 是否启用证书验证和签名
enable
=
false
# 加密文件路径
cryptoPath
=
"authdir/crypto"
# 带证书签名类型,支持"auth_ecdsa", "auth_sm2"
signType
=
"auth_ecdsa"
[exec.sub.manage]
superManager
=[
"1Bsg9j6gW83sShoee1fZAt9TkUjcrCgA9S"
,
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv"
,
"1Q8hGLfoGe63efeWa8fJ4Pnukhkngt6poK"
]
\ No newline at end of file
plugin/mempool/price/mempool.go
0 → 100644
View file @
3b2ed068
package
price
import
(
"github.com/33cn/chain33/queue"
drivers
"github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
//--------------------------------------------------------------------------------
// Module Mempool
type
subConfig
struct
{
PoolCacheSize
int64
`json:"poolCacheSize"`
MinTxFee
int64
`json:"minTxFee"`
MaxTxNumPerAccount
int64
`json:"maxTxNumPerAccount"`
}
func
init
()
{
drivers
.
Reg
(
"price"
,
New
)
}
//New 创建price cache 结构的 mempool
func
New
(
cfg
*
types
.
Mempool
,
sub
[]
byte
)
queue
.
Module
{
c
:=
drivers
.
NewMempool
(
cfg
)
var
subcfg
subConfig
types
.
MustDecode
(
sub
,
&
subcfg
)
if
subcfg
.
PoolCacheSize
==
0
{
subcfg
.
PoolCacheSize
=
cfg
.
PoolCacheSize
}
c
.
SetQueueCache
(
NewQueue
(
subcfg
))
return
c
}
plugin/mempool/score/cache.go
0 → 100644
View file @
3b2ed068
package
score
import
(
"bytes"
"encoding/gob"
"github.com/33cn/chain33/common/skiplist"
"github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
var
mempoolDupResendInterval
int64
=
600
// mempool内交易过期时间,10分钟
// Queue 分数队列模式(分数=常量a*手续费/交易字节数-常量b*时间*定量c,按分数排队,高的优先,常量a,b和定量c可配置)
type
Queue
struct
{
txMap
map
[
string
]
*
skiplist
.
SkipValue
txList
*
skiplist
.
SkipList
subConfig
subConfig
}
// NewQueue 创建队列
func
NewQueue
(
subcfg
subConfig
)
*
Queue
{
return
&
Queue
{
txMap
:
make
(
map
[
string
]
*
skiplist
.
SkipValue
,
subcfg
.
PoolCacheSize
),
txList
:
skiplist
.
NewSkipList
(
&
skiplist
.
SkipValue
{
Score
:
-
1
,
Value
:
nil
}),
subConfig
:
subcfg
,
}
}
func
(
cache
*
Queue
)
newSkipValue
(
item
*
mempool
.
Item
)
(
*
skiplist
.
SkipValue
,
error
)
{
//tx := item.value
buf
:=
bytes
.
NewBuffer
(
nil
)
enc
:=
gob
.
NewEncoder
(
buf
)
err
:=
enc
.
Encode
(
item
.
Value
)
if
err
!=
nil
{
return
nil
,
err
}
size
:=
len
(
buf
.
Bytes
())
return
&
skiplist
.
SkipValue
{
Score
:
cache
.
subConfig
.
PriceConstant
*
(
item
.
Value
.
Fee
/
int64
(
size
))
*
cache
.
subConfig
.
PricePower
-
cache
.
subConfig
.
TimeParam
*
item
.
EnterTime
,
Value
:
item
},
nil
}
// Exist 是否存在
func
(
cache
*
Queue
)
Exist
(
hash
string
)
bool
{
_
,
exists
:=
cache
.
txMap
[
hash
]
return
exists
}
//GetItem 获取数据通过 key
func
(
cache
*
Queue
)
GetItem
(
hash
string
)
(
*
mempool
.
Item
,
error
)
{
if
k
,
exist
:=
cache
.
txMap
[
hash
];
exist
{
return
k
.
Value
.
(
*
mempool
.
Item
),
nil
}
return
nil
,
types
.
ErrNotFound
}
// Push 把给定tx添加到Queue;如果tx已经存在Queue中或Mempool已满则返回对应error
func
(
cache
*
Queue
)
Push
(
item
*
mempool
.
Item
)
error
{
hash
:=
item
.
Value
.
Hash
()
if
cache
.
Exist
(
string
(
hash
))
{
s
:=
cache
.
txMap
[
string
(
hash
)]
addedItem
:=
s
.
Value
.
(
*
mempool
.
Item
)
addedTime
:=
addedItem
.
EnterTime
if
types
.
Now
()
.
Unix
()
-
addedTime
<
mempoolDupResendInterval
{
return
types
.
ErrTxExist
}
// 超过2分钟之后的重发交易返回nil,再次发送给P2P,但是不再次加入mempool
// 并修改其enterTime,以避免该交易一直在节点间被重发
newEnterTime
:=
types
.
Now
()
.
Unix
()
resendItem
:=
&
mempool
.
Item
{
Value
:
item
.
Value
,
Priority
:
item
.
Value
.
Fee
,
EnterTime
:
newEnterTime
}
var
err
error
sv
,
err
:=
cache
.
newSkipValue
(
resendItem
)
if
err
!=
nil
{
return
err
}
cache
.
Remove
(
string
(
hash
))
cache
.
txList
.
Insert
(
sv
)
cache
.
txMap
[
string
(
hash
)]
=
sv
// ------------------
return
nil
}
it
:=
&
mempool
.
Item
{
Value
:
item
.
Value
,
Priority
:
item
.
Value
.
Fee
,
EnterTime
:
item
.
EnterTime
}
sv
,
err
:=
cache
.
newSkipValue
(
it
)
if
err
!=
nil
{
return
err
}
if
int64
(
cache
.
txList
.
Len
())
>=
cache
.
subConfig
.
PoolCacheSize
{
tail
:=
cache
.
txList
.
GetIterator
()
.
Last
()
//分数高存留
if
sv
.
Compare
(
tail
)
==
-
1
{
cache
.
Remove
(
string
(
tail
.
Value
.
(
*
mempool
.
Item
)
.
Value
.
Hash
()))
}
else
{
return
types
.
ErrMemFull
}
}
cache
.
txList
.
Insert
(
sv
)
cache
.
txMap
[
string
(
hash
)]
=
sv
return
nil
}
// Remove 删除数据
func
(
cache
*
Queue
)
Remove
(
hash
string
)
error
{
cache
.
txList
.
Delete
(
cache
.
txMap
[
hash
])
delete
(
cache
.
txMap
,
hash
)
return
nil
}
// Size 数据总数
func
(
cache
*
Queue
)
Size
()
int
{
return
cache
.
txList
.
Len
()
}
// Walk 遍历整个队列
func
(
cache
*
Queue
)
Walk
(
count
int
,
cb
func
(
value
*
mempool
.
Item
)
bool
)
{
i
:=
0
cache
.
txList
.
Walk
(
func
(
item
interface
{})
bool
{
if
!
cb
(
item
.
(
*
mempool
.
Item
))
{
return
false
}
i
++
return
i
!=
count
})
}
plugin/mempool/score/cache_test.go
0 → 100644
View file @
3b2ed068
package
score
import
(
"testing"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/common/crypto"
cty
"github.com/33cn/chain33/system/dapp/coins/types"
drivers
"github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
var
(
c
,
_
=
crypto
.
New
(
types
.
GetSignName
(
""
,
types
.
SECP256K1
))
hex
=
"CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944"
a
,
_
=
common
.
FromHex
(
hex
)
privKey
,
_
=
c
.
PrivKeyFromBytes
(
a
)
toAddr
=
address
.
PubKeyToAddress
(
privKey
.
PubKey
()
.
Bytes
())
.
String
()
amount
=
int64
(
1e8
)
v
=
&
cty
.
CoinsAction_Transfer
{
Transfer
:
&
types
.
AssetsTransfer
{
Amount
:
amount
}}
transfer
=
&
cty
.
CoinsAction
{
Value
:
v
,
Ty
:
cty
.
CoinsActionTransfer
}
tx1
=
&
types
.
Transaction
{
Execer
:
[]
byte
(
"coins"
),
Payload
:
types
.
Encode
(
transfer
),
Fee
:
1000000
,
Expire
:
1
,
To
:
toAddr
}
tx2
=
&
types
.
Transaction
{
Execer
:
[]
byte
(
"coins"
),
Payload
:
types
.
Encode
(
transfer
),
Fee
:
1000000
,
Expire
:
2
,
To
:
toAddr
}
tx3
=
&
types
.
Transaction
{
Execer
:
[]
byte
(
"coins"
),
Payload
:
types
.
Encode
(
transfer
),
Fee
:
1000000
,
Expire
:
3
,
To
:
toAddr
}
tx4
=
&
types
.
Transaction
{
Execer
:
[]
byte
(
"coins"
),
Payload
:
types
.
Encode
(
transfer
),
Fee
:
2000000
,
Expire
:
4
,
To
:
toAddr
}
tx5
=
&
types
.
Transaction
{
Execer
:
[]
byte
(
"coins"
),
Payload
:
types
.
Encode
(
transfer
),
Fee
:
1000000
,
Expire
:
5
,
To
:
toAddr
}
item1
=
&
drivers
.
Item
{
Value
:
tx1
,
Priority
:
tx1
.
Fee
,
EnterTime
:
types
.
Now
()
.
Unix
()}
item2
=
&
drivers
.
Item
{
Value
:
tx2
,
Priority
:
tx2
.
Fee
,
EnterTime
:
types
.
Now
()
.
Unix
()}
item3
=
&
drivers
.
Item
{
Value
:
tx3
,
Priority
:
tx3
.
Fee
,
EnterTime
:
types
.
Now
()
.
Unix
()
-
1000
}
item4
=
&
drivers
.
Item
{
Value
:
tx4
,
Priority
:
tx4
.
Fee
,
EnterTime
:
types
.
Now
()
.
Unix
()
-
1000
}
item5
=
&
drivers
.
Item
{
Value
:
tx5
,
Priority
:
tx5
.
Fee
,
EnterTime
:
types
.
Now
()
.
Unix
()
-
1000
}
)
func
initEnv
(
size
int64
)
*
Queue
{
if
size
==
0
{
size
=
100
}
_
,
sub
:=
types
.
InitCfg
(
"chain33.test.toml"
)
var
subcfg
subConfig
types
.
MustDecode
(
sub
.
Mempool
[
"score"
],
&
subcfg
)
subcfg
.
PoolCacheSize
=
size
cache
:=
NewQueue
(
subcfg
)
return
cache
}
func
TestMemFull
(
t
*
testing
.
T
)
{
cache
:=
initEnv
(
1
)
hash
:=
string
(
tx1
.
Hash
())
err
:=
cache
.
Push
(
item1
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
true
,
cache
.
Exist
(
hash
))
it
,
err
:=
cache
.
GetItem
(
hash
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
item1
,
it
)
_
,
err
=
cache
.
GetItem
(
hash
+
":"
)
assert
.
Equal
(
t
,
types
.
ErrNotFound
,
err
)
err
=
cache
.
Push
(
item1
)
assert
.
Equal
(
t
,
types
.
ErrTxExist
,
err
)
err
=
cache
.
Push
(
item2
)
assert
.
Equal
(
t
,
types
.
ErrMemFull
,
err
)
cache
.
Remove
(
hash
)
assert
.
Equal
(
t
,
0
,
cache
.
Size
())
}
func
TestWalk
(
t
*
testing
.
T
)
{
//push to item
cache
:=
initEnv
(
2
)
cache
.
Push
(
item1
)
cache
.
Push
(
item2
)
assert
.
Equal
(
t
,
2
,
cache
.
Size
())
var
data
[
2
]
*
drivers
.
Item
i
:=
0
cache
.
Walk
(
1
,
func
(
value
*
drivers
.
Item
)
bool
{
data
[
i
]
=
value
i
++
return
true
})
assert
.
Equal
(
t
,
1
,
i
)
assert
.
Equal
(
t
,
data
[
0
],
item1
)
i
=
0
cache
.
Walk
(
2
,
func
(
value
*
drivers
.
Item
)
bool
{
data
[
i
]
=
value
i
++
return
true
})
assert
.
Equal
(
t
,
2
,
i
)
assert
.
Equal
(
t
,
data
[
0
],
item1
)
assert
.
Equal
(
t
,
data
[
1
],
item2
)
i
=
0
cache
.
Walk
(
2
,
func
(
value
*
drivers
.
Item
)
bool
{
data
[
i
]
=
value
i
++
return
false
})
assert
.
Equal
(
t
,
1
,
i
)
}
func
TestTimeCompetition
(
t
*
testing
.
T
)
{
cache
:=
initEnv
(
1
)
cache
.
Push
(
item1
)
cache
.
Push
(
item3
)
assert
.
Equal
(
t
,
false
,
cache
.
Exist
(
string
(
item1
.
Value
.
Hash
())))
assert
.
Equal
(
t
,
true
,
cache
.
Exist
(
string
(
item3
.
Value
.
Hash
())))
}
func
TestPriceCompetition
(
t
*
testing
.
T
)
{
cache
:=
initEnv
(
1
)
cache
.
Push
(
item1
)
cache
.
Push
(
item4
)
assert
.
Equal
(
t
,
false
,
cache
.
Exist
(
string
(
item1
.
Value
.
Hash
())))
assert
.
Equal
(
t
,
true
,
cache
.
Exist
(
string
(
item4
.
Value
.
Hash
())))
}
func
TestAddDuplicateItem
(
t
*
testing
.
T
)
{
cache
:=
initEnv
(
1
)
cache
.
Push
(
item1
)
err
:=
cache
.
Push
(
item1
)
assert
.
Equal
(
t
,
types
.
ErrTxExist
,
err
)
}
func
TestQueueDirection
(
t
*
testing
.
T
)
{
cache
:=
initEnv
(
0
)
cache
.
Push
(
item1
)
cache
.
Push
(
item2
)
cache
.
Push
(
item3
)
cache
.
Push
(
item4
)
cache
.
Push
(
item5
)
cache
.
txList
.
Print
()
i
:=
0
lastScore
:=
cache
.
txList
.
GetIterator
()
.
First
()
.
Score
var
tmpScore
int64
cache
.
Walk
(
5
,
func
(
value
*
drivers
.
Item
)
bool
{
tmpScore
=
cache
.
txMap
[
string
(
value
.
Value
.
Hash
())]
.
Score
if
lastScore
<
tmpScore
{
return
false
}
lastScore
=
tmpScore
i
++
return
true
})
assert
.
Equal
(
t
,
5
,
i
)
assert
.
Equal
(
t
,
true
,
lastScore
==
cache
.
txList
.
GetIterator
()
.
Last
()
.
Score
)
}
plugin/mempool/score/chain33.test.toml
0 → 100644
View file @
3b2ed068
Title
=
"chain33"
TestNet
=
true
[log]
# 日志级别,支持debug(dbug)/info/warn/error(eror)/crit
loglevel
=
"debug"
logConsoleLevel
=
"info"
# 日志文件名,可带目录,所有生成的日志文件都放到此目录下
logFile
=
"logs/chain33.log"
# 单个日志文件的最大值(单位:兆)
maxFileSize
=
20
# 最多保存的历史日志文件个数
maxBackups
=
20
# 最多保存的历史日志消息(单位:天)
maxAge
=
28
# 日志文件名是否使用本地事件(否则使用UTC时间)
localTime
=
true
# 历史日志文件是否压缩(压缩格式为gz)
compress
=
false
# 是否打印调用源文件和行号
callerFile
=
true
# 是否打印调用方法
callerFunction
=
true
[blockchain]
defCacheSize
=
128
maxFetchBlockNum
=
128
timeoutSeconds
=
5
batchBlockNum
=
128
driver
=
"memdb"
dbPath
=
"datadir"
dbCache
=
64
isStrongConsistency
=
true
singleMode
=
true
batchsync
=
false
isRecordBlockSequence
=
true
isParaChain
=
false
enableTxQuickIndex
=
false
[p2p]
port
=
13802
seeds
=
["47.104.125.151:13802","47.104.125.97:13802","47.104.125.177:13802"]
enable
=
true
isSeed
=
true
serverStart
=
true
msgCacheSize
=
10240
driver
=
"memdb"
dbPath
=
"datadir/addrbook"
dbCache
=
4
grpcLogFile
=
"grpc33.log"
version
=
216
verMix
=
216
verMax
=
217
[rpc]
jrpcBindAddr
=
"localhost:8801"
grpcBindAddr
=
"localhost:8802"
whitelist
=
["127.0.0.1"]
jrpcFuncWhitelist
=
["*"]
grpcFuncWhitelist
=
["*"]
enableTLS
=
false
certFile
=
"cert.pem"
keyFile
=
"key.pem"
[mempool]
name
=
"score"
poolCacheSize
=
10240
minTxFee
=
100000
maxTxNumPerAccount
=
10000
[mempool.sub.timeline]
poolCacheSize
=
10240
minTxFee
=
100000
maxTxNumPerAccount
=
10000
[mempool.sub.score]
poolCacheSize
=
10240
minTxFee
=
100000
maxTxNumPerAccount
=
10000
timeParam
=
1
#时间占价格比例
priceConstant
=
1544
#手续费相对于时间的一个合适的常量,取当前unxi时间戳前四位数,排队时手续费高1e-5的分数~=快1s的分数
pricePower
=
1
#常量比例
[consensus]
name
=
"solo"
minerstart
=
true
genesisBlockTime
=
1514533394
genesis
=
"14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
[mver.consensus]
fundKeyAddr
=
"1BQXS6TxaYYG5mADaWij4AxhZZUTpw95a5"
coinReward
=
18
coinDevFund
=
12
ticketPrice
=
10000
powLimitBits
=
"0x1f00ffff"
retargetAdjustmentFactor
=
4
futureBlockTime
=
16
ticketFrozenTime
=
5
#5s only for test
ticketWithdrawTime
=
10
#10s only for test
ticketMinerWaitTime
=
2
#2s only for test
maxTxNumber
=
1600
#160
targetTimespan
=
2304
targetTimePerBlock
=
16
[mver.consensus.ForkChainParamV1]
maxTxNumber
=
10000
targetTimespan
=
288
#only for test
targetTimePerBlock
=
2
[mver.consensus.ForkChainParamV2]
powLimitBits
=
"0x1f2fffff"
[consensus.sub.solo]
genesis
=
"14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
genesisBlockTime
=
1514533394
hotkeyAddr
=
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv"
waitTxMs
=
10
[consensus.sub.ticket]
genesisBlockTime
=
1514533394
[[consensus.sub.ticket.genesis]]
minerAddr
=
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv"
returnAddr
=
"14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
count
=
10000
[[consensus.sub.ticket.genesis]]
minerAddr
=
"1PUiGcbsccfxW3zuvHXZBJfznziph5miAo"
returnAddr
=
"1EbDHAXpoiewjPLX9uqoz38HsKqMXayZrF"
count
=
10000
[[consensus.sub.ticket.genesis]]
minerAddr
=
"1EDnnePAZN48aC2hiTDzhkczfF39g1pZZX"
returnAddr
=
"1KcCVZLSQYRUwE5EXTsAoQs9LuJW6xwfQa"
count
=
10000
[store]
name
=
"mavl"
driver
=
"memdb"
dbPath
=
"datadir/mavltree"
dbCache
=
128
[store.sub.mavl]
enableMavlPrefix
=
false
enableMVCC
=
false
enableMavlPrune
=
false
pruneHeight
=
10000
[wallet]
minFee
=
1000000
driver
=
"memdb"
dbPath
=
"datadir/wallet"
dbCache
=
16
signType
=
"secp256k1"
[wallet.sub.ticket]
minerwhitelist
=
["*"]
[exec]
isFree
=
false
minExecFee
=
100000
enableStat
=
false
enableMVCC
=
false
[exec.sub.token]
saveTokenTxList
=
true
tokenApprs
=
[
"1Bsg9j6gW83sShoee1fZAt9TkUjcrCgA9S"
,
"1Q8hGLfoGe63efeWa8fJ4Pnukhkngt6poK"
,
"1LY8GFia5EiyoTodMLfkB5PHNNpXRqxhyB"
,
"1GCzJDS6HbgTQ2emade7mEJGGWFfA15pS9"
,
"1JYB8sxi4He5pZWHCd3Zi2nypQ4JMB6AxN"
,
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv"
,
]
[exec.sub.relay]
genesis
=
"14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
[exec.sub.cert]
# 是否启用证书验证和签名
enable
=
false
# 加密文件路径
cryptoPath
=
"authdir/crypto"
# 带证书签名类型,支持"auth_ecdsa", "auth_sm2"
signType
=
"auth_ecdsa"
[exec.sub.manage]
superManager
=[
"1Bsg9j6gW83sShoee1fZAt9TkUjcrCgA9S"
,
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv"
,
"1Q8hGLfoGe63efeWa8fJ4Pnukhkngt6poK"
]
\ No newline at end of file
plugin/mempool/score/mempool.go
0 → 100644
View file @
3b2ed068
package
score
import
(
"github.com/33cn/chain33/queue"
drivers
"github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
//--------------------------------------------------------------------------------
// Module Mempool
type
subConfig
struct
{
PoolCacheSize
int64
`json:"poolCacheSize"`
MinTxFee
int64
`json:"minTxFee"`
MaxTxNumPerAccount
int64
`json:"maxTxNumPerAccount"`
TimeParam
int64
`json:"timeParam"`
PriceConstant
int64
`json:"priceConstant"`
PricePower
int64
`json:"pricePower"`
}
func
init
()
{
drivers
.
Reg
(
"score"
,
New
)
}
//New 创建score cache 结构的 mempool
func
New
(
cfg
*
types
.
Mempool
,
sub
[]
byte
)
queue
.
Module
{
c
:=
drivers
.
NewMempool
(
cfg
)
var
subcfg
subConfig
types
.
MustDecode
(
sub
,
&
subcfg
)
if
subcfg
.
PoolCacheSize
==
0
{
subcfg
.
PoolCacheSize
=
cfg
.
PoolCacheSize
}
c
.
SetQueueCache
(
NewQueue
(
subcfg
))
return
c
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment