Commit 6c8cc82f authored by 陈德海's avatar 陈德海 Committed by vipwzw

fix score

parent 1c612528
...@@ -5,136 +5,96 @@ import ( ...@@ -5,136 +5,96 @@ import (
"github.com/33cn/chain33/common/skiplist" "github.com/33cn/chain33/common/skiplist"
"github.com/33cn/chain33/system/mempool" "github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types" "github.com/golang/protobuf/proto"
) )
var mempoolDupResendInterval int64 = 600 // mempool内交易过期时间,10分钟
// Queue 分数队列模式(分数=定量a*常量b*手续费/交易字节数-常量c*时间,按分数排队,高的优先,定量a和常量b,c可配置) // Queue 分数队列模式(分数=定量a*常量b*手续费/交易字节数-常量c*时间,按分数排队,高的优先,定量a和常量b,c可配置)
type Queue struct { type Queue struct {
txMap map[string]*skiplist.SkipValue *skiplist.Queue
txList *skiplist.SkipList
subConfig subConfig subConfig subConfig
} }
// NewQueue 创建队列 type scoreScore struct {
func NewQueue(subcfg subConfig) *Queue { *mempool.Item
return &Queue{ subConfig subConfig
txMap: make(map[string]*skiplist.SkipValue, subcfg.PoolCacheSize),
txList: skiplist.NewSkipList(&skiplist.SkipValue{Score: -1, Value: nil}),
subConfig: subcfg,
}
} }
func (cache *Queue) newSkipValue(item *mempool.Item) (*skiplist.SkipValue, error) { func (item *scoreScore) GetScore() int64 {
buf := types.Encode(item.Value) size := proto.Size(item.Value)
size := len(buf) score := item.subConfig.PriceConstant*(item.Value.Fee/int64(size))*
return &skiplist.SkipValue{Score: cache.subConfig.PriceConstant*(item.Value.Fee/int64(size))* item.subConfig.PricePower - item.subConfig.TimeParam*item.EnterTime
cache.subConfig.PricePower - cache.subConfig.TimeParam*item.EnterTime, Value: item}, nil return score
} }
// Exist 是否存在 func (item *scoreScore) Hash() []byte {
func (cache *Queue) Exist(hash string) bool { return item.Value.Hash()
_, exists := cache.txMap[hash]
return exists
} }
//GetItem 获取数据通过 key func (item *scoreScore) Compare(cmp skiplist.Scorer) int {
func (cache *Queue) GetItem(hash string) (*mempool.Item, error) { it := cmp.(*scoreScore)
if k, exist := cache.txMap[hash]; exist { //时间越小,权重越高
return k.Value.(*mempool.Item), nil if item.EnterTime < it.EnterTime {
return skiplist.Big
} }
return nil, types.ErrNotFound if item.EnterTime == it.EnterTime {
return skiplist.Equal
}
return skiplist.Small
} }
// Push 把给定tx添加到Queue;如果tx已经存在Queue中或Mempool已满则返回对应error // NewQueue 创建队列
func (cache *Queue) Push(item *mempool.Item) error { func NewQueue(subcfg subConfig) *Queue {
hash := item.Value.Hash() return &Queue{
if cache.Exist(string(hash)) { Queue: skiplist.NewQueue(subcfg.PoolCacheSize),
s := cache.txMap[string(hash)] subConfig: subcfg,
addedItem := s.Value.(*mempool.Item)
addedTime := addedItem.EnterTime
if types.Now().Unix()-addedTime < mempoolDupResendInterval {
return types.ErrTxExist
}
// 超过2分钟之后的重发交易返回nil,再次发送给P2P,但是不再次加入mempool
// 并修改其enterTime,以避免该交易一直在节点间被重发
newEnterTime := types.Now().Unix()
resendItem := &mempool.Item{Value: item.Value, Priority: item.Value.Fee, EnterTime: newEnterTime}
var err error
sv, err := cache.newSkipValue(resendItem)
if err != nil {
return err
}
cache.Remove(string(hash))
cache.txList.Insert(sv)
cache.txMap[string(hash)] = sv
// ------------------
return nil
} }
}
//func (cache *Queue) newSkipValue(item *mempool.Item) (*skiplist.SkipValue, error) {
// size := proto.Size(item.Value)
// return &skiplist.SkipValue{Score: cache.subConfig.PriceConstant*(item.Value.Fee/int64(size))*
// cache.subConfig.PricePower - cache.subConfig.TimeParam*item.EnterTime, Value: item}, nil
//}
it := &mempool.Item{Value: item.Value, Priority: item.Value.Fee, EnterTime: item.EnterTime} //GetItem 获取数据通过 key
sv, err := cache.newSkipValue(it) func (cache *Queue) GetItem(hash string) (*mempool.Item, error) {
item, err := cache.Queue.GetItem(hash)
if err != nil { if err != nil {
return err return nil, err
}
if int64(cache.txList.Len()) >= cache.subConfig.PoolCacheSize {
tail := cache.txList.GetIterator().Last()
//分数高存留
if sv.Compare(tail) == -1 {
cache.Remove(string(tail.Value.(*mempool.Item).Value.Hash()))
} else {
return types.ErrMemFull
}
} }
cache.txList.Insert(sv) return item.(*scoreScore).Item, nil
cache.txMap[string(hash)] = sv
return nil
} }
// Remove 删除数据 // Push 把给定tx添加到Queue;如果tx已经存在Queue中或Mempool已满则返回对应error
func (cache *Queue) Remove(hash string) error { func (cache *Queue) Push(item *mempool.Item) error {
cache.txList.Delete(cache.txMap[hash]) return cache.Queue.Push(&scoreScore{Item: item, subConfig: cache.subConfig})
delete(cache.txMap, hash)
return nil
}
// Size 数据总数
func (cache *Queue) Size() int {
return cache.txList.Len()
} }
// Walk 遍历整个队列 // Walk 遍历整个队列
func (cache *Queue) Walk(count int, cb func(value *mempool.Item) bool) { func (cache *Queue) Walk(count int, cb func(value *mempool.Item) bool) {
i := 0 cache.Queue.Walk(count, func(item skiplist.Scorer) bool {
cache.txList.Walk(func(item interface{}) bool { return cb(item.(*scoreScore).Item)
if !cb(item.(*mempool.Item)) {
return false
}
i++
return i != count
}) })
} }
// GetProperFee 获取合适的手续费 // GetProperFee 获取合适的手续费
func (cache *Queue) GetProperFee() int64 { func (cache *Queue) GetProperFee() int64 {
var sumScore int64 var sumScore int64
var properFee int64 var properFeerate int64
if cache.Size() == 0 { if cache.Size() == 0 {
return cache.subConfig.ProperFee return cache.subConfig.ProperFee
} }
i := 0 i := 0
cache.txList.WalkS(func(node interface{}) bool { cache.Queue.Walk(0, func(score skiplist.Scorer) bool {
if i == 100 { if i == 100 {
return false return false
} }
sumScore += node.(*skiplist.SkipValue).Score sumScore += score.GetScore()
i++ i++
return true return true
}) })
//这里的int64(250)是一般交易的大小 //这里的int64(100)是一般交易的大小
properFee = (sumScore/int64(i) + cache.subConfig.TimeParam*time.Now().Unix()) * int64(250) / properFeerate = (sumScore/int64(i) + cache.subConfig.TimeParam*time.Now().Unix()) * int64(100) /
(cache.subConfig.PriceConstant * cache.subConfig.PricePower) (cache.subConfig.PriceConstant * cache.subConfig.PricePower)
return properFee return properFeerate
} }
package score package score
import ( import (
"log"
"testing" "testing"
"time" "time"
...@@ -10,7 +11,12 @@ import ( ...@@ -10,7 +11,12 @@ import (
cty "github.com/33cn/chain33/system/dapp/coins/types" cty "github.com/33cn/chain33/system/dapp/coins/types"
drivers "github.com/33cn/chain33/system/mempool" drivers "github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
"github.com/33cn/chain33/util/testnode"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
_ "github.com/33cn/chain33/system"
) )
var ( var (
...@@ -22,11 +28,11 @@ var ( ...@@ -22,11 +28,11 @@ var (
amount = int64(1e8) amount = int64(1e8)
v = &cty.CoinsAction_Transfer{Transfer: &types.AssetsTransfer{Amount: amount}} v = &cty.CoinsAction_Transfer{Transfer: &types.AssetsTransfer{Amount: amount}}
transfer = &cty.CoinsAction{Value: v, Ty: cty.CoinsActionTransfer} transfer = &cty.CoinsAction{Value: v, Ty: cty.CoinsActionTransfer}
tx1 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 100000, Expire: 1, To: toAddr} tx1 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 1, To: toAddr}
tx2 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 100000, Expire: 2, To: toAddr} tx2 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 2, To: toAddr}
tx3 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 100000, Expire: 3, To: toAddr} tx3 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 3, To: toAddr}
tx4 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 200000, Expire: 4, To: toAddr} tx4 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 2000000, Expire: 4, To: toAddr}
tx5 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 100000, Expire: 5, To: toAddr} tx5 = &types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 1000000, Expire: 5, To: toAddr}
item1 = &drivers.Item{Value: tx1, Priority: tx1.Fee, EnterTime: types.Now().Unix()} item1 = &drivers.Item{Value: tx1, Priority: tx1.Fee, EnterTime: types.Now().Unix()}
item2 = &drivers.Item{Value: tx2, Priority: tx2.Fee, EnterTime: types.Now().Unix()} item2 = &drivers.Item{Value: tx2, Priority: tx2.Fee, EnterTime: types.Now().Unix()}
item3 = &drivers.Item{Value: tx3, Priority: tx3.Fee, EnterTime: types.Now().Unix() - 1000} item3 = &drivers.Item{Value: tx3, Priority: tx3.Fee, EnterTime: types.Now().Unix() - 1000}
...@@ -134,12 +140,11 @@ func TestQueueDirection(t *testing.T) { ...@@ -134,12 +140,11 @@ func TestQueueDirection(t *testing.T) {
cache.Push(item3) cache.Push(item3)
cache.Push(item4) cache.Push(item4)
cache.Push(item5) cache.Push(item5)
cache.txList.Print()
i := 0 i := 0
lastScore := cache.txList.GetIterator().First().Score lastScore := cache.First().GetScore()
var tmpScore int64 var tmpScore int64
cache.Walk(5, func(value *drivers.Item) bool { cache.Walk(5, func(value *drivers.Item) bool {
tmpScore = cache.txMap[string(value.Value.Hash())].Score tmpScore = cache.CreateSkipValue(&scoreScore{Item: value, subConfig: cache.subConfig}).Score
if lastScore < tmpScore { if lastScore < tmpScore {
return false return false
} }
...@@ -148,25 +153,91 @@ func TestQueueDirection(t *testing.T) { ...@@ -148,25 +153,91 @@ func TestQueueDirection(t *testing.T) {
return true return true
}) })
assert.Equal(t, 5, i) assert.Equal(t, 5, i)
assert.Equal(t, true, lastScore == cache.txList.GetIterator().Last().Score) assert.Equal(t, true, lastScore == cache.Last().GetScore())
}
func TestRealNodeMempool(t *testing.T) {
mock33 := testnode.New("chain33.test.toml", nil)
defer mock33.Close()
mock33.Listen()
mock33.WaitHeight(0)
mock33.SendHot()
mock33.WaitHeight(1)
n := 300
done := make(chan struct{}, n)
keys := make([]crypto.PrivKey, n)
for i := 0; i < n; i++ {
addr, priv := util.Genaddress()
tx := util.CreateCoinsTx(mock33.GetHotKey(), addr, 10*types.Coin)
mock33.SendTx(tx)
keys[i] = priv
}
mock33.Wait()
for i := 0; i < n; i++ {
go func(priv crypto.PrivKey) {
for i := 0; i < 100; i++ {
tx := util.CreateCoinsTx(priv, mock33.GetGenesisAddress(), types.Coin/1000)
reply, err := mock33.GetAPI().SendTx(tx)
if err != nil {
log.Println(err)
continue
}
//发送交易组
tx1 := util.CreateCoinsTx(priv, mock33.GetGenesisAddress(), types.Coin/1000)
tx2 := util.CreateCoinsTx(priv, mock33.GetGenesisAddress(), types.Coin/1000)
txgroup, err := types.CreateTxGroup([]*types.Transaction{tx1, tx2})
if err != nil {
log.Println(err)
continue
}
for i := 0; i < len(txgroup.GetTxs()); i++ {
err = txgroup.SignN(i, types.SECP256K1, priv)
if err != nil {
t.Error(err)
return
}
}
reply, err = mock33.GetAPI().SendTx(txgroup.Tx())
if err != nil {
log.Println(err)
continue
}
mock33.SetLastSend(reply.GetMsg())
}
done <- struct{}{}
}(keys[i])
}
for i := 0; i < n; i++ {
<-done
}
for {
txs, err := mock33.GetAPI().GetMempool()
assert.Nil(t, err)
println("len", len(txs.GetTxs()))
if len(txs.GetTxs()) > 0 {
mock33.Wait()
continue
}
break
}
peer, err := mock33.GetAPI().PeerInfo()
assert.Nil(t, err)
assert.Equal(t, len(peer.Peers), 1)
assert.Equal(t, peer.Peers[0].MempoolSize, int32(0))
} }
func TestGetProperFee(t *testing.T) { func TestGetProperFee(t *testing.T) {
cache := initEnv(0) cache := initEnv(0)
assert.Equal(t, cache.subConfig.ProperFee, cache.GetProperFee()) assert.Equal(t, cache.subConfig.ProperFee, cache.GetProperFee())
cache.Push(item3) cache.Push(item3)
cache.Push(item4) cache.Push(item4)
cache.GetProperFee() size3 := proto.Size(item3.Value)
buf3 := types.Encode(item3.Value) size4 := proto.Size(item3.Value)
size3 := len(buf3) score3 := cache.subConfig.PriceConstant*cache.subConfig.PricePower*(item3.Value.Fee/int64(size3)) -
buf4 := types.Encode(item4.Value)
size4 := len(buf4)
score3 := item3.Value.Fee*cache.subConfig.PriceConstant*cache.subConfig.PricePower/int64(size3) -
item3.EnterTime*cache.subConfig.TimeParam item3.EnterTime*cache.subConfig.TimeParam
score4 := item4.Value.Fee*cache.subConfig.PriceConstant*cache.subConfig.PricePower/int64(size4) - score4 := cache.subConfig.PriceConstant*cache.subConfig.PricePower*(item4.Value.Fee/int64(size4)) -
item4.EnterTime*cache.subConfig.TimeParam item4.EnterTime*cache.subConfig.TimeParam
properFee := ((score3+score4)/2 + time.Now().Unix()*cache.subConfig.TimeParam) * int64(250) / properFee := ((score3+score4)/2 + time.Now().Unix()*cache.subConfig.TimeParam) * int64(100) /
(cache.subConfig.PriceConstant * cache.subConfig.PricePower) (cache.subConfig.PriceConstant * cache.subConfig.PricePower)
assert.Equal(t, int64(1), properFee/cache.GetProperFee()) assert.Equal(t, int64(1), properFee/cache.GetProperFee())
} }
Title="chain33" Title="local"
TestNet=true TestNet=true
[log] [log]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment