Commit 8850975d authored by caopingcp's avatar caopingcp Committed by 33cn

fix qbft unit test stuck

parent 3bb3c030
...@@ -207,7 +207,7 @@ func (node *Node) Start() { ...@@ -207,7 +207,7 @@ func (node *Node) Start() {
randomSleep(0) randomSleep(0)
err := node.DialPeerWithAddress(addr) err := node.DialPeerWithAddress(addr)
if err != nil { if err != nil {
qbftlog.Debug("Error dialing peer", "err", err) qbftlog.Error("Error dialing peer", "err", err)
} }
}(i) }(i)
} }
...@@ -366,8 +366,19 @@ func (node *Node) StopPeerForError(peer Peer, reason interface{}) { ...@@ -366,8 +366,19 @@ func (node *Node) StopPeerForError(peer Peer, reason interface{}) {
} }
} }
func (node *Node) isSeed(addr string) bool {
ip, _ := splitHostPort(addr)
for _, seed := range node.seeds {
host, _ := splitHostPort(seed)
if host == ip {
return true
}
}
return false
}
func (node *Node) addInboundPeer(conn net.Conn) error { func (node *Node) addInboundPeer(conn net.Conn) error {
peerConn, err := newInboundPeerConn(conn, node.privKey, node.StopPeerForError, node.state) peerConn, err := newInboundPeerConn(conn, node.isSeed(conn.RemoteAddr().String()), node.privKey, node.StopPeerForError, node.state)
if err != nil { if err != nil {
if er := conn.Close(); er != nil { if er := conn.Close(); er != nil {
qbftlog.Error("addInboundPeer close conn failed", "er", er) qbftlog.Error("addInboundPeer close conn failed", "er", er)
...@@ -458,7 +469,7 @@ func (node *Node) addPeer(pc *peerConn) error { ...@@ -458,7 +469,7 @@ func (node *Node) addPeer(pc *peerConn) error {
return err return err
} }
qbftlog.Info("Added peer", "peer", pc.ip) qbftlog.Info("Added peer", "ip", pc.ip, "peer", pc)
rs := node.state.GetRoundState() rs := node.state.GetRoundState()
stateMsg := MsgInfo{TypeID: ttypes.NewRoundStepID, Msg: rs.RoundStateMessage(), PeerID: pc.id, PeerIP: pc.ip.String()} stateMsg := MsgInfo{TypeID: ttypes.NewRoundStepID, Msg: rs.RoundStateMessage(), PeerID: pc.id, PeerIP: pc.ip.String()}
pc.Send(stateMsg) pc.Send(stateMsg)
...@@ -685,6 +696,7 @@ func newOutboundPeerConn(addr string, ourNodePrivKey crypto.PrivKey, onPeerError ...@@ -685,6 +696,7 @@ func newOutboundPeerConn(addr string, ourNodePrivKey crypto.PrivKey, onPeerError
func newInboundPeerConn( func newInboundPeerConn(
conn net.Conn, conn net.Conn,
persistent bool,
ourNodePrivKey crypto.PrivKey, ourNodePrivKey crypto.PrivKey,
onPeerError func(Peer, interface{}), onPeerError func(Peer, interface{}),
state *ConsensusState, state *ConsensusState,
...@@ -692,7 +704,7 @@ func newInboundPeerConn( ...@@ -692,7 +704,7 @@ func newInboundPeerConn(
// TODO: issue PoW challenge // TODO: issue PoW challenge
return newPeerConn(conn, false, false, ourNodePrivKey, onPeerError, state) return newPeerConn(conn, false, persistent, ourNodePrivKey, onPeerError, state)
} }
func newPeerConn( func newPeerConn(
......
...@@ -140,8 +140,9 @@ func (ps *PeerSet) Add(peer Peer) error { ...@@ -140,8 +140,9 @@ func (ps *PeerSet) Add(peer Peer) error {
// peerKey, otherwise false. // peerKey, otherwise false.
func (ps *PeerSet) Has(peerKey ID) bool { func (ps *PeerSet) Has(peerKey ID) bool {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock()
_, ok := ps.lookup[peerKey] _, ok := ps.lookup[peerKey]
ps.mtx.Unlock()
return ok return ok
} }
...@@ -435,7 +436,7 @@ func (pc *peerConn) IsRunning() bool { ...@@ -435,7 +436,7 @@ func (pc *peerConn) IsRunning() bool {
func (pc *peerConn) Start() error { func (pc *peerConn) Start() error {
if atomic.CompareAndSwapUint32(&pc.started, 0, 1) { if atomic.CompareAndSwapUint32(&pc.started, 0, 1) {
if atomic.LoadUint32(&pc.stopped) == 1 { if atomic.LoadUint32(&pc.stopped) == 1 {
qbftlog.Error("peerConn already stoped can not start", "peerIP", pc.ip.String()) qbftlog.Error("peerConn already stopped", "peerIP", pc.ip.String())
return nil return nil
} }
pc.bufReader = bufio.NewReaderSize(pc.conn, minReadBufferSize) pc.bufReader = bufio.NewReaderSize(pc.conn, minReadBufferSize)
...@@ -525,13 +526,13 @@ FOR_LOOP: ...@@ -525,13 +526,13 @@ FOR_LOOP:
raw := encodeMsg(msg.Msg, msg.TypeID) raw := encodeMsg(msg.Msg, msg.TypeID)
_, err := pc.bufWriter.Write(raw) _, err := pc.bufWriter.Write(raw)
if err != nil { if err != nil {
qbftlog.Error("peerConn sendroutine write data failed", "error", err) qbftlog.Error("sendRoutine buffer write fail", "peer", pc, "err", err)
pc.stopForError(err) pc.stopForError(err)
break FOR_LOOP break FOR_LOOP
} }
err = pc.bufWriter.Flush() err = pc.bufWriter.Flush()
if err != nil { if err != nil {
qbftlog.Error("peerConn sendroutine flush buffer failed", "error", err) qbftlog.Error("sendRoutine buffer flush fail", "peer", pc, "err", err)
pc.stopForError(err) pc.stopForError(err)
break FOR_LOOP break FOR_LOOP
} }
...@@ -547,7 +548,7 @@ FOR_LOOP: ...@@ -547,7 +548,7 @@ FOR_LOOP:
var buf [6]byte var buf [6]byte
_, err := io.ReadFull(pc.bufReader, buf[:]) _, err := io.ReadFull(pc.bufReader, buf[:])
if err != nil { if err != nil {
qbftlog.Error("recvRoutine read byte fail", "conn", pc, "err", err) qbftlog.Error("recvRoutine read byte fail", "peer", pc, "err", err)
pc.stopForError(err) pc.stopForError(err)
break FOR_LOOP break FOR_LOOP
} }
......
...@@ -41,7 +41,7 @@ func TestQbft(t *testing.T) { ...@@ -41,7 +41,7 @@ func TestQbft(t *testing.T) {
mock33.Listen() mock33.Listen()
t.Log(mock33.GetGenesisAddress()) t.Log(mock33.GetGenesisAddress())
go startNode(t) go startNode(t)
time.Sleep(2 * time.Second) time.Sleep(3 * time.Second)
configTx := configManagerTx() configTx := configManagerTx()
_, err := mock33.GetAPI().SendTx(configTx) _, err := mock33.GetAPI().SendTx(configTx)
...@@ -79,6 +79,8 @@ func startNode(t *testing.T) { ...@@ -79,6 +79,8 @@ func startNode(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
sub.Consensus["qbft"] = qcfg sub.Consensus["qbft"] = qcfg
mock33_2 := testnode.NewWithConfig(cfg2, nil) mock33_2 := testnode.NewWithConfig(cfg2, nil)
mock33_2.Listen()
time.Sleep(3 * time.Second)
defer clearQbftData("datadir2") defer clearQbftData("datadir2")
defer mock33_2.Close() defer mock33_2.Close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment