Commit a9e3544c authored by vipwzw's avatar vipwzw Committed by 33cn

fix queue blocked bug

parent 318bd0ed
......@@ -176,6 +176,9 @@ func (client *client) Close() {
client.wg.Wait()
atomic.StoreInt32(&client.isClosed, 1)
close(client.Recv())
for msg := range client.Recv() {
msg.ReplyErr("client.close", types.ErrChannelClosed)
}
}
// CloseQueue 关闭消息队列
......
......@@ -423,3 +423,50 @@ func BenchmarkChanSubCallback2(b *testing.B) {
client.Reply(msg)
}
}
func TestChannelClose(t *testing.T) {
//send timeout and recv timeout
q := New("channel")
//mempool
done := make(chan struct{}, 1)
go func() {
client := q.Client()
client.Sub("mempool")
for {
select {
case msg := <-client.Recv():
if msg == nil {
return
}
if msg.Ty == types.EventTx {
msg.Reply(client.NewMessage("mempool", types.EventReply, types.Reply{IsOk: true, Msg: []byte("word")}))
}
case <-done:
client.Close()
return
}
}
}()
client := q.Client()
go q.Start()
//rpc 模块 会向其他模块发送消息,自己本身不需要订阅消息
go func() {
done <- struct{}{}
}()
for i := 0; i < 10000; i++ {
msg := client.NewMessage("mempool", types.EventTx, "hello")
err := client.SendTimeout(msg, true, 0)
if err == types.ErrChannelClosed {
return
}
if err != nil {
t.Error(err)
return
}
_, err = client.Wait(msg)
if err != nil {
t.Error(err)
}
}
}
......@@ -96,6 +96,9 @@ func (client *Client) CheckBlock(parent *types.Block, current *types.BlockDetail
func (client *Client) CreateBlock() {
issleep := true
for {
if client.IsClosed() {
break
}
if !client.IsMining() || !client.IsCaughtUp() {
time.Sleep(client.sleepTime)
continue
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment