Commit bdbcd763 authored by vipwzw's avatar vipwzw Committed by 33cn

update chain33 07/26 2019

parent 1120f58f
......@@ -18,8 +18,18 @@ func (m *mockStore) SetQueueClient(q queue.Queue) {
client.Sub("store")
for msg := range client.Recv() {
switch msg.Ty {
case types.EventStoreSet:
msg.Reply(client.NewMessage("store", types.EventStoreSetReply, &types.ReplyHash{}))
case types.EventStoreGet:
msg.Reply(client.NewMessage("store", types.EventStoreGetReply, &types.StoreReplyValue{}))
case types.EventStoreMemSet:
msg.Reply(client.NewMessage("store", types.EventStoreSetReply, &types.ReplyHash{}))
case types.EventStoreCommit:
msg.Reply(client.NewMessage("store", types.EventStoreCommit, &types.ReplyHash{}))
case types.EventStoreRollback:
msg.Reply(client.NewMessage("store", types.EventStoreRollback, &types.ReplyHash{}))
case types.EventStoreDel:
msg.Reply(client.NewMessage("store", types.EventStoreDel, &types.ReplyHash{}))
case types.EventStoreGetTotalCoins:
if req, ok := msg.GetData().(*types.IterateRangeByStateHash); ok {
if req.Count == 10 {
......
......@@ -1183,6 +1183,52 @@ func (_m *QueueProtocolAPI) SignRawTx(param *types.ReqSignRawTx) (*types.ReplySi
return r0, r1
}
// StoreCommit provides a mock function with given fields: param
func (_m *QueueProtocolAPI) StoreCommit(param *types.ReqHash) (*types.ReplyHash, error) {
ret := _m.Called(param)
var r0 *types.ReplyHash
if rf, ok := ret.Get(0).(func(*types.ReqHash) *types.ReplyHash); ok {
r0 = rf(param)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ReplyHash)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*types.ReqHash) error); ok {
r1 = rf(param)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// StoreDel provides a mock function with given fields: param
func (_m *QueueProtocolAPI) StoreDel(param *types.StoreDel) (*types.ReplyHash, error) {
ret := _m.Called(param)
var r0 *types.ReplyHash
if rf, ok := ret.Get(0).(func(*types.StoreDel) *types.ReplyHash); ok {
r0 = rf(param)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ReplyHash)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*types.StoreDel) error); ok {
r1 = rf(param)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// StoreGet provides a mock function with given fields: _a0
func (_m *QueueProtocolAPI) StoreGet(_a0 *types.StoreGet) (*types.StoreReplyValue, error) {
ret := _m.Called(_a0)
......@@ -1252,6 +1298,75 @@ func (_m *QueueProtocolAPI) StoreList(param *types.StoreList) (*types.StoreListR
return r0, r1
}
// StoreMemSet provides a mock function with given fields: param
func (_m *QueueProtocolAPI) StoreMemSet(param *types.StoreSetWithSync) (*types.ReplyHash, error) {
ret := _m.Called(param)
var r0 *types.ReplyHash
if rf, ok := ret.Get(0).(func(*types.StoreSetWithSync) *types.ReplyHash); ok {
r0 = rf(param)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ReplyHash)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*types.StoreSetWithSync) error); ok {
r1 = rf(param)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// StoreRollback provides a mock function with given fields: param
func (_m *QueueProtocolAPI) StoreRollback(param *types.ReqHash) (*types.ReplyHash, error) {
ret := _m.Called(param)
var r0 *types.ReplyHash
if rf, ok := ret.Get(0).(func(*types.ReqHash) *types.ReplyHash); ok {
r0 = rf(param)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ReplyHash)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*types.ReqHash) error); ok {
r1 = rf(param)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// StoreSet provides a mock function with given fields: param
func (_m *QueueProtocolAPI) StoreSet(param *types.StoreSetWithSync) (*types.ReplyHash, error) {
ret := _m.Called(param)
var r0 *types.ReplyHash
if rf, ok := ret.Get(0).(func(*types.StoreSetWithSync) *types.ReplyHash); ok {
r0 = rf(param)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ReplyHash)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*types.StoreSetWithSync) error); ok {
r1 = rf(param)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Version provides a mock function with given fields:
func (_m *QueueProtocolAPI) Version() (*types.VersionInfo, error) {
ret := _m.Called()
......
......@@ -95,6 +95,12 @@ func TestQueueProtocol(t *testing.T) {
testLocalList(t, api)
testGetLastHeader(t, api)
testSignRawTx(t, api)
testStoreSet(t, api)
testStoreGet(t, api)
testStoreMemSet(t, api)
testStoreCommit(t, api)
testStoreRollback(t, api)
testStoreDel(t, api)
testStoreGetTotalCoins(t, api)
testStoreList(t, api)
testBlockChainQuery(t, api)
......@@ -121,6 +127,78 @@ func testBlockChainQuery(t *testing.T, api client.QueueProtocolAPI) {
}
}
func testStoreSet(t *testing.T, api client.QueueProtocolAPI) {
_, err := api.StoreSet(&types.StoreSetWithSync{})
if err != nil {
t.Error("Call StoreSet Failed.", err)
}
_, err = api.StoreSet(nil)
if err == nil {
t.Error("StoreSet(nil) need return error.")
}
}
func testStoreGet(t *testing.T, api client.QueueProtocolAPI) {
_, err := api.StoreGet(&types.StoreGet{})
if err != nil {
t.Error("Call StoreGet Failed.", err)
}
_, err = api.StoreGet(nil)
if err == nil {
t.Error("StoreGet(nil) need return error.")
}
}
func testStoreMemSet(t *testing.T, api client.QueueProtocolAPI) {
_, err := api.StoreMemSet(&types.StoreSetWithSync{})
if err != nil {
t.Error("Call StoreMemSet Failed.", err)
}
_, err = api.StoreMemSet(nil)
if err == nil {
t.Error("StoreMemSet(nil) need return error.")
}
}
func testStoreCommit(t *testing.T, api client.QueueProtocolAPI) {
_, err := api.StoreCommit(&types.ReqHash{})
if err != nil {
t.Error("Call StoreCommit Failed.", err)
}
_, err = api.StoreCommit(nil)
if err == nil {
t.Error("StoreCommit(nil) need return error.")
}
}
func testStoreRollback(t *testing.T, api client.QueueProtocolAPI) {
_, err := api.StoreRollback(&types.ReqHash{})
if err != nil {
t.Error("Call StoreRollback Failed.", err)
}
_, err = api.StoreRollback(nil)
if err == nil {
t.Error("StoreRollback(nil) need return error.")
}
}
func testStoreDel(t *testing.T, api client.QueueProtocolAPI) {
_, err := api.StoreDel(&types.StoreDel{})
if err != nil {
t.Error("Call StoreDel Failed.", err)
}
_, err = api.StoreDel(nil)
if err == nil {
t.Error("StoreDel(nil) need return error.")
}
}
func testStoreGetTotalCoins(t *testing.T, api client.QueueProtocolAPI) {
_, err := api.StoreGetTotalCoins(&types.IterateRangeByStateHash{})
if err != nil {
......
......@@ -137,7 +137,12 @@ type QueueProtocolAPI interface {
// --------------- blockchain interfaces end
// +++++++++++++++ store interfaces begin
StoreSet(param *types.StoreSetWithSync) (*types.ReplyHash, error)
StoreGet(*types.StoreGet) (*types.StoreReplyValue, error)
StoreMemSet(param *types.StoreSetWithSync) (*types.ReplyHash, error)
StoreCommit(param *types.ReqHash) (*types.ReplyHash, error)
StoreRollback(param *types.ReqHash) (*types.ReplyHash, error)
StoreDel(param *types.StoreDel) (*types.ReplyHash, error)
StoreGetTotalCoins(*types.IterateRangeByStateHash) (*types.ReplyGetTotalCoins, error)
StoreList(param *types.StoreList) (*types.StoreListReply, error)
// --------------- store interfaces end
......
......@@ -242,6 +242,20 @@ func (e *executor) execDelLocal(tx *types.Transaction, r *types.ReceiptData, ind
}
func (e *executor) loadDriver(tx *types.Transaction, index int) (c drivers.Driver) {
if types.IsFork(e.height, "ForkCacheDriver") {
return e.loadDriverNoCache(tx, index)
}
return e.loadDriverWithCache(tx, index)
}
func (e *executor) loadDriverNoCache(tx *types.Transaction, index int) (c drivers.Driver) {
exec := drivers.LoadDriverAllow(tx, index, e.height)
e.setEnv(exec)
return exec
}
// cache exec name bug: 部分执行是否可以执行依赖于合约里的具体操作, 而不是只依赖执行器名字
func (e *executor) loadDriverWithCache(tx *types.Transaction, index int) (c drivers.Driver) {
ename := string(tx.Execer)
exec, ok := e.execCache[ename]
if ok {
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package executor
import (
"testing"
"time"
_ "github.com/33cn/chain33/system"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
"github.com/stretchr/testify/assert"
)
func TestLoadDriverFork(t *testing.T) {
execInit(nil)
var txs []*types.Transaction
addr, _ := util.Genaddress()
genkey := util.TestPrivkeyList[0]
tx := util.CreateCoinsTx(genkey, addr, types.Coin)
txs = append(txs, tx)
// local fork值 为0, 测试不出fork前的情况
types.SetTitleOnlyForTest("chain33")
t.Log("get fork value", types.GetFork("ForkCacheDriver"), types.GetTitle())
cases := []struct {
height int64
cacheSize int
}{
{types.GetFork("ForkCacheDriver") - 1, 1},
{types.GetFork("ForkCacheDriver"), 0},
}
for _, c := range cases {
ctx := &executorCtx{
stateHash: nil,
height: c.height,
blocktime: time.Now().Unix(),
difficulty: 1,
mainHash: nil,
parentHash: nil,
}
execute := newExecutor(ctx, &Executor{}, nil, txs, nil)
_ = execute.loadDriver(tx, 0)
assert.Equal(t, c.cacheSize, len(execute.execCache))
}
}
......@@ -65,7 +65,8 @@ func TestTxGroup(t *testing.T) {
txs = append(txs, util.CreateCoinsTx(priv2, addr3, types.Coin))
txs = append(txs, util.CreateCoinsTx(priv3, addr4, types.Coin))
//执行三笔交易: 全部正确
txgroup, err := types.CreateTxGroup(txs)
feeRate := types.GInt("MinFee")
txgroup, err := types.CreateTxGroup(txs, feeRate)
assert.Nil(t, err)
//重新签名
txgroup.SignN(0, types.SECP256K1, genkey)
......@@ -85,7 +86,7 @@ func TestTxGroup(t *testing.T) {
txs = append(txs, util.CreateCoinsTx(genkey, addr4, types.Coin))
txs = append(txs, util.CreateCoinsTx(genkey, addr4, types.Coin))
txgroup, err = types.CreateTxGroup(txs)
txgroup, err = types.CreateTxGroup(txs, feeRate)
assert.Nil(t, err)
//重新签名
txgroup.SignN(0, types.SECP256K1, priv2)
......@@ -100,7 +101,7 @@ func TestTxGroup(t *testing.T) {
txs = append(txs, util.CreateCoinsTx(priv2, addr4, 2*types.Coin))
txs = append(txs, util.CreateCoinsTx(genkey, addr4, types.Coin))
txgroup, err = types.CreateTxGroup(txs)
txgroup, err = types.CreateTxGroup(txs, feeRate)
assert.Nil(t, err)
//重新签名
txgroup.SignN(0, types.SECP256K1, genkey)
......@@ -115,7 +116,7 @@ func TestTxGroup(t *testing.T) {
txs = append(txs, util.CreateCoinsTx(genkey, addr4, types.Coin))
txs = append(txs, util.CreateCoinsTx(priv2, addr4, 10*types.Coin))
txgroup, err = types.CreateTxGroup(txs)
txgroup, err = types.CreateTxGroup(txs, feeRate)
assert.Nil(t, err)
//重新签名
txgroup.SignN(0, types.SECP256K1, genkey)
......@@ -131,7 +132,7 @@ func TestTxGroup(t *testing.T) {
txs = append(txs, util.CreateCoinsTx(priv2, addr4, 10*types.Coin))
txs[2].Execer = []byte("user.xxx")
txs[2].To = address.ExecAddress("user.xxx")
txgroup, err = types.CreateTxGroup(txs)
txgroup, err = types.CreateTxGroup(txs, feeRate)
assert.Nil(t, err)
//重新签名
txgroup.SignN(0, types.SECP256K1, genkey)
......
......@@ -41,7 +41,7 @@ func TestExecutorGetTxGroup(t *testing.T) {
txs = append(txs, util.CreateCoinsTx(priv2, addr3, types.Coin))
txs = append(txs, util.CreateCoinsTx(priv3, addr4, types.Coin))
//执行三笔交易: 全部正确
txgroup, err := types.CreateTxGroup(txs)
txgroup, err := types.CreateTxGroup(txs, types.GInt("MinFee"))
if err != nil {
t.Error(err)
return
......
......@@ -35,7 +35,7 @@ const (
)
const (
defalutNatPort = 23802
//defalutNatPort = 23802
maxOutBoundNum = 25
stableBoundNum = 15
maxAttemps = 5
......
......@@ -5,7 +5,6 @@
package p2p
import (
"container/list"
"fmt"
"io"
"sort"
......@@ -41,13 +40,13 @@ func (i Invs) Swap(a, b int) {
// DownloadJob defines download job type
type DownloadJob struct {
wg sync.WaitGroup
retryList *list.List
p2pcli *Cli
canceljob int32
mtx sync.Mutex
busyPeer map[string]*peerJob
downloadPeers []*Peer
MaxJob int32
retryItems Invs
}
type peerJob struct {
......@@ -57,11 +56,10 @@ type peerJob struct {
// NewDownloadJob create a downloadjob object
func NewDownloadJob(p2pcli *Cli, peers []*Peer) *DownloadJob {
job := new(DownloadJob)
job.retryList = list.New()
job.p2pcli = p2pcli
job.busyPeer = make(map[string]*peerJob)
job.downloadPeers = peers
job.retryItems = make([]*pb.Inventory, 0)
job.MaxJob = 5
if len(peers) < 5 {
job.MaxJob = 10
......@@ -141,35 +139,30 @@ func (d *DownloadJob) setFreePeer(pid string) {
}
}
//加入到重试数组
func (d *DownloadJob) appendRetryItem(item *pb.Inventory) {
d.mtx.Lock()
defer d.mtx.Unlock()
d.retryItems = append(d.retryItems, item)
}
// GetFreePeer get free peer ,return peer
func (d *DownloadJob) GetFreePeer(blockHeight int64) *Peer {
_, infos := d.p2pcli.network.node.GetActivePeers()
var jobNum int32 = 10
infos := d.p2pcli.network.node.nodeInfo.peerInfos.GetPeerInfos()
var minJobNum int32 = 10
var bestPeer *Peer
for _, peer := range d.downloadPeers {
pbpeer, ok := infos[peer.Addr()]
if ok {
if len(peer.GetPeerName()) == 0 {
peer.SetPeerName(pbpeer.GetName())
}
if pbpeer.GetHeader().GetHeight() >= blockHeight {
if d.isBusyPeer(pbpeer.GetName()) {
continue
}
peerJopNum := d.getJobNum(pbpeer.GetName())
if jobNum > peerJopNum {
jobNum = peerJopNum
bestPeer = peer
}
}
peerName := peer.GetPeerName()
if d.isBusyPeer(peerName) {
continue
}
}
if bestPeer != nil {
d.setBusyPeer(bestPeer.GetPeerName())
if jobNum := d.getJobNum(peerName); jobNum < minJobNum &&
infos[peerName].GetHeader().GetHeight() >= blockHeight {
minJobNum = jobNum
bestPeer = peer
}
}
return bestPeer
}
......@@ -186,20 +179,20 @@ func (d *DownloadJob) isCancel() bool {
// DownloadBlock download the block
func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory,
bchan chan *pb.BlockPid) []*pb.Inventory {
var errinvs []*pb.Inventory
if d.isCancel() {
return errinvs
return nil
}
for _, inv := range invs { //让一个节点一次下载一个区块,下载失败区块,交给下一轮下载
REGET:
freePeer := d.GetFreePeer(inv.GetHeight()) //获取当前任务数最少的节点,相当于 下载速度最快的节点
if freePeer == nil {
//获取当前任务数最少的节点,相当于 下载速度最快的节点
freePeer := d.GetFreePeer(inv.GetHeight())
for freePeer == nil {
log.Debug("no free peer")
time.Sleep(time.Millisecond * 100)
goto REGET
freePeer = d.GetFreePeer(inv.GetHeight())
}
d.setBusyPeer(freePeer.GetPeerName())
d.wg.Add(1)
go func(peer *Peer, inv *pb.Inventory) {
defer d.wg.Done()
......@@ -207,7 +200,7 @@ func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory,
if err != nil {
d.removePeer(peer.GetPeerName())
log.Error("DownloadBlock:syncDownloadBlock", "height", inv.GetHeight(), "peer", peer.GetPeerName(), "err", err)
d.retryList.PushFront(inv) //失败的下载,放在下一轮ReDownload进行下载
d.appendRetryItem(inv) //失败的下载,放在下一轮ReDownload进行下载
} else {
d.setFreePeer(peer.GetPeerName())
......@@ -217,35 +210,15 @@ func (d *DownloadJob) DownloadBlock(invs []*pb.Inventory,
}
return d.restOfInvs(bchan)
}
func (d *DownloadJob) restOfInvs(bchan chan *pb.BlockPid) []*pb.Inventory {
var errinvs []*pb.Inventory
if d.isCancel() {
return errinvs
}
//等待下载任务
d.wg.Wait()
if d.retryList.Len() == 0 {
return errinvs
}
var invsArr Invs
for e := d.retryList.Front(); e != nil; {
if e.Value == nil {
continue
}
log.Debug("resetofInvs", "inv", e.Value.(*pb.Inventory).GetHeight())
invsArr = append(invsArr, e.Value.(*pb.Inventory)) //把下载遗漏的区块,重新组合进行下载
next := e.Next()
d.retryList.Remove(e)
e = next
retryInvs := d.retryItems
//存在重试项
if retryInvs.Len() > 0 {
d.retryItems = make([]*pb.Inventory, 0)
sort.Sort(retryInvs)
}
//Sort
sort.Sort(invsArr)
//log.Info("resetOfInvs", "sorted:", invs)
return invsArr
return retryInvs
}
func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan chan *pb.BlockPid) error {
......@@ -270,23 +243,19 @@ func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan cha
defer func() {
log.Debug("download", "frompeer", peer.Addr(), "blockheight", inv.GetHeight(), "downloadcost", pb.Since(beg))
}()
defer resp.CloseSend()
for {
invdatas, err := resp.Recv()
if err != nil {
if err == io.EOF {
if invdatas == nil {
return nil
}
goto RECV
}
log.Error("download", "resp,Recv err", err.Error(), "download from", peer.Addr())
return err
}
RECV:
for _, item := range invdatas.Items {
bchan <- &pb.BlockPid{Pid: peer.GetPeerName(), Block: item.GetBlock()} //下载完成后插入bchan
log.Debug("download", "frompeer", peer.Addr(), "blockheight", inv.GetHeight(), "Blocksize", item.GetBlock().Size())
}
invData, err := resp.Recv()
if err != nil && err != io.EOF {
log.Error("syncDownloadBlock", "RecvData err", err.Error())
return err
}
//返回单个数据条目
if invData == nil || len(invData.Items) != 1 {
return fmt.Errorf("InvalidRecvData")
}
block := invData.Items[0].GetBlock()
bchan <- &pb.BlockPid{Pid: peer.GetPeerName(), Block: block} //加入到输出通道
log.Debug("download", "frompeer", peer.Addr(), "blockheight", inv.GetHeight(), "blockSize", block.Size())
return nil
}
......@@ -21,18 +21,18 @@ import (
// Listener the actions
type Listener interface {
Close()
Close1()
Start()
}
// Start listener start
// Start server start
func (l *listener) Start() {
l.p2pserver.Start()
go l.server.Serve(l.netlistener)
}
// Close listener close
// Close server close
func (l *listener) Close() {
err := l.netlistener.Close()
if err != nil {
......@@ -40,7 +40,7 @@ func (l *listener) Close() {
}
go l.server.Stop()
l.p2pserver.Close()
log.Info("stop", "listener", "close")
log.Info("stop", "server", "close")
}
......@@ -52,10 +52,10 @@ type listener struct {
netlistener net.Listener
}
// NewListener produce a listener object
func NewListener(protocol string, node *Node) Listener {
// newListener produce a server object
func newListener(protocol string, node *Node) *listener {
Retry:
log.Info("NewListener", "localPort", node.listenPort)
log.Info("newListener", "localPort", node.listenPort)
l, err := net.Listen(protocol, fmt.Sprintf(":%v", node.listenPort))
if err != nil {
log.Error("Failed to listen", "Error", err.Error())
......@@ -131,7 +131,7 @@ Retry:
var opts []grpc.ServerOption
opts = append(opts, grpc.UnaryInterceptor(interceptor), grpc.StreamInterceptor(interceptorStream))
//区块最多10M
msgRecvOp := grpc.MaxMsgSize(11 * 1024 * 1024) //设置最大接收数据大小位11M
msgRecvOp := grpc.MaxRecvMsgSize(11 * 1024 * 1024) //设置最大接收数据大小位11M
msgSendOp := grpc.MaxSendMsgSize(11 * 1024 * 1024) //设置最大发送数据大小为11M
var keepparm keepalive.ServerParameters
keepparm.Time = 5 * time.Minute
......
......@@ -381,11 +381,12 @@ func (n *Node) monitorPeers() {
}
peers, infos := n.GetActivePeers()
for paddr, pinfo := range infos {
for name, pinfo := range infos {
peerheight := pinfo.GetHeader().GetHeight()
if pinfo.GetName() == selfName && !pinfo.GetSelf() { //发现连接到自己,立即删除
paddr := pinfo.GetAddr()
if name == selfName && !pinfo.GetSelf() { //发现连接到自己,立即删除
//删除节点数过低的节点
n.remove(paddr)
n.remove(pinfo.GetAddr())
n.nodeInfo.addrBook.RemoveAddr(paddr)
n.nodeInfo.blacklist.Add(paddr, 0)
}
......@@ -462,7 +463,7 @@ func (n *Node) monitorDialPeers() {
continue
}
//不对已经连接上的地址或者黑名单地址发起连接
//不对已经连接上的地址或者黑名单地址发起连接 TODO:连接足够时,对于连入的地址也不再去重复连接(客户端服务端只维护一条连接, 后续优化)
if n.Has(netAddr.String()) || n.nodeInfo.blacklist.Has(netAddr.String()) || n.HasCacheBound(netAddr.String()) {
log.Debug("DialPeers", "find hash", netAddr.String())
continue
......@@ -594,7 +595,7 @@ func (n *Node) monitorCfgSeeds() {
peers, _ := n.GetActivePeers()
//选出当前连接的节点中,负载最大的节点
var MaxInBounds int32
var MaxInBoundPeer *Peer
MaxInBoundPeer := &Peer{}
for _, peer := range peers {
if peer.GetInBouns() > MaxInBounds {
MaxInBounds = peer.GetInBouns()
......
......@@ -65,7 +65,7 @@ func (n *upnp) AddMapping(protocol string, extport, intport int, desc string, li
if err != nil {
return nil
}
fmt.Println("internalAddress:", ip)
//fmt.Println("internalAddress:", ip)
protocol = strings.ToUpper(protocol)
lifetimeS := uint32(lifetime / time.Second)
err = n.DeleteMapping(protocol, extport, intport)
......
......@@ -34,9 +34,9 @@ func TestGetLocalAddr(t *testing.T) {
func TestP2pListen(t *testing.T) {
var node Node
node.listenPort = 3333
listen1 := NewListener("tcp", &node)
listen1 := newListener("tcp", &node)
assert.Equal(t, true, listen1 != nil)
listen2 := NewListener("tcp", &node)
listen2 := newListener("tcp", &node)
assert.Equal(t, true, listen2 != nil)
listen1.Close()
......
......@@ -26,10 +26,10 @@ import (
// 3.启动端口映射
// 4.启动监控模块,进行节点管理
// Start Node listener
// Start Node server
func (n *Node) Start() {
if n.listener != nil {
n.listener.Start()
if n.server != nil {
n.server.Start()
}
n.detectNodeAddr()
n.monitor()
......@@ -38,14 +38,14 @@ func (n *Node) Start() {
}
// Close node listener
// Close node server
func (n *Node) Close() {
//避免重复
if !atomic.CompareAndSwapInt32(&n.closed, 0, 1) {
return
}
if n.listener != nil {
n.listener.Close()
if n.server != nil {
n.server.Close()
}
log.Debug("stop", "listen", "closed")
n.nodeInfo.addrBook.Close()
......@@ -72,7 +72,7 @@ type Node struct {
cmtx sync.Mutex
cacheBound map[string]*Peer
outBound map[string]*Peer
listener Listener
server *listener
listenPort int
innerSeeds sync.Map
cfgSeeds sync.Map
......@@ -115,7 +115,7 @@ func NewNode(cfg *types.P2P) (*Node, error) {
}
node.nodeInfo = NewNodeInfo(cfg)
if cfg.ServerStart {
node.listener = NewListener(protocol, node)
node.server = newListener(protocol, node)
}
return node, nil
}
......@@ -172,6 +172,7 @@ func (n *Node) doNat() {
if n.Size() > 0 {
break
}
time.Sleep(time.Millisecond * 100)
}
p2pcli := NewNormalP2PCli()
......@@ -306,9 +307,10 @@ func (n *Node) GetActivePeers() (map[string]*Peer, map[string]*types.Peer) {
var peers = make(map[string]*Peer)
for _, peer := range regPeers {
if _, ok := infos[peer.Addr()]; ok {
name := peer.GetPeerName()
if _, ok := infos[name]; ok {
peers[peer.Addr()] = peer
peers[name] = peer
}
}
return peers, infos
......@@ -390,7 +392,7 @@ func (n *Node) detectNodeAddr() {
if len(exportBytes) != 0 {
externalPort = int(P2pComm.BytesToInt32(exportBytes))
} else {
externalPort = defalutNatPort
externalPort = n.listenPort
}
if err != nil {
log.Error("bookDb Get", "nodePort", n.listenPort, "externalPortTag fail err:", err)
......@@ -444,7 +446,7 @@ func (n *Node) natMapPort() {
err = nat.Any().AddMapping("TCP", int(n.nodeInfo.GetExternalAddr().Port), n.listenPort, nodename[:8], time.Hour*48)
if err != nil {
if i > tryMapPortTimes/2 { //如果连续失败次数超过最大限制次数的二分之一则切换为随机端口映射
log.Error("NatMapPort", "err", err.Error())
log.Warn("TryNatMapPortFailed", "tryTimes", i, "err", err.Error())
n.flushNodePort(uint16(n.listenPort), uint16(rand.Intn(64512)+1023))
}
......@@ -507,3 +509,14 @@ func (n *Node) natNotice() {
func (n *Node) verifyP2PChannel(channel int32) bool {
return channel == n.nodeInfo.cfg.Channel
}
//检测该节点地址是否作为客户端连入, 此时需要维护双向连接, 增加了节点间的连接冗余
func (n *Node) isInBoundPeer(peerAddr string) (bool, *innerpeer) {
if n.server == nil || n.server.p2pserver == nil {
return false, nil
}
//查询连入的客户端
info := n.server.p2pserver.getInBoundPeerInfo(peerAddr)
return info == nil, info
}
......@@ -5,7 +5,6 @@
package p2p
import (
"fmt"
"sync"
"sync/atomic"
......@@ -51,7 +50,8 @@ func NewNodeInfo(cfg *types.P2P) *NodeInfo {
// PeerInfos encapsulation peer information
type PeerInfos struct {
mtx sync.Mutex
mtx sync.Mutex
//key:peerName
infos map[string]*types.Peer
}
......@@ -72,7 +72,7 @@ func (p *PeerInfos) FlushPeerInfos(in []*types.Peer) {
}
for _, peer := range in {
p.infos[fmt.Sprintf("%v:%v", peer.GetAddr(), peer.GetPort())] = peer
p.infos[peer.GetName()] = peer
}
}
......@@ -91,16 +91,18 @@ func (p *PeerInfos) GetPeerInfos() map[string]*types.Peer {
func (p *PeerInfos) SetPeerInfo(peer *types.Peer) {
p.mtx.Lock()
defer p.mtx.Unlock()
key := fmt.Sprintf("%v:%v", peer.GetAddr(), peer.GetPort())
p.infos[key] = peer
if peer.GetName() == "" {
return
}
p.infos[peer.GetName()] = peer
}
// GetPeerInfo return a infos by key
func (p *PeerInfos) GetPeerInfo(key string) *types.Peer {
func (p *PeerInfos) GetPeerInfo(peerName string) *types.Peer {
p.mtx.Lock()
defer p.mtx.Unlock()
if _, ok := p.infos[key]; ok {
return p.infos[key]
if peer, ok := p.infos[peerName]; ok {
return peer
}
return nil
}
......@@ -130,27 +132,24 @@ func (nf *NodeInfo) latestPeerInfo(n *Node) map[string]*types.Peer {
log.Debug("latestPeerInfo", "register peer num", len(peers))
for _, peer := range peers {
if peer.Addr() == n.nodeInfo.GetExternalAddr().String() { //fmt.Sprintf("%v:%v", ExternalIp, m.network.node.GetExterPort())
if !peer.GetRunning() || peer.Addr() == n.nodeInfo.GetExternalAddr().String() {
n.remove(peer.Addr())
continue
}
peerinfo, err := peer.GetPeerInfo()
if err != nil {
if err == types.ErrVersion {
peer.version.SetSupport(false)
P2pComm.CollectPeerStat(err, peer)
log.Error("latestPeerInfo", "Err", err.Error(), "peer", peer.Addr())
}
peerinfo, err := peer.GetPeerInfo()
if err != nil || peerinfo.GetName() == "" {
P2pComm.CollectPeerStat(err, peer)
log.Error("latestPeerInfo", "Err", err, "peer", peer.Addr())
continue
}
P2pComm.CollectPeerStat(err, peer)
var pr types.Peer
pr.Addr = peerinfo.GetAddr()
pr.Port = peerinfo.GetPort()
pr.Name = peerinfo.GetName()
pr.MempoolSize = peerinfo.GetMempoolSize()
pr.Header = peerinfo.GetHeader()
peerlist[fmt.Sprintf("%v:%v", peerinfo.Addr, peerinfo.Port)] = &pr
peerlist[pr.Name] = &pr
}
return peerlist
}
......
......@@ -3,11 +3,11 @@ package p2p
import (
"encoding/hex"
"sync/atomic"
"time"
"os"
"strings"
"testing"
"time"
l "github.com/33cn/chain33/common/log"
......@@ -197,16 +197,36 @@ func testPeer(t *testing.T, p2p *P2p, q queue.Queue) {
defer peer.Close()
peer.MakePersistent()
localP2P.node.addPeer(peer)
time.Sleep(time.Second * 5)
t.Log(peer.GetInBouns())
t.Log(peer.version.GetVersion())
assert.IsType(t, "string", peer.GetPeerName())
var info *innerpeer
t.Log("WaitRegisterPeerStart...")
for peer.GetPeerName() == "" ||
info == nil || info.p2pversion == 0 {
time.Sleep(time.Millisecond * 10)
info = p2p.node.server.p2pserver.getInBoundPeerInfo("127.0.0.1:43802")
}
t.Log("WaitRegisterPeerStop...")
p2pcli := NewNormalP2PCli()
num, err := p2pcli.GetInPeersNum(peer)
assert.Equal(t, 1, num)
assert.Nil(t, err)
tx1 := &types.Transaction{Execer: []byte("testTx1")}
tx2 := &types.Transaction{Execer: []byte("testTx2")}
localP2P.node.pubToPeer(&types.P2PTx{Tx: tx1}, peer.GetPeerName())
p2p.node.server.p2pserver.pubToStream(&types.P2PTx{Tx: tx2}, info.name)
t.Log("WaitRegisterTxFilterStart...")
for !(txHashFilter.QueryRecvData(hex.EncodeToString(tx1.Hash())) &&
txHashFilter.QueryRecvData(hex.EncodeToString(tx1.Hash()))) {
time.Sleep(time.Millisecond * 10)
}
t.Log("WaitRegisterTxFilterStop")
localP2P.node.AddCachePeer(peer)
peer.GetRunning()
localP2P.node.natOk()
localP2P.node.nodeInfo.FetchPeerInfo(localP2P.node)
peers, infos := localP2P.node.GetActivePeers()
assert.Equal(t, len(peers), len(infos))
localP2P.node.flushNodePort(43803, 43802)
p2pcli := NewNormalP2PCli()
localP2P.node.nodeInfo.peerInfos.SetPeerInfo(nil)
localP2P.node.nodeInfo.peerInfos.GetPeerInfo("1222")
t.Log(p2p.node.GetRegisterPeer("localhost:43802"))
......@@ -260,6 +280,9 @@ func testPeer(t *testing.T, p2p *P2p, q queue.Queue) {
job.setFreePeer(peer.GetPeerName())
job.removePeer(peer.GetPeerName())
job.CancelJob()
peer.Close()
localP2P.node.remove(peer.peerAddr.String())
}
//测试grpc 多连接
......
......@@ -380,39 +380,32 @@ func (m *Cli) GetHeaders(msg *queue.Message, taskindex int64) {
msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{IsOk: true, Msg: []byte("ok")}))
peers, infos := m.network.node.GetActivePeers()
var pidIsActivePeer bool
for paddr, info := range infos {
if info.GetName() == pid[0] { //匹配成功
peer, ok := peers[paddr]
if ok && peer != nil {
var err error
pidIsActivePeer = true
headers, err := peer.mconn.gcli.GetHeaders(context.Background(), &pb.P2PGetHeaders{StartHeight: req.GetStart(), EndHeight: req.GetEnd(),
Version: m.network.node.nodeInfo.channelVersion}, grpc.FailFast(true))
P2pComm.CollectPeerStat(err, peer)
if err != nil {
log.Error("GetBlocks", "Err", err.Error())
if err == pb.ErrVersion {
peer.version.SetSupport(false)
P2pComm.CollectPeerStat(err, peer) //把no support 消息传递过去
}
return
}
client := m.network.node.nodeInfo.client
msg := client.NewMessage("blockchain", pb.EventAddBlockHeaders, &pb.HeadersPid{Pid: pid[0], Headers: &pb.Headers{Items: headers.GetHeaders()}})
err = client.Send(msg, false)
if err != nil {
log.Error("send", "to blockchain EventAddBlockHeaders msg Err", err.Error())
}
if peer, ok := peers[pid[0]]; ok && peer != nil {
var err error
headers, err := peer.mconn.gcli.GetHeaders(context.Background(), &pb.P2PGetHeaders{StartHeight: req.GetStart(), EndHeight: req.GetEnd(),
Version: m.network.node.nodeInfo.channelVersion}, grpc.FailFast(true))
P2pComm.CollectPeerStat(err, peer)
if err != nil {
log.Error("GetBlocks", "Err", err.Error())
if err == pb.ErrVersion {
peer.version.SetSupport(false)
P2pComm.CollectPeerStat(err, peer) //把no support 消息传递过去
}
return
}
}
//当请求的pid不是ActivePeer时需要打印日志方便问题定位
if !pidIsActivePeer {
client := m.network.node.nodeInfo.client
msg := client.NewMessage("blockchain", pb.EventAddBlockHeaders, &pb.HeadersPid{Pid: pid[0], Headers: &pb.Headers{Items: headers.GetHeaders()}})
err = client.Send(msg, false)
if err != nil {
log.Error("send", "to blockchain EventAddBlockHeaders msg Err", err.Error())
}
} else {
//当请求的pid不是ActivePeer时需要打印日志方便问题定位
log.Debug("GetHeaders", "pid", pid[0], "ActivePeers", peers, "infos", infos)
}
}
// GetBlocks get blocks information
......@@ -446,32 +439,19 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
peers, infos := m.network.node.GetActivePeers()
if len(pids) > 0 && pids[0] != "" { //指定Pid 下载数据
log.Debug("fetch from peer in pids", "pids", pids)
var pidmap = make(map[string]bool)
for _, pid := range pids {
pidmap[pid] = true
}
for paddr, info := range infos {
if _, ok := pidmap[info.GetName()]; ok { //匹配成功
peer, ok := peers[paddr]
if ok && peer != nil {
downloadPeers = append(downloadPeers, peer)
}
if peer, ok := peers[pid]; ok && peer != nil {
downloadPeers = append(downloadPeers, peer)
}
}
} else {
log.Debug("fetch from all peers in pids")
for _, peer := range peers {
peerinfo, ok := infos[peer.Addr()]
if !ok {
continue
}
if peerinfo.GetHeader().GetHeight() < req.GetStart() { //高度不符合要求
for name, peer := range peers {
info, ok := infos[name]
if !ok || info.GetHeader().GetHeight() < req.GetStart() { //高度不符合要求
continue
}
downloadPeers = append(downloadPeers, peer)
}
}
......@@ -563,7 +543,7 @@ func (m *Cli) GetNetInfo(msg *queue.Message, taskindex int64) {
netinfo.Localaddr = m.network.node.nodeInfo.GetListenAddr().String()
netinfo.Service = m.network.node.nodeInfo.IsOutService()
netinfo.Outbounds = int32(m.network.node.Size())
netinfo.Inbounds = int32(len(m.network.node.listener.(interface{}).(*listener).p2pserver.getInBoundPeers()))
netinfo.Inbounds = int32(len(m.network.node.server.p2pserver.getInBoundPeers()))
msg.Reply(m.network.client.NewMessage("rpc", pb.EventReplyNetInfo, &netinfo))
}
......
......@@ -69,17 +69,14 @@ func (s *P2pserver) Ping(ctx context.Context, in *pb.P2PPing) (*pb.P2PPong, erro
log.Error("Ping", "p2p server", "check sig err")
return nil, pb.ErrPing
}
var peerip string
var err error
getctx, ok := pr.FromContext(ctx)
if ok {
peerip, _, err = net.SplitHostPort(getctx.Addr.String())
if err != nil {
return nil, fmt.Errorf("ctx.Addr format err")
}
peerIP, _, err := resolveClientNetAddr(ctx)
if err != nil {
log.Error("Ping", "get grpc peer addr err", err)
return nil, fmt.Errorf("get grpc peer addr err:%s", err.Error())
}
peeraddr := fmt.Sprintf("%s:%v", peerip, in.Port)
peeraddr := fmt.Sprintf("%s:%v", peerIP, in.Port)
remoteNetwork, err := NewNetAddressString(peeraddr)
if err == nil {
if !s.node.nodeInfo.blacklist.Has(peeraddr) {
......@@ -129,15 +126,6 @@ func (s *P2pserver) Version2(ctx context.Context, in *pb.P2PVersion) (*pb.P2PVer
channel, ver := decodeChannelVersion(in.GetVersion())
log.Debug("p2pServer Version2", "p2pChannel", channel, "p2p version", ver)
var peerip string
var err error
getctx, ok := pr.FromContext(ctx)
if ok {
peerip, _, err = net.SplitHostPort(getctx.Addr.String())
if err != nil {
return nil, fmt.Errorf("ctx.Addr format err")
}
}
if !s.node.verifyP2PChannel(channel) {
return nil, pb.ErrP2PChannel
......@@ -146,21 +134,29 @@ func (s *P2pserver) Version2(ctx context.Context, in *pb.P2PVersion) (*pb.P2PVer
log.Debug("Version2", "before", "GetPrivPubKey")
_, pub := s.node.nodeInfo.addrBook.GetPrivPubKey()
log.Debug("Version2", "after", "GetPrivPubKey")
//addrFrom:表示自己的外网地址,addrRecv:表示对方的外网地址
peerIP, _, err := resolveClientNetAddr(ctx)
if err != nil {
log.Error("Version2", "get grpc peer addr err", err)
return nil, fmt.Errorf("get grpc peer addr err:%s", err.Error())
}
//addrFrom:表示发送方外网地址,addrRecv:表示接收方外网地址
_, port, err := net.SplitHostPort(in.AddrFrom)
if err != nil {
return nil, fmt.Errorf("AddrFrom format err")
}
remoteNetwork, err := NewNetAddressString(fmt.Sprintf("%v:%v", peerip, port))
peerAddr := fmt.Sprintf("%v:%v", peerIP, port)
remoteNetwork, err := NewNetAddressString(peerAddr)
if err == nil {
if !s.node.nodeInfo.blacklist.Has(remoteNetwork.String()) {
s.node.nodeInfo.addrBook.AddAddress(remoteNetwork, nil)
}
}
return &pb.P2PVersion{Version: s.node.nodeInfo.channelVersion, Service: int64(s.node.nodeInfo.ServiceTy()), Nonce: in.Nonce,
AddrFrom: in.AddrRecv, AddrRecv: fmt.Sprintf("%v:%v", peerip, port), UserAgent: pub}, nil
return &pb.P2PVersion{Version: s.node.nodeInfo.channelVersion,
Service: int64(s.node.nodeInfo.ServiceTy()), Nonce: in.Nonce,
AddrFrom: in.AddrRecv, AddrRecv: fmt.Sprintf("%v:%v", peerIP, port), UserAgent: pub}, nil
}
// SoftVersion software version
......@@ -286,7 +282,7 @@ func (s *P2pserver) GetData(in *pb.P2PGetData, stream pb.P2Pgservice_GetDataServ
resp, err := client.WaitTimeout(msg, time.Second*20)
if err != nil {
log.Error("GetBlocks Err", "Err", err.Error())
continue
return err
}
blocks := resp.Data.(*pb.BlockDetails)
......@@ -416,6 +412,17 @@ func (s *P2pserver) ServerStreamSend(in *pb.P2PPing, stream pb.P2Pgservice_Serve
return fmt.Errorf("beyound max inbound num")
}
peerIP, _, err := resolveClientNetAddr(stream.Context())
if err != nil {
log.Error("ServerStreamSend", "get grpc peer addr err", err)
return fmt.Errorf("get grpc peer addr err:%s", err.Error())
}
peerAddr := fmt.Sprintf("%s:%v", peerIP, in.GetPort())
//等待ReadStream接收节点version信息
var peerInfo *innerpeer
for ; peerInfo == nil || peerInfo.p2pversion == 0; peerInfo = s.getInBoundPeerInfo(peerAddr) {
time.Sleep(time.Second)
}
log.Debug("ServerStreamSend")
peername := hex.EncodeToString(in.GetSign().GetPubkey())
dataChain := s.addStreamHandler(peername)
......@@ -424,18 +431,6 @@ func (s *P2pserver) ServerStreamSend(in *pb.P2PPing, stream pb.P2Pgservice_Serve
if s.IsClose() {
return fmt.Errorf("node close")
}
peerInfo := s.getInBoundPeerInfo(peername)
if peerInfo != nil {
if peerInfo.p2pversion == 0 {
return fmt.Errorf("version empty")
}
//增加过滤,如果自己连接了远程节点,则不需要通过stream send 重复发送数据给这个节点
if s.node.Has(peerInfo.addr) {
continue
}
} else {
return fmt.Errorf("no peer info")
}
sendData, doSend := s.node.processSendP2P(data, peerInfo.p2pversion, peerInfo.addr)
if !doSend {
continue
......@@ -454,21 +449,15 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
return fmt.Errorf("beyound max inbound num:%v>%v", len(s.getInBoundPeers()), int(s.node.nodeInfo.cfg.InnerBounds))
}
log.Debug("StreamRead")
var remoteIP string
var err error
getctx, ok := pr.FromContext(stream.Context())
if ok {
remoteIP, _, err = net.SplitHostPort(getctx.Addr.String())
if err != nil {
return fmt.Errorf("ctx.Addr format err")
}
} else {
return fmt.Errorf("getctx err")
peerIP, _, err := resolveClientNetAddr(stream.Context())
if err != nil {
log.Error("ServerStreamRead", "get grpc peer addr err", err)
return fmt.Errorf("get grpc peer addr err:%s", err.Error())
}
var peeraddr, peername string
defer s.deleteInBoundPeerInfo(peername)
//此处delete是defer调用, 提前绑定变量,需要传入指针, peeraddr的值才能被获取
defer s.deleteInBoundPeerInfo(&peeraddr)
defer stream.SendAndClose(&pb.ReqNil{})
for {
......@@ -487,7 +476,7 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
//接收版本信息
peername = ver.GetPeername()
softversion := ver.GetSoftversion()
innerpeer := s.getInBoundPeerInfo(peername)
innerpeer := s.getInBoundPeerInfo(peeraddr)
channel, p2pVersion := decodeChannelVersion(ver.GetP2Pversion())
if !s.node.verifyP2PChannel(channel) {
return pb.ErrP2PChannel
......@@ -497,9 +486,9 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
info := *innerpeer
info.p2pversion = p2pVersion
info.softversion = softversion
s.addInBoundPeerInfo(peername, info)
s.addInBoundPeerInfo(innerpeer.addr, info)
} else {
//没有获取到peername 的信息,说明没有获取ping的消息包
//没有获取到peer 的信息,说明没有获取ping的消息包
return pb.ErrStreamPing
}
......@@ -512,13 +501,13 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
if s.node.Size() > 0 {
if remoteIP != s.node.nodeInfo.GetListenAddr().IP.String() && remoteIP != s.node.nodeInfo.GetExternalAddr().IP.String() {
if peerIP != s.node.nodeInfo.GetListenAddr().IP.String() && peerIP != s.node.nodeInfo.GetExternalAddr().IP.String() {
s.node.nodeInfo.SetServiceTy(Service)
}
}
peername = hex.EncodeToString(ping.GetSign().GetPubkey())
peeraddr = fmt.Sprintf("%s:%v", remoteIP, in.GetPing().GetPort())
s.addInBoundPeerInfo(peername, innerpeer{addr: peeraddr, name: peername, timestamp: pb.Now().Unix()})
peeraddr = fmt.Sprintf("%s:%v", peerIP, ping.GetPort())
s.addInBoundPeerInfo(peeraddr, innerpeer{addr: peeraddr, name: peername, timestamp: pb.Now().Unix()})
}
}
}
......@@ -675,22 +664,22 @@ func (s *P2pserver) deleteStream(peerName string, delChan chan interface{}) {
}
}
func (s *P2pserver) addInBoundPeerInfo(peername string, info innerpeer) {
func (s *P2pserver) addInBoundPeerInfo(peerAddr string, info innerpeer) {
s.imtx.Lock()
defer s.imtx.Unlock()
s.inboundpeers[peername] = &info
s.inboundpeers[peerAddr] = &info
}
func (s *P2pserver) deleteInBoundPeerInfo(peername string) {
func (s *P2pserver) deleteInBoundPeerInfo(peerAddr *string) {
s.imtx.Lock()
defer s.imtx.Unlock()
delete(s.inboundpeers, peername)
delete(s.inboundpeers, *peerAddr)
}
func (s *P2pserver) getInBoundPeerInfo(peername string) *innerpeer {
func (s *P2pserver) getInBoundPeerInfo(peerAddr string) *innerpeer {
s.imtx.Lock()
defer s.imtx.Unlock()
if key, ok := s.inboundpeers[peername]; ok {
if key, ok := s.inboundpeers[peerAddr]; ok {
return key
}
......@@ -706,3 +695,13 @@ func (s *P2pserver) getInBoundPeers() []*innerpeer {
}
return peers
}
func resolveClientNetAddr(ctx context.Context) (host, port string, err error) {
grpcPeer, ok := pr.FromContext(ctx)
if ok {
return net.SplitHostPort(grpcPeer.Addr.String())
}
return "", "", fmt.Errorf("get grpc peer from ctx err")
}
......@@ -25,7 +25,10 @@ func (p *Peer) Start() {
// Close peer close
func (p *Peer) Close() {
atomic.StoreInt32(&p.isclose, 1)
//避免重复关闭
if !atomic.CompareAndSwapInt32(&p.isclose, 0, 1) {
return
}
p.mconn.Close()
p.node.pubsub.Unsub(p.taskChan, "block", "tx")
log.Info("Peer", "closed", p.Addr())
......@@ -133,18 +136,19 @@ func (p *Peer) heartBeat() {
}
peername, err := pcli.SendVersion(p, p.node.nodeInfo)
P2pComm.CollectPeerStat(err, p)
if err == nil || peername == "" {
log.Debug("sendVersion", "peer name", peername)
p.SetPeerName(peername) //设置连接的远程节点的节点名称
p.taskChan = p.node.pubsub.Sub("block", "tx", peername)
go p.sendStream()
go p.readStream()
break
} else {
if err != nil || peername == "" {
//版本不对,直接关掉
log.Error("PeerHeartBeatSendVersion", "peerName", peername, "err", err)
p.Close()
return
}
log.Debug("sendVersion", "peer name", peername)
p.SetPeerName(peername) //设置连接的远程节点的节点名称
p.taskChan = p.node.pubsub.Sub("block", "tx", peername)
go p.sendStream()
go p.readStream()
break
}
ticker := time.NewTicker(PingTimeout)
......@@ -153,16 +157,15 @@ func (p *Peer) heartBeat() {
if !p.GetRunning() {
return
}
<-ticker.C
err := pcli.SendPing(p, p.node.nodeInfo)
P2pComm.CollectPeerStat(err, p)
peernum, err := pcli.GetInPeersNum(p)
P2pComm.CollectPeerStat(err, p)
peerNum, err := pcli.GetInPeersNum(p)
if err == nil {
atomic.StoreInt32(&p.inBounds, int32(peernum))
atomic.StoreInt32(&p.inBounds, int32(peerNum))
}
err = pcli.SendPing(p, p.node.nodeInfo)
if err != nil {
log.Error("SendPeerPing", "peer", p.Addr(), "err", err)
}
}
}
......
......@@ -122,18 +122,19 @@ func (n *Node) sendQueryReply(rep *types.P2PBlockTxReply, p2pData *types.BroadCa
func (n *Node) sendTx(tx *types.P2PTx, p2pData *types.BroadCastData, peerVersion int32) (doSend bool) {
log.Debug("P2PSendStream", "will send tx", hex.EncodeToString(tx.Tx.Hash()), "ttl", tx.Route.TTL)
ttl := tx.GetRoute().GetTTL()
log.Debug("P2PSendStream", "will send tx", hex.EncodeToString(tx.Tx.Hash()), "ttl", ttl)
//超过最大的ttl, 不再发送
if tx.Route.TTL > n.nodeInfo.cfg.MaxTTL {
if ttl > n.nodeInfo.cfg.MaxTTL {
return false
}
//新版本且ttl达到设定值
if peerVersion >= lightBroadCastVersion && tx.Route.TTL >= n.nodeInfo.cfg.LightTxTTL {
if peerVersion >= lightBroadCastVersion && ttl >= n.nodeInfo.cfg.LightTxTTL {
p2pData.Value = &types.BroadCastData_LtTx{
LtTx: &types.LightTx{
TxHash: tx.Tx.Hash(),
Route: tx.Route,
Route: tx.GetRoute(),
},
}
} else {
......@@ -150,7 +151,7 @@ func (n *Node) recvTx(tx *types.P2PTx) {
if n.checkAndRegFilterAtomic(txHashFilter, txHash) {
return
}
log.Debug("recvTx", "tx", txHash)
log.Debug("recvTx", "tx", txHash, "ttl", tx.GetRoute().GetTTL())
txHashFilter.Add(txHash, tx.Route)
msg := n.nodeInfo.client.NewMessage("mempool", types.EventTx, tx.GetTx())
......@@ -164,7 +165,7 @@ func (n *Node) recvTx(tx *types.P2PTx) {
func (n *Node) recvLtTx(tx *types.LightTx, pid string, pubPeerFunc pubFuncType) {
txHash := hex.EncodeToString(tx.TxHash)
log.Debug("recvLtTx", "peerID", pid, "txHash", txHash)
log.Debug("recvLtTx", "peerID", pid, "txHash", txHash, "ttl", tx.GetRoute().GetTTL())
//本地不存在, 需要向对端节点发起完整交易请求. 如果存在则表示本地已经接收过此交易, 不做任何操作
if !txHashFilter.QueryRecvData(txHash) {
......@@ -206,7 +207,7 @@ func (n *Node) recvLtBlock(ltBlock *types.LightBlock, pid string, pubPeerFunc pu
if n.checkAndRegFilterAtomic(blockHashFilter, blockHash) {
return
}
log.Debug("recvLtBlock", "blockHash", blockHash)
log.Debug("recvLtBlock", "blockHash", blockHash, "blockHeight", ltBlock.GetHeader().GetHeight())
//组装block
block := &types.Block{}
block.TxHash = ltBlock.Header.TxHash
......@@ -220,17 +221,20 @@ func (n *Node) recvLtBlock(ltBlock *types.LightBlock, pid string, pubPeerFunc pu
//add miner tx
block.Txs = append(block.Txs, ltBlock.MinerTx)
txList := &types.ReplyTxList{}
ok := false
//get tx list from mempool
resp, err := n.queryMempool(types.EventTxListByHash, &types.ReqTxHashList{Hashes: ltBlock.STxHashes, IsShortHash: true})
if err != nil {
log.Error("queryMempoolTxWithHash", "err", err)
return
}
if len(ltBlock.STxHashes) > 0 {
resp, err := n.queryMempool(types.EventTxListByHash, &types.ReqTxHashList{Hashes: ltBlock.STxHashes, IsShortHash: true})
if err != nil {
log.Error("recvLtBlock", "queryTxListByHashErr", err)
return
}
txList, ok := resp.(*types.ReplyTxList)
if !ok {
log.Error("recvLtBlock", "queryMemPool", "nilReplyTxList")
txList = &types.ReplyTxList{}
txList, ok = resp.(*types.ReplyTxList)
if !ok {
log.Error("recvLtBlock", "queryMemPool", "nilReplyTxList")
}
}
nilTxIndices := make([]int32, 0)
for i := 0; ok && i < len(txList.Txs); i++ {
......@@ -238,7 +242,8 @@ func (n *Node) recvLtBlock(ltBlock *types.LightBlock, pid string, pubPeerFunc pu
if tx == nil {
//tx not exist in mempool
nilTxIndices = append(nilTxIndices, int32(i+1))
} else if tx.GetGroupCount() > 0 {
tx = &types.Transaction{}
} else if count := tx.GetGroupCount(); count > 0 {
group, err := tx.GetTxGroup()
if err != nil {
......@@ -255,21 +260,21 @@ func (n *Node) recvLtBlock(ltBlock *types.LightBlock, pid string, pubPeerFunc pu
block.Txs = append(block.Txs, tx)
}
nilTxLen := len(nilTxIndices)
//需要比较交易根哈希是否一致, 不一致需要请求区块内所有的交易
if len(block.Txs) == int(ltBlock.Header.TxCount) && bytes.Equal(block.TxHash, merkle.CalcMerkleRoot(block.Txs)) {
if nilTxLen == 0 && len(block.Txs) == int(ltBlock.Header.TxCount) &&
bytes.Equal(block.TxHash, merkle.CalcMerkleRoot(block.Txs)) {
log.Info("recvBlock", "block==+======+====+=>Height", block.GetHeight(), "fromPeer", pid,
log.Info("recvLtBlock", "block==+======+====+=>Height", block.GetHeight(), "fromPeer", pid,
"block size(KB)", float32(ltBlock.Size)/1024, "blockHash", blockHash)
//发送至blockchain执行
if err = n.postBlockChain(block, pid); err != nil {
log.Error("recvBlock", "send block to blockchain Error", err.Error())
if err := n.postBlockChain(block, pid); err != nil {
log.Error("recvLtBlock", "send block to blockchain Error", err.Error())
}
return
}
nilTxLen := len(nilTxIndices)
log.Debug("recvLtBlockQueryBlock", "Hash", blockHash, "height", ltBlock.GetHeader().GetHeight(), "queryTxs", nilTxIndices, "pid", pid)
// 缺失的交易个数大于总数1/3 或者缺失数据大小大于2/3, 触发请求区块所有交易数据
if nilTxLen > 0 && (float32(nilTxLen) > float32(ltBlock.Header.TxCount)/3 ||
float32(block.Size()) < float32(ltBlock.Size)/3) {
......@@ -341,6 +346,8 @@ func (n *Node) recvQueryData(query *types.P2PQueryData, pid string, pubPeerFunc
}
func (n *Node) recvQueryReply(rep *types.P2PBlockTxReply, pid string, pubPeerFunc pubFuncType) {
log.Debug("recvQueryReplyBlock", "Hash", rep.GetBlockHash(), "queryTxs", rep.GetTxIndices(), "pid", pid)
val, exist := ltBlockCache.del(rep.BlockHash)
block, _ := val.(*types.Block)
//not exist in cache or nil block
......@@ -359,11 +366,11 @@ func (n *Node) recvQueryReply(rep *types.P2PBlockTxReply, pid string, pubPeerFun
//计算的root hash是否一致
if bytes.Equal(block.TxHash, merkle.CalcMerkleRoot(block.Txs)) {
log.Info("recvBlock", "block==+======+====+=>Height", block.GetHeight(), "fromPeer", pid,
log.Info("recvQueryReplyBlock", "block==+======+====+=>Height", block.GetHeight(), "fromPeer", pid,
"block size(KB)", float32(block.Size())/1024, "blockHash", rep.BlockHash)
//发送至blockchain执行
if err := n.postBlockChain(block, pid); err != nil {
log.Error("recvBlock", "send block to blockchain Error", err.Error())
log.Error("recvQueryReplyBlock", "send block to blockchain Error", err.Error())
}
} else if len(rep.TxIndices) != 0 {
//不一致尝试请求整个区块的交易, 且判定是否已经请求过完整交易
......
......@@ -36,7 +36,7 @@ func Test_processP2P(t *testing.T) {
tx := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 4600, Expire: 2}
tx1 := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 460000000, Expire: 0}
tx2 := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 100, Expire: 1}
txGroup, _ := types.CreateTxGroup([]*types.Transaction{tx1, tx2})
txGroup, _ := types.CreateTxGroup([]*types.Transaction{tx1, tx2}, types.GInt("MinFee"))
gtx := txGroup.Tx()
txList := append([]*types.Transaction{}, minerTx, tx, tx1, tx2)
memTxList := append([]*types.Transaction{}, tx, gtx)
......
......@@ -60,7 +60,32 @@ func (c *channelClient) CreateRawTransaction(param *types.CreateTx) ([]byte, err
if param.Execer != "" {
execer = param.Execer
}
return types.CallCreateTx(execer, "", param)
reply, err := types.CallCreateTx(execer, "", param)
if err != nil {
return nil, err
}
//add tx fee setting
tx := &types.Transaction{}
err = types.Decode(reply, tx)
if err != nil {
return nil, err
}
tx.Fee = param.Fee
//set proper fee if zero fee
if tx.Fee <= 0 {
proper, err := c.GetProperFee(nil)
if err != nil {
return nil, err
}
fee, err := tx.GetRealFee(proper.GetProperFee())
if err != nil {
return nil, err
}
tx.Fee = fee
}
return types.Encode(tx), nil
}
func (c *channelClient) ReWriteRawTx(param *types.ReWriteRawTx) ([]byte, error) {
......@@ -151,7 +176,17 @@ func (c *channelClient) CreateRawTxGroup(param *types.CreateTransactionGroup) ([
}
transactions = append(transactions, &transaction)
}
group, err := types.CreateTxGroup(transactions)
feeRate := types.GInt("MinFee")
//get proper fee rate
proper, err := c.GetProperFee(nil)
if err != nil {
log.Error("CreateNoBalance", "GetProperFeeErr", err)
return nil, err
}
if proper.GetProperFee() > feeRate {
feeRate = proper.ProperFee
}
group, err := types.CreateTxGroup(transactions, feeRate)
if err != nil {
return nil, err
}
......@@ -174,14 +209,26 @@ func (c *channelClient) CreateNoBalanceTransaction(in *types.NoBalanceTx) (*type
return nil, err
}
transactions := []*types.Transaction{txNone, tx}
group, err := types.CreateTxGroup(transactions)
feeRate := types.GInt("MinFee")
//get proper fee rate
proper, err := c.GetProperFee(nil)
if err != nil {
log.Error("CreateNoBalance", "GetProperFeeErr", err)
return nil, err
}
err = group.Check(0, types.GInt("MinFee"), types.GInt("MaxFee"))
if proper.GetProperFee() > feeRate {
feeRate = proper.ProperFee
}
group, err := types.CreateTxGroup(transactions, feeRate)
if err != nil {
return nil, err
}
err = group.Check(0, feeRate, types.GInt("MaxFee"))
if err != nil {
return nil, err
}
newtx := group.Tx()
//如果可能要做签名
if in.PayAddr != "" || in.Privkey != "" {
......
......@@ -63,7 +63,7 @@ func testCreateRawTransactionAmoutErr(t *testing.T) {
func testCreateRawTransactionTo(t *testing.T) {
name := types.ExecName(cty.CoinsX)
tx := types.CreateTx{ExecName: name, Amount: 1, To: "1MY4pMgjpS2vWiaSDZasRhN47pcwEire32"}
tx := types.CreateTx{ExecName: name, Amount: 1, To: "1MY4pMgjpS2vWiaSDZasRhN47pcwEire32", Fee: 1}
client := newTestChannelClient()
rawtx, err := client.CreateRawTransaction(&tx)
......@@ -92,6 +92,7 @@ func testCreateRawTransactionCoinTransfer(t *testing.T) {
IsWithdraw: false,
To: "1JkbMq5yNMZHtokjg5XxkC3RZbqjoPJm84",
Note: []byte("note"),
Fee: 1,
}
client := newTestChannelClient()
......@@ -115,6 +116,7 @@ func testCreateRawTransactionCoinTransferExec(t *testing.T) {
IsWithdraw: false,
To: "1JkbMq5yNMZHtokjg5XxkC3RZbqjoPJm84",
Note: []byte("note"),
Fee: 1,
}
client := newTestChannelClient()
......@@ -142,6 +144,7 @@ func testCreateRawTransactionCoinWithdraw(t *testing.T) {
IsWithdraw: true,
To: "1JkbMq5yNMZHtokjg5XxkC3RZbqjoPJm84",
Note: []byte("note"),
Fee: 1,
}
client := newTestChannelClient()
......@@ -324,8 +327,15 @@ func TestChannelClient_GetTotalCoins(t *testing.T) {
func TestChannelClient_CreateNoBalanceTransaction(t *testing.T) {
client := new(channelClient)
api := new(mocks.QueueProtocolAPI)
client.Init(&qmock.Client{}, api)
fee := types.GInt("MinFee") * 2
api.On("GetProperFee", mock.Anything).Return(&types.ReplyProperFee{ProperFee: fee}, nil)
in := &types.NoBalanceTx{}
_, err := client.CreateNoBalanceTransaction(in)
tx, err := client.CreateNoBalanceTransaction(in)
assert.NoError(t, err)
gtx, _ := tx.GetTxGroup()
assert.NoError(t, gtx.Check(0, fee, types.GInt("MaxFee")))
assert.NoError(t, err)
}
......
......@@ -205,10 +205,10 @@ func TestGetLastMemPool(t *testing.T) {
func testGetProperFeeOK(t *testing.T) {
var in *types.ReqProperFee
qapi.On("GetProperFee", in).Return(nil, nil)
qapi.On("GetProperFee", in).Return(&types.ReplyProperFee{ProperFee: 1000000}, nil)
data, err := g.GetProperFee(getOkCtx(), in)
assert.Nil(t, err, "the error should be nil")
assert.Nil(t, data)
assert.Equal(t, int64(1000000), data.ProperFee)
}
func TestGetProperFee(t *testing.T) {
......
......@@ -40,26 +40,7 @@ func (c *Chain33) CreateRawTransaction(in *rpctypes.CreateTx, result *interface{
if err != nil {
return err
}
//add tx fee setting
tx := &types.Transaction{}
err = types.Decode(reply, tx)
if err != nil {
return err
}
tx.Fee = inpb.Fee
//set proper fee if zero fee
if tx.Fee <= 0 {
proper, err := c.cli.GetProperFee(nil)
if err != nil {
return err
}
fee, err := tx.GetRealFee(proper.ProperFee)
if err != nil {
return err
}
tx.Fee = fee
}
*result = hex.EncodeToString(types.Encode(tx))
*result = hex.EncodeToString(reply)
return nil
}
......
......@@ -438,6 +438,7 @@ func TestChain33_CreateTxGroup(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
testChain33 := newTestChain33(api)
var testResult interface{}
api.On("GetProperFee", mock.Anything).Return(nil, nil)
err := testChain33.CreateRawTxGroup(nil, &testResult)
assert.Nil(t, testResult)
assert.NotNil(t, err)
......@@ -1300,6 +1301,7 @@ func TestChain33_GetBalance(t *testing.T) {
func TestChain33_CreateNoBalanceTransaction(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
chain33 := newTestChain33(api)
api.On("GetProperFee", mock.Anything).Return(&types.ReplyProperFee{ProperFee: 1000000}, nil)
var result string
err := chain33.CreateNoBalanceTransaction(&types.NoBalanceTx{TxHex: "0a05636f696e73122c18010a281080c2d72f222131477444795771577233553637656a7663776d333867396e7a6e7a434b58434b7120a08d0630a696c0b3f78dd9ec083a2131477444795771577233553637656a7663776d333867396e7a6e7a434b58434b71"}, &result)
assert.NoError(t, err)
......
......@@ -210,7 +210,7 @@ func createTxGroup(cmd *cobra.Command, args []string) {
types.Decode(txByte, &transaction)
transactions = append(transactions, &transaction)
}
group, err := types.CreateTxGroup(transactions)
group, err := types.CreateTxGroup(transactions, types.GInt("MinFee"))
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
......
......@@ -869,7 +869,7 @@ func TestAddTxGroup(t *testing.T) {
crouptx3 := types.Transaction{Execer: []byte("coins"), Payload: types.Encode(transfer), Fee: 100000000, Expire: 0, To: toAddr}
crouptx4 := types.Transaction{Execer: []byte("user.write"), Payload: types.Encode(transfer), Fee: 100000000, Expire: 0, To: toAddr}
txGroup, _ := types.CreateTxGroup([]*types.Transaction{&crouptx1, &crouptx2, &crouptx3, &crouptx4})
txGroup, _ := types.CreateTxGroup([]*types.Transaction{&crouptx1, &crouptx2, &crouptx3, &crouptx4}, types.GInt("MinFee"))
for i := range txGroup.Txs {
err := txGroup.SignN(i, types.SECP256K1, mainPriv)
......@@ -952,7 +952,7 @@ func TestLevelFeeBigByte(t *testing.T) {
}
//test group high fee , feeRate = 10 * minfee
txGroup, err := types.CreateTxGroup([]*types.Transaction{bigTx4, bigTx5, bigTx6, bigTx7, bigTx8, bigTx9, bigTx10, bigTx11})
txGroup, err := types.CreateTxGroup([]*types.Transaction{bigTx4, bigTx5, bigTx6, bigTx7, bigTx8, bigTx9, bigTx10, bigTx11}, types.GInt("MinFee"))
if err != nil {
t.Error("CreateTxGroup err ", err.Error())
}
......
......@@ -216,7 +216,7 @@ func SetTestNetFork() {
systemFork.SetFork("chain33", "ForkBase58AddressCheck", 1800000)
//这个fork只影响平行链,注册类似user.p.x.exec的driver,新开的平行链设为0即可,老的平行链要设置新的高度
systemFork.SetFork("chain33", "ForkEnableParaRegExec", 0)
systemFork.SetFork("chain33", "ForkCacheDriver", 2580000)
}
func setLocalFork() {
......
......@@ -178,6 +178,7 @@ ForkBlockCheck=1725000
ForkLocalDBAccess=1
ForkBase58AddressCheck=1800000
ForkEnableParaRegExec=0
ForkCacheDriver=0
[fork.sub.coins]
Enable=0
......
......@@ -198,6 +198,7 @@ ForkBlockCheck=1
ForkLocalDBAccess=0
ForkBase58AddressCheck=1800000
ForkEnableParaRegExec=0
ForkCacheDriver=0
[fork.sub.coins]
Enable=0
......@@ -206,4 +207,4 @@ Enable=0
ForkManageExec=100000
[fork.sub.store-kvmvccmavl]
ForkKvmvccmavl=1
\ No newline at end of file
ForkKvmvccmavl=1
......@@ -53,8 +53,8 @@ func TxCacheSet(tx *Transaction, txc *TransactionCache) {
txCache.Add(tx, txc)
}
// CreateTxGroup 创建组交易
func CreateTxGroup(txs []*Transaction) (*Transactions, error) {
// CreateTxGroup 创建组交易, feeRate传入交易费率, 建议通过系统GetProperFee获取
func CreateTxGroup(txs []*Transaction, feeRate int64) (*Transactions, error) {
if len(txs) < 2 {
return nil, ErrTxGroupCountLessThanTwo
}
......@@ -75,7 +75,7 @@ func CreateTxGroup(txs []*Transaction) (*Transactions, error) {
} else {
txs[i].Fee = 0
}
realfee, err := txs[i].GetRealFee(GInt("MinFee"))
realfee, err := txs[i].GetRealFee(feeRate)
if err != nil {
return nil, err
}
......
......@@ -30,7 +30,7 @@ func TestCreateGroupTx(t *testing.T) {
var tx32 Transaction
Decode(tx31, &tx32)
group, err := CreateTxGroup([]*Transaction{&tx12, &tx22, &tx32})
group, err := CreateTxGroup([]*Transaction{&tx12, &tx22, &tx32}, GInt("MinFee"))
if err != nil {
t.Error(err)
return
......@@ -69,8 +69,9 @@ func TestCreateParaGroupTx(t *testing.T) {
tx22.Execer = []byte("token")
tx32.Execer = []byte("user.p.test.ticket")
feeRate := GInt("MinFee")
//SetFork("", "ForkTxGroupPara", 0)
group, err := CreateTxGroup([]*Transaction{&tx12, &tx22, &tx32})
group, err := CreateTxGroup([]*Transaction{&tx12, &tx22, &tx32}, feeRate)
if err != nil {
t.Error(err)
return
......@@ -86,7 +87,7 @@ func TestCreateParaGroupTx(t *testing.T) {
assert.Equal(t, ErrTxGroupParaMainMixed, err)
tx22.Execer = []byte("user.p.para.token")
group, err = CreateTxGroup([]*Transaction{&tx12, &tx22, &tx32})
group, err = CreateTxGroup([]*Transaction{&tx12, &tx22, &tx32}, feeRate)
if err != nil {
t.Error(err)
return
......@@ -95,7 +96,7 @@ func TestCreateParaGroupTx(t *testing.T) {
assert.Equal(t, ErrTxGroupParaCount, err)
tx22.Execer = []byte("user.p.test.paracross")
group, err = CreateTxGroup([]*Transaction{&tx12, &tx22, &tx32})
group, err = CreateTxGroup([]*Transaction{&tx12, &tx22, &tx32}, feeRate)
if err != nil {
t.Error(err)
return
......@@ -135,7 +136,7 @@ func TestCreateGroupTxWithSize(t *testing.T) {
var tx32 Transaction
Decode(tx31, &tx32)
group, err := CreateTxGroup([]*Transaction{&tx12, &tx22, &tx32})
group, err := CreateTxGroup([]*Transaction{&tx12, &tx22, &tx32}, GInt("MinFee"))
if err != nil {
t.Error(err)
return
......
......@@ -83,11 +83,14 @@ func TestBase58(t *testing.T) {
b, err := basen.Base58.DecodeString(s)
assert.NoError(t, err)
assert.True(t, len(b) == 10)
assert.True(t, len(b) <= 10)
s = basen.Base58.EncodeToString([]byte{0, 1, 2, 3, 4, 5})
b, err = basen.Base58.DecodeString(s)
assert.NoError(t, err)
assert.Equal(t, 5, len(b))
b, err = basen.Base58.DecodeStringN(s, 12)
assert.NoError(t, err)
assert.True(t, len(b) == 12)
assert.Equal(t, 12, len(b))
assert.True(t, basen.Base58.Base() == 58)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment