Commit fd1b29b1 authored by vipwzw's avatar vipwzw

fix mempool full panic

parent 4a005994
package price package price
import ( import (
"container/list"
"fmt"
"github.com/33cn/chain33/common/skiplist" "github.com/33cn/chain33/common/skiplist"
"github.com/33cn/chain33/system/mempool" "github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
...@@ -12,56 +9,41 @@ import ( ...@@ -12,56 +9,41 @@ import (
// Queue 价格队列模式(价格=手续费/交易字节数,价格高者优先,同价则时间早优先) // Queue 价格队列模式(价格=手续费/交易字节数,价格高者优先,同价则时间早优先)
type Queue struct { type Queue struct {
txMap map[string]*list.Element *skiplist.Queue
txList *skiplist.SkipList
subConfig subConfig subConfig subConfig
} }
// NewQueue 创建队列 type priceScore struct {
func NewQueue(subcfg subConfig) *Queue { *mempool.Item
return &Queue{
make(map[string]*list.Element),
skiplist.NewSkipList(&skiplist.SkipValue{Score: -1, Value: nil}),
subcfg,
}
}
/*
为了处理相同 Score 的问题,需要一个队列保存相同 Score 下面的交易
*/
func (cache *Queue) insertSkipValue(item *mempool.Item) *list.Element {
txSize := proto.Size(item.Value)
skvalue := &skiplist.SkipValue{Score: item.Value.Fee / int64(txSize)}
value := cache.txList.Find(skvalue)
var orderlist *list.List
if value == nil { //new OrderList
orderlist = list.New()
skvalue.Value = orderlist
cache.txList.Insert(skvalue)
} else {
orderlist = value.Value.(*list.List)
}
return orderlist.PushBack(item)
} }
func (cache *Queue) newSkipValue(item *mempool.Item) *skiplist.SkipValue { func (item *priceScore) GetScore() int64 {
txSize := proto.Size(item.Value) txSize := proto.Size(item.Value)
skvalue := &skiplist.SkipValue{Score: item.Value.Fee / int64(txSize)} return item.Value.Fee / int64(txSize)
return skvalue
} }
//Exist 是否存在 // NewQueue 创建队列
func (cache *Queue) Exist(hash string) bool { func NewQueue(subcfg subConfig) *Queue {
_, exists := cache.txMap[hash] return &Queue{
return exists Queue: skiplist.NewQueue(),
subConfig: subcfg,
}
} }
//GetItem 获取数据通过 key //GetItem 获取数据通过 key
func (cache *Queue) GetItem(hash string) (*mempool.Item, error) { func (cache *Queue) GetItem(hash string) (*mempool.Item, error) {
if k, exist := cache.txMap[hash]; exist { item, err := cache.Queue.GetItem(hash)
return k.Value.(*mempool.Item), nil if err != nil {
return nil, err
} }
return nil, types.ErrNotFound return item.(*priceScore).Item, nil
}
//Walk 获取数据通过 key
func (cache *Queue) Walk(count int, cb func(tx *mempool.Item) bool) {
cache.Queue.Walk(count, func(item skiplist.Scorer) bool {
return cb(item.(*priceScore).Item)
})
} }
// Push 把给定tx添加到Queue;如果tx已经存在Queue中或Mempool已满则返回对应error // Push 把给定tx添加到Queue;如果tx已经存在Queue中或Mempool已满则返回对应error
...@@ -70,21 +52,17 @@ func (cache *Queue) Push(item *mempool.Item) error { ...@@ -70,21 +52,17 @@ func (cache *Queue) Push(item *mempool.Item) error {
if cache.Exist(string(hash)) { if cache.Exist(string(hash)) {
return types.ErrTxExist return types.ErrTxExist
} }
it := &mempool.Item{Value: item.Value, Priority: item.Value.Fee, EnterTime: item.EnterTime} sv := cache.CreateSkipValue(&priceScore{Item: item})
sv := cache.newSkipValue(it) if int64(cache.Size()) >= cache.subConfig.PoolCacheSize {
if int64(cache.txList.Len()) >= cache.subConfig.PoolCacheSize { tail := cache.Last().(*priceScore)
tail := cache.txList.GetIterator().Last() lasthash := string(tail.Value.Hash())
lasthash := string(tail.Value.(*mempool.Item).Value.Hash())
printhash("remove tail", []byte(lasthash))
printhash("push hash", hash)
fmt.Println("compare", sv.Compare(tail))
//价格高存留 //价格高存留
switch sv.Compare(tail) { switch sv.Compare(cache.CreateSkipValue(tail)) {
case -1: case -1:
cache.Remove(lasthash) cache.Queue.Remove(lasthash)
case 0: case 0:
if sv.Value.(*mempool.Item).EnterTime < tail.Value.(*mempool.Item).EnterTime { if item.EnterTime < tail.EnterTime {
cache.Remove(lasthash) cache.Queue.Remove(lasthash)
break break
} }
return types.ErrMemFull return types.ErrMemFull
...@@ -94,42 +72,10 @@ func (cache *Queue) Push(item *mempool.Item) error { ...@@ -94,42 +72,10 @@ func (cache *Queue) Push(item *mempool.Item) error {
return types.ErrMemFull return types.ErrMemFull
} }
} }
cache.add(string(hash), sv) cache.Queue.Push(string(hash), &priceScore{Item: item})
return nil
}
func (cache *Queue) add(hash string, item *list.Element) error {
cache.txMap[string(hash)] = cache.insertSkipValue(item)
return nil
}
// Remove 删除数据
func (cache *Queue) Remove(hash string) error {
retcode := cache.txList.Delete()
if retcode == 0 { //not found
printhash("remove error", []byte(hash))
}
delete(cache.txMap, hash)
return nil return nil
} }
// Size 数据总数
func (cache *Queue) Size() int {
return cache.txList.Len()
}
// Walk 遍历整个队列
func (cache *Queue) Walk(count int, cb func(value *mempool.Item) bool) {
i := 0
cache.txList.Walk(func(item interface{}) bool {
if !cb(item.(*mempool.Item)) {
return false
}
i++
return i != count
})
}
// GetProperFee 获取合适的手续费率,取前100的平均手续费率 // GetProperFee 获取合适的手续费率,取前100的平均手续费率
func (cache *Queue) GetProperFee() int64 { func (cache *Queue) GetProperFee() int64 {
var sumFeeRate int64 var sumFeeRate int64
...@@ -140,12 +86,9 @@ func (cache *Queue) GetProperFee() int64 { ...@@ -140,12 +86,9 @@ func (cache *Queue) GetProperFee() int64 {
i := 0 i := 0
var txSize int var txSize int
var feeRate int64 var feeRate int64
cache.txList.Walk(func(tx interface{}) bool { cache.Walk(100, func(item *mempool.Item) bool {
if i == 100 { txSize = proto.Size(item.Value)
return false feeRate = item.Value.Fee / int64(txSize/1000+1)
}
txSize = proto.Size(tx.(*mempool.Item).Value)
feeRate = tx.(*mempool.Item).Value.Fee / int64(txSize/1000+1)
sumFeeRate += feeRate sumFeeRate += feeRate
i++ i++
return true return true
...@@ -153,7 +96,3 @@ func (cache *Queue) GetProperFee() int64 { ...@@ -153,7 +96,3 @@ func (cache *Queue) GetProperFee() int64 {
properFeeRate = sumFeeRate / int64(i) properFeeRate = sumFeeRate / int64(i)
return properFeeRate return properFeeRate
} }
func printhash(title string, hash []byte) {
fmt.Printf(title+" %x \n", hash)
}
...@@ -139,12 +139,11 @@ func TestQueueDirection(t *testing.T) { ...@@ -139,12 +139,11 @@ func TestQueueDirection(t *testing.T) {
cache.Push(item3) cache.Push(item3)
cache.Push(item4) cache.Push(item4)
cache.Push(item5) cache.Push(item5)
cache.txList.Print()
i := 0 i := 0
lastScore := cache.txList.GetIterator().First().Score lastScore := cache.First().GetScore()
var tmpScore int64 var tmpScore int64
cache.Walk(5, func(value *drivers.Item) bool { cache.Walk(5, func(value *drivers.Item) bool {
tmpScore = cache.txMap[string(value.Value.Hash())].Score tmpScore = cache.CreateSkipValue(&priceScore{Item: value}).Score
if lastScore < tmpScore { if lastScore < tmpScore {
return false return false
} }
...@@ -153,7 +152,7 @@ func TestQueueDirection(t *testing.T) { ...@@ -153,7 +152,7 @@ func TestQueueDirection(t *testing.T) {
return true return true
}) })
assert.Equal(t, 5, i) assert.Equal(t, 5, i)
assert.Equal(t, true, lastScore == cache.txList.GetIterator().Last().Score) assert.Equal(t, true, lastScore == cache.Last().GetScore())
} }
func TestGetProperFee(t *testing.T) { func TestGetProperFee(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment