Commit 10c93e40 authored by libangzhu's avatar libangzhu

update code

parent 81bf3dbb
...@@ -7,7 +7,6 @@ package gossip ...@@ -7,7 +7,6 @@ package gossip
import ( import (
"bytes" "bytes"
"io" "io"
"math/big"
"net/http" "net/http"
"strings" "strings"
"time" "time"
...@@ -635,41 +634,26 @@ func (n *Node) monitorCerts() { ...@@ -635,41 +634,26 @@ func (n *Node) monitorCerts() {
log.Debug("monitorCerts", "resp", resp) log.Debug("monitorCerts", "resp", resp)
tempCerts := getSerials() tempCerts := getSerials()
for _, serialNum := range resp { for _, serialNum := range resp {
//被吊销的证书序列号
var ok bool
sNum := big.NewInt(1)
sNum, ok = sNum.SetString(serialNum, 10)
if !ok {
log.Error("monitorCerts", "big.Int Setstring err", serialNum)
continue
}
//设置证书序列号状态 //设置证书序列号状态
certinfo := updateCertSerial(sNum, true) certinfo := updateCertSerial(serialNum, true)
delete(tempCerts, sNum.String()) delete(tempCerts, serialNum)
if certinfo != nil { for pname, peer := range n.nodeInfo.peerInfos.GetPeerInfos() {
for pname, peer := range n.nodeInfo.peerInfos.GetPeerInfos() { if peer.GetAddr() == certinfo.ip {
if peer.GetAddr() == certinfo.ip { v, ok := latestSerials.Load(certinfo.ip)
v, ok := latestSerials.Load(certinfo.ip) if ok && v.(string) == serialNum {
if ok && v.(string) == serialNum { n.remove(pname) //断开已经连接的节点
n.remove(pname) //断开已经连接的节点
}
} }
} }
} }
} }
log.Debug("monitorCert", "tempCerts", tempCerts) log.Debug("monitorCert", "tempCerts", tempCerts)
//处理解除吊销的节点 //处理解除吊销的节点
for serialNum, info := range tempCerts { for serialNum, info := range tempCerts {
if info.revoke { if info.revoke {
// 被撤销的证书恢复正常 // 被撤销的证书恢复正常
sNum := big.NewInt(1) updateCertSerial(serialNum, !info.revoke)
sNum, _ = sNum.SetString(serialNum, 10)
updateCertSerial(sNum, !info.revoke)
} }
} }
} }
......
...@@ -212,7 +212,7 @@ func (na *NetAddress) DialTimeout(version int32, creds credentials.TransportCred ...@@ -212,7 +212,7 @@ func (na *NetAddress) DialTimeout(version int32, creds credentials.TransportCred
//判断是否对方是否支持压缩 //判断是否对方是否支持压缩
cli := pb.NewP2PgserviceClient(conn) cli := pb.NewP2PgserviceClient(conn)
_, err = cli.GetHeaders(context.Background(), &pb.P2PGetHeaders{StartHeight: 0, EndHeight: 0, Version: version}, grpc.WaitForReady(true)) _, err = cli.GetHeaders(context.Background(), &pb.P2PGetHeaders{StartHeight: 0, EndHeight: 0, Version: version}, grpc.WaitForReady(false))
if err != nil && !isCompressSupport(err) { if err != nil && !isCompressSupport(err) {
//compress not support //compress not support
log.Error("compress not supprot , rollback to uncompress version", "addr", na.String()) log.Error("compress not supprot , rollback to uncompress version", "addr", na.String())
......
...@@ -134,24 +134,24 @@ func NewNode(mgr *p2p.Manager, mcfg *subConfig) (*Node, error) { ...@@ -134,24 +134,24 @@ func NewNode(mgr *p2p.Manager, mcfg *subConfig) (*Node, error) {
//不需要CA //不需要CA
node.nodeInfo.cliCreds, err = credentials.NewClientTLSFromFile(mcfg.CertFile, "") node.nodeInfo.cliCreds, err = credentials.NewClientTLSFromFile(mcfg.CertFile, "")
if err != nil { if err != nil {
panic(err) panic(fmt.Sprintf("NewClientTLSFromFile panic:%v", err.Error()))
} }
node.nodeInfo.servCreds, err = credentials.NewServerTLSFromFile(mcfg.CertFile, mcfg.KeyFile) node.nodeInfo.servCreds, err = credentials.NewServerTLSFromFile(mcfg.CertFile, mcfg.KeyFile)
if err != nil { if err != nil {
panic(err) panic(fmt.Sprintf("NewServerTLSFromFile panic:%v", err.Error()))
} }
} else { } else {
//CA //CA
cert, err := tls.LoadX509KeyPair(mcfg.CertFile, mcfg.KeyFile) cert, err := tls.LoadX509KeyPair(mcfg.CertFile, mcfg.KeyFile)
if err != nil { if err != nil {
panic(err) panic(fmt.Sprintf("LoadX509KeyPair panic:%v", err.Error()))
} }
certPool := x509.NewCertPool() certPool := x509.NewCertPool()
//添加CA校验 //添加CA校验
//把CA证书读进去,动态更新CA中的吊销列表 //把CA证书读进去,动态更新CA中的吊销列表
ca, err := ioutil.ReadFile(mcfg.CaCert) ca, err := ioutil.ReadFile(mcfg.CaCert)
if err != nil { if err != nil {
panic(err) panic(fmt.Sprintf("readFile ca panic:%v", err.Error()))
} }
if ok := certPool.AppendCertsFromPEM(ca); !ok { if ok := certPool.AppendCertsFromPEM(ca); !ok {
......
...@@ -110,7 +110,7 @@ func (m *Cli) GetMemPool(msg *queue.Message, taskindex int64) { ...@@ -110,7 +110,7 @@ func (m *Cli) GetMemPool(msg *queue.Message, taskindex int64) {
for _, peer := range peers { for _, peer := range peers {
//获取远程 peer invs //获取远程 peer invs
resp, err := peer.mconn.gcli.GetMemPool(context.Background(), resp, err := peer.mconn.gcli.GetMemPool(context.Background(),
&pb.P2PGetMempool{Version: m.network.node.nodeInfo.channelVersion}, grpc.WaitForReady(true)) &pb.P2PGetMempool{Version: m.network.node.nodeInfo.channelVersion}, grpc.WaitForReady(false))
P2pComm.CollectPeerStat(err, peer) P2pComm.CollectPeerStat(err, peer)
if err != nil { if err != nil {
if err == pb.ErrVersion { if err == pb.ErrVersion {
...@@ -142,7 +142,7 @@ func (m *Cli) GetMemPool(msg *queue.Message, taskindex int64) { ...@@ -142,7 +142,7 @@ func (m *Cli) GetMemPool(msg *queue.Message, taskindex int64) {
} }
//获取真正的交易Tx call GetData //获取真正的交易Tx call GetData
datacli, dataerr := peer.mconn.gcli.GetData(context.Background(), datacli, dataerr := peer.mconn.gcli.GetData(context.Background(),
&pb.P2PGetData{Invs: ableInv, Version: m.network.node.nodeInfo.channelVersion}, grpc.WaitForReady(true)) &pb.P2PGetData{Invs: ableInv, Version: m.network.node.nodeInfo.channelVersion}, grpc.WaitForReady(false))
P2pComm.CollectPeerStat(dataerr, peer) P2pComm.CollectPeerStat(dataerr, peer)
if dataerr != nil { if dataerr != nil {
continue continue
...@@ -174,7 +174,7 @@ func (m *Cli) GetMemPool(msg *queue.Message, taskindex int64) { ...@@ -174,7 +174,7 @@ func (m *Cli) GetMemPool(msg *queue.Message, taskindex int64) {
func (m *Cli) GetAddr(peer *Peer) ([]string, error) { func (m *Cli) GetAddr(peer *Peer) ([]string, error) {
resp, err := peer.mconn.gcli.GetAddr(context.Background(), &pb.P2PGetAddr{Nonce: int64(rand.Int31n(102040))}, resp, err := peer.mconn.gcli.GetAddr(context.Background(), &pb.P2PGetAddr{Nonce: int64(rand.Int31n(102040))},
grpc.WaitForReady(true)) grpc.WaitForReady(false))
P2pComm.CollectPeerStat(err, peer) P2pComm.CollectPeerStat(err, peer)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -192,7 +192,7 @@ func (m *Cli) GetInPeersNum(peer *Peer) (int, error) { ...@@ -192,7 +192,7 @@ func (m *Cli) GetInPeersNum(peer *Peer) (int, error) {
} }
resp, err := peer.mconn.gcli.CollectInPeers(context.Background(), ping, resp, err := peer.mconn.gcli.CollectInPeers(context.Background(), ping,
grpc.WaitForReady(true)) grpc.WaitForReady(false))
P2pComm.CollectPeerStat(err, peer) P2pComm.CollectPeerStat(err, peer)
if err != nil { if err != nil {
...@@ -210,7 +210,7 @@ func (m *Cli) GetAddrList(peer *Peer) (map[string]*pb.P2PPeerInfo, error) { ...@@ -210,7 +210,7 @@ func (m *Cli) GetAddrList(peer *Peer) (map[string]*pb.P2PPeerInfo, error) {
return addrlist, fmt.Errorf("pointer is nil") return addrlist, fmt.Errorf("pointer is nil")
} }
resp, err := peer.mconn.gcli.GetAddrList(context.Background(), &pb.P2PGetAddr{Nonce: int64(rand.Int31n(102040))}, resp, err := peer.mconn.gcli.GetAddrList(context.Background(), &pb.P2PGetAddr{Nonce: int64(rand.Int31n(102040))},
grpc.WaitForReady(true)) grpc.WaitForReady(false))
P2pComm.CollectPeerStat(err, peer) P2pComm.CollectPeerStat(err, peer)
if err != nil { if err != nil {
...@@ -272,7 +272,7 @@ func (m *Cli) SendVersion(peer *Peer, nodeinfo *NodeInfo) (string, error) { ...@@ -272,7 +272,7 @@ func (m *Cli) SendVersion(peer *Peer, nodeinfo *NodeInfo) (string, error) {
resp, err := peer.mconn.gcli.Version2(context.Background(), &pb.P2PVersion{Version: nodeinfo.channelVersion, Service: int64(nodeinfo.ServiceTy()), Timestamp: pb.Now().Unix(), resp, err := peer.mconn.gcli.Version2(context.Background(), &pb.P2PVersion{Version: nodeinfo.channelVersion, Service: int64(nodeinfo.ServiceTy()), Timestamp: pb.Now().Unix(),
AddrRecv: peer.Addr(), AddrFrom: addrfrom, Nonce: int64(rand.Int31n(102040)), AddrRecv: peer.Addr(), AddrFrom: addrfrom, Nonce: int64(rand.Int31n(102040)),
UserAgent: hex.EncodeToString(in.Sign.GetPubkey()), StartHeight: blockheight}, grpc.WaitForReady(true)) UserAgent: hex.EncodeToString(in.Sign.GetPubkey()), StartHeight: blockheight}, grpc.WaitForReady(false))
log.Debug("SendVersion", "resp", resp, "from", addrfrom, "to", peer.Addr()) log.Debug("SendVersion", "resp", resp, "from", addrfrom, "to", peer.Addr())
if err != nil { if err != nil {
log.Error("SendVersion", "Verson", err.Error(), "peer", peer.Addr()) log.Error("SendVersion", "Verson", err.Error(), "peer", peer.Addr())
...@@ -317,7 +317,7 @@ func (m *Cli) SendPing(peer *Peer, nodeinfo *NodeInfo) error { ...@@ -317,7 +317,7 @@ func (m *Cli) SendPing(peer *Peer, nodeinfo *NodeInfo) error {
return err return err
} }
r, err := peer.mconn.gcli.Ping(context.Background(), ping, grpc.WaitForReady(true)) r, err := peer.mconn.gcli.Ping(context.Background(), ping, grpc.WaitForReady(false))
P2pComm.CollectPeerStat(err, peer) P2pComm.CollectPeerStat(err, peer)
if err != nil { if err != nil {
return err return err
...@@ -395,7 +395,7 @@ func (m *Cli) GetHeaders(msg *queue.Message, taskindex int64) { ...@@ -395,7 +395,7 @@ func (m *Cli) GetHeaders(msg *queue.Message, taskindex int64) {
if peer, ok := peers[pid[0]]; ok && peer != nil { if peer, ok := peers[pid[0]]; ok && peer != nil {
var err error var err error
headers, err := peer.mconn.gcli.GetHeaders(context.Background(), &pb.P2PGetHeaders{StartHeight: req.GetStart(), EndHeight: req.GetEnd(), headers, err := peer.mconn.gcli.GetHeaders(context.Background(), &pb.P2PGetHeaders{StartHeight: req.GetStart(), EndHeight: req.GetEnd(),
Version: m.network.node.nodeInfo.channelVersion}, grpc.WaitForReady(true)) Version: m.network.node.nodeInfo.channelVersion}, grpc.WaitForReady(false))
P2pComm.CollectPeerStat(err, peer) P2pComm.CollectPeerStat(err, peer)
if err != nil { if err != nil {
log.Error("GetBlocks", "Err", err.Error()) log.Error("GetBlocks", "Err", err.Error())
...@@ -587,7 +587,7 @@ func (m *Cli) CheckSelf(addr string, nodeinfo *NodeInfo) bool { ...@@ -587,7 +587,7 @@ func (m *Cli) CheckSelf(addr string, nodeinfo *NodeInfo) bool {
cli := pb.NewP2PgserviceClient(conn) cli := pb.NewP2PgserviceClient(conn)
resp, err := cli.GetPeerInfo(context.Background(), resp, err := cli.GetPeerInfo(context.Background(),
&pb.P2PGetPeerInfo{Version: nodeinfo.channelVersion}, grpc.WaitForReady(true)) &pb.P2PGetPeerInfo{Version: nodeinfo.channelVersion}, grpc.WaitForReady(false))
if err != nil { if err != nil {
return false return false
} }
......
...@@ -55,7 +55,6 @@ type Peer struct { ...@@ -55,7 +55,6 @@ type Peer struct {
taskChan chan interface{} //tx block taskChan chan interface{} //tx block
inBounds int32 //连接此节点的客户端节点数量 inBounds int32 //连接此节点的客户端节点数量
IsMaxInbouds bool IsMaxInbouds bool
serialNnum string
} }
// NewPeer produce a peer object // NewPeer produce a peer object
...@@ -171,7 +170,7 @@ func (p *Peer) GetInBouns() int32 { ...@@ -171,7 +170,7 @@ func (p *Peer) GetInBouns() int32 {
// GetPeerInfo get peer information of peer // GetPeerInfo get peer information of peer
func (p *Peer) GetPeerInfo() (*pb.P2PPeerInfo, error) { func (p *Peer) GetPeerInfo() (*pb.P2PPeerInfo, error) {
return p.mconn.gcli.GetPeerInfo(context.Background(), &pb.P2PGetPeerInfo{Version: p.node.nodeInfo.channelVersion}, grpc.FailFast(true)) return p.mconn.gcli.GetPeerInfo(context.Background(), &pb.P2PGetPeerInfo{Version: p.node.nodeInfo.channelVersion}, grpc.WaitForReady(false))
} }
func (p *Peer) sendStream() { func (p *Peer) sendStream() {
...@@ -300,7 +299,7 @@ func (p *Peer) readStream() { ...@@ -300,7 +299,7 @@ func (p *Peer) readStream() {
log.Error("readStream", "err:", err.Error(), "peerIp", p.Addr()) log.Error("readStream", "err:", err.Error(), "peerIp", p.Addr())
continue continue
} }
resp, err := p.mconn.gcli.ServerStreamSend(context.Background(), ping, grpc.WaitForReady(true)) resp, err := p.mconn.gcli.ServerStreamSend(context.Background(), ping, grpc.WaitForReady(false))
P2pComm.CollectPeerStat(err, p) P2pComm.CollectPeerStat(err, p)
if err != nil { if err != nil {
log.Error("readStream", "serverstreamsend,err:", err, "peer", p.Addr()) log.Error("readStream", "serverstreamsend,err:", err, "peer", p.Addr())
......
...@@ -41,19 +41,17 @@ func addCertSerial(serial *big.Int, ip string) { ...@@ -41,19 +41,17 @@ func addCertSerial(serial *big.Int, ip string) {
serials[serial.String()] = &certInfo{false, ip, serial.String()} serials[serial.String()] = &certInfo{false, ip, serial.String()}
} }
func updateCertSerial(serial *big.Int, revoke bool) *certInfo { func updateCertSerial(serial string, revoke bool) certInfo {
revokeLock.Lock() revokeLock.Lock()
defer revokeLock.Unlock() defer revokeLock.Unlock()
v, ok := serials[serial.String()] v, ok := serials[serial]
if ok { if ok {
v.revoke = revoke v.revoke = revoke
} else { return *v
return nil
} }
serials[serial.String()] = v
return v //serials[serial.String()] = v
return certInfo{}
} }
func isRevoke(serial *big.Int) bool { func isRevoke(serial *big.Int) bool {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment