Commit 7667a9dc authored by lilinleeli1234's avatar lilinleeli1234 Committed by vipwzw

update close func

parent 039dbcb5
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"sync/atomic"
"sync" "sync"
...@@ -21,7 +22,9 @@ var topic = "mempool" ...@@ -21,7 +22,9 @@ var topic = "mempool"
type Mempool struct { type Mempool struct {
key string key string
wg sync.WaitGroup wg sync.WaitGroup
client queue.Client
mainGrpcCli types.Chain33Client mainGrpcCli types.Chain33Client
isclose int32
} }
//NewMempool 新建mempool 实例 //NewMempool 新建mempool 实例
...@@ -41,9 +44,11 @@ func NewMempool(cfg *types.Mempool) *Mempool { ...@@ -41,9 +44,11 @@ func NewMempool(cfg *types.Mempool) *Mempool {
//SetQueueClient 初始化mempool模块 //SetQueueClient 初始化mempool模块
func (mem *Mempool) SetQueueClient(client queue.Client) { func (mem *Mempool) SetQueueClient(client queue.Client) {
mem.client = client
mem.client.Sub(mem.key)
mem.wg.Add(1) mem.wg.Add(1)
go func() { go func() {
client.Sub(mem.key)
for msg := range client.Recv() { for msg := range client.Recv() {
switch msg.Ty { switch msg.Ty {
case types.EventTx: case types.EventTx:
...@@ -76,6 +81,19 @@ func (mem *Mempool) Wait() {} ...@@ -76,6 +81,19 @@ func (mem *Mempool) Wait() {}
// Close method // Close method
func (mem *Mempool) Close() { func (mem *Mempool) Close() {
if mem.isClose() {
return
}
atomic.StoreInt32(&mem.isclose, 1)
if mem.client != nil {
mem.client.Close()
}
// wait for cycle quit // wait for cycle quit
mlog.Info("para mempool module closing")
mem.wg.Wait() mem.wg.Wait()
mlog.Info("para mempool module closed")
}
func (mem *Mempool) isClose() bool {
return atomic.LoadInt32(&mem.isclose) == 1
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment