Commit 518ee23c authored by libangzhu's avatar libangzhu

update

parents cc1ed4e6 8ac5afa9
...@@ -113,13 +113,13 @@ func (c Comm) dialPeerWithAddress(addr *NetAddress, persistent bool, node *Node) ...@@ -113,13 +113,13 @@ func (c Comm) dialPeerWithAddress(addr *NetAddress, persistent bool, node *Node)
conn, err := addr.DialTimeout(node.nodeInfo.channelVersion, node.nodeInfo.cliCreds, node.nodeInfo.blacklist) conn, err := addr.DialTimeout(node.nodeInfo.channelVersion, node.nodeInfo.cliCreds, node.nodeInfo.blacklist)
if err != nil { if err != nil {
log.Error("dialPeerWithAddress","DialTimeoutErr",err.Error()) log.Error("dialPeerWithAddress", "DialTimeoutErr", err.Error())
return nil, err return nil, err
} }
peer, err := c.newPeerFromConn(conn, addr, node) peer, err := c.newPeerFromConn(conn, addr, node)
if err != nil { if err != nil {
log.Error("dialPeerWithAddress","newPeerFromConn",err) log.Error("dialPeerWithAddress", "newPeerFromConn", err)
err = conn.Close() err = conn.Close()
return nil, err return nil, err
......
...@@ -92,13 +92,13 @@ Retry: ...@@ -92,13 +92,13 @@ Retry:
if err != nil { if err != nil {
return nil, err return nil, err
} }
if serialNum,ok:= latestSerials.Load(ip);ok{ if serialNum, ok := latestSerials.Load(ip); ok {
bn,_:=big.NewInt(1).SetString(serialNum.(string),10) bn, _ := big.NewInt(1).SetString(serialNum.(string), 10)
if isRevoke(bn){//证书被吊销 拒绝接口请求 if isRevoke(bn) { //证书被吊销 拒绝接口请求
return nil, fmt.Errorf("cert %v revoked", serialNum.(string)) return nil, fmt.Errorf("cert %v revoked", serialNum.(string))
} }
} }
if pServer.node.nodeInfo.blacklist.Has(ip) { if pServer.node.nodeInfo.blacklist.Has(ip) {
return nil, fmt.Errorf("blacklist %v no authorized", ip) return nil, fmt.Errorf("blacklist %v no authorized", ip)
} }
...@@ -123,10 +123,10 @@ Retry: ...@@ -123,10 +123,10 @@ Retry:
if err != nil { if err != nil {
return err return err
} }
if serialNum,ok:= latestSerials.Load(ip);ok{ if serialNum, ok := latestSerials.Load(ip); ok {
bn,_:=big.NewInt(1).SetString(serialNum.(string),10) bn, _ := big.NewInt(1).SetString(serialNum.(string), 10)
if isRevoke(bn){//证书被吊销 拒绝接口请求 if isRevoke(bn) { //证书被吊销 拒绝接口请求
return fmt.Errorf("cert %v revoked", serialNum.(string)) return fmt.Errorf("cert %v revoked", serialNum.(string))
} }
} }
......
...@@ -6,13 +6,14 @@ package gossip ...@@ -6,13 +6,14 @@ package gossip
import ( import (
"bytes" "bytes"
"github.com/33cn/chain33/rpc/jsonclient"
"io" "io"
"math/big" "math/big"
"net/http" "net/http"
"strings" "strings"
"time" "time"
"github.com/33cn/chain33/rpc/jsonclient"
"github.com/33cn/chain33/p2p/utils" "github.com/33cn/chain33/p2p/utils"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
) )
...@@ -607,33 +608,30 @@ func (n *Node) monitorCerts() { ...@@ -607,33 +608,30 @@ func (n *Node) monitorCerts() {
} }
ticker := time.NewTicker(CheckCfgCertInterVal) ticker := time.NewTicker(CheckCfgCertInterVal)
defer ticker.Stop() defer ticker.Stop()
jcli, err := jsonclient.New("chain33-ca-server",n.nodeInfo.caServer , false) jcli, err := jsonclient.New("chain33-ca-server", n.nodeInfo.caServer, false)
if err != nil { if err != nil {
log.Error("monitorCerts", "rpc call err", err) log.Error("monitorCerts", "rpc call err", err)
return return
} }
//delayT:=time.Now().Add(time.Minute*2)
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
//check serialNum //check serialNum
// if !time.Now().After(delayT){
// continue
// }
var resp []string var resp []string
var s Serial var s Serial
s.Serials =getSerialNums() s.Serials = getSerialNums()
if len(s.Serials) == 0 { if len(s.Serials) == 0 {
continue continue
} }
log.Debug("check cert serialNum++++++","certNum.",len(s.Serials )) log.Debug("check cert serialNum++++++", "certNum.", len(s.Serials))
err = jcli.Call("Validate", s, &resp) err = jcli.Call("Validate", s, &resp)
if err != nil { if err != nil {
log.Error("monitorCerts", "rpc call err", err) log.Error("monitorCerts", "rpc call err", err)
continue continue
} }
log.Debug("monitorCerts","resp", resp) log.Debug("monitorCerts", "resp", resp)
tempCerts := getSerials() tempCerts := getSerials()
for _, serialNum := range resp { for _, serialNum := range resp {
...@@ -656,18 +654,18 @@ func (n *Node) monitorCerts() { ...@@ -656,18 +654,18 @@ func (n *Node) monitorCerts() {
// } // }
//log.Info("monitorCerts","add blacklist",certinfo.ip) //log.Info("monitorCerts","add blacklist",certinfo.ip)
//n.nodeInfo.blacklist.Add(certinfo.ip, 60) //n.nodeInfo.blacklist.Add(certinfo.ip, 60)
for pname,peer:=range n.nodeInfo.peerInfos.GetPeerInfos(){ for pname, peer := range n.nodeInfo.peerInfos.GetPeerInfos() {
if peer.GetAddr()==certinfo.ip { if peer.GetAddr() == certinfo.ip {
v,ok:= latestSerials.Load(certinfo.ip) v, ok := latestSerials.Load(certinfo.ip)
if ok && v.(string)==serialNum{ if ok && v.(string) == serialNum {
n.remove(pname)//断开已经连接的节点 n.remove(pname) //断开已经连接的节点
} }
} }
} }
} }
} }
log.Debug("monitorCert","tempCerts",tempCerts) log.Debug("monitorCert", "tempCerts", tempCerts)
//处理解除吊销的节点 //处理解除吊销的节点
for serialNum, info := range tempCerts { for serialNum, info := range tempCerts {
if info.revoke { if info.revoke {
...@@ -676,11 +674,7 @@ func (n *Node) monitorCerts() { ...@@ -676,11 +674,7 @@ func (n *Node) monitorCerts() {
sNum, _ = sNum.SetString(serialNum, 10) sNum, _ = sNum.SetString(serialNum, 10)
updateCertSerial(sNum, !info.revoke) updateCertSerial(sNum, !info.revoke)
} }
/*
//拉入黑名单的节点 恢复正常
if n.nodeInfo.blacklist.Has(info.ip) {
n.nodeInfo.blacklist.Delete(info.ip)
}*/
} }
} }
......
...@@ -7,12 +7,13 @@ package gossip ...@@ -7,12 +7,13 @@ package gossip
import ( import (
"context" "context"
"fmt" "fmt"
pr "google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"net" "net"
"strconv" "strconv"
"time" "time"
pr "google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
pb "github.com/33cn/chain33/types" pb "github.com/33cn/chain33/types"
...@@ -172,7 +173,7 @@ func (na *NetAddress) DialTimeout(version int32, creds credentials.TransportCred ...@@ -172,7 +173,7 @@ func (na *NetAddress) DialTimeout(version int32, creds credentials.TransportCred
if err != nil { if err != nil {
return err return err
} }
log.Info("interceptor client","remoteAddr",na.String()) log.Info("interceptor client", "remoteAddr", na.String())
if bList != nil && bList.Has(ip) { if bList != nil && bList.Has(ip) {
return fmt.Errorf("blacklist peer %v no authorized", ip) return fmt.Errorf("blacklist peer %v no authorized", ip)
...@@ -188,7 +189,7 @@ func (na *NetAddress) DialTimeout(version int32, creds credentials.TransportCred ...@@ -188,7 +189,7 @@ func (na *NetAddress) DialTimeout(version int32, creds credentials.TransportCred
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.Info("interceptorStream client","remoteAddr",na.String()) log.Info("interceptorStream client", "remoteAddr", na.String())
if bList.Has(ip) { if bList.Has(ip) {
return nil, fmt.Errorf("blacklist peer %v no authorized", ip) return nil, fmt.Errorf("blacklist peer %v no authorized", ip)
} }
...@@ -197,18 +198,18 @@ func (na *NetAddress) DialTimeout(version int32, creds credentials.TransportCred ...@@ -197,18 +198,18 @@ func (na *NetAddress) DialTimeout(version int32, creds credentials.TransportCred
} }
//grpc.WithPerRPCCredentials //grpc.WithPerRPCCredentials
tcpAddr,err:= net.ResolveTCPAddr("tcp",na.String()) tcpAddr, err := net.ResolveTCPAddr("tcp", na.String())
if err!=nil{ if err != nil {
return nil, err return nil, err
} }
peer := &pr.Peer{ peer := &pr.Peer{
Addr:tcpAddr, Addr: tcpAddr,
AuthInfo: nil, AuthInfo: nil,
} }
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel() defer cancel()
ctxV:= pr.NewContext(ctx,peer) ctxV := pr.NewContext(ctx, peer)
conn, err := grpc.DialContext(ctxV, na.String(), conn, err := grpc.DialContext(ctxV, na.String(),
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")), grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
......
...@@ -159,7 +159,7 @@ func NewNode(mgr *p2p.Manager, mcfg *subConfig) (*Node, error) { ...@@ -159,7 +159,7 @@ func NewNode(mgr *p2p.Manager, mcfg *subConfig) (*Node, error) {
ServerName: "", ServerName: "",
RootCAs: certPool, RootCAs: certPool,
}) })
node.nodeInfo.caServer=mcfg.CaServer node.nodeInfo.caServer = mcfg.CaServer
} }
if mcfg.ServerStart { if mcfg.ServerStart {
......
...@@ -36,7 +36,7 @@ type NodeInfo struct { ...@@ -36,7 +36,7 @@ type NodeInfo struct {
channelVersion int32 channelVersion int32
cliCreds credentials.TransportCredentials cliCreds credentials.TransportCredentials
servCreds credentials.TransportCredentials servCreds credentials.TransportCredentials
caServer string caServer string
} }
// NewNodeInfo new a node object // NewNodeInfo new a node object
......
...@@ -62,11 +62,10 @@ type subConfig struct { ...@@ -62,11 +62,10 @@ type subConfig struct {
//是否使用证书进行节点之间的通信,true 使用证书通信,读取rpc配置项下的证书文件 //是否使用证书进行节点之间的通信,true 使用证书通信,读取rpc配置项下的证书文件
EnableTls bool `protobuf:"varint,13,opt,name=enableTls" json:"enableTls,omitempty"` EnableTls bool `protobuf:"varint,13,opt,name=enableTls" json:"enableTls,omitempty"`
CaCert string `json:"caCert,omitempty"` CaCert string `json:"caCert,omitempty"`
CaServer string `json:"caServer,omitempty"` CaServer string `json:"caServer,omitempty"`
CertFile string `json:"certFile,omitempty"` CertFile string `json:"certFile,omitempty"`
// 私钥文件 // 私钥文件
KeyFile string `json:"keyFile,omitempty"` KeyFile string `json:"keyFile,omitempty"`
} }
// P2p interface // P2p interface
......
...@@ -55,7 +55,7 @@ type Peer struct { ...@@ -55,7 +55,7 @@ type Peer struct {
taskChan chan interface{} //tx block taskChan chan interface{} //tx block
inBounds int32 //连接此节点的客户端节点数量 inBounds int32 //连接此节点的客户端节点数量
IsMaxInbouds bool IsMaxInbouds bool
serialNnum string serialNnum string
} }
// NewPeer produce a peer object // NewPeer produce a peer object
......
...@@ -6,13 +6,14 @@ import ( ...@@ -6,13 +6,14 @@ import (
"crypto/x509" "crypto/x509"
"errors" "errors"
"fmt" "fmt"
"google.golang.org/grpc/credentials"
"math/big" "math/big"
"net" "net"
"net/url" "net/url"
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
"google.golang.org/grpc/credentials"
) )
var serials = make(map[string]*certInfo) var serials = make(map[string]*certInfo)
...@@ -30,22 +31,22 @@ type certInfo struct { ...@@ -30,22 +31,22 @@ type certInfo struct {
type Serial struct { type Serial struct {
Serials []string `json:"serials,omitempty"` Serials []string `json:"serials,omitempty"`
} }
//serialNum -->ip //serialNum -->ip
func addCertSerial(serial *big.Int, ip string) { func addCertSerial(serial *big.Int, ip string) {
revokeLock.Lock() revokeLock.Lock()
defer revokeLock.Unlock() defer revokeLock.Unlock()
serials[serial.String()] = &certInfo{false, ip,serial.String()} serials[serial.String()] = &certInfo{false, ip, serial.String()}
} }
func updateCertSerial(serial *big.Int, revoke bool) *certInfo { func updateCertSerial(serial *big.Int, revoke bool) *certInfo {
revokeLock.Lock() revokeLock.Lock()
defer revokeLock.Unlock() defer revokeLock.Unlock()
v, ok := serials[serial.String()] v, ok := serials[serial.String()]
if ok { if ok {
v.revoke = revoke v.revoke = revoke
}else{ } else {
return nil return nil
} }
serials[serial.String()] = v serials[serial.String()] = v
...@@ -53,7 +54,7 @@ func updateCertSerial(serial *big.Int, revoke bool) *certInfo { ...@@ -53,7 +54,7 @@ func updateCertSerial(serial *big.Int, revoke bool) *certInfo {
return v return v
} }
func isRevoke(serial *big.Int) bool { func isRevoke(serial *big.Int) bool {
revokeLock.Lock() revokeLock.Lock()
defer revokeLock.Unlock() defer revokeLock.Unlock()
if r, ok := serials[serial.String()]; ok { if r, ok := serials[serial.String()]; ok {
...@@ -62,12 +63,12 @@ func isRevoke(serial *big.Int) bool { ...@@ -62,12 +63,12 @@ func isRevoke(serial *big.Int) bool {
return false return false
} }
func removeCertSerial(serial *big.Int) { func removeCertSerial(serial *big.Int) {
revokeLock.Lock() revokeLock.Lock()
defer revokeLock.Unlock() defer revokeLock.Unlock()
delete(serials, serial.String()) delete(serials, serial.String())
} }
func getSerialNums() []string { func getSerialNums() []string {
revokeLock.Lock() revokeLock.Lock()
defer revokeLock.Unlock() defer revokeLock.Unlock()
var certs []string var certs []string
...@@ -77,7 +78,7 @@ func getSerialNums() []string { ...@@ -77,7 +78,7 @@ func getSerialNums() []string {
return certs return certs
} }
func getSerials() map[string]*certInfo { func getSerials() map[string]*certInfo {
revokeLock.Lock() revokeLock.Lock()
defer revokeLock.Unlock() defer revokeLock.Unlock()
var certs = make(map[string]*certInfo) var certs = make(map[string]*certInfo)
...@@ -144,14 +145,14 @@ func (c *Tls) ClientHandshake(ctx context.Context, authority string, rawConn net ...@@ -144,14 +145,14 @@ func (c *Tls) ClientHandshake(ctx context.Context, authority string, rawConn net
log.Debug("ClientHandshake", "peerSerialNum", peerSerialNum, "certificate Num", certNum, "remoteAddr", rawConn.RemoteAddr(), "tlsInfo", tlsInfo) log.Debug("ClientHandshake", "peerSerialNum", peerSerialNum, "certificate Num", certNum, "remoteAddr", rawConn.RemoteAddr(), "tlsInfo", tlsInfo)
addrSplites := strings.Split(rawConn.RemoteAddr().String(), ":") addrSplites := strings.Split(rawConn.RemoteAddr().String(), ":")
//检查证书是否被吊销 //检查证书是否被吊销
if isRevoke(peerSerialNum){ if isRevoke(peerSerialNum) {
conn.Close() conn.Close()
return nil,nil,errors.New(fmt.Sprintf("tls ClientHandshake %v revoked",peerSerialNum.String())) return nil, nil, errors.New(fmt.Sprintf("tls ClientHandshake %v revoked", peerSerialNum.String()))
} }
if len(addrSplites) > 0 { //服务端证书的序列号,已经其IP地址 if len(addrSplites) > 0 { //服务端证书的序列号,已经其IP地址
addCertSerial(peerSerialNum, addrSplites[0]) addCertSerial(peerSerialNum, addrSplites[0])
latestSerials.Store(addrSplites[0],peerSerialNum.String())//ip --->serialNum latestSerials.Store(addrSplites[0], peerSerialNum.String()) //ip --->serialNum
} }
} }
...@@ -181,16 +182,16 @@ func (c *Tls) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, ...@@ -181,16 +182,16 @@ func (c *Tls) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo,
if certNum != 0 { if certNum != 0 {
peerSerialNum := peerCert[0].SerialNumber peerSerialNum := peerCert[0].SerialNumber
//log.Info("ServerHandshake","certinfo",string(tlsInfo.State.PeerCertificates[0].Raw)) //log.Info("ServerHandshake","certinfo",string(tlsInfo.State.PeerCertificates[0].Raw))
log.Debug("ServerHandshake", "peerSerialNum", peerSerialNum, "certificate Num", certNum, "remoteAddr", rawConn.RemoteAddr(), "tlsinfo", tlsInfo,"remoteAddr",conn.RemoteAddr()) log.Debug("ServerHandshake", "peerSerialNum", peerSerialNum, "certificate Num", certNum, "remoteAddr", rawConn.RemoteAddr(), "tlsinfo", tlsInfo, "remoteAddr", conn.RemoteAddr())
if isRevoke(peerSerialNum) { if isRevoke(peerSerialNum) {
rawConn.Close() rawConn.Close()
return nil, nil, errors.New(fmt.Sprintf( "tls ServerHandshake %s revoked", peerSerialNum.String())) return nil, nil, errors.New(fmt.Sprintf("tls ServerHandshake %s revoked", peerSerialNum.String()))
} }
addrSplites := strings.Split(rawConn.RemoteAddr().String(), ":") addrSplites := strings.Split(rawConn.RemoteAddr().String(), ":")
if len(addrSplites) > 0 { if len(addrSplites) > 0 {
addCertSerial(peerSerialNum, addrSplites[0]) addCertSerial(peerSerialNum, addrSplites[0])
latestSerials.Store(addrSplites[0],peerSerialNum.String())//ip --->serialNum latestSerials.Store(addrSplites[0], peerSerialNum.String()) //ip --->serialNum
} }
} else { } else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment