Commit 4666b86b authored by jiangpeng's avatar jiangpeng Committed by vipwzw

p2p:fix wait group data race

parent 4f9789cb
...@@ -81,6 +81,7 @@ type P2p struct { ...@@ -81,6 +81,7 @@ type P2p struct {
subCfg *subConfig subCfg *subConfig
mgr *p2p.Manager mgr *p2p.Manager
subChan chan interface{} subChan chan interface{}
lock sync.Mutex
} }
// New produce a p2p object // New produce a p2p object
...@@ -147,6 +148,8 @@ func (network *P2p) isRestart() bool { ...@@ -147,6 +148,8 @@ func (network *P2p) isRestart() bool {
//CloseP2P Close network client //CloseP2P Close network client
func (network *P2p) CloseP2P() { func (network *P2p) CloseP2P() {
network.lock.Lock()
defer network.lock.Unlock()
log.Info("p2p network start shutdown") log.Info("p2p network start shutdown")
atomic.StoreInt32(&network.closed, 1) atomic.StoreInt32(&network.closed, 1)
//等待业务协程停止 //等待业务协程停止
...@@ -321,6 +324,8 @@ func (network *P2p) genAirDropKeyFromWallet() error { ...@@ -321,6 +324,8 @@ func (network *P2p) genAirDropKeyFromWallet() error {
//ReStart p2p //ReStart p2p
func (network *P2p) ReStart() { func (network *P2p) ReStart() {
network.lock.Lock()
defer network.lock.Unlock()
//避免重复 //避免重复
if !atomic.CompareAndSwapInt32(&network.restart, 0, 1) { if !atomic.CompareAndSwapInt32(&network.restart, 0, 1) {
return return
...@@ -400,6 +405,11 @@ func (network *P2p) subP2pMsg() { ...@@ -400,6 +405,11 @@ func (network *P2p) subP2pMsg() {
func (network *P2p) processEvent(msg *queue.Message, taskIdx int64, eventFunc p2pEventFunc) { func (network *P2p) processEvent(msg *queue.Message, taskIdx int64, eventFunc p2pEventFunc) {
network.lock.Lock()
defer network.lock.Unlock()
if network.isClose() {
return
}
//检测重启标志,停止分发事件,需要等待重启 //检测重启标志,停止分发事件,需要等待重启
if network.isRestart() { if network.isRestart() {
log.Info("wait for p2p restart....") log.Info("wait for p2p restart....")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment