Commit 008a0d7f authored by vipwzw's avatar vipwzw

skiplist 中抽象了一个通用的queue

parent fd1b29b1
...@@ -3,7 +3,6 @@ package price ...@@ -3,7 +3,6 @@ package price
import ( import (
"github.com/33cn/chain33/common/skiplist" "github.com/33cn/chain33/common/skiplist"
"github.com/33cn/chain33/system/mempool" "github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
) )
...@@ -22,10 +21,26 @@ func (item *priceScore) GetScore() int64 { ...@@ -22,10 +21,26 @@ func (item *priceScore) GetScore() int64 {
return item.Value.Fee / int64(txSize) return item.Value.Fee / int64(txSize)
} }
func (item *priceScore) Hash() []byte {
return item.Value.Hash()
}
func (item *priceScore) Compare(cmp skiplist.Scorer) int {
it := cmp.(*priceScore)
//时间越小,权重越高
if item.EnterTime < it.EnterTime {
return skiplist.Big
}
if item.EnterTime == it.EnterTime {
return skiplist.Equal
}
return skiplist.Small
}
// NewQueue 创建队列 // NewQueue 创建队列
func NewQueue(subcfg subConfig) *Queue { func NewQueue(subcfg subConfig) *Queue {
return &Queue{ return &Queue{
Queue: skiplist.NewQueue(), Queue: skiplist.NewQueue(subcfg.PoolCacheSize),
subConfig: subcfg, subConfig: subcfg,
} }
} }
...@@ -39,6 +54,11 @@ func (cache *Queue) GetItem(hash string) (*mempool.Item, error) { ...@@ -39,6 +54,11 @@ func (cache *Queue) GetItem(hash string) (*mempool.Item, error) {
return item.(*priceScore).Item, nil return item.(*priceScore).Item, nil
} }
//Push 加入数据到队列
func (cache *Queue) Push(item *mempool.Item) error {
return cache.Queue.Push(&priceScore{Item: item})
}
//Walk 获取数据通过 key //Walk 获取数据通过 key
func (cache *Queue) Walk(count int, cb func(tx *mempool.Item) bool) { func (cache *Queue) Walk(count int, cb func(tx *mempool.Item) bool) {
cache.Queue.Walk(count, func(item skiplist.Scorer) bool { cache.Queue.Walk(count, func(item skiplist.Scorer) bool {
...@@ -46,36 +66,6 @@ func (cache *Queue) Walk(count int, cb func(tx *mempool.Item) bool) { ...@@ -46,36 +66,6 @@ func (cache *Queue) Walk(count int, cb func(tx *mempool.Item) bool) {
}) })
} }
// Push 把给定tx添加到Queue;如果tx已经存在Queue中或Mempool已满则返回对应error
func (cache *Queue) Push(item *mempool.Item) error {
hash := item.Value.Hash()
if cache.Exist(string(hash)) {
return types.ErrTxExist
}
sv := cache.CreateSkipValue(&priceScore{Item: item})
if int64(cache.Size()) >= cache.subConfig.PoolCacheSize {
tail := cache.Last().(*priceScore)
lasthash := string(tail.Value.Hash())
//价格高存留
switch sv.Compare(cache.CreateSkipValue(tail)) {
case -1:
cache.Queue.Remove(lasthash)
case 0:
if item.EnterTime < tail.EnterTime {
cache.Queue.Remove(lasthash)
break
}
return types.ErrMemFull
case 1:
return types.ErrMemFull
default:
return types.ErrMemFull
}
}
cache.Queue.Push(string(hash), &priceScore{Item: item})
return nil
}
// GetProperFee 获取合适的手续费率,取前100的平均手续费率 // GetProperFee 获取合适的手续费率,取前100的平均手续费率
func (cache *Queue) GetProperFee() int64 { func (cache *Queue) GetProperFee() int64 {
var sumFeeRate int64 var sumFeeRate int64
......
package skiplist
import (
"container/list"
"github.com/33cn/chain33/types"
)
//Scorer 接口实现 Value的 Score 功能
type Scorer interface {
GetScore() int64
Hash() []byte
Compare(Scorer) int
}
// Queue 价格队列模式(价格=手续费/交易字节数,价格高者优先,同价则时间早优先)
type Queue struct {
txMap map[string]*list.Element
txList *SkipList
maxsize int64
}
// NewQueue 创建队列
func NewQueue(maxsize int64) *Queue {
return &Queue{
txMap: make(map[string]*list.Element),
txList: NewSkipList(&SkipValue{Score: -1, Value: nil}),
maxsize: maxsize,
}
}
/*
为了处理相同 Score 的问题,需要一个队列保存相同 Score 下面的交易
*/
func (cache *Queue) insertSkipValue(item Scorer) *list.Element {
skvalue := cache.CreateSkipValue(item)
value := cache.txList.Find(skvalue)
var txlist *list.List
if value == nil {
txlist = list.New()
skvalue.Value = txlist
cache.txList.Insert(skvalue)
} else {
txlist = value.Value.(*list.List)
}
return txlist.PushBack(item)
}
func (cache *Queue) deleteSkipValue(item *list.Element) error {
if item == nil {
return nil
}
skvalue := cache.CreateSkipValue(item.Value.(Scorer))
value := cache.txList.Find(skvalue)
var txlist *list.List
if value == nil {
return types.ErrNotFound
}
txlist = value.Value.(*list.List)
txlist.Remove(item)
if txlist.Len() == 0 {
cache.txList.Delete(value)
}
return nil
}
//CreateSkipValue 创建一个 仅仅有 score 的Value
func (cache *Queue) CreateSkipValue(item Scorer) *SkipValue {
skvalue := &SkipValue{Score: item.GetScore()}
return skvalue
}
//Exist 是否存在
func (cache *Queue) Exist(hash string) bool {
_, exists := cache.txMap[hash]
return exists
}
//GetItem 获取数据通过 key
func (cache *Queue) GetItem(hash string) (Scorer, error) {
if k, exist := cache.txMap[hash]; exist {
return k.Value.(Scorer), nil
}
return nil, types.ErrNotFound
}
//Insert Scorer item to queue
func (cache *Queue) Insert(hash string, item Scorer) error {
cache.txMap[string(hash)] = cache.insertSkipValue(item)
return nil
}
// Push 把给定tx添加到Queue;如果tx已经存在Queue中或Mempool已满则返回对应error
func (cache *Queue) Push(item Scorer) error {
hash := item.Hash()
if cache.Exist(string(hash)) {
return types.ErrTxExist
}
sv := cache.CreateSkipValue(item)
if int64(cache.Size()) >= cache.maxsize {
tail := cache.Last()
lasthash := string(tail.Hash())
//价格高存留
switch sv.Compare(cache.CreateSkipValue(tail)) {
case Big:
cache.Remove(lasthash)
case Equal:
//再score 相同的情况下,item 之间的比较方法
//权重大的留下来
if item.Compare(tail) == Big {
cache.Remove(lasthash)
break
}
return types.ErrMemFull
case Small:
return types.ErrMemFull
default:
return types.ErrMemFull
}
}
cache.Insert(string(hash), item)
return nil
}
// Remove 删除数据
func (cache *Queue) Remove(hash string) error {
elm, ok := cache.txMap[hash]
if !ok {
return types.ErrNotFound
}
err := cache.deleteSkipValue(elm)
if err != nil {
return err
}
delete(cache.txMap, hash)
return nil
}
// Size 数据总数
func (cache *Queue) Size() int {
return len(cache.txMap)
}
//Last 取出最后一个交易
func (cache *Queue) Last() Scorer {
if cache.Size() == 0 {
return nil
}
tailqueue := cache.txList.GetIterator().Last()
tail := tailqueue.Value.(*list.List).Back().Value.(Scorer)
return tail
}
//First 取出第一个交易
func (cache *Queue) First() Scorer {
if cache.Size() == 0 {
return nil
}
tailqueue := cache.txList.GetIterator().First()
tail := tailqueue.Value.(*list.List).Front().Value.(Scorer)
return tail
}
// Walk 遍历整个队列
func (cache *Queue) Walk(count int, cb func(value Scorer) bool) {
i := 0
cache.txList.Walk(func(item interface{}) bool {
l := item.(*list.List)
for e := l.Front(); e != nil; e = e.Next() {
if !cb(e.Value.(Scorer)) {
return false
}
i++
if i == count {
return false
}
}
return true
})
}
...@@ -14,14 +14,21 @@ type SkipValue struct { ...@@ -14,14 +14,21 @@ type SkipValue struct {
Value interface{} Value interface{}
} }
//Compare Const
const (
Big = -1
Small = 1
Equal = 0
)
// Compare 比较函数,这样的比较排序是从大到小 // Compare 比较函数,这样的比较排序是从大到小
func (v *SkipValue) Compare(value *SkipValue) int { func (v *SkipValue) Compare(value *SkipValue) int {
if v.Score > value.Score { if v.Score > value.Score {
return -1 return Big
} else if v.Score == value.Score { } else if v.Score == value.Score {
return 0 return Equal
} }
return 1 return Small
} }
// skipListNode 跳跃表节点 // skipListNode 跳跃表节点
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment