Commit cd7b6290 authored by jiangpeng's avatar jiangpeng Committed by vipwzw

p2p:update gossip broadcast

* add dial grpc call option, max msg size * update light block broadcast config
parent e950a553
...@@ -64,7 +64,8 @@ const ( ...@@ -64,7 +64,8 @@ const (
const ( const (
DefaultLtTxBroadCastTTL = 3 DefaultLtTxBroadCastTTL = 3
DefaultMaxTxBroadCastTTL = 25 DefaultMaxTxBroadCastTTL = 25
DefaultMinLtBlockTxNum = 5 // 100KB
defaultMinLtBlockSize = 100
) )
// P2pCacheTxSize p2pcache size of transaction // P2pCacheTxSize p2pcache size of transaction
......
...@@ -152,8 +152,12 @@ func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) { ...@@ -152,8 +152,12 @@ func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) {
keepaliveOp := grpc.WithKeepaliveParams(cliparm) keepaliveOp := grpc.WithKeepaliveParams(cliparm)
timeoutOp := grpc.WithTimeout(time.Second * 3) timeoutOp := grpc.WithTimeout(time.Second * 3)
log.Debug("NetAddress", "Dial", na.String()) log.Debug("NetAddress", "Dial", na.String())
maxMsgSize := pb.MaxBlockSize + 1024*1024
conn, err := grpc.Dial(na.String(), grpc.WithInsecure(), conn, err := grpc.Dial(na.String(), grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")), grpc.WithServiceConfig(ch), keepaliveOp, timeoutOp) grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)),
grpc.WithServiceConfig(ch), keepaliveOp, timeoutOp)
if err != nil { if err != nil {
log.Debug("grpc DialCon", "did not connect", err, "addr", na.String()) log.Debug("grpc DialCon", "did not connect", err, "addr", na.String())
return nil, err return nil, err
......
...@@ -57,8 +57,8 @@ type subConfig struct { ...@@ -57,8 +57,8 @@ type subConfig struct {
MaxTTL int32 `protobuf:"varint,10,opt,name=maxTTL" json:"maxTTL,omitempty"` MaxTTL int32 `protobuf:"varint,10,opt,name=maxTTL" json:"maxTTL,omitempty"`
// p2p网络频道,用于区分主网/测试网/其他网络 // p2p网络频道,用于区分主网/测试网/其他网络
Channel int32 `protobuf:"varint,11,opt,name=channel" json:"channel,omitempty"` Channel int32 `protobuf:"varint,11,opt,name=channel" json:"channel,omitempty"`
//区块轻广播的最低打包交易数, 大于该值时区块内交易采用短哈希广播 //触发区块轻广播最小大小, KB
MinLtBlockTxNum int32 `protobuf:"varint,12,opt,name=minLtBlockTxNum" json:"minLtBlockTxNum,omitempty"` MinLtBlockSize int32 `protobuf:"varint,12,opt,name=minLtBlockSize" json:"minLtBlockSize,omitempty"`
//指定p2p类型, 支持gossip, dht //指定p2p类型, 支持gossip, dht
} }
...@@ -100,8 +100,8 @@ func New(mgr *p2p.Manager, subCfg []byte) p2p.IP2P { ...@@ -100,8 +100,8 @@ func New(mgr *p2p.Manager, subCfg []byte) p2p.IP2P {
mcfg.MaxTTL = DefaultMaxTxBroadCastTTL mcfg.MaxTTL = DefaultMaxTxBroadCastTTL
} }
if mcfg.MinLtBlockTxNum <= 0 { if mcfg.MinLtBlockSize <= 0 {
mcfg.MinLtBlockTxNum = DefaultMinLtBlockTxNum mcfg.MinLtBlockSize = defaultMinLtBlockSize
} }
log.Info("p2p", "Channel", mcfg.Channel, "Version", VERSION, "IsTest", cfg.IsTestNet()) log.Info("p2p", "Channel", mcfg.Channel, "Version", VERSION, "IsTest", cfg.IsTestNet())
......
...@@ -136,7 +136,6 @@ func newP2p(cfg *types.Chain33Config, port int32, dbpath string, q queue.Queue) ...@@ -136,7 +136,6 @@ func newP2p(cfg *types.Chain33Config, port int32, dbpath string, q queue.Queue)
pcfg.Port = port pcfg.Port = port
pcfg.Channel = testChannel pcfg.Channel = testChannel
pcfg.ServerStart = true pcfg.ServerStart = true
pcfg.MinLtBlockTxNum = 1
subCfgBytes, _ := json.Marshal(pcfg) subCfgBytes, _ := json.Marshal(pcfg)
p2pcli := New(p2pMgr, subCfgBytes).(*P2p) p2pcli := New(p2pMgr, subCfgBytes).(*P2p)
......
...@@ -114,7 +114,7 @@ func (n *Node) sendBlock(block *types.P2PBlock, p2pData *types.BroadCastData, pe ...@@ -114,7 +114,7 @@ func (n *Node) sendBlock(block *types.P2PBlock, p2pData *types.BroadCastData, pe
return false return false
} }
if peerVersion >= lightBroadCastVersion && len(block.Block.Txs) >= int(n.nodeInfo.cfg.MinLtBlockTxNum) { if peerVersion >= lightBroadCastVersion && types.Size(block.GetBlock()) >= int(n.nodeInfo.cfg.MinLtBlockSize*1024) {
ltBlock := &types.LightBlock{} ltBlock := &types.LightBlock{}
ltBlock.Size = int64(types.Size(block.Block)) ltBlock.Size = int64(types.Size(block.Block))
...@@ -156,26 +156,20 @@ func (n *Node) sendTx(tx *types.P2PTx, p2pData *types.BroadCastData, peerVersion ...@@ -156,26 +156,20 @@ func (n *Node) sendTx(tx *types.P2PTx, p2pData *types.BroadCastData, peerVersion
txHash := hex.EncodeToString(tx.Tx.Hash()) txHash := hex.EncodeToString(tx.Tx.Hash())
ttl := tx.GetRoute().GetTTL() ttl := tx.GetRoute().GetTTL()
isLightSend := peerVersion >= lightBroadCastVersion && ttl >= n.nodeInfo.cfg.LightTxTTL
//检测冗余发送
ignoreSend := false
//短哈希广播不记录发送过滤
if !isLightSend {
ignoreSend = n.addIgnoreSendPeerAtomic(txSendFilter, txHash, pid)
}
log.Debug("P2PSendTx", "txHash", txHash, "ttl", ttl, "isLightSend", isLightSend,
"peerAddr", peerAddr, "ignoreSend", ignoreSend)
if ignoreSend {
return false
}
//超过最大的ttl, 不再发送 //超过最大的ttl, 不再发送
if ttl > n.nodeInfo.cfg.MaxTTL { if ttl > n.nodeInfo.cfg.MaxTTL {
return false return false
} }
isLightSend := peerVersion >= lightBroadCastVersion && ttl >= n.nodeInfo.cfg.LightTxTTL
//检测冗余发送
if n.addIgnoreSendPeerAtomic(txSendFilter, txHash, pid) {
return false
}
//log.Debug("P2PSendTx", "txHash", txHash, "ttl", ttl, "isLightSend", isLightSend, "peerAddr", peerAddr, "ignoreSend", ignoreSend)
//新版本且ttl达到设定值 //新版本且ttl达到设定值
if isLightSend { if isLightSend {
p2pData.Value = &types.BroadCastData_LtTx{ //超过最大的ttl, 不再发送 p2pData.Value = &types.BroadCastData_LtTx{ //超过最大的ttl, 不再发送
...@@ -199,7 +193,7 @@ func (n *Node) recvTx(tx *types.P2PTx, pid, peerAddr string) { ...@@ -199,7 +193,7 @@ func (n *Node) recvTx(tx *types.P2PTx, pid, peerAddr string) {
n.addIgnoreSendPeerAtomic(txSendFilter, txHash, pid) n.addIgnoreSendPeerAtomic(txSendFilter, txHash, pid)
//重复接收 //重复接收
isDuplicate := txHashFilter.AddWithCheckAtomic(txHash, true) isDuplicate := txHashFilter.AddWithCheckAtomic(txHash, true)
log.Debug("recvTx", "tx", txHash, "ttl", tx.GetRoute().GetTTL(), "peerAddr", peerAddr, "duplicateTx", isDuplicate) //log.Debug("recvTx", "tx", txHash, "ttl", tx.GetRoute().GetTTL(), "peerAddr", peerAddr, "duplicateTx", isDuplicate)
if isDuplicate { if isDuplicate {
return return
} }
...@@ -222,7 +216,7 @@ func (n *Node) recvLtTx(tx *types.LightTx, pid, peerAddr string, pubPeerFunc pub ...@@ -222,7 +216,7 @@ func (n *Node) recvLtTx(tx *types.LightTx, pid, peerAddr string, pubPeerFunc pub
//将节点id添加到发送过滤, 避免冗余发送 //将节点id添加到发送过滤, 避免冗余发送
n.addIgnoreSendPeerAtomic(txSendFilter, txHash, pid) n.addIgnoreSendPeerAtomic(txSendFilter, txHash, pid)
exist := txHashFilter.Contains(txHash) exist := txHashFilter.Contains(txHash)
log.Debug("recvLtTx", "txHash", txHash, "ttl", tx.GetRoute().GetTTL(), "peerAddr", peerAddr, "exist", exist) //log.Debug("recvLtTx", "txHash", txHash, "ttl", tx.GetRoute().GetTTL(), "peerAddr", peerAddr, "exist", exist)
//本地不存在, 需要向对端节点发起完整交易请求. 如果存在则表示本地已经接收过此交易, 不做任何操作 //本地不存在, 需要向对端节点发起完整交易请求. 如果存在则表示本地已经接收过此交易, 不做任何操作
if !exist { if !exist {
...@@ -354,10 +348,10 @@ func (n *Node) recvLtBlock(ltBlock *types.LightBlock, pid, peerAddr string, pubP ...@@ -354,10 +348,10 @@ func (n *Node) recvLtBlock(ltBlock *types.LightBlock, pid, peerAddr string, pubP
}, },
}, },
} }
//pub to specified peer
pubPeerFunc(query, pid)
//需要将不完整的block预存 //需要将不完整的block预存
ltBlockCache.Add(blockHash, block, block.Size()) ltBlockCache.Add(blockHash, block, block.Size())
//pub to specified peer
pubPeerFunc(query, pid)
} }
func (n *Node) recvQueryData(query *types.P2PQueryData, pid, peerAddr string, pubPeerFunc pubFuncType) { func (n *Node) recvQueryData(query *types.P2PQueryData, pid, peerAddr string, pubPeerFunc pubFuncType) {
...@@ -382,6 +376,7 @@ func (n *Node) recvQueryData(query *types.P2PQueryData, pid, peerAddr string, pu ...@@ -382,6 +376,7 @@ func (n *Node) recvQueryData(query *types.P2PQueryData, pid, peerAddr string, pu
p2pTx := &types.P2PTx{Tx: txList.Txs[0]} p2pTx := &types.P2PTx{Tx: txList.Txs[0]}
//再次发送完整交易至节点, ttl重设为1 //再次发送完整交易至节点, ttl重设为1
p2pTx.Route = &types.P2PRoute{TTL: 1} p2pTx.Route = &types.P2PRoute{TTL: 1}
n.removeIgnoreSendPeerAtomic(txSendFilter, txHash, pid)
pubPeerFunc(p2pTx, pid) pubPeerFunc(p2pTx, pid)
} else if blcReq := query.GetBlockTxReq(); blcReq != nil { } else if blcReq := query.GetBlockTxReq(); blcReq != nil {
...@@ -442,10 +437,10 @@ func (n *Node) recvQueryReply(rep *types.P2PBlockTxReply, pid, peerAddr string, ...@@ -442,10 +437,10 @@ func (n *Node) recvQueryReply(rep *types.P2PBlockTxReply, pid, peerAddr string,
}, },
}, },
} }
//pub to specified peer
pubPeerFunc(query, pid)
block.Txs = nil block.Txs = nil
ltBlockCache.Add(rep.BlockHash, block, block.Size()) ltBlockCache.Add(rep.BlockHash, block, block.Size())
//pub to specified peer
pubPeerFunc(query, pid)
} }
} }
...@@ -490,3 +485,15 @@ func (n *Node) addIgnoreSendPeerAtomic(filter *utils.Filterdata, key, pid string ...@@ -490,3 +485,15 @@ func (n *Node) addIgnoreSendPeerAtomic(filter *utils.Filterdata, key, pid string
info.ignoreSendPeers[pid] = true info.ignoreSendPeers[pid] = true
return exist return exist
} }
// 删除发送过滤器记录
func (n *Node) removeIgnoreSendPeerAtomic(filter *utils.Filterdata, key, pid string) {
filter.GetAtomicLock()
defer filter.ReleaseAtomicLock()
if filter.Contains(key) {
data, _ := filter.Get(key)
info := data.(*sendFilterInfo)
delete(info.ignoreSendPeers, pid)
}
}
...@@ -27,6 +27,7 @@ func Test_processP2P(t *testing.T) { ...@@ -27,6 +27,7 @@ func Test_processP2P(t *testing.T) {
q.SetConfig(cfg) q.SetConfig(cfg)
go q.Start() go q.Start()
p2p := newP2p(cfg, 12345, "testProcessP2p", q) p2p := newP2p(cfg, 12345, "testProcessP2p", q)
p2p.subCfg.MinLtBlockSize = 0
defer freeP2p(p2p) defer freeP2p(p2p)
defer q.Close() defer q.Close()
node := p2p.node node := p2p.node
...@@ -34,7 +35,6 @@ func Test_processP2P(t *testing.T) { ...@@ -34,7 +35,6 @@ func Test_processP2P(t *testing.T) {
pid := "testPid" pid := "testPid"
sendChan := make(chan interface{}, 1) sendChan := make(chan interface{}, 1)
recvChan := make(chan *types.BroadCastData, 1) recvChan := make(chan *types.BroadCastData, 1)
testDone := make(chan struct{})
payload := []byte("testpayload") payload := []byte("testpayload")
minerTx := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 14600, Expire: 200} minerTx := &types.Transaction{Execer: []byte("coins"), Payload: payload, Fee: 14600, Expire: 200}
...@@ -106,73 +106,75 @@ func Test_processP2P(t *testing.T) { ...@@ -106,73 +106,75 @@ func Test_processP2P(t *testing.T) {
}() }()
//data test //data test
go func() { subChan := node.pubsub.Sub(pid)
subChan := node.pubsub.Sub(pid) //全数据广播
//normal sendChan <- &versionData{peerName: pid + "1", rawData: &types.P2PTx{Tx: tx, Route: &types.P2PRoute{}}, version: lightBroadCastVersion - 1}
sendChan <- &versionData{peerName: pid + "1", rawData: &types.P2PTx{Tx: tx, Route: &types.P2PRoute{}}, version: lightBroadCastVersion - 1} p2p.mgr.PubSub.Pub(client.NewMessage("p2p", types.EventTxBroadcast, tx), P2PTypeName)
p2p.mgr.PubSub.Pub(client.NewMessage("p2p", types.EventTxBroadcast, tx), P2PTypeName) sendChan <- &versionData{peerName: pid + "1", rawData: &types.P2PBlock{Block: block}, version: lightBroadCastVersion - 1}
sendChan <- &versionData{peerName: pid + "1", rawData: &types.P2PBlock{Block: block}, version: lightBroadCastVersion - 1} //交易发送过滤
//light broadcast txHashFilter.Add(hex.EncodeToString(tx1.Hash()), &types.P2PRoute{TTL: DefaultLtTxBroadCastTTL})
txHashFilter.Add(hex.EncodeToString(tx1.Hash()), &types.P2PRoute{TTL: DefaultLtTxBroadCastTTL}) p2p.mgr.PubSub.Pub(client.NewMessage("p2p", types.EventTxBroadcast, tx1), P2PTypeName)
p2p.mgr.PubSub.Pub(client.NewMessage("p2p", types.EventTxBroadcast, tx1), P2PTypeName) //交易短哈希广播
sendChan <- &versionData{peerName: pid + "2", rawData: &types.P2PTx{Tx: tx, Route: &types.P2PRoute{TTL: DefaultLtTxBroadCastTTL}}, version: lightBroadCastVersion} sendChan <- &versionData{peerName: pid + "2", rawData: &types.P2PTx{Tx: tx, Route: &types.P2PRoute{TTL: DefaultLtTxBroadCastTTL}}, version: lightBroadCastVersion}
<-subChan //query tx recvWithTimeout(t, subChan, "case 1") //缺失交易,从对端获取
sendChan <- &versionData{peerName: pid + "2", rawData: &types.P2PBlock{Block: block}, version: lightBroadCastVersion} //区块短哈希广播
<-subChan //query block sendChan <- &versionData{peerName: pid + "2", rawData: &types.P2PBlock{Block: block}, version: lightBroadCastVersion}
for !ltBlockCache.Contains(blockHash) { recvWithTimeout(t, subChan, "case 2")
} assert.True(t, ltBlockCache.Contains(blockHash))
cpBlock := *ltBlockCache.Get(blockHash).(*types.Block)
assert.True(t, bytes.Equal(rootHash, merkle.CalcMerkleRoot(cfg, cpBlock.Height, cpBlock.Txs)))
cpBlock := *ltBlockCache.Get(blockHash).(*types.Block) //query tx
assert.True(t, bytes.Equal(rootHash, merkle.CalcMerkleRoot(cfg, cpBlock.Height, cpBlock.Txs))) sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_TxReq{TxReq: &types.P2PTxReq{TxHash: tx.Hash()}}}}
data := recvWithTimeout(t, subChan, "case 3")
_, ok := data.(*types.P2PTx)
assert.True(t, ok)
sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_BlockTxReq{BlockTxReq: &types.P2PBlockTxReq{
BlockHash: blockHash,
TxIndices: []int32{1, 2},
}}}}
data = recvWithTimeout(t, subChan, "case 4")
rep, ok := data.(*types.P2PBlockTxReply)
assert.True(t, ok)
assert.Equal(t, 2, int(rep.TxIndices[1]))
sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_BlockTxReq{BlockTxReq: &types.P2PBlockTxReq{
BlockHash: blockHash,
TxIndices: nil,
}}}}
data = recvWithTimeout(t, subChan, "case 5")
rep, ok = data.(*types.P2PBlockTxReply)
assert.True(t, ok)
assert.Nil(t, rep.TxIndices)
//query tx //query reply
sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_TxReq{TxReq: &types.P2PTxReq{TxHash: tx.Hash()}}}} sendChan <- &versionData{rawData: &types.P2PBlockTxReply{
_, ok := (<-subChan).(*types.P2PTx) BlockHash: blockHash,
assert.True(t, ok) TxIndices: []int32{1},
sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_BlockTxReq{BlockTxReq: &types.P2PBlockTxReq{ Txs: txList[1:2],
BlockHash: blockHash, }}
TxIndices: []int32{1, 2}, rep1, ok := recvWithTimeout(t, subChan, "case 6").(*types.P2PQueryData)
}}}} assert.True(t, ok)
rep, ok := (<-subChan).(*types.P2PBlockTxReply) assert.Nil(t, rep1.GetBlockTxReq().GetTxIndices())
assert.True(t, ok) sendChan <- &versionData{rawData: &types.P2PBlockTxReply{
assert.Equal(t, 2, int(rep.TxIndices[1])) BlockHash: blockHash,
sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_BlockTxReq{BlockTxReq: &types.P2PBlockTxReq{ Txs: txList[0:],
BlockHash: blockHash, }}
TxIndices: nil, for ltBlockCache.Contains(blockHash) {
}}}} time.Sleep(time.Millisecond)
rep, ok = (<-subChan).(*types.P2PBlockTxReply) }
assert.True(t, ok) //send tx with max ttl
assert.Nil(t, rep.TxIndices) _, doSend := node.processSendP2P(&types.P2PTx{Tx: tx, Route: &types.P2PRoute{TTL: node.nodeInfo.cfg.MaxTTL + 1}}, lightBroadCastVersion, pid+"5", "testIP:port")
assert.False(t, doSend)
}
//query reply // 等待接收channel数据,超时报错
sendChan <- &versionData{rawData: &types.P2PBlockTxReply{ func recvWithTimeout(t *testing.T, ch chan interface{}, testCase string) interface{} {
BlockHash: blockHash, select {
TxIndices: []int32{1}, case data := <-ch:
Txs: txList[1:2], return data
}} case <-time.After(time.Second * 10):
rep1, ok := (<-subChan).(*types.P2PQueryData) t.Error(testCase, "waitChanTimeout")
assert.True(t, ok) t.FailNow()
assert.Nil(t, rep1.GetBlockTxReq().GetTxIndices())
sendChan <- &versionData{rawData: &types.P2PBlockTxReply{
BlockHash: blockHash,
Txs: txList[0:],
}}
for ltBlockCache.Contains(blockHash) {
}
//max ttl
_, doSend := node.processSendP2P(&types.P2PTx{Tx: tx, Route: &types.P2PRoute{TTL: node.nodeInfo.cfg.MaxTTL + 1}}, lightBroadCastVersion, pid+"5", "testIP:port")
assert.False(t, doSend)
close(testDone)
}()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-testDone:
return
case <-ticker.C:
t.Error("TestP2PProcessTimeout")
return
}
} }
return nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment