Commit 853a4bad authored by kingwang's avatar kingwang Committed by 33cn

update 190710

parent 406bddef
......@@ -1334,3 +1334,28 @@ func (bs *BlockStore) CreateSequences(batchSize int64) {
}
storeLog.Info("CreateSequences done")
}
//SetConsensusPara 设置kv到数据库,当value是空时需要delete操作
func (bs *BlockStore) SetConsensusPara(kvs *types.LocalDBSet) error {
var isSync bool
if kvs.GetTxid() != 0 {
isSync = true
}
batch := bs.db.NewBatch(isSync)
for i := 0; i < len(kvs.KV); i++ {
if types.CheckConsensusParaTxsKey(kvs.KV[i].Key) {
if kvs.KV[i].Value == nil {
batch.Delete(kvs.KV[i].Key)
} else {
batch.Set(kvs.KV[i].Key, kvs.KV[i].Value)
}
} else {
storeLog.Error("Set:CheckConsensusParaTxsKey:fail", "key", string(kvs.KV[i].Key))
}
}
err := batch.Write()
if err != nil {
panic(err)
}
return err
}
......@@ -503,3 +503,13 @@ func (chain *BlockChain) ProcFutureBlocks() {
}
}
}
//SetValueByKey 设置kv对到blockchain数据库
func (chain *BlockChain) SetValueByKey(kvs *types.LocalDBSet) error {
return chain.blockStore.SetConsensusPara(kvs)
}
//GetValueByKey 通过key值从blockchain数据库中获取value值
func (chain *BlockChain) GetValueByKey(keys *types.LocalDBGet) *types.LocalReplyValue {
return chain.blockStore.Get(keys)
}
......@@ -1141,3 +1141,125 @@ func testProcMainSeqMsg(t *testing.T, blockchain *blockchain.BlockChain) {
chainlog.Info("testProcMainSeqMsg end --------------------")
}
//测试kv对的读写
func TestSetValueByKey(t *testing.T) {
mock33 := testnode.New("", nil)
defer func() {
defer mock33.Close()
}()
chainlog.Info("TestSetValueByKey begin --------------------")
blockchain := mock33.GetBlockChain()
//设置kv对到数据库,key的前缀错误
var kvs types.LocalDBSet
var kv types.KeyValue
//ConsensusParaTxsPrefix = []byte("LODB:Consensus:Para:")
key_1 := []byte("LODBP:Consensus:Parakey-1")
value_1 := []byte("value-1")
kv.Key = key_1
kv.Value = value_1
kvs.KV = append(kvs.KV, &kv)
err := blockchain.SetValueByKey(&kvs)
if err != nil {
t.Error("TestSetValueByKey:SetValueByKey1", "err", err)
}
//读取数据为空
var keys types.LocalDBGet
key := []byte("LODBP:Consensus:Parakey-1")
keys.Keys = append(keys.Keys, key)
values := blockchain.GetValueByKey(&keys)
for _, value := range values.Values {
if bytes.Equal(value, value_1) {
t.Error("TestSetValueByKey:GetValueByKey1", "value", string(value))
}
}
//设置kv对到数据库
var kvs2 types.LocalDBSet
var kv2 types.KeyValue
key_1 = types.CalcConsensusParaTxsKey([]byte("key-1"))
kv2.Key = key_1
kv2.Value = value_1
kvs2.KV = append(kvs2.KV, &kv2)
err = blockchain.SetValueByKey(&kvs2)
if err != nil {
t.Error("TestSetValueByKey:SetValueByKey2", "err", err)
}
//读取数据ok
var keys2 types.LocalDBGet
var key_1_exist bool
keys2.Keys = append(keys2.Keys, key_1)
values = blockchain.GetValueByKey(&keys2)
for _, value := range values.Values {
if bytes.Equal(value, value_1) {
key_1_exist = true
} else {
t.Error("TestSetValueByKey:GetValueByKey", "value", string(value))
}
}
if !key_1_exist {
t.Error("TestSetValueByKey:GetValueByKey:key_1", "key_1", string(key_1))
}
//删除key_1对应的数据,set时设置key_1对应的value为nil
for _, kv := range kvs2.KV {
if bytes.Equal(kv.GetKey(), key_1) {
kv.Value = nil
}
}
err = blockchain.SetValueByKey(&kvs2)
if err != nil {
t.Error("TestSetValueByKey:SetValueByKey3", "err", err)
}
values = blockchain.GetValueByKey(&keys2)
for _, value := range values.Values {
if bytes.Equal(value, value_1) {
t.Error("TestSetValueByKey:GetValueByKey4", "value", string(value))
}
}
//插入多个kv对到数据库并获取
var kvs3 types.LocalDBSet
var kv3 types.KeyValue
key_1 = types.CalcConsensusParaTxsKey([]byte("key-1"))
value_1 = []byte("test-1")
key_2 := types.CalcConsensusParaTxsKey([]byte("key-2"))
value_2 := []byte("test-2")
key_3 := types.CalcConsensusParaTxsKey([]byte("key-3"))
value_3 := []byte("test-3")
kv.Key = key_1
kv.Value = value_1
kv2.Key = key_2
kv2.Value = value_2
kv3.Key = key_3
kv3.Value = value_3
kvs3.KV = append(kvs2.KV, &kv, &kv2, &kv3)
err = blockchain.SetValueByKey(&kvs3)
if err != nil {
t.Error("TestSetValueByKey:SetValueByKey4", "err", err)
}
//读取数据ok
var keys3 types.LocalDBGet
count := 0
for _, kv := range kvs3.GetKV() {
keys3.Keys = append(keys3.Keys, kv.GetKey())
}
values = blockchain.GetValueByKey(&keys3)
for i, value := range values.Values {
if bytes.Equal(value, kvs3.GetKV()[i].GetValue()) {
count++
} else {
t.Error("TestSetValueByKey:GetValueByKey", "value", string(value))
}
}
if count < 3 {
t.Error("TestSetValueByKey:GetValueByKey:fail")
}
chainlog.Info("TestSetValueByKey end --------------------")
}
......@@ -95,6 +95,12 @@ func (chain *BlockChain) ProcRecvMsg() {
case types.EventGetMainSeqByHash:
go chain.processMsg(msg, reqnum, chain.GetMainSeqByHash)
//para共识模块操作blockchain db的事件
case types.EventSetValueByKey:
go chain.processMsg(msg, reqnum, chain.setValueByKey)
case types.EventGetValueByKey:
go chain.processMsg(msg, reqnum, chain.getValueByKey)
default:
go chain.processMsg(msg, reqnum, chain.unknowMsg)
}
......@@ -549,3 +555,34 @@ func (chain *BlockChain) GetMainSeqByHash(msg *queue.Message) {
}
msg.Reply(chain.client.NewMessage("rpc", types.EventReplyMainSeqByHash, &types.Int64{Data: seq}))
}
//setValueByKey 设置kv对到blockchain db中
func (chain *BlockChain) setValueByKey(msg *queue.Message) {
var reply types.Reply
reply.IsOk = true
if !chain.isParaChain {
reply.IsOk = false
reply.Msg = []byte("Must Para Chain Support!")
msg.Reply(chain.client.NewMessage("", types.EventReply, &reply))
return
}
kvs := (msg.Data).(*types.LocalDBSet)
err := chain.SetValueByKey(kvs)
if err != nil {
chainlog.Error("setValueByKey", "err", err.Error())
reply.IsOk = false
reply.Msg = []byte(err.Error())
}
msg.Reply(chain.client.NewMessage("", types.EventReply, &reply))
}
//GetValueByKey 获取value通过key从blockchain db中
func (chain *BlockChain) getValueByKey(msg *queue.Message) {
if !chain.isParaChain {
msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, nil))
return
}
keys := (msg.Data).(*types.LocalDBGet)
values := chain.GetValueByKey(keys)
msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, values))
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package blockchain
import (
"fmt"
"syscall"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/types"
)
// Rollbackblock chain Rollbackblock
func (chain *BlockChain) Rollbackblock() {
tipnode := chain.bestChain.Tip()
if chain.cfg.RollbackBlock > 0 {
if chain.NeedRollback(tipnode.height, chain.cfg.RollbackBlock) {
chainlog.Info("chain rollback start")
chain.Rollback()
chainlog.Info("chain rollback end")
}
syscall.Exit(0)
}
}
// NeedRollback need Rollback
func (chain *BlockChain) NeedRollback(curHeight, rollHeight int64) bool {
if curHeight <= rollHeight {
chainlog.Info("curHeight is small than rollback height, no need rollback")
return false
}
kvmvccMavlFork := types.GetDappFork("store-kvmvccmavl", "ForkKvmvccmavl")
if curHeight >= kvmvccMavlFork+10000 && rollHeight <= kvmvccMavlFork {
chainlog.Info("because ForkKvmvccmavl", "current height", curHeight, "not support rollback to", rollHeight)
return false
}
return true
}
// Rollback chain Rollback
func (chain *BlockChain) Rollback() {
//获取当前的tip节点
tipnode := chain.bestChain.Tip()
startHeight := tipnode.height
for i := startHeight; i > chain.cfg.RollbackBlock; i-- {
blockdetail, err := chain.blockStore.LoadBlockByHeight(i)
if err != nil {
panic(fmt.Sprintln("rollback LoadBlockByHeight err :", err))
}
sequence := int64(-1)
if chain.isParaChain {
// 获取平行链的seq
sequence, err = chain.ProcGetMainSeqByHash(blockdetail.Block.Hash())
if err != nil {
chainlog.Error("chain rollback get main seq fail", "height: ", i, "err", err, "hash", common.ToHex(blockdetail.Block.Hash()))
}
}
err = chain.disBlock(blockdetail, sequence)
if err != nil {
panic(fmt.Sprintln("rollback block fail ", "height", blockdetail.Block.Height, "blockHash:", common.ToHex(blockdetail.Block.Hash())))
}
// 删除storedb中的状态高度
chain.sendDelStore(blockdetail.Block.StateHash, blockdetail.Block.Height)
chainlog.Info("chain rollback ", "height: ", i, "blockheight", blockdetail.Block.Height, "hash", common.ToHex(blockdetail.Block.Hash()), "state hash", common.ToHex(blockdetail.Block.StateHash))
}
}
// 删除blocks
func (chain *BlockChain) disBlock(blockdetail *types.BlockDetail, sequence int64) error {
var lastSequence int64
//批量删除block的信息从磁盘中
newbatch := chain.blockStore.NewBatch(true)
//从db中删除tx相关的信息
err := chain.blockStore.DelTxs(newbatch, blockdetail)
if err != nil {
chainlog.Error("disBlock DelTxs:", "height", blockdetail.Block.Height, "err", err)
return err
}
//从db中删除block相关的信息
lastSequence, err = chain.blockStore.DelBlock(newbatch, blockdetail, sequence)
if err != nil {
chainlog.Error("disBlock DelBlock:", "height", blockdetail.Block.Height, "err", err)
return err
}
db.MustWrite(newbatch)
//更新最新的高度和header为上一个块
chain.blockStore.UpdateHeight()
chain.blockStore.UpdateLastBlock(blockdetail.Block.ParentHash)
//通知共识,mempool和钱包删除block
err = chain.SendDelBlockEvent(blockdetail)
if err != nil {
chainlog.Error("disBlock SendDelBlockEvent", "err", err)
}
//删除缓存中的block信息
chain.cache.delBlockFromCache(blockdetail.Block.Height)
//目前非平行链并开启isRecordBlockSequence功能
if chain.isRecordBlockSequence {
chain.pushseq.updateSeq(lastSequence)
}
return nil
}
// 通知store删除区块,主要针对kvmvcc
func (chain *BlockChain) sendDelStore(hash []byte, height int64) {
storeDel := &types.StoreDel{StateHash: hash, Height: height}
msg := chain.client.NewMessage("store", types.EventStoreDel, storeDel)
err := chain.client.Send(msg, true)
if err != nil {
chainlog.Debug("sendDelStoreEvent -->>store", "err", err)
}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package blockchain_test
import (
"testing"
"time"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
"github.com/33cn/chain33/util/testnode"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func init() {
types.SetDappFork("local", "store-kvmvccmavl", "ForkKvmvccmavl", 20*10000)
}
func TestRollbackblock(t *testing.T) {
cfg, sub := testnode.GetDefaultConfig()
cfg.BlockChain.RollbackBlock = 0
mock33 := testnode.NewWithConfig(cfg, sub, nil)
//发送交易
chain := mock33.GetBlockChain()
chain.Rollbackblock()
db := chain.GetDB()
kvs := getAllKeys(db)
assert.Equal(t, len(kvs), 22)
defer mock33.Close()
txs := util.GenCoinsTxs(mock33.GetGenesisKey(), 10)
for i := 0; i < len(txs); i++ {
reply, err := mock33.GetAPI().SendTx(txs[i])
assert.Nil(t, err)
assert.Equal(t, reply.IsOk, true)
}
mock33.WaitHeight(1)
txs = util.GenCoinsTxs(mock33.GetGenesisKey(), 10)
for i := 0; i < len(txs); i++ {
reply, err := mock33.GetAPI().SendTx(txs[i])
assert.Nil(t, err)
assert.Equal(t, reply.IsOk, true)
}
mock33.WaitHeight(2)
txs = util.GenNoneTxs(mock33.GetGenesisKey(), 1)
for i := 0; i < len(txs); i++ {
reply, err := mock33.GetAPI().SendTx(txs[i])
assert.Nil(t, err)
assert.Equal(t, reply.IsOk, true)
}
mock33.WaitHeight(3)
txs = util.GenNoneTxs(mock33.GetGenesisKey(), 2)
for i := 0; i < len(txs); i++ {
reply, err := mock33.GetAPI().SendTx(txs[i])
assert.Nil(t, err)
assert.Equal(t, reply.IsOk, true)
}
mock33.WaitHeight(4)
time.Sleep(time.Second)
chain.Rollbackblock()
}
func TestNeedRollback(t *testing.T) {
cfg, sub := testnode.GetDefaultConfig()
mock33 := testnode.NewWithConfig(cfg, sub, nil)
chain := mock33.GetBlockChain()
curHeight := int64(5)
rollHeight := int64(5)
ok := chain.NeedRollback(curHeight, rollHeight)
require.Equal(t, false, ok)
curHeight = int64(5)
rollHeight = int64(6)
ok = chain.NeedRollback(curHeight, rollHeight)
require.Equal(t, false, ok)
curHeight = int64(10 * 10000)
rollHeight = int64(5 * 10000)
ok = chain.NeedRollback(curHeight, rollHeight)
require.Equal(t, true, ok)
curHeight = int64(20*10000 + 1)
rollHeight = int64(20 * 10000)
ok = chain.NeedRollback(curHeight, rollHeight)
require.Equal(t, true, ok)
curHeight = int64(22 * 10000)
rollHeight = int64(20 * 10000)
ok = chain.NeedRollback(curHeight, rollHeight)
require.Equal(t, false, ok)
curHeight = int64(22 * 10000)
rollHeight = int64(20*10000 + 1)
ok = chain.NeedRollback(curHeight, rollHeight)
require.Equal(t, true, ok)
}
func TestRollback(t *testing.T) {
cfg, sub := testnode.GetDefaultConfig()
cfg.BlockChain.RollbackBlock = 2
mock33 := testnode.NewWithConfig(cfg, sub, nil)
//发送交易
chain := mock33.GetBlockChain()
db := chain.GetDB()
kvs := getAllKeys(db)
assert.Equal(t, len(kvs), 22)
defer mock33.Close()
txs := util.GenCoinsTxs(mock33.GetGenesisKey(), 10)
for i := 0; i < len(txs); i++ {
reply, err := mock33.GetAPI().SendTx(txs[i])
assert.Nil(t, err)
assert.Equal(t, reply.IsOk, true)
}
mock33.WaitHeight(1)
txs = util.GenCoinsTxs(mock33.GetGenesisKey(), 10)
for i := 0; i < len(txs); i++ {
reply, err := mock33.GetAPI().SendTx(txs[i])
assert.Nil(t, err)
assert.Equal(t, reply.IsOk, true)
}
mock33.WaitHeight(2)
txs = util.GenNoneTxs(mock33.GetGenesisKey(), 1)
for i := 0; i < len(txs); i++ {
reply, err := mock33.GetAPI().SendTx(txs[i])
assert.Nil(t, err)
assert.Equal(t, reply.IsOk, true)
}
mock33.WaitHeight(3)
txs = util.GenNoneTxs(mock33.GetGenesisKey(), 2)
for i := 0; i < len(txs); i++ {
reply, err := mock33.GetAPI().SendTx(txs[i])
assert.Nil(t, err)
assert.Equal(t, reply.IsOk, true)
}
mock33.WaitHeight(4)
time.Sleep(time.Second)
chain.Rollback()
require.Equal(t, int64(2), chain.GetBlockHeight())
}
func TestRollbackPara(t *testing.T) {
cfg, sub := testnode.GetDefaultConfig()
cfg.BlockChain.RollbackBlock = 2
cfg.BlockChain.IsParaChain = true
mock33 := testnode.NewWithConfig(cfg, sub, nil)
//发送交易
chain := mock33.GetBlockChain()
defer mock33.Close()
txs := util.GenCoinsTxs(mock33.GetGenesisKey(), 10)
for i := 0; i < len(txs); i++ {
reply, err := mock33.GetAPI().SendTx(txs[i])
assert.Nil(t, err)
assert.Equal(t, reply.IsOk, true)
}
mock33.WaitHeight(1)
txs = util.GenCoinsTxs(mock33.GetGenesisKey(), 10)
for i := 0; i < len(txs); i++ {
reply, err := mock33.GetAPI().SendTx(txs[i])
assert.Nil(t, err)
assert.Equal(t, reply.IsOk, true)
}
mock33.WaitHeight(2)
txs = util.GenNoneTxs(mock33.GetGenesisKey(), 1)
for i := 0; i < len(txs); i++ {
reply, err := mock33.GetAPI().SendTx(txs[i])
assert.Nil(t, err)
assert.Equal(t, reply.IsOk, true)
}
mock33.WaitHeight(3)
txs = util.GenNoneTxs(mock33.GetGenesisKey(), 2)
for i := 0; i < len(txs); i++ {
reply, err := mock33.GetAPI().SendTx(txs[i])
assert.Nil(t, err)
assert.Equal(t, reply.IsOk, true)
}
mock33.WaitHeight(4)
time.Sleep(time.Second)
chain.Rollback()
require.Equal(t, int64(2), chain.GetBlockHeight())
}
......@@ -155,7 +155,21 @@ func IsQueueError(err error) bool {
return false
}
//IsFatalError 是否是必须停止执行的系统错误
func IsFatalError(err error) bool {
if err == nil {
return false
}
if err == errAPIEnv {
return true
}
if err == types.ErrConsensusHashErr {
return true
}
return false
}
//IsAPIEnvError 是否是api执行环境的错误
func IsAPIEnvError(err error) bool {
return IsGrpcError(err) || IsQueueError(err)
return IsGrpcError(err) || IsQueueError(err) || IsFatalError(err)
}
......@@ -66,6 +66,8 @@ func TestAPI(t *testing.T) {
assert.Equal(t, false, IsGrpcError(nil))
assert.Equal(t, false, IsGrpcError(errors.New("xxxx")))
assert.Equal(t, true, eapi.IsErr())
assert.Equal(t, true, IsFatalError(types.ErrConsensusHashErr))
assert.Equal(t, false, IsFatalError(errors.New("xxxx")))
gapi2, err := grpcclient.NewMainChainClient("127.0.0.1:8003")
assert.Nil(t, err)
......
......@@ -476,13 +476,13 @@ func (_m *QueueProtocolAPI) GetMainSequenceByHash(param *types.ReqHash) (*types.
return r0, r1
}
// GetMempool provides a mock function with given fields:
func (_m *QueueProtocolAPI) GetMempool() (*types.ReplyTxList, error) {
ret := _m.Called()
// GetMempool provides a mock function with given fields: req
func (_m *QueueProtocolAPI) GetMempool(req *types.ReqGetMempool) (*types.ReplyTxList, error) {
ret := _m.Called(req)
var r0 *types.ReplyTxList
if rf, ok := ret.Get(0).(func() *types.ReplyTxList); ok {
r0 = rf()
if rf, ok := ret.Get(0).(func(*types.ReqGetMempool) *types.ReplyTxList); ok {
r0 = rf(req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ReplyTxList)
......@@ -490,8 +490,8 @@ func (_m *QueueProtocolAPI) GetMempool() (*types.ReplyTxList, error) {
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
if rf, ok := ret.Get(1).(func(*types.ReqGetMempool) error); ok {
r1 = rf(req)
} else {
r1 = ret.Error(1)
}
......
......@@ -224,8 +224,8 @@ func (q *QueueProtocol) GetTransactionByHash(param *types.ReqHashes) (*types.Tra
}
// GetMempool get transactions from mempool
func (q *QueueProtocol) GetMempool() (*types.ReplyTxList, error) {
msg, err := q.query(mempoolKey, types.EventGetMempool, &types.ReqNil{})
func (q *QueueProtocol) GetMempool(req *types.ReqGetMempool) (*types.ReplyTxList, error) {
msg, err := q.query(mempoolKey, types.EventGetMempool, req)
if err != nil {
log.Error("GetMempool", "Error", err.Error())
return nil, err
......
......@@ -526,7 +526,8 @@ func testWalletGetAccountList(t *testing.T, api client.QueueProtocolAPI) {
}
func testGetMempool(t *testing.T, api client.QueueProtocolAPI) {
_, err := api.GetMempool()
req := types.ReqGetMempool{IsAll: false}
_, err := api.GetMempool(&req)
if err != nil {
t.Error("Call GetMempool Failed.", err)
}
......@@ -1097,7 +1098,7 @@ func testGetAccountsGRPC(t *testing.T, rpc *mockGRPCSystem) {
func testGetMemPoolGRPC(t *testing.T, rpc *mockGRPCSystem) {
var res types.ReplyTxList
err := rpc.newRpcCtx("GetMemPool", &types.ReqNil{}, &res)
err := rpc.newRpcCtx("GetMemPool", &types.ReqGetMempool{}, &res)
if err != nil {
t.Error("Call GetMemPool Failed.", err)
}
......
......@@ -21,7 +21,7 @@ type QueueProtocolAPI interface {
// types.EventTxList
GetTxList(param *types.TxHashList) (*types.ReplyTxList, error)
// types.EventGetMempool
GetMempool() (*types.ReplyTxList, error)
GetMempool(req *types.ReqGetMempool) (*types.ReplyTxList, error)
// types.EventGetLastMempool
GetLastMempool() (*types.ReplyTxList, error)
// types.EventGetProperFee
......
......@@ -138,7 +138,7 @@ func (c *GrpcCtx) Run() (err error) {
}
errRet = err
case "GetMemPool":
reply, err := rpc.GetMemPool(context.Background(), c.Params.(*types.ReqNil))
reply, err := rpc.GetMemPool(context.Background(), c.Params.(*types.ReqGetMempool))
if err == nil {
*c.Res.(*types.ReplyTxList) = *reply
}
......
......@@ -50,11 +50,15 @@ driver="memdb"
dbPath="datadir/addrbook"
dbCache=4
grpcLogFile="grpc33.log"
version=216
verMix=216
verMax=217
channel=216
#waitPid 等待seed导入
waitPid=false
#交易开始采用哈希传播的ttl
lightTxTTL=3
#最大交易传播ttl
maxTTL=20
#是否固定只连接seeds中配置的节点
fixedSeed=false
[rpc]
jrpcBindAddr="localhost:8801"
......
......@@ -19,8 +19,8 @@ import (
rpctypes "github.com/33cn/chain33/rpc/types"
)
const secondsPerBlock = 15
const btyPreBlock = 18
const secondsPerBlock = 5
const btyPreBlock = 5
const statInterval = 3600
const monitorBtyLowLimit = 3 * 1e7 * types.Coin
......
......@@ -92,6 +92,47 @@ func testDBIteratorAllKey(t *testing.T, db DB) {
it.Close()
}
func testDBIteratorReserverExample(t *testing.T, db DB) {
var datas = [][]byte{
[]byte("aa0"), []byte("aa1"), []byte("bb0"), []byte("bb1"), []byte("cc0"), []byte("cc1"),
}
for _, v := range datas {
db.Set(v, v)
}
// 从尾部到头一次遍历
it := db.Iterator(nil, types.EmptyValue, true)
i := 5
for it.Rewind(); it.Valid(); it.Next() {
assert.Equal(t, it.Key(), datas[i])
//fmt.Println(i, string(it.Key()))
i--
}
it.Close()
assert.Equal(t, i, -1)
// 从bb0开始从后到前遍历,end需要填入bb0的下一个,才可以遍历到bb0
it = db.Iterator(nil, []byte("bb1"), true)
i = 2
for it.Rewind(); it.Valid(); it.Next() {
assert.Equal(t, it.Key(), datas[i])
//fmt.Println(i, string(it.Key()))
i--
}
it.Close()
assert.Equal(t, i, -1)
// 反向前缀查找
it = db.Iterator([]byte("bb"), nil, true)
i = 3
for it.Rewind(); it.Valid(); it.Next() {
assert.Equal(t, it.Key(), datas[i])
// fmt.Println(string(it.Key()))
i--
}
it.Close()
assert.Equal(t, i, 1)
}
// 迭代测试
func testDBIterator(t *testing.T, db DB) {
t.Log("test Set")
......
......@@ -39,6 +39,16 @@ func TestGoLevelDBIteratorAll(t *testing.T) {
testDBIteratorAllKey(t, leveldb)
}
func TestGoLevelDBIteratorReserverExample(t *testing.T) {
dir, err := ioutil.TempDir("", "goleveldb")
require.NoError(t, err)
t.Log(dir)
leveldb, err := NewGoLevelDB("goleveldb", dir, 128)
require.NoError(t, err)
defer leveldb.Close()
testDBIteratorReserverExample(t, leveldb)
}
func TestGoLevelDBIteratorDel(t *testing.T) {
dir, err := ioutil.TempDir("", "goleveldb")
require.NoError(t, err)
......
package skiplist
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
var (
s1 = &SkipValue{1, "111"}
s2 = &SkipValue{2, "222"}
s3 = &SkipValue{3, "333"}
s1 = &SkipValue{1, "111"}
s2 = &SkipValue{2, "222"}
s3 = &SkipValue{3, "333"}
s4 = &SkipValue{4, "444"}
s5 = &SkipValue{5, "555"}
s6 = &SkipValue{6, "666"}
s7 = &SkipValue{7, "777"}
s8 = &SkipValue{8, "888"}
s9 = &SkipValue{9, "999"}
s10 = &SkipValue{10, "101010"}
s11 = &SkipValue{11, "111111"}
s12 = &SkipValue{12, "121212"}
)
func TestInsert(t *testing.T) {
......@@ -50,42 +60,61 @@ func TestWalk(t *testing.T) {
l := NewSkipList(nil)
l.Insert(s1)
l.Insert(s2)
var data [2]string
l.Insert(s3)
l.Insert(s4)
l.Insert(s5)
l.Insert(s6)
l.Insert(s7)
l.Insert(s8)
l.Insert(s9)
l.Insert(s10)
l.Insert(s11)
l.Insert(s12)
var data [100]string
i := 0
l.Walk(func(value interface{}) bool {
data[i] = value.(string)
i++
return true
})
assert.Equal(t, data[0], "222")
assert.Equal(t, data[1], "111")
assert.Equal(t, data[0], "121212")
assert.Equal(t, data[1], "111111")
var data2 [2]string
var data2 [100]string
i = 0
l.Walk(func(value interface{}) bool {
data2[i] = value.(string)
i++
return false
})
assert.Equal(t, data2[0], "222")
assert.Equal(t, data2[0], "121212")
assert.Equal(t, data2[1], "")
l.Walk(nil)
iter := l.GetIterator()
assert.Equal(t, int64(2), iter.First().Score)
assert.Equal(t, "222", iter.First().Value.(string))
assert.Equal(t, int64(12), iter.First().Score)
assert.Equal(t, "121212", iter.First().Value.(string))
assert.Equal(t, int64(1), iter.Last().Score)
assert.Equal(t, "111", iter.Last().Value.(string))
l.Print()
}
func TestWalkS(t *testing.T) {
l := NewSkipList(nil)
l.Insert(s1)
l.Insert(s2)
var score [2]int64
var data [2]string
l.Insert(s3)
l.Insert(s4)
l.Insert(s5)
l.Insert(s6)
l.Insert(s7)
l.Insert(s8)
l.Insert(s9)
l.Insert(s10)
l.Insert(s11)
l.Insert(s12)
var score [100]int64
var data [100]string
i := 0
l.WalkS(func(value interface{}) bool {
score[i] = value.(*SkipValue).Score
......@@ -93,10 +122,10 @@ func TestWalkS(t *testing.T) {
i++
return true
})
assert.Equal(t, data[0], "222")
assert.Equal(t, data[1], "111")
assert.Equal(t, int64(2), score[0])
assert.Equal(t, int64(1), score[1])
assert.Equal(t, data[0], "121212")
assert.Equal(t, data[1], "111111")
assert.Equal(t, int64(12), score[0])
assert.Equal(t, int64(11), score[1])
var score2 [2]int64
var data2 [2]string
......@@ -107,8 +136,19 @@ func TestWalkS(t *testing.T) {
i++
return false
})
assert.Equal(t, "222", data2[0])
assert.Equal(t, "121212", data2[0])
assert.Equal(t, "", data2[1])
assert.Equal(t, int64(2), score2[0])
assert.Equal(t, int64(12), score2[0])
assert.Equal(t, int64(0), score2[1])
l.WalkS(func(value interface{}) bool {
e := value.(*SkipValue)
if e != nil {
fmt.Print(e.Score)
fmt.Print(" ")
fmt.Print(e.Value)
fmt.Println("")
return true
}
return false
})
}
package p2p
import (
"sync"
lru "github.com/hashicorp/golang-lru"
)
var (
totalBlockCache = newSpaceLimitCache(BlockCacheNum, MaxBlockCacheByteSize)
ltBlockCache = newSpaceLimitCache(BlockCacheNum/2, MaxBlockCacheByteSize/2)
)
// lru缓存封装, 控制占用空间大小
type spaceLimitCache struct {
maxSize int64
currSize int64
sizeMap map[interface{}]int64
data *lru.Cache
lock *sync.RWMutex
}
func newSpaceLimitCache(num int, maxByteSize int64) *spaceLimitCache {
cache := &spaceLimitCache{maxSize: maxByteSize}
cache.sizeMap = make(map[interface{}]int64)
cache.lock = &sync.RWMutex{}
var err error
cache.data, err = lru.New(num)
if err != nil {
panic(err)
}
return cache
}
func (c *spaceLimitCache) add(key interface{}, val interface{}, size int64) bool {
c.lock.Lock()
defer c.lock.Unlock()
//如果存在先删除
if c.data.Contains(key) {
c.data.Remove(key)
c.currSize -= c.sizeMap[key]
delete(c.sizeMap, key)
}
//单个值超过最大大小
if size > c.maxSize {
return false
}
c.currSize += size
keys := c.data.Keys()
//超过最大大小, 移除最早的值
for i := 0; i < len(keys) && c.currSize > c.maxSize; i++ {
c.currSize -= c.sizeMap[keys[i]]
c.data.RemoveOldest()
delete(c.sizeMap, keys[i])
}
evicted := c.data.Add(key, val)
c.sizeMap[key] = size
//触发最早数据被移除, 更新目前大小
if evicted {
c.currSize -= c.sizeMap[keys[0]]
delete(c.sizeMap, keys[0])
}
return true
}
func (c *spaceLimitCache) get(key interface{}) interface{} {
v, _ := c.data.Get(key)
return v
}
func (c *spaceLimitCache) del(key interface{}) (interface{}, bool) {
c.lock.Lock()
defer c.lock.Unlock()
val, exist := c.data.Get(key)
if exist {
c.data.Remove(key)
c.currSize -= c.sizeMap[key]
delete(c.sizeMap, key)
}
return val, exist
}
func (c *spaceLimitCache) contains(key interface{}) bool {
return c.data.Contains(key)
}
......@@ -25,7 +25,7 @@ var P2pComm Comm
type Comm struct{}
// AddrRouteble address router ,return enbale address
func (Comm) AddrRouteble(addrs []string) []string {
func (Comm) AddrRouteble(addrs []string, version int32) []string {
var enableAddrs []string
for _, addr := range addrs {
......@@ -34,7 +34,7 @@ func (Comm) AddrRouteble(addrs []string) []string {
log.Error("AddrRouteble", "NewNetAddressString", err.Error())
continue
}
conn, err := netaddr.DialTimeout(VERSION)
conn, err := netaddr.DialTimeout(version)
if err != nil {
//log.Error("AddrRouteble", "DialTimeout", err.Error())
continue
......@@ -77,7 +77,7 @@ func (c Comm) GetLocalAddr() string {
func (c Comm) dialPeerWithAddress(addr *NetAddress, persistent bool, node *Node) (*Peer, error) {
log.Info("dialPeerWithAddress")
conn, err := addr.DialTimeout(node.nodeInfo.cfg.Version)
conn, err := addr.DialTimeout(node.nodeInfo.channelVersion)
if err != nil {
return nil, err
}
......@@ -113,7 +113,7 @@ func (c Comm) dialPeer(addr *NetAddress, node *Node) (*Peer, error) {
}
peer, err := c.dialPeerWithAddress(addr, persistent, node)
if err != nil {
log.Error("dialPeer", "dial peer err:", err.Error())
log.Error("dialPeer", "nodeListenAddr", node.nodeInfo.listenAddr.str, "peerAddr", addr.str, "dial peer err:", err.Error())
return nil, err
}
//获取远程节点的信息 peer
......@@ -239,7 +239,7 @@ func (c Comm) CheckSign(in *types.P2PPing) bool {
// CollectPeerStat collect peer stat and report
func (c Comm) CollectPeerStat(err error, peer *Peer) {
if err != nil {
if err == types.ErrVersion {
if err == types.ErrVersion || err == types.ErrP2PChannel {
peer.version.SetSupport(false)
}
peer.peerStat.NotOk()
......
......@@ -60,9 +60,19 @@ const (
privKeyTag = "privkey"
)
//TTL
const (
DefaultLtTxBroadCastTTL = 3
DefaultMaxTxBroadCastTTL = 25
)
// P2pCacheTxSize p2pcache size of transaction
const (
P2pCacheTxSize = 10240
PeerAddrCacheNum = 1000
TxHashCacheNum = 10240
BlockHashCacheNum = 100
BlockCacheNum = 10
MaxBlockCacheByteSize = 100 * 1024 * 1024
)
// TestNetSeeds test seeds of net
......
......@@ -258,7 +258,7 @@ func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan cha
return fmt.Errorf("peer not running")
}
var p2pdata pb.P2PGetData
p2pdata.Version = d.p2pcli.network.node.nodeInfo.cfg.Version
p2pdata.Version = d.p2pcli.network.node.nodeInfo.channelVersion
p2pdata.Invs = []*pb.Inventory{inv}
beg := pb.Now()
resp, err := peer.mconn.gcli.GetData(context.Background(), &p2pdata, grpc.FailFast(true))
......
......@@ -14,13 +14,17 @@ import (
)
// Filter a Filter object
var Filter = NewFilter()
var (
peerAddrFilter = NewFilter(PeerAddrCacheNum)
txHashFilter = NewFilter(TxHashCacheNum)
blockHashFilter = NewFilter(BlockHashCacheNum)
)
// NewFilter produce a filter object
func NewFilter() *Filterdata {
func NewFilter(num int) *Filterdata {
filter := new(Filterdata)
var err error
filter.regRData, err = lru.New(P2pCacheTxSize)
filter.regRData, err = lru.New(num)
if err != nil {
panic(err)
}
......@@ -96,3 +100,15 @@ func (f *Filterdata) ManageRecvFilter() {
}
}
}
// Add add val
func (f *Filterdata) Add(key string, val interface{}) bool {
return f.regRData.Add(key, val)
}
// Get get val
func (f *Filterdata) Get(key string) interface{} {
val, _ := f.regRData.Get(key)
return val
}
......@@ -200,7 +200,7 @@ func (n *Node) getAddrFromOnline() {
continue
}
if !n.nodeInfo.blacklist.Has(addr) || !Filter.QueryRecvData(addr) {
if !n.nodeInfo.blacklist.Has(addr) || !peerAddrFilter.QueryRecvData(addr) {
if ticktimes < 10 {
//如果连接了其他节点,优先不连接种子节点
if _, ok := n.innerSeeds.Load(addr); !ok {
......@@ -339,7 +339,7 @@ func (n *Node) nodeReBalance() {
}
if MinCacheInBoundPeer != nil {
info, err := MinCacheInBoundPeer.GetPeerInfo(VERSION)
info, err := MinCacheInBoundPeer.GetPeerInfo()
if err != nil {
n.RemoveCachePeer(MinCacheInBoundPeer.Addr())
MinCacheInBoundPeer.Close()
......@@ -448,7 +448,7 @@ func (n *Node) monitorDialPeers() {
log.Info("monitorDialPeers", "loop", "done")
return
}
if Filter.QueryRecvData(addr.(string)) {
if peerAddrFilter.QueryRecvData(addr.(string)) {
//先查询有没有注册进去,避免同时重复连接相同的地址
continue
}
......@@ -485,15 +485,15 @@ func (n *Node) monitorDialPeers() {
}
dialCount++
//把待连接的节点增加到过滤容器中
Filter.RegRecvData(addr.(string))
peerAddrFilter.RegRecvData(addr.(string))
log.Info("monitorDialPeer", "dialCount", dialCount)
go func(netAddr *NetAddress) {
defer Filter.RemoveRecvData(netAddr.String())
defer peerAddrFilter.RemoveRecvData(netAddr.String())
peer, err := P2pComm.dialPeer(netAddr, n)
if err != nil {
//连接失败后
n.nodeInfo.addrBook.RemoveAddr(netAddr.String())
log.Error("monitorDialPeers", "Err", err.Error())
log.Error("monitorDialPeers", "peerAddr", netAddr.str, "Err", err.Error())
if err == types.ErrVersion { //版本不支持,加入黑名单12小时
peer.version.SetSupport(false)
P2pComm.CollectPeerStat(err, peer)
......@@ -566,7 +566,7 @@ func (n *Node) monitorBlackList() {
}
func (n *Node) monitorFilter() {
Filter.ManageRecvFilter()
peerAddrFilter.ManageRecvFilter()
}
//独立goroutine 监控配置的
......
package p2p
import (
"net"
"testing"
"github.com/stretchr/testify/assert"
)
func TestNetAddress(t *testing.T) {
tcpAddr := new(net.TCPAddr)
tcpAddr.IP = net.ParseIP("localhost")
tcpAddr.Port = 2223
nad := NewNetAddress(tcpAddr)
nad1 := nad.Copy()
nad.Equals(nad1)
nad2s, err := NewNetAddressStrings([]string{"localhost:3306"})
if err != nil {
return
}
nad.Less(nad2s[0])
}
func TestAddrRouteble(t *testing.T) {
resp := P2pComm.AddrRouteble([]string{"114.55.101.159:13802"}, calcChannelVersion(119))
t.Log(resp)
}
func TestGetLocalAddr(t *testing.T) {
t.Log(P2pComm.GetLocalAddr())
}
func TestP2pListen(t *testing.T) {
var node Node
node.listenPort = 3333
listen1 := NewListener("tcp", &node)
assert.Equal(t, true, listen1 != nil)
listen2 := NewListener("tcp", &node)
assert.Equal(t, true, listen2 != nil)
listen1.Close()
listen2.Close()
}
......@@ -40,7 +40,10 @@ func (n *Node) Start() {
// Close node listener
func (n *Node) Close() {
atomic.StoreInt32(&n.closed, 1)
//避免重复
if !atomic.CompareAndSwapInt32(&n.closed, 0, 1) {
return
}
if n.listener != nil {
n.listener.Close()
}
......@@ -49,11 +52,11 @@ func (n *Node) Close() {
n.nodeInfo.monitorChan <- nil
log.Debug("stop", "addrBook", "closed")
n.removeAll()
if Filter != nil {
Filter.Close()
if peerAddrFilter != nil {
peerAddrFilter.Close()
}
n.deleteNatMapPort()
n.pubsub.Shutdown()
log.Info("stop", "PeerRemoeAll", "closed")
}
......@@ -147,7 +150,7 @@ func (n *Node) doNat() {
}
testExaddr := fmt.Sprintf("%v:%v", n.nodeInfo.GetExternalAddr().IP.String(), n.listenPort)
log.Info("TestNetAddr", "testExaddr", testExaddr)
if len(P2pComm.AddrRouteble([]string{testExaddr})) != 0 {
if len(P2pComm.AddrRouteble([]string{testExaddr}, n.nodeInfo.channelVersion)) != 0 {
log.Info("node outside")
n.nodeInfo.SetNetSide(true)
if netexaddr, err := NewNetAddressString(testExaddr); err == nil {
......@@ -173,9 +176,8 @@ func (n *Node) doNat() {
p2pcli := NewNormalP2PCli()
//测试映射后的端口能否连通或者外网+本地端口
if p2pcli.CheckPeerNatOk(n.nodeInfo.GetExternalAddr().String()) ||
p2pcli.CheckPeerNatOk(fmt.Sprintf("%v:%v", n.nodeInfo.GetExternalAddr().IP.String(), n.listenPort)) {
if p2pcli.CheckPeerNatOk(n.nodeInfo.GetExternalAddr().String(), n.nodeInfo) ||
p2pcli.CheckPeerNatOk(fmt.Sprintf("%v:%v", n.nodeInfo.GetExternalAddr().IP.String(), n.listenPort), n.nodeInfo) {
n.nodeInfo.SetServiceTy(Service)
log.Info("doNat", "NatOk", "Support Service")
} else {
......@@ -333,15 +335,18 @@ func (n *Node) removeAll() {
func (n *Node) monitor() {
go n.monitorErrPeer()
go n.getAddrFromOnline()
go n.getAddrFromAddrBook()
//固定模式, 只连接seeds配置节点
go n.monitorCfgSeeds()
if !n.nodeInfo.cfg.FixedSeed {
go n.getAddrFromOnline()
go n.getAddrFromAddrBook()
}
go n.monitorPeerInfo()
go n.monitorDialPeers()
go n.monitorBlackList()
go n.monitorFilter()
go n.monitorPeers()
go n.nodeReBalance()
go n.monitorCfgSeeds()
}
func (n *Node) needMore() bool {
......@@ -388,7 +393,7 @@ func (n *Node) detectNodeAddr() {
externalPort = defalutNatPort
}
if err != nil {
log.Error("bookDb Get", "externalPortTag fail err:", err)
log.Error("bookDb Get", "nodePort", n.listenPort, "externalPortTag fail err:", err)
}
}
......@@ -422,7 +427,7 @@ func (n *Node) natMapPort() {
time.Sleep(time.Second)
}
var err error
if len(P2pComm.AddrRouteble([]string{n.nodeInfo.GetExternalAddr().String()})) != 0 { //判断能否连通要映射的端口
if len(P2pComm.AddrRouteble([]string{n.nodeInfo.GetExternalAddr().String()}, n.nodeInfo.channelVersion)) != 0 { //判断能否连通要映射的端口
log.Info("natMapPort", "addr", "routeble")
p2pcli := NewNormalP2PCli() //检查要映射的IP地址是否已经被映射成功
ok := p2pcli.CheckSelf(n.nodeInfo.GetExternalAddr().String(), n.nodeInfo)
......@@ -498,3 +503,7 @@ func (n *Node) deleteNatMapPort() {
func (n *Node) natNotice() {
<-n.nodeInfo.natNoticeChain
}
func (n *Node) verifyP2PChannel(channel int32) bool {
return channel == n.nodeInfo.cfg.Channel
}
......@@ -29,6 +29,7 @@ type NodeInfo struct {
natDone int32
outSide int32
ServiceType int32
channelVersion int32
}
// NewNodeInfo new a node object
......@@ -44,6 +45,7 @@ func NewNodeInfo(cfg *types.P2P) *NodeInfo {
nodeInfo.externalAddr = new(NetAddress)
nodeInfo.listenAddr = new(NetAddress)
nodeInfo.addrBook = NewAddrBook(cfg)
nodeInfo.channelVersion = calcChannelVersion(cfg.Channel)
return nodeInfo
}
......@@ -131,7 +133,7 @@ func (nf *NodeInfo) latestPeerInfo(n *Node) map[string]*types.Peer {
if peer.Addr() == n.nodeInfo.GetExternalAddr().String() { //fmt.Sprintf("%v:%v", ExternalIp, m.network.node.GetExterPort())
continue
}
peerinfo, err := peer.GetPeerInfo(nf.cfg.Version)
peerinfo, err := peer.GetPeerInfo()
if err != nil {
if err == types.ErrVersion {
peer.version.SetSupport(false)
......
......@@ -8,6 +8,7 @@ package p2p
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
......@@ -34,6 +35,7 @@ type P2p struct {
txFactory chan struct{}
otherFactory chan struct{}
waitRestart chan struct{}
taskGroup *sync.WaitGroup
closed int32
restart int32
......@@ -42,27 +44,20 @@ type P2p struct {
// New produce a p2p object
func New(cfg *types.P2P) *P2p {
if cfg.Version == 0 {
if types.IsTestNet() {
cfg.Version = 119
cfg.VerMin = 118
cfg.VerMax = 128
} else {
cfg.Version = 10020
cfg.VerMin = 10020
cfg.VerMax = 11000
}
//主网的channel默认设为0, 测试网未配置时设为默认
if types.IsTestNet() && cfg.Channel == 0 {
cfg.Channel = defaultTestNetChannel
}
if cfg.VerMin == 0 {
cfg.VerMin = cfg.Version
//ttl至少设为2
if cfg.LightTxTTL <= 1 {
cfg.LightTxTTL = DefaultLtTxBroadCastTTL
}
if cfg.VerMax == 0 {
cfg.VerMax = cfg.VerMin + 1
if cfg.MaxTTL <= 0 {
cfg.MaxTTL = DefaultMaxTxBroadCastTTL
}
VERSION = cfg.Version
log.Info("p2p", "Version", VERSION, "IsTest", types.IsTestNet())
log.Info("p2p", "Channel", cfg.Channel, "Version", VERSION, "IsTest", types.IsTestNet())
if cfg.InnerBounds == 0 {
cfg.InnerBounds = 500
}
......@@ -81,6 +76,7 @@ func New(cfg *types.P2P) *P2p {
p2p.waitRestart = make(chan struct{}, 1)
p2p.txCapcity = 1000
p2p.cfg = cfg
p2p.taskGroup = &sync.WaitGroup{}
return p2p
}
......@@ -99,17 +95,12 @@ func (network *P2p) isRestart() bool {
func (network *P2p) Close() {
log.Info("p2p network start shutdown")
atomic.StoreInt32(&network.closed, 1)
//等待业务协程停止
network.waitTaskDone()
network.node.Close()
if network.client != nil {
if !network.isRestart() {
network.client.Close()
}
network.client.Close()
}
network.node.pubsub.Shutdown()
}
// SetQueueClient set the queue
......@@ -129,8 +120,8 @@ func (network *P2p) SetQueueClient(cli queue.Client) {
if p2p.isRestart() {
p2p.node.Start()
atomic.StoreInt32(&p2p.closed, 0)
atomic.StoreInt32(&p2p.restart, 0)
//开启业务处理协程
network.waitRestart <- struct{}{}
return
}
......@@ -304,13 +295,17 @@ func (network *P2p) genAirDropKeyFromWallet() error {
//ReStart p2p
func (network *P2p) ReStart() {
atomic.StoreInt32(&network.restart, 1)
network.Close()
//避免重复
if !atomic.CompareAndSwapInt32(&network.restart, 0, 1) {
return
}
log.Info("p2p restart, wait p2p task done")
network.waitTaskDone()
network.node.Close()
node, err := NewNode(network.cfg) //创建新的node节点
if err != nil {
panic(err.Error())
}
network.node = node
network.SetQueueClient(network.client)
......@@ -328,14 +323,6 @@ func (network *P2p) subP2pMsg() {
network.client.Sub("p2p")
for msg := range network.client.Recv() {
if network.isRestart() {
//wait for restart
log.Info("waitp2p restart....")
<-network.waitRestart
log.Info("p2p restart ok....")
}
if network.isClose() {
log.Debug("subP2pMsg", "loop", "done")
close(network.otherFactory)
......@@ -355,19 +342,19 @@ func (network *P2p) subP2pMsg() {
switch msg.Ty {
case types.EventTxBroadcast: //广播tx
go network.p2pCli.BroadCastTx(msg, taskIndex)
network.processEvent(msg, taskIndex, network.p2pCli.BroadCastTx)
case types.EventBlockBroadcast: //广播block
go network.p2pCli.BlockBroadcast(msg, taskIndex)
network.processEvent(msg, taskIndex, network.p2pCli.BlockBroadcast)
case types.EventFetchBlocks:
go network.p2pCli.GetBlocks(msg, taskIndex)
network.processEvent(msg, taskIndex, network.p2pCli.GetBlocks)
case types.EventGetMempool:
go network.p2pCli.GetMemPool(msg, taskIndex)
network.processEvent(msg, taskIndex, network.p2pCli.GetMemPool)
case types.EventPeerInfo:
go network.p2pCli.GetPeerInfo(msg, taskIndex)
network.processEvent(msg, taskIndex, network.p2pCli.GetPeerInfo)
case types.EventFetchBlockHeaders:
go network.p2pCli.GetHeaders(msg, taskIndex)
network.processEvent(msg, taskIndex, network.p2pCli.GetHeaders)
case types.EventGetNetInfo:
go network.p2pCli.GetNetInfo(msg, taskIndex)
network.processEvent(msg, taskIndex, network.p2pCli.GetNetInfo)
default:
log.Warn("unknown msgtype", "msg", msg)
msg.Reply(network.client.NewMessage("", msg.Ty, types.Reply{Msg: []byte("unknown msgtype")}))
......@@ -380,3 +367,32 @@ func (network *P2p) subP2pMsg() {
}()
}
func (network *P2p) processEvent(msg *queue.Message, taskIdx int64, eventFunc p2pEventFunc) {
//检测重启标志,停止分发事件,需要等待重启
if network.isRestart() {
log.Info("wait for p2p restart....")
<-network.waitRestart
log.Info("p2p restart ok....")
}
network.taskGroup.Add(1)
go func() {
defer network.taskGroup.Done()
eventFunc(msg, taskIdx)
}()
}
func (network *P2p) waitTaskDone() {
waitDone := make(chan struct{})
go func() {
defer close(waitDone)
network.taskGroup.Wait()
}()
select {
case <-waitDone:
case <-time.After(time.Second * 20):
log.Error("P2pWaitTaskDone", "err", "20s timeout")
}
}
......@@ -2,10 +2,9 @@ package p2p
import (
"encoding/hex"
//"fmt"
"net"
"sync/atomic"
"os"
"sort"
"strings"
"testing"
"time"
......@@ -22,15 +21,15 @@ import (
"google.golang.org/grpc"
)
var q queue.Queue
var p2pModule *P2p
var dataDir = "testdata"
var (
testChannel = int32(119)
)
func init() {
VERSION = 119
l.SetLogLevel("err")
q = queue.New("channel")
go q.Start()
}
func processMsg(q queue.Queue) {
go func() {
......@@ -110,24 +109,20 @@ func init() {
switch msg.Ty {
case types.EventGetMempoolSize:
msg.Reply(client.NewMessage("p2p", types.EventMempoolSize, &types.MempoolSize{Size: 0}))
}
}
}()
time.Sleep(time.Second)
p2pModule = initP2p(53802, dataDir)
p2pModule.Wait()
}
//初始化p2p模块
func initP2p(port int32, dbpath string) *P2p {
//new p2p
func newP2p(port int32, dbpath string, q queue.Queue) *P2p {
cfg := new(types.P2P)
cfg.Port = port
cfg.Enable = true
cfg.DbPath = dbpath
cfg.DbCache = 4
cfg.Version = 119
cfg.Channel = testChannel
cfg.ServerStart = true
cfg.Driver = "leveldb"
......@@ -137,12 +132,18 @@ func initP2p(port int32, dbpath string) *P2p {
p2pcli.node.nodeInfo.addrBook.bookDb.Set([]byte(privKeyTag), []byte(privkey))
p2pcli.node.nodeInfo.SetServiceTy(7)
p2pcli.SetQueueClient(q.Client())
return p2pcli
}
func TestP2PEvent(t *testing.T) {
qcli := q.Client()
//free P2p
func freeP2p(p2p *P2p) {
p2p.Close()
if err := os.RemoveAll(p2p.cfg.DbPath); err != nil {
log.Error("removeTestDbErr", "err", err)
}
}
func testP2PEvent(t *testing.T, qcli queue.Client) {
msg := qcli.NewMessage("p2p", types.EventBlockBroadcast, &types.Block{})
qcli.Send(msg, false)
......@@ -164,17 +165,17 @@ func TestP2PEvent(t *testing.T) {
qcli.Send(msg, false)
}
func TestNetInfo(t *testing.T) {
p2pModule.node.nodeInfo.IsNatDone()
p2pModule.node.nodeInfo.SetNatDone()
p2pModule.node.nodeInfo.Get()
p2pModule.node.nodeInfo.Set(p2pModule.node.nodeInfo)
assert.NotNil(t, p2pModule.node.nodeInfo.GetListenAddr())
assert.NotNil(t, p2pModule.node.nodeInfo.GetExternalAddr())
func testNetInfo(t *testing.T, p2p *P2p) {
p2p.node.nodeInfo.IsNatDone()
p2p.node.nodeInfo.SetNatDone()
p2p.node.nodeInfo.Get()
p2p.node.nodeInfo.Set(p2p.node.nodeInfo)
assert.NotNil(t, p2p.node.nodeInfo.GetListenAddr())
assert.NotNil(t, p2p.node.nodeInfo.GetExternalAddr())
}
//测试Peer
func TestPeer(t *testing.T) {
func testPeer(t *testing.T, p2p *P2p, q queue.Queue) {
conn, err := grpc.Dial("localhost:53802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
......@@ -184,9 +185,8 @@ func TestPeer(t *testing.T) {
remote, err := NewNetAddressString("127.0.0.1:53802")
assert.Nil(t, err)
localP2P := initP2p(43802, "testdata2")
defer os.RemoveAll("testdata2")
defer localP2P.Close()
localP2P := newP2p(43802, "testPeer", q)
defer freeP2p(localP2P)
t.Log(localP2P.node.CacheBoundsSize())
t.Log(localP2P.node.GetCacheBounds())
......@@ -209,7 +209,7 @@ func TestPeer(t *testing.T) {
p2pcli := NewNormalP2PCli()
localP2P.node.nodeInfo.peerInfos.SetPeerInfo(nil)
localP2P.node.nodeInfo.peerInfos.GetPeerInfo("1222")
t.Log(p2pModule.node.GetRegisterPeer("localhost:43802"))
t.Log(p2p.node.GetRegisterPeer("localhost:43802"))
//测试发送Ping消息
err = p2pcli.SendPing(peer, localP2P.node.nodeInfo)
assert.Nil(t, err)
......@@ -219,7 +219,7 @@ func TestPeer(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 1, pnum)
_, err = peer.GetPeerInfo(VERSION)
_, err = peer.GetPeerInfo()
assert.Nil(t, err)
//获取节点列表
_, err = p2pcli.GetAddrList(peer)
......@@ -228,12 +228,12 @@ func TestPeer(t *testing.T) {
_, err = p2pcli.SendVersion(peer, localP2P.node.nodeInfo)
assert.Nil(t, err)
t.Log(p2pcli.CheckPeerNatOk("localhost:53802"))
t.Log(p2pcli.CheckPeerNatOk("localhost:53802", localP2P.node.nodeInfo))
t.Log("checkself:", p2pcli.CheckSelf("loadhost:43803", localP2P.node.nodeInfo))
_, err = p2pcli.GetAddr(peer)
assert.Nil(t, err)
localP2P.node.pubsub.FIFOPub(&types.P2PTx{Tx: &types.Transaction{}}, "tx")
localP2P.node.pubsub.FIFOPub(&types.P2PTx{Tx: &types.Transaction{}, Route: &types.P2PRoute{}}, "tx")
localP2P.node.pubsub.FIFOPub(&types.P2PBlock{Block: &types.Block{}}, "block")
// //测试获取高度
height, err := p2pcli.GetBlockHeight(localP2P.node.nodeInfo)
......@@ -260,23 +260,10 @@ func TestPeer(t *testing.T) {
job.setFreePeer(peer.GetPeerName())
job.removePeer(peer.GetPeerName())
job.CancelJob()
os.Remove(dataDir)
}
func TestSortArr(t *testing.T) {
var Inventorys = make(Invs, 0)
for i := 100; i >= 0; i-- {
var inv types.Inventory
inv.Ty = 111
inv.Height = int64(i)
Inventorys = append(Inventorys, &inv)
}
sort.Sort(Inventorys)
}
//测试grpc 多连接
func TestGrpcConns(t *testing.T) {
func testGrpcConns(t *testing.T) {
var conns []*grpc.ClientConn
for i := 0; i < maxSamIPNum; i++ {
......@@ -307,7 +294,7 @@ func TestGrpcConns(t *testing.T) {
}
//测试grpc 流多连接
func TestGrpcStreamConns(t *testing.T) {
func testGrpcStreamConns(t *testing.T, p2p *P2p) {
conn, err := grpc.Dial("localhost:53802", grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
......@@ -319,7 +306,7 @@ func TestGrpcStreamConns(t *testing.T) {
_, err = resp.Recv()
assert.Equal(t, true, strings.Contains(err.Error(), "no authorized"))
ping, err := P2pComm.NewPingData(p2pModule.node.nodeInfo)
ping, err := P2pComm.NewPingData(p2p.node.nodeInfo)
assert.Nil(t, err)
_, err = cli.ServerStreamSend(context.Background(), ping)
......@@ -336,53 +323,22 @@ func TestGrpcStreamConns(t *testing.T) {
}
func TestP2pComm(t *testing.T) {
func testP2pComm(t *testing.T, p2p *P2p) {
addrs := P2pComm.AddrRouteble([]string{"localhost:53802"})
addrs := P2pComm.AddrRouteble([]string{"localhost:53802"}, calcChannelVersion(testChannel))
t.Log(addrs)
i32 := P2pComm.BytesToInt32([]byte{0xff})
t.Log(i32)
_, _, err := P2pComm.GenPrivPubkey()
assert.Nil(t, err)
ping, err := P2pComm.NewPingData(p2pModule.node.nodeInfo)
ping, err := P2pComm.NewPingData(p2p.node.nodeInfo)
assert.Nil(t, err)
assert.Equal(t, true, P2pComm.CheckSign(ping))
assert.IsType(t, "string", P2pComm.GetLocalAddr())
assert.Equal(t, 5, len(P2pComm.RandStr(5)))
}
func TestFilter(t *testing.T) {
go Filter.ManageRecvFilter()
defer Filter.Close()
Filter.GetLock()
assert.Equal(t, true, Filter.RegRecvData("key"))
assert.Equal(t, true, Filter.QueryRecvData("key"))
Filter.RemoveRecvData("key")
assert.Equal(t, false, Filter.QueryRecvData("key"))
Filter.ReleaseLock()
}
func TestAddrRouteble(t *testing.T) {
resp := P2pComm.AddrRouteble([]string{"114.55.101.159:13802"})
t.Log(resp)
}
func TestRandStr(t *testing.T) {
t.Log(P2pComm.RandStr(5))
}
func TestGetLocalAddr(t *testing.T) {
t.Log(P2pComm.GetLocalAddr())
}
func TestAddrBook(t *testing.T) {
func testAddrBook(t *testing.T, p2p *P2p) {
prv, pub, err := P2pComm.GenPrivPubkey()
if err != nil {
......@@ -399,7 +355,7 @@ func TestAddrBook(t *testing.T) {
}
t.Log("GenPubkey:", pubstr)
addrBook := p2pModule.node.nodeInfo.addrBook
addrBook := p2p.node.nodeInfo.addrBook
addrBook.Size()
addrBook.saveToDb()
addrBook.GetPeerStat("locolhost:43802")
......@@ -413,50 +369,34 @@ func TestAddrBook(t *testing.T) {
addrBook.ResetPeerkey(hex.EncodeToString(prv), pubstr)
resetkey, _ := addrBook.GetPrivPubKey()
assert.NotEqual(t, resetkey, privkey)
}
func TestBytesToInt32(t *testing.T) {
t.Log(P2pComm.BytesToInt32([]byte{0xff}))
t.Log(P2pComm.Int32ToBytes(255))
}
func TestNetAddress(t *testing.T) {
tcpAddr := new(net.TCPAddr)
tcpAddr.IP = net.ParseIP("localhost")
tcpAddr.Port = 2223
nad := NewNetAddress(tcpAddr)
nad1 := nad.Copy()
nad.Equals(nad1)
nad2s, err := NewNetAddressStrings([]string{"localhost:3306"})
if err != nil {
return
}
nad.Less(nad2s[0])
func testRestart(t *testing.T, p2p *P2p) {
client := p2p.client
assert.False(t, p2p.isRestart())
p2p.txFactory <- struct{}{}
p2p.processEvent(client.NewMessage("p2p", types.EventTxBroadcast, &types.Transaction{}), 128, p2p.p2pCli.BroadCastTx)
atomic.StoreInt32(&p2p.restart, 1)
p2p.ReStart()
atomic.StoreInt32(&p2p.restart, 0)
p2p.ReStart()
}
func TestP2pListen(t *testing.T) {
var node Node
node.listenPort = 3333
listen1 := NewListener("tcp", &node)
assert.Equal(t, true, listen1 != nil)
listen2 := NewListener("tcp", &node)
assert.Equal(t, true, listen2 != nil)
func Test_p2p(t *testing.T) {
listen1.Close()
listen2.Close()
}
func TestP2pRestart(t *testing.T) {
assert.Equal(t, false, p2pModule.isClose())
assert.Equal(t, false, p2pModule.isRestart())
p2pModule.ReStart()
}
func TestP2pClose(t *testing.T) {
p2pModule.Close()
os.RemoveAll(dataDir)
q := queue.New("channel")
go q.Start()
processMsg(q)
p2p := newP2p(53802, "testP2p", q)
p2p.Wait()
defer freeP2p(p2p)
defer q.Close()
testP2PEvent(t, q.Client())
testNetInfo(t, p2p)
testPeer(t, p2p, q)
testGrpcConns(t)
testGrpcStreamConns(t, p2p)
testP2pComm(t, p2p)
testAddrBook(t, p2p)
testRestart(t, p2p)
}
......@@ -8,9 +8,8 @@ import (
"encoding/hex"
"fmt"
"io"
"net"
"math/rand"
"net"
"sync/atomic"
"time"
......@@ -21,6 +20,8 @@ import (
"google.golang.org/grpc"
)
type p2pEventFunc func(message *queue.Message, taskIndex int64)
// EventInterface p2p subscribe to the event hander interface
type EventInterface interface {
BroadCastTx(msg *queue.Message, taskindex int64)
......@@ -38,7 +39,7 @@ type NormalInterface interface {
SendVersion(peer *Peer, nodeinfo *NodeInfo) (string, error)
SendPing(peer *Peer, nodeinfo *NodeInfo) error
GetBlockHeight(nodeinfo *NodeInfo) (int64, error)
CheckPeerNatOk(addr string) bool
CheckPeerNatOk(addr string, nodeInfo *NodeInfo) bool
GetAddrList(peer *Peer) (map[string]int64, error)
GetInPeersNum(peer *Peer) (int, error)
CheckSelf(addr string, nodeinfo *NodeInfo) bool
......@@ -73,8 +74,18 @@ func (m *Cli) BroadCastTx(msg *queue.Message, taskindex int64) {
atomic.AddInt32(&m.network.txCapcity, 1)
log.Debug("BroadCastTx", "task complete:", taskindex)
}()
m.network.node.pubsub.FIFOPub(&pb.P2PTx{Tx: msg.GetData().(*pb.Transaction)}, "tx")
msg.Reply(m.network.client.NewMessage("mempool", pb.EventReply, pb.Reply{IsOk: true, Msg: []byte("ok")}))
if tx, ok := msg.GetData().(*pb.Transaction); ok {
txHash := hex.EncodeToString(tx.Hash())
//此处使用新分配结构,避免重复修改已保存的ttl
route := &pb.P2PRoute{TTL: 1}
//是否已存在记录,不存在表示本节点发起的交易
if ttl, exist := txHashFilter.Get(txHash).(*pb.P2PRoute); exist {
route.TTL = ttl.TTL + 1
}
m.network.node.pubsub.FIFOPub(&pb.P2PTx{Tx: tx, Route: route}, "tx")
msg.Reply(m.network.client.NewMessage("mempool", pb.EventReply, pb.Reply{IsOk: true, Msg: []byte("ok")}))
}
}
// GetMemPool get mempool contents
......@@ -90,7 +101,7 @@ func (m *Cli) GetMemPool(msg *queue.Message, taskindex int64) {
for _, peer := range peers {
//获取远程 peer invs
resp, err := peer.mconn.gcli.GetMemPool(context.Background(),
&pb.P2PGetMempool{Version: m.network.node.nodeInfo.cfg.Version}, grpc.FailFast(true))
&pb.P2PGetMempool{Version: m.network.node.nodeInfo.channelVersion}, grpc.FailFast(true))
P2pComm.CollectPeerStat(err, peer)
if err != nil {
if err == pb.ErrVersion {
......@@ -122,7 +133,7 @@ func (m *Cli) GetMemPool(msg *queue.Message, taskindex int64) {
}
//获取真正的交易Tx call GetData
datacli, dataerr := peer.mconn.gcli.GetData(context.Background(),
&pb.P2PGetData{Invs: ableInv, Version: m.network.node.nodeInfo.cfg.Version}, grpc.FailFast(true))
&pb.P2PGetData{Invs: ableInv, Version: m.network.node.nodeInfo.channelVersion}, grpc.FailFast(true))
P2pComm.CollectPeerStat(dataerr, peer)
if dataerr != nil {
continue
......@@ -248,7 +259,7 @@ func (m *Cli) SendVersion(peer *Peer, nodeinfo *NodeInfo) (string, error) {
}
addrfrom := nodeinfo.GetExternalAddr().String()
resp, err := peer.mconn.gcli.Version2(context.Background(), &pb.P2PVersion{Version: nodeinfo.cfg.Version, Service: int64(nodeinfo.ServiceTy()), Timestamp: pb.Now().Unix(),
resp, err := peer.mconn.gcli.Version2(context.Background(), &pb.P2PVersion{Version: nodeinfo.channelVersion, Service: int64(nodeinfo.ServiceTy()), Timestamp: pb.Now().Unix(),
AddrRecv: peer.Addr(), AddrFrom: addrfrom, Nonce: int64(rand.Int31n(102040)),
UserAgent: hex.EncodeToString(in.Sign.GetPubkey()), StartHeight: blockheight}, grpc.FailFast(true))
log.Debug("SendVersion", "resp", resp, "addrfrom", addrfrom, "sendto", peer.Addr())
......@@ -263,7 +274,8 @@ func (m *Cli) SendVersion(peer *Peer, nodeinfo *NodeInfo) (string, error) {
P2pComm.CollectPeerStat(err, peer)
log.Debug("SHOW VERSION BACK", "VersionBack", resp, "peer", peer.Addr())
peer.version.SetVersion(resp.GetVersion())
_, ver := decodeChannelVersion(resp.GetVersion())
peer.version.SetVersion(ver)
ip, _, err := net.SplitHostPort(resp.GetAddrRecv())
if err == nil {
......@@ -377,7 +389,7 @@ func (m *Cli) GetHeaders(msg *queue.Message, taskindex int64) {
var err error
pidIsActivePeer = true
headers, err := peer.mconn.gcli.GetHeaders(context.Background(), &pb.P2PGetHeaders{StartHeight: req.GetStart(), EndHeight: req.GetEnd(),
Version: m.network.node.nodeInfo.cfg.Version}, grpc.FailFast(true))
Version: m.network.node.nodeInfo.channelVersion}, grpc.FailFast(true))
P2pComm.CollectPeerStat(err, peer)
if err != nil {
log.Error("GetBlocks", "Err", err.Error())
......@@ -557,9 +569,9 @@ func (m *Cli) GetNetInfo(msg *queue.Message, taskindex int64) {
}
// CheckPeerNatOk check peer is ok or not
func (m *Cli) CheckPeerNatOk(addr string) bool {
func (m *Cli) CheckPeerNatOk(addr string, info *NodeInfo) bool {
//连接自己的地址信息做测试
return !(len(P2pComm.AddrRouteble([]string{addr})) == 0)
return !(len(P2pComm.AddrRouteble([]string{addr}, info.channelVersion)) == 0)
}
......@@ -570,14 +582,15 @@ func (m *Cli) CheckSelf(addr string, nodeinfo *NodeInfo) bool {
log.Error("AddrRouteble", "NewNetAddressString", err.Error())
return false
}
conn, err := netaddr.DialTimeout(VERSION)
conn, err := netaddr.DialTimeout(nodeinfo.channelVersion)
if err != nil {
return false
}
defer conn.Close()
cli := pb.NewP2PgserviceClient(conn)
resp, err := cli.GetPeerInfo(context.Background(), &pb.P2PGetPeerInfo{Version: VERSION}, grpc.FailFast(true))
resp, err := cli.GetPeerInfo(context.Background(),
&pb.P2PGetPeerInfo{Version: nodeinfo.channelVersion}, grpc.FailFast(true))
if err != nil {
return false
}
......
......@@ -5,7 +5,6 @@
package p2p
import (
"encoding/hex"
"strings"
"sync"
"sync/atomic"
......@@ -134,10 +133,10 @@ func (p *Peer) heartBeat() {
}
peername, err := pcli.SendVersion(p, p.node.nodeInfo)
P2pComm.CollectPeerStat(err, p)
if err == nil {
if err == nil || peername == "" {
log.Debug("sendVersion", "peer name", peername)
p.SetPeerName(peername) //设置连接的远程节点的节点名称
p.taskChan = p.node.pubsub.Sub("block", "tx")
p.taskChan = p.node.pubsub.Sub("block", "tx", peername)
go p.sendStream()
go p.readStream()
break
......@@ -173,8 +172,8 @@ func (p *Peer) GetInBouns() int32 {
}
// GetPeerInfo get peer information of peer
func (p *Peer) GetPeerInfo(version int32) (*pb.P2PPeerInfo, error) {
return p.mconn.gcli.GetPeerInfo(context.Background(), &pb.P2PGetPeerInfo{Version: version}, grpc.FailFast(true))
func (p *Peer) GetPeerInfo() (*pb.P2PPeerInfo, error) {
return p.mconn.gcli.GetPeerInfo(context.Background(), &pb.P2PGetPeerInfo{Version: p.node.nodeInfo.channelVersion}, grpc.FailFast(true))
}
func (p *Peer) sendStream() {
......@@ -220,7 +219,7 @@ func (p *Peer) sendStream() {
//send softversion&p2pversion
_, peername := p.node.nodeInfo.addrBook.GetPrivPubKey()
p2pdata.Value = &pb.BroadCastData_Version{Version: &pb.Versions{P2Pversion: p.node.nodeInfo.cfg.Version,
p2pdata.Value = &pb.BroadCastData_Version{Version: &pb.Versions{P2Pversion: p.node.nodeInfo.channelVersion,
Softversion: v.GetVersion(), Peername: peername}}
if err := resp.Send(p2pdata); err != nil {
......@@ -235,7 +234,6 @@ func (p *Peer) sendStream() {
}
timeout := time.NewTimer(time.Second * 2)
defer timeout.Stop()
var hash [64]byte
SEND_LOOP:
for {
......@@ -253,34 +251,11 @@ func (p *Peer) sendStream() {
log.Error("sendStream peer connect closed", "peerName", p.GetPeerName())
return
}
p2pdata := new(pb.BroadCastData)
if block, ok := task.(*pb.P2PBlock); ok {
height := block.GetBlock().GetHeight()
hex.Encode(hash[:], block.GetBlock().Hash())
blockhash := string(hash[:])
log.Debug("sendStream", "will send block", blockhash)
pinfo, err := p.GetPeerInfo(p.node.nodeInfo.cfg.Version)
P2pComm.CollectPeerStat(err, p)
if err == nil {
if pinfo.GetHeader().GetHeight() >= height {
log.Debug("sendStream", "find peer height>this broadblock ,send process", "break")
continue
}
}
p2pdata.Value = &pb.BroadCastData_Block{Block: block}
Filter.RegRecvData(blockhash)
} else if tx, ok := task.(*pb.P2PTx); ok {
hex.Encode(hash[:], tx.GetTx().Hash())
txhash := string(hash[:])
log.Debug("sendStream", "will send tx", txhash)
p2pdata.Value = &pb.BroadCastData_Tx{Tx: tx}
Filter.RegRecvData(txhash)
sendData, doSend := p.node.processSendP2P(task, p.version.GetVersion(), p.Addr())
if !doSend {
continue
}
err := resp.Send(p2pdata)
err := resp.Send(sendData)
P2pComm.CollectPeerStat(err, p)
if err != nil {
log.Error("sendStream", "send", err)
......@@ -309,17 +284,13 @@ func (p *Peer) sendStream() {
return
}
timeout.Reset(time.Second * 2)
}
}
}
}
func (p *Peer) readStream() {
pcli := NewNormalP2PCli()
for {
if !p.GetRunning() {
log.Debug("readstream", "loop", "done")
......@@ -339,7 +310,6 @@ func (p *Peer) readStream() {
}
log.Debug("SubStreamBlock", "Start", p.Addr())
var hash [64]byte
for {
if !p.GetRunning() {
errs := resp.CloseSend()
......@@ -370,60 +340,7 @@ func (p *Peer) readStream() {
break
}
if block := data.GetBlock(); block != nil {
if block.GetBlock() != nil {
//如果已经有登记过的消息记录,则不发送给本地blockchain
hex.Encode(hash[:], block.GetBlock().Hash())
blockhash := string(hash[:])
Filter.GetLock()
if Filter.QueryRecvData(blockhash) {
Filter.ReleaseLock()
continue
}
Filter.RegRecvData(blockhash)
Filter.ReleaseLock()
//判断比自己低的区块,则不发送给blockchain
height, err := pcli.GetBlockHeight(p.node.nodeInfo)
if err == nil {
if height >= block.GetBlock().GetHeight()+128 {
continue
}
}
log.Info("readStream", "block==+======+====+=>Height", block.GetBlock().GetHeight(), "from peer", p.Addr(),
"block size(KB)", float32(len(pb.Encode(block)))/1024, "block hash",
blockhash)
msg := p.node.nodeInfo.client.NewMessage("blockchain", pb.EventBroadcastAddBlock, &pb.BlockPid{Pid: p.GetPeerName(), Block: block.GetBlock()})
err = p.node.nodeInfo.client.Send(msg, false)
if err != nil {
log.Error("readStream", "send to blockchain Error", err.Error())
continue
}
//Filter.RegRecvData(blockhash) //添加发送登记,下次通过stream 接收同样的消息的时候可以过滤
}
} else if tx := data.GetTx(); tx != nil {
if tx.GetTx() != nil {
hex.Encode(hash[:], tx.Tx.Hash())
txhash := string(hash[:])
log.Debug("readStream", "tx", txhash)
Filter.GetLock()
if Filter.QueryRecvData(txhash) {
Filter.ReleaseLock()
continue //处理方式同上
}
Filter.RegRecvData(txhash)
Filter.ReleaseLock()
msg := p.node.nodeInfo.client.NewMessage("mempool", pb.EventTx, tx.GetTx())
errs := p.node.nodeInfo.client.Send(msg, false)
if errs != nil {
log.Error("send", "to mempool EventTx msg Error", errs.Error())
}
//Filter.RegRecvData(txhash) //登记
}
}
p.node.processRecvP2P(data, p.GetPeerName(), p.node.pubToPeer, p.Addr())
}
}
}
......
This diff is collapsed.
package p2p
import (
"bytes"
"encoding/hex"
"testing"
"time"
"github.com/33cn/chain33/common/merkle"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
type versionData struct {
rawData interface{}
version int32
}
func Test_processP2P(t *testing.T) {
q := queue.New("channel")
go q.Start()
p2p := newP2p(12345, "testProcessP2p", q)
defer freeP2p(p2p)
defer q.Close()
node := p2p.node
client := p2p.client
pid := "testPid"
sendChan := make(chan interface{}, 1)
recvChan := make(chan *types.BroadCastData, 1)
testDone := make(chan struct{})
payload := []byte("testpayload")
minerTx := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 14600, Expire: 200}
tx := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 4600, Expire: 2}
tx1 := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 460000000, Expire: 0}
tx2 := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 100, Expire: 1}
txGroup, _ := types.CreateTxGroup([]*types.Transaction{tx1, tx2})
gtx := txGroup.Tx()
txList := append([]*types.Transaction{}, minerTx, tx, tx1, tx2)
memTxList := append([]*types.Transaction{}, tx, gtx)
block := &types.Block{
TxHash: []byte("123"),
Height: 10,
Txs: txList,
}
txHash := hex.EncodeToString(tx.Hash())
blockHash := hex.EncodeToString(block.Hash())
rootHash := merkle.CalcMerkleRoot(txList)
//mempool handler
go func() {
client := q.Client()
client.Sub("mempool")
for msg := range client.Recv() {
switch msg.Ty {
case types.EventTxListByHash:
query := msg.Data.(*types.ReqTxHashList)
var txs []*types.Transaction
if !query.IsShortHash {
txs = memTxList[:1]
} else {
txs = memTxList
}
msg.Reply(client.NewMessage("p2p", types.EventTxListByHash, &types.ReplyTxList{Txs: txs}))
}
}
}()
//测试发送
go func() {
for data := range sendChan {
verData, ok := data.(*versionData)
assert.True(t, ok)
sendData, doSend := node.processSendP2P(verData.rawData, verData.version, "testIP:port")
txHashFilter.regRData.Remove(txHash)
blockHashFilter.regRData.Remove(blockHash)
assert.True(t, doSend, "sendData:", verData.rawData)
recvChan <- sendData
}
}()
//测试接收
go func() {
for data := range recvChan {
txHashFilter.regRData.Remove(txHash)
blockHashFilter.regRData.Remove(blockHash)
handled := node.processRecvP2P(data, pid, node.pubToPeer, "testIP:port")
assert.True(t, handled)
}
}()
go func() {
p2pChan := node.pubsub.Sub("tx")
for data := range p2pChan {
if p2pTx, ok := data.(*types.P2PTx); ok {
sendChan <- &versionData{rawData: p2pTx, version: lightBroadCastVersion}
}
}
}()
//data test
go func() {
subChan := node.pubsub.Sub(pid)
//normal
sendChan <- &versionData{rawData: &types.P2PTx{Tx: tx, Route: &types.P2PRoute{}}, version: lightBroadCastVersion - 1}
assert.Nil(t, client.Send(client.NewMessage("p2p", types.EventTxBroadcast, tx), false))
sendChan <- &versionData{rawData: &types.P2PBlock{Block: block}, version: lightBroadCastVersion - 1}
//light broadcast
txHashFilter.Add(hex.EncodeToString(tx1.Hash()), &types.P2PRoute{TTL: DefaultLtTxBroadCastTTL})
_ = client.Send(client.NewMessage("p2p", types.EventTxBroadcast, tx1), false)
sendChan <- &versionData{rawData: &types.P2PTx{Tx: tx, Route: &types.P2PRoute{TTL: DefaultLtTxBroadCastTTL}}, version: lightBroadCastVersion}
<-subChan //query tx
sendChan <- &versionData{rawData: &types.P2PBlock{Block: block}, version: lightBroadCastVersion}
<-subChan //query block
for !ltBlockCache.contains(blockHash) {
}
ltBlock := ltBlockCache.get(blockHash).(*types.Block)
assert.True(t, bytes.Equal(rootHash, merkle.CalcMerkleRoot(ltBlock.Txs)))
//query tx
sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_TxReq{TxReq: &types.P2PTxReq{TxHash: tx.Hash()}}}}
_, ok := (<-subChan).(*types.P2PTx)
assert.True(t, ok)
sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_BlockTxReq{BlockTxReq: &types.P2PBlockTxReq{
BlockHash: blockHash,
TxIndices: []int32{1, 2},
}}}}
rep, ok := (<-subChan).(*types.P2PBlockTxReply)
assert.True(t, ok)
assert.Equal(t, 2, int(rep.TxIndices[1]))
sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_BlockTxReq{BlockTxReq: &types.P2PBlockTxReq{
BlockHash: blockHash,
TxIndices: nil,
}}}}
rep, ok = (<-subChan).(*types.P2PBlockTxReply)
assert.True(t, ok)
assert.Nil(t, rep.TxIndices)
//query reply
sendChan <- &versionData{rawData: &types.P2PBlockTxReply{
BlockHash: blockHash,
TxIndices: []int32{1},
Txs: txList[1:2],
}}
<-subChan
assert.True(t, ltBlockCache.contains(blockHash))
ltBlock.TxHash = rootHash
sendChan <- &versionData{rawData: &types.P2PBlockTxReply{
BlockHash: blockHash,
Txs: txList[0:],
}}
for ltBlockCache.contains(blockHash) {
}
//max ttl
node.nodeInfo.cfg.MaxTTL = 0
_, doSend := node.processSendP2P(&types.P2PTx{Tx: tx, Route: &types.P2PRoute{TTL: 1}}, lightBroadCastVersion, "testIP:port")
assert.False(t, doSend)
close(testDone)
}()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-testDone:
return
case <-ticker.C:
t.Error("TestP2PProcessTimeout")
return
}
}
}
package p2p
import (
"sort"
"testing"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
func Test_AddDelStream(t *testing.T) {
s := NewP2pServer()
peerName := "testpeer"
delChan := s.addStreamHandler(peerName)
//replace
dataChan := s.addStreamHandler(peerName)
_, ok := <-delChan
assert.False(t, ok)
//del old
s.deleteStream(peerName, delChan)
_, ok = s.streams[peerName]
assert.True(t, ok)
//del nil
s.deleteStream("", delChan)
//del exist
s.deleteStream(peerName, dataChan)
_, ok = s.streams[peerName]
assert.False(t, ok)
}
func Test_spaceLimitCache(t *testing.T) {
c := newSpaceLimitCache(3, 10)
assert.True(t, c.add(1, 1, 1))
assert.True(t, c.add(1, 1, 1))
assert.False(t, c.add(2, 2, 20))
assert.Nil(t, c.get(2))
assert.True(t, c.add(2, 1, 10))
c.add(3, 2, 2)
c.add(4, 2, 2)
c.add(5, 2, 2)
c.add(6, 2, 2)
assert.False(t, c.contains(2))
assert.Equal(t, 3, c.data.Len())
assert.True(t, c.add(7, 7, 10))
assert.True(t, c.contains(7))
assert.Equal(t, 1, c.data.Len())
_, exist := c.del(7)
assert.True(t, exist)
_, exist = c.del(6)
assert.False(t, exist)
}
func testChannelVersion(t *testing.T, channel int32) {
chanVer := calcChannelVersion(channel)
chann, ver := decodeChannelVersion(chanVer)
assert.True(t, chann == channel)
assert.True(t, ver == VERSION)
}
func Test_ChannelVersion(t *testing.T) {
testChannelVersion(t, 0)
testChannelVersion(t, 128)
}
func TestRandStr(t *testing.T) {
t.Log(P2pComm.RandStr(5))
}
func TestBytesToInt32(t *testing.T) {
t.Log(P2pComm.BytesToInt32([]byte{0xff}))
t.Log(P2pComm.Int32ToBytes(255))
}
func TestSortArr(t *testing.T) {
var Inventorys = make(Invs, 0)
for i := 100; i >= 0; i-- {
var inv types.Inventory
inv.Ty = 111
inv.Height = int64(i)
Inventorys = append(Inventorys, &inv)
}
sort.Sort(Inventorys)
}
func TestFilter(t *testing.T) {
filter := NewFilter(10)
go filter.ManageRecvFilter()
defer filter.Close()
filter.GetLock()
assert.Equal(t, true, filter.RegRecvData("key"))
assert.Equal(t, true, filter.QueryRecvData("key"))
filter.RemoveRecvData("key")
assert.Equal(t, false, filter.QueryRecvData("key"))
filter.ReleaseLock()
}
......@@ -4,12 +4,38 @@
package p2p
// VERSION number
var VERSION int32
//更新内容:
// 1.p2p 修改为在nat结束后,在启动peer的stream,ping,version 等功能
//2018-3-26 更新内容
// 1. p2p 过滤重复数据,改用blockhash 提换block height
// 2. 增加p2p私钥自动导入到钱包功能
//p2p版本区间 10020, 11000
//历史版本
const (
//p2p广播交易哈希而非完整区块数据
lightBroadCastVersion = 10030
)
// VERSION number
const VERSION = lightBroadCastVersion
// MainNet Channel = 0x0000
const (
defaultTestNetChannel = 256
versionMask = 0xFFFF
)
// channelVersion = channel << 16 + version
func calcChannelVersion(channel int32) int32 {
return channel<<16 + VERSION
}
func decodeChannelVersion(channelVersion int32) (channel int32, version int32) {
channel = channelVersion >> 16
version = channelVersion & versionMask
return
}
......@@ -168,7 +168,7 @@ func (client *client) isInClose() bool {
// Close 关闭client
func (client *client) Close() {
if atomic.LoadInt32(&client.isClosed) == 1 || client.topic == nil {
if atomic.LoadInt32(&client.isClosed) == 1 || atomic.LoadPointer(&client.topic) == nil {
return
}
topic := client.getTopic()
......
......@@ -130,8 +130,8 @@ func (g *Grpc) GetTransactionByHashes(ctx context.Context, in *pb.ReqHashes) (*p
}
// GetMemPool get mempool contents
func (g *Grpc) GetMemPool(ctx context.Context, in *pb.ReqNil) (*pb.ReplyTxList, error) {
return g.cli.GetMempool()
func (g *Grpc) GetMemPool(ctx context.Context, in *pb.ReqGetMempool) (*pb.ReplyTxList, error) {
return g.cli.GetMempool(in)
}
// GetAccounts get accounts
......
......@@ -163,7 +163,8 @@ func TestVersion(t *testing.T) {
//}
func testGetMemPoolOK(t *testing.T) {
qapi.On("GetMempool").Return(nil, nil)
var in *types.ReqGetMempool
qapi.On("GetMempool", in).Return(nil, nil)
data, err := g.GetMemPool(getOkCtx(), nil)
assert.Nil(t, err, "the error should be nil")
assert.Nil(t, data)
......
......@@ -5,6 +5,7 @@
package rpc
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
......@@ -39,7 +40,26 @@ func (c *Chain33) CreateRawTransaction(in *rpctypes.CreateTx, result *interface{
if err != nil {
return err
}
*result = hex.EncodeToString(reply)
//add tx fee setting
tx := &types.Transaction{}
err = types.Decode(reply, tx)
if err != nil {
return err
}
tx.Fee = inpb.Fee
//set proper fee if zero fee
if tx.Fee <= 0 {
proper, err := c.cli.GetProperFee(nil)
if err != nil {
return err
}
fee, err := tx.GetRealFee(proper.ProperFee)
if err != nil {
return err
}
tx.Fee = fee
}
*result = hex.EncodeToString(types.Encode(tx))
return nil
}
......@@ -329,9 +349,8 @@ func fmtAsssets(assets []*types.Asset) []*rpctypes.Asset {
}
// GetMempool get mempool information
func (c *Chain33) GetMempool(in *types.ReqNil, result *interface{}) error {
reply, err := c.cli.GetMempool()
func (c *Chain33) GetMempool(in *types.ReqGetMempool, result *interface{}) error {
reply, err := c.cli.GetMempool(in)
if err != nil {
return err
}
......@@ -894,9 +913,14 @@ func (c *Chain33) IsNtpClockSync(in *types.ReqNil, result *interface{}) error {
// QueryTotalFee query total fee
func (c *Chain33) QueryTotalFee(in *types.LocalDBGet, result *interface{}) error {
if in == nil || len(in.Keys) > 1 {
if in == nil || len(in.Keys) != 1 {
return types.ErrInvalidParam
}
totalFeePrefix := []byte("TotalFeeKey:")
//add prefix if not exist
if !bytes.HasPrefix(in.Keys[0], totalFeePrefix) {
in.Keys[0] = append(totalFeePrefix, in.Keys[0]...)
}
reply, err := c.cli.LocalGet(in)
if err != nil {
return err
......
......@@ -5,6 +5,7 @@
package rpc
import (
"bytes"
"errors"
"testing"
......@@ -627,9 +628,9 @@ func TestChain33_GetMempool(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
testChain33 := newTestChain33(api)
api.On("GetMempool").Return(&types.ReplyTxList{Txs: []*types.Transaction{{}}}, nil)
api.On("GetMempool", &types.ReqGetMempool{}).Return(&types.ReplyTxList{Txs: []*types.Transaction{{}}}, nil)
var testResult interface{}
data := &types.ReqNil{}
data := &types.ReqGetMempool{IsAll: false}
err := testChain33.GetMempool(data, &testResult)
t.Log(err)
assert.NotNil(t, testResult)
......@@ -1422,3 +1423,25 @@ func Test_fmtTxDetail(t *testing.T) {
assert.Equal(t, "to", tran.Fromaddr)
assert.Equal(t, "from", tx.To)
}
func queryTotalFee(client *Chain33, req *types.LocalDBGet, t *testing.T) int64 {
var testResult interface{}
err := client.QueryTotalFee(req, &testResult)
assert.NoError(t, err)
fee, _ := testResult.(types.TotalFee)
return fee.Fee
}
func TestChain33_QueryTotalFee(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
client := newTestChain33(api)
total := &types.TotalFee{TxCount: 1, Fee: 10000}
api.On("LocalGet", mock.Anything).Return(&types.LocalReplyValue{Values: [][]byte{types.Encode(total)}}, nil)
req := &types.LocalDBGet{Keys: [][]byte{types.TotalFeeKey([]byte("testHash"))}}
req1 := &types.LocalDBGet{Keys: [][]byte{[]byte("testHash")}}
assert.Equal(t, total.Fee, queryTotalFee(client, req, t))
assert.Equal(t, total.Fee, queryTotalFee(client, req1, t))
assert.True(t, bytes.Equal(req.Keys[0], req1.Keys[0]))
}
......@@ -101,7 +101,7 @@ func TestJSONClient_Call(t *testing.T) {
var fee types.TotalFee
api.On("LocalGet", mock.Anything).Return(nil, errors.New("error value"))
err = jsonClient.Call("Chain33.QueryTotalFee", &types.ReqSignRawTx{}, &fee)
err = jsonClient.Call("Chain33.QueryTotalFee", &types.LocalDBGet{Keys: [][]byte{[]byte("test")}}, &fee)
assert.NotNil(t, err)
var retNtp bool
......@@ -109,11 +109,21 @@ func TestJSONClient_Call(t *testing.T) {
err = jsonClient.Call("Chain33.IsNtpClockSync", &types.ReqNil{}, &retNtp)
assert.Nil(t, err)
assert.True(t, retNtp)
api.On("GetProperFee", mock.Anything).Return(&types.ReplyProperFee{ProperFee: 2}, nil)
testCreateTxCoins(t, jsonClient)
server.Close()
mock.AssertExpectationsForObjects(t, api)
}
func testDecodeTxHex(t *testing.T, txHex string) *types.Transaction {
txbytes, err := hex.DecodeString(txHex)
assert.Nil(t, err)
var tx types.Transaction
err = types.Decode(txbytes, &tx)
assert.Nil(t, err)
return &tx
}
func testCreateTxCoins(t *testing.T, jsonClient *jsonclient.JSONClient) {
req := &rpctypes.CreateTx{
To: "184wj4nsgVxKyz2NhM3Yb5RK5Ap6AFRFq2",
......@@ -128,12 +138,15 @@ func testCreateTxCoins(t *testing.T, jsonClient *jsonclient.JSONClient) {
var res string
err := jsonClient.Call("Chain33.CreateRawTransaction", req, &res)
assert.Nil(t, err)
txbytes, err := hex.DecodeString(res)
assert.Nil(t, err)
var tx types.Transaction
err = types.Decode(txbytes, &tx)
assert.Nil(t, err)
tx := testDecodeTxHex(t, res)
assert.Equal(t, "184wj4nsgVxKyz2NhM3Yb5RK5Ap6AFRFq2", tx.To)
assert.Equal(t, int64(1), tx.Fee)
req.Fee = 0
err = jsonClient.Call("Chain33.CreateRawTransaction", req, &res)
assert.Nil(t, err)
tx = testDecodeTxHex(t, res)
fee, _ := tx.GetRealFee(2)
assert.Equal(t, fee, tx.Fee)
}
func TestGrpc_Call(t *testing.T) {
......
......@@ -8,6 +8,7 @@ import (
"github.com/33cn/chain33/rpc/jsonclient"
rpctypes "github.com/33cn/chain33/rpc/types"
"github.com/33cn/chain33/system/dapp/commands/types"
ctypes "github.com/33cn/chain33/types"
"github.com/spf13/cobra"
)
......@@ -35,13 +36,22 @@ func GetMempoolCmd() *cobra.Command {
Short: "List mempool txs",
Run: listMempoolTxs,
}
addGetMempoolFlags(cmd)
return cmd
}
func addGetMempoolFlags(cmd *cobra.Command) {
cmd.Flags().BoolP("all", "a", false, "show all tx in mempool")
}
func listMempoolTxs(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
isAll, _ := cmd.Flags().GetBool("all")
params := &ctypes.ReqGetMempool{
IsAll: isAll,
}
var res rpctypes.ReplyTxList
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.GetMempool", nil, &res)
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.GetMempool", params, &res)
ctx.SetResultCb(parseListMempoolTxsRes)
ctx.Run()
}
......
......@@ -130,7 +130,14 @@ func totalCoins(cmd *cobra.Command, args []string) {
}
resp.TxCount = res2.TxCount
totalAmount = (317430000+30*height)*types.Coin - res2.Fee
var issueCoins int64
//只适用bty主网计算
if height < 2270000 {
issueCoins = 30 * height
} else { //挖矿产量降低30->8
issueCoins = 22*2269999 + height*8
}
totalAmount = (317430000+issueCoins)*types.Coin - res2.Fee
resp.TotalAmount = strconv.FormatFloat(float64(totalAmount)/float64(types.Coin), 'f', 4, 64)
} else {
var req types.ReqString
......
......@@ -122,10 +122,10 @@ func (mem *Mempool) getTxList(filterList *types.TxHashList) (txs []*types.Transa
for i := 0; i < len(filterList.GetHashes()); i++ {
dupMap[string(filterList.GetHashes()[i])] = true
}
return mem.filterTxList(count, dupMap)
return mem.filterTxList(count, dupMap, false)
}
func (mem *Mempool) filterTxList(count int64, dupMap map[string]bool) (txs []*types.Transaction) {
func (mem *Mempool) filterTxList(count int64, dupMap map[string]bool, isAll bool) (txs []*types.Transaction) {
height := mem.header.GetHeight()
blocktime := mem.header.GetBlockTime()
mem.cache.Walk(int(count), func(tx *Item) bool {
......@@ -134,7 +134,7 @@ func (mem *Mempool) filterTxList(count int64, dupMap map[string]bool) (txs []*ty
return true
}
}
if isExpired(tx, height, blocktime) {
if isExpired(tx, height, blocktime) && !isAll {
return true
}
txs = append(txs, tx.Value)
......@@ -289,14 +289,19 @@ func (mem *Mempool) GetProperFeeRate(req *types.ReqProperFee) int64 {
if req.TxSize == 0 {
req.TxSize = 10240
}
baseFeeRate := mem.cache.GetProperFee()
feeRate := mem.cache.GetProperFee()
if mem.cfg.IsLevelFee {
levelFeeRate := mem.getLevelFeeRate(mem.cfg.MinTxFee, req.TxCount, req.TxSize)
if levelFeeRate > baseFeeRate {
return levelFeeRate
if levelFeeRate > feeRate {
feeRate = levelFeeRate
}
}
return baseFeeRate
//控制精度
minFee := types.GInt("MinFee")
if minFee != 0 && feeRate%minFee > 0 {
feeRate = (feeRate/minFee + 1) * minFee
}
return feeRate
}
// getLevelFeeRate 获取合适的阶梯手续费率, 可以外部传入count, size进行前瞻性估计
......
......@@ -115,8 +115,14 @@ func (mem *Mempool) eventTx(msg *queue.Message) {
// EventGetMempool 获取Mempool内所有交易
func (mem *Mempool) eventGetMempool(msg *queue.Message) {
var isAll bool
if msg.GetData() == nil {
isAll = false
} else {
isAll = msg.GetData().(*types.ReqGetMempool).GetIsAll()
}
msg.Reply(mem.client.NewMessage("rpc", types.EventReplyTxList,
&types.ReplyTxList{Txs: mem.filterTxList(0, nil)}))
&types.ReplyTxList{Txs: mem.filterTxList(0, nil, isAll)}))
}
// EventDelTxList 获取Mempool中一定数量交易,并把这些交易从Mempool中删除
......
......@@ -143,7 +143,9 @@ type BlockChain struct {
IsParaChain bool `protobuf:"varint,12,opt,name=isParaChain" json:"isParaChain,omitempty"`
EnableTxQuickIndex bool `protobuf:"varint,13,opt,name=enableTxQuickIndex" json:"enableTxQuickIndex,omitempty"`
// 升级storedb是否重新执行localdb
EnableReExecLocal bool `protobuf:"varint,13,opt,name=enableReExecLocal" json:"enableReExecLocal,omitempty"`
EnableReExecLocal bool `protobuf:"varint,14,opt,name=enableReExecLocal" json:"enableReExecLocal,omitempty"`
// 区块回退
RollbackBlock int64 `protobuf:"varint,15,opt,name=rollbackBlock" json:"rollbackBlock,omitempty"`
}
// P2P 配置
......@@ -167,10 +169,6 @@ type P2P struct {
// 是否启动P2P服务
Enable bool `protobuf:"varint,9,opt,name=enable" json:"enable,omitempty"`
MsgCacheSize int32 `protobuf:"varint,10,opt,name=msgCacheSize" json:"msgCacheSize,omitempty"`
// 版本号
Version int32 `protobuf:"varint,11,opt,name=version" json:"version,omitempty"`
VerMin int32 `protobuf:"varint,12,opt,name=verMin" json:"verMin,omitempty"`
VerMax int32 `protobuf:"varint,13,opt,name=verMax" json:"verMax,omitempty"`
// 是否使用内置的种子节点
InnerSeedEnable bool `protobuf:"varint,14,opt,name=innerSeedEnable" json:"innerSeedEnable,omitempty"`
// 最多的接入节点个数
......@@ -179,6 +177,14 @@ type P2P struct {
UseGithub bool `protobuf:"varint,16,opt,name=useGithub" json:"useGithub,omitempty"`
//是否等待Pid
WaitPid bool `protobuf:"varint,17,opt,name=waitPid" json:"waitPid,omitempty"`
//交易开始采用哈希广播的ttl
LightTxTTL int32 `protobuf:"varint,18,opt,name=lightTxTTL" json:"lightTxTTL,omitempty"`
// 最大传播ttl, ttl达到该值将停止继续向外发送
MaxTTL int32 `protobuf:"varint,19,opt,name=maxTTL" json:"maxTTL,omitempty"`
// p2p网络频道,用于区分主网/测试网/其他网络
Channel int32 `protobuf:"varint,20,opt,name=channel" json:"channel,omitempty"`
//固定连接节点,只连接配置项seeds中的节点
FixedSeed bool `protobuf:"varint,21,opt,name=fixedSeed" json:"fixedSeed,omitempty"`
}
// RPC 配置
......
......@@ -141,6 +141,7 @@ var (
ErrVersion = errors.New("ErrVersionNoSupport")
ErrStreamPing = errors.New("ErrStreamPing")
ErrPeerStop = errors.New("ErrPeerStop")
ErrP2PChannel = errors.New("ErrIllegalP2PChannel")
ErrBlockSize = errors.New("ErrBlockSize")
ErrTxGroupIndex = errors.New("ErrTxGroupIndex")
......@@ -186,4 +187,6 @@ var (
ErrDisableWrite = errors.New("ErrDisableWrite")
ErrDisableRead = errors.New("ErrDisableRead")
ErrConsensusHashErr = errors.New("ErrConsensusHashErr")
)
......@@ -162,6 +162,9 @@ const (
EventReplyLastBlockMainSequence = 301
EventGetMainSeqByHash = 302
EventReplyMainSeqByHash = 303
//其他模块读写blockchain db事件
EventSetValueByKey = 304
EventGetValueByKey = 305
)
var eventName = map[int]string{
......@@ -313,4 +316,6 @@ var eventName = map[int]string{
EventReplyLastBlockMainSequence: "EventReplyLastBlockMainSequence",
EventGetMainSeqByHash: "EventGetMainSeqByHash",
EventReplyMainSeqByHash: "EventReplyMainSeqByHash",
EventSetValueByKey: "EventSetValueByKey",
EventGetValueByKey: "EventGetValueByKey",
}
......@@ -5,19 +5,21 @@
package types
import (
"bytes"
"fmt"
)
// 定义key值
var (
LocalPrefix = []byte("LODB")
FlagTxQuickIndex = []byte("FLAG:FlagTxQuickIndex")
FlagKeyMVCC = []byte("FLAG:keyMVCCFlag")
TxHashPerfix = []byte("TX:")
TxShortHashPerfix = []byte("STX:")
TxAddrHash = []byte("TxAddrHash:")
TxAddrDirHash = []byte("TxAddrDirHash:")
AddrTxsCount = []byte("AddrTxsCount:")
LocalPrefix = []byte("LODB")
FlagTxQuickIndex = []byte("FLAG:FlagTxQuickIndex")
FlagKeyMVCC = []byte("FLAG:keyMVCCFlag")
TxHashPerfix = []byte("TX:")
TxShortHashPerfix = []byte("STX:")
TxAddrHash = []byte("TxAddrHash:")
TxAddrDirHash = []byte("TxAddrDirHash:")
AddrTxsCount = []byte("AddrTxsCount:")
ConsensusParaTxsPrefix = []byte("LODBP:Consensus:Para:") //存贮para共识模块从主链拉取的平行链交易
)
// GetLocalDBKeyList 获取localdb的key列表
......@@ -89,3 +91,13 @@ func CalcRollbackKey(execer []byte, hash []byte) []byte {
key = append(key, hash...)
return key
}
//CalcConsensusParaTxsKey 平行链localdb中保存的平行链title对应的交易
func CalcConsensusParaTxsKey(key []byte) []byte {
return append(ConsensusParaTxsPrefix, key...)
}
//CheckConsensusParaTxsKey 检测para共识模块需要操作的平行链交易的key值
func CheckConsensusParaTxsKey(key []byte) bool {
return bytes.HasPrefix(key, ConsensusParaTxsPrefix)
}
......@@ -703,7 +703,7 @@ func (_m *Chain33Client) GetLastMemPool(ctx context.Context, in *types.ReqNil, o
}
// GetMemPool provides a mock function with given fields: ctx, in, opts
func (_m *Chain33Client) GetMemPool(ctx context.Context, in *types.ReqNil, opts ...grpc.CallOption) (*types.ReplyTxList, error) {
func (_m *Chain33Client) GetMemPool(ctx context.Context, in *types.ReqGetMempool, opts ...grpc.CallOption) (*types.ReplyTxList, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
......@@ -714,7 +714,7 @@ func (_m *Chain33Client) GetMemPool(ctx context.Context, in *types.ReqNil, opts
ret := _m.Called(_ca...)
var r0 *types.ReplyTxList
if rf, ok := ret.Get(0).(func(context.Context, *types.ReqNil, ...grpc.CallOption) *types.ReplyTxList); ok {
if rf, ok := ret.Get(0).(func(context.Context, *types.ReqGetMempool, ...grpc.CallOption) *types.ReplyTxList); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
......@@ -723,7 +723,7 @@ func (_m *Chain33Client) GetMemPool(ctx context.Context, in *types.ReqNil, opts
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *types.ReqNil, ...grpc.CallOption) error); ok {
if rf, ok := ret.Get(1).(func(context.Context, *types.ReqGetMempool, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
......
......@@ -203,11 +203,17 @@ message P2PGetData {
repeated Inventory invs = 2;
}
//
message P2PRoute {
int32 TTL = 1;
}
/**
* p2p 发送交易协议
*/
message P2PTx {
Transaction tx = 1;
Transaction tx = 1;
P2PRoute route = 2;
}
/**
......@@ -216,6 +222,52 @@ message P2PTx {
message P2PBlock {
Block block = 1;
}
/**
* p2p 轻量级区块, 广播交易短哈希列表
*/
message LightBlock {
int64 size = 1;
Header header = 2;
Transaction minerTx = 3;
repeated string sTxHashes = 4;
}
// 轻量级交易广播
message LightTx {
bytes txHash = 1;
P2PRoute route = 2;
}
// 请求完整交易数据
message P2PTxReq {
bytes txHash = 1;
}
// 请求区块内交易数据
message P2PBlockTxReq {
string blockHash = 1;
repeated int32 txIndices = 2;
}
// 区块交易数据返回
message P2PBlockTxReply {
string blockHash = 1;
repeated int32 txIndices = 2;
repeated Transaction txs = 3;
}
/* 节点收到区块或交易hash,
* 当在本地不存在时,需要请求重发完整交易或区块
* 采用统一结构减少消息类型
*/
message P2PQueryData {
oneof value {
P2PTxReq txReq = 1;
P2PBlockTxReq blockTxReq = 2;
}
}
/**
* p2p 协议和软件版本
*/
......@@ -230,10 +282,14 @@ message Versions {
*/
message BroadCastData {
oneof value {
P2PTx tx = 1;
P2PBlock block = 2;
P2PPing ping = 3;
Versions version = 4;
P2PTx tx = 1;
P2PBlock block = 2;
P2PPing ping = 3;
Versions version = 4;
LightTx ltTx = 5;
LightBlock ltBlock = 6;
P2PQueryData query = 7;
P2PBlockTxReply blockRep = 8;
}
}
......@@ -315,4 +371,4 @@ message PeersInfo {
int32 port = 3;
string softversion = 4;
int32 p2pversion = 5;
}
\ No newline at end of file
}
......@@ -32,7 +32,7 @@ service chain33 {
rpc GetTransactionByHashes(ReqHashes) returns (TransactionDetails) {}
//缓存接口
rpc GetMemPool(ReqNil) returns (ReplyTxList) {}
rpc GetMemPool(ReqGetMempool) returns (ReplyTxList) {}
//钱包接口
//获取钱包账户信息
......
......@@ -165,6 +165,10 @@ message ReplyTxList {
repeated Transaction txs = 1;
}
message ReqGetMempool {
bool isAll = 1;
}
message ReqProperFee {
int32 txCount = 1;
int32 txSize = 2;
......
......@@ -32,6 +32,40 @@ func CalcBitMap(bases, subs [][]byte, subData []*types.ReceiptData) []byte {
return rst.Bytes()
}
//CalcSingleBitMap calc bitmap to bases by data
func CalcSingleBitMap(bases [][]byte, data []*types.ReceiptData) []byte {
rst := big.NewInt(0)
for i := range bases {
if data[i].Ty == types.ExecOk {
rst.SetBit(rst, i, 1)
}
}
return rst.Bytes()
}
//CalcBitMapByBitMap bitmap align with subs
func CalcBitMapByBitMap(bases, subs [][]byte, bitmap []byte) []byte {
rst := big.NewInt(0)
bit := big.NewInt(0).SetBytes(bitmap)
subMap := make(map[string]bool)
for i, sub := range subs {
if bit.Bit(i) == uint(0x1) {
subMap[string(sub)] = true
}
}
for i, base := range bases {
if _, exist := subMap[string(base)]; exist {
rst.SetBit(rst, i, 1)
}
}
return rst.Bytes()
}
//BitMapBit :index begin from 0, find the index bit, 1 or 0
func BitMapBit(bitmap []byte, index uint32) bool {
rst := big.NewInt(0).SetBytes(bitmap)
......
......@@ -89,3 +89,46 @@ func TestDecodeByteBitMap(t *testing.T) {
ret = BitMapBit(rst, i)
assert.False(t, ret)
}
func TestCalcSingleBitMap(t *testing.T) {
ori := [][]byte{} //{0,1,2,3,4,5,6,7,8,9}
for i := 0; i < 10; i++ {
ori = append(ori, common.Sha256([]byte(string(i))))
}
d0 := &types.ReceiptData{Ty: types.ExecOk}
d1 := &types.ReceiptData{Ty: types.ExecPack}
d2 := &types.ReceiptData{Ty: types.ExecOk}
d3 := &types.ReceiptData{Ty: types.ExecPack}
d4 := &types.ReceiptData{Ty: types.ExecOk}
d5 := &types.ReceiptData{Ty: types.ExecOk}
d6 := &types.ReceiptData{Ty: types.ExecPack}
d7 := &types.ReceiptData{Ty: types.ExecOk}
d8 := &types.ReceiptData{Ty: types.ExecPack}
d9 := &types.ReceiptData{Ty: types.ExecPack}
data := []*types.ReceiptData{d0, d1, d2, d3, d4, d5, d6, d7, d8, d9}
rst := CalcSingleBitMap(ori, data)
//t.Log(rst)
check := []byte{0xb5}
assert.Equal(t, check, rst)
}
func TestCalcBitMapByBitMap(t *testing.T) {
ori := [][]byte{} //{0,1,2,3,4,5,6,7,8,9}
for i := 0; i < 10; i++ {
ori = append(ori, common.Sha256([]byte(string(i))))
}
sub := [][]byte{}
arry := []byte{0, 2, 4, 6, 7, 9}
for _, v := range arry {
sub = append(sub, common.Sha256([]byte(string(v))))
}
bitmap := []byte{0x97}
rst := CalcBitMapByBitMap(sub, ori, bitmap)
//t.Log(rst)
check := []byte{0x17}
assert.Equal(t, check, rst)
}
......@@ -50,6 +50,7 @@ var (
versionCmd = flag.Bool("v", false, "version")
fixtime = flag.Bool("fixtime", false, "fix time")
waitPid = flag.Bool("waitpid", false, "p2p stuck until seed save info wallet & wallet unlock")
rollback = flag.Int64("rollback", 0, "rollback block")
)
//RunChain33 : run Chain33
......@@ -153,6 +154,7 @@ func RunChain33(name string) {
exec.SetQueueClient(q.Client())
log.Info("loading blockchain module")
cfg.BlockChain.RollbackBlock = *rollback
chain := blockchain.New(cfg.BlockChain)
chain.SetQueueClient(q.Client())
......@@ -166,6 +168,16 @@ func RunChain33(name string) {
cs := consensus.New(cfg.Consensus, sub.Consensus)
cs.SetQueueClient(q.Client())
//jsonrpc, grpc, channel 三种模式
rpcapi := rpc.New(cfg.RPC)
rpcapi.SetQueueClient(q.Client())
log.Info("loading wallet module")
walletm := wallet.New(cfg.Wallet, sub.Wallet)
walletm.SetQueueClient(q.Client())
chain.Rollbackblock()
log.Info("loading p2p module")
var network queue.Module
if cfg.P2P.Enable && !types.IsPara() {
......@@ -175,14 +187,6 @@ func RunChain33(name string) {
}
network.SetQueueClient(q.Client())
//jsonrpc, grpc, channel 三种模式
rpcapi := rpc.New(cfg.RPC)
rpcapi.SetQueueClient(q.Client())
log.Info("loading wallet module")
walletm := wallet.New(cfg.Wallet, sub.Wallet)
walletm.SetQueueClient(q.Client())
health := util.NewHealthCheckServer(q.Client())
health.Start(cfg.Health)
defer func() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment