Commit 3d6bab63 authored by liuyuhang's avatar liuyuhang Committed by vipwzw

add judge para blackwhite exec addr

1、将chain33中的 fix IsDriverAddress interface #526修改加入 2、放开blackwhite 的rpc test 注释 3、修改运行平行链时候mempool中配置为para modify linter err and enough amount address modify bw in para bug modify bug modify bug add new vrf
parent 768082ad
......@@ -8,7 +8,7 @@ function dapp_test_rpc() {
echo "============ # dapp rpc test begin ============="
if [ -d dapptest ]; then
cd dapptest || return
dir=$(find . -maxdepth 1 -type d ! -name dapptest ! -name blackwhite ! -name . | sed 's/^\.\///')
dir=$(find . -maxdepth 1 -type d ! -name dapptest ! -name . | sed 's/^\.\///')
for app in $dir; do
echo "=========== # $app rpc test ============="
./"$app/${RPC_TESTFILE}" "$ip"
......
......@@ -24,6 +24,19 @@ glAddr=""
gameAddr1=""
gameAddr2=""
gameAddr3=""
bwExecAddr=""
init() {
ispara=$(echo '"'"${MAIN_HTTP}"'"' | jq '.|contains("8901")')
echo "ipara=$ispara"
if [ "$ispara" == true ]; then
bwExecAddr=$(curl -ksd '{"method":"Chain33.ConvertExectoAddr","params":[{"execname":"user.p.para.blackwhite"}]}' ${MAIN_HTTP} | jq -r ".result")
else
bwExecAddr=$(curl -ksd '{"method":"Chain33.ConvertExectoAddr","params":[{"execname":"blackwhite"}]}' ${MAIN_HTTP} | jq -r ".result")
fi
echo "bwExecAddr=$bwExecAddr"
}
chain33_NewAccount() {
label=$1
......@@ -58,8 +71,9 @@ chain33_SendToAddress() {
from=$1
to=$2
amount=$3
http=$4
note="test"
resp=$(curl -ksd '{"jsonrpc":"2.0","id":2,"method":"Chain33.SendToAddress","params":[{"from":"'"$from"'","to":"'"$to"'","amount":'"$amount"',"note":"'"$note"'"}]}' -H 'content-type:text/plain;' ${MAIN_HTTP})
resp=$(curl -ksd '{"jsonrpc":"2.0","id":2,"method":"Chain33.SendToAddress","params":[{"from":"'"$from"'","to":"'"$to"'","amount":'"$amount"',"note":"'"$note"'"}]}' -H 'content-type:text/plain;' "${http}")
ok=$(jq '(.error|not)' <<<"$resp")
[ "$ok" == true ]
rst=$?
......@@ -195,17 +209,24 @@ function run_testcases() {
#给每个账户分别转帐
origAddr="12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv"
chain33_SendToAddress "${origAddr}" "${gameAddr1}" 1000000000
chain33_SendToAddress "${origAddr}" "${gameAddr2}" 1000000000
chain33_SendToAddress "${origAddr}" "${gameAddr3}" 1000000000
#主链中相应账户需要转帐
M_HTTP=${MAIN_HTTP//8901/8801}
chain33_SendToAddress "${origAddr}" "${gameAddr1}" 1000000000 "${M_HTTP}"
chain33_SendToAddress "${origAddr}" "${gameAddr2}" 1000000000 "${M_HTTP}"
chain33_SendToAddress "${origAddr}" "${gameAddr3}" 1000000000 "${M_HTTP}"
#平行链相应账户需要转帐
chain33_SendToAddress "${origAddr}" "${gameAddr1}" 1000000000 "${MAIN_HTTP}"
chain33_SendToAddress "${origAddr}" "${gameAddr2}" 1000000000 "${MAIN_HTTP}"
chain33_SendToAddress "${origAddr}" "${gameAddr3}" 1000000000 "${MAIN_HTTP}"
block_wait 1
#给游戏合约中转帐
bwExecAddr="146wei89zoX5TNQKATBJmduNPEtSKTXi1z"
chain33_SendToAddress "${gameAddr1}" "${bwExecAddr}" 500000000
chain33_SendToAddress "${gameAddr2}" "${bwExecAddr}" 500000000
chain33_SendToAddress "${gameAddr3}" "${bwExecAddr}" 500000000
chain33_SendToAddress "${gameAddr1}" "${bwExecAddr}" 500000000 "${MAIN_HTTP}"
chain33_SendToAddress "${gameAddr2}" "${bwExecAddr}" 500000000 "${MAIN_HTTP}"
chain33_SendToAddress "${gameAddr3}" "${bwExecAddr}" 500000000 "${MAIN_HTTP}"
block_wait 1
blackwhite_BlackwhiteCreateTx "${gameAddr1}"
......@@ -233,6 +254,7 @@ function main() {
MAIN_HTTP="$1"
echo "main_ip=$MAIN_HTTP"
init
run_testcases
if [ -n "$CASE_ERR" ]; then
......
......@@ -5,7 +5,7 @@
package rpc
import (
context "golang.org/x/net/context"
"golang.org/x/net/context"
"github.com/33cn/chain33/types"
bw "github.com/33cn/plugin/plugin/dapp/blackwhite/types"
......@@ -19,7 +19,7 @@ func (c *channelClient) Create(ctx context.Context, head *bw.BlackwhiteCreate) (
tx := &types.Transaction{
Payload: types.Encode(val),
}
data, err := types.FormatTxEncode(string(bw.ExecerBlackwhite), tx)
data, err := types.FormatTxEncode(types.ExecName(string(bw.ExecerBlackwhite)), tx)
if err != nil {
return nil, err
}
......@@ -34,7 +34,7 @@ func (c *channelClient) Show(ctx context.Context, head *bw.BlackwhiteShow) (*typ
tx := &types.Transaction{
Payload: types.Encode(val),
}
data, err := types.FormatTxEncode(string(bw.ExecerBlackwhite), tx)
data, err := types.FormatTxEncode(types.ExecName(string(bw.ExecerBlackwhite)), tx)
if err != nil {
return nil, err
}
......@@ -49,7 +49,7 @@ func (c *channelClient) Play(ctx context.Context, head *bw.BlackwhitePlay) (*typ
tx := &types.Transaction{
Payload: types.Encode(val),
}
data, err := types.FormatTxEncode(string(bw.ExecerBlackwhite), tx)
data, err := types.FormatTxEncode(types.ExecName(string(bw.ExecerBlackwhite)), tx)
if err != nil {
return nil, err
}
......@@ -64,7 +64,7 @@ func (c *channelClient) TimeoutDone(ctx context.Context, head *bw.BlackwhiteTime
tx := &types.Transaction{
Payload: types.Encode(val),
}
data, err := types.FormatTxEncode(string(bw.ExecerBlackwhite), tx)
data, err := types.FormatTxEncode(types.ExecName(string(bw.ExecerBlackwhite)), tx)
if err != nil {
return nil, err
}
......
......@@ -250,10 +250,10 @@ func (a *AddrBook) loadDb() bool {
if err != nil {
panic(err)
}
return false
}
a.setKey(string(privkey), a.genPubkey(string(privkey)))
} else {
a.setKey(string(privkey), a.genPubkey(string(privkey)))
}
iteror := a.bookDb.Iterator(nil, nil, false)
for iteror.Next() {
......@@ -391,6 +391,18 @@ func (a *AddrBook) setKey(privkey, pubkey string) {
}
//ResetPeerkey reset priv,pub key
func (a *AddrBook) ResetPeerkey(privkey, pubkey string) {
a.keymtx.Lock()
defer a.keymtx.Unlock()
a.privkey = privkey
a.pubkey = pubkey
err := a.bookDb.Set([]byte(privKeyTag), []byte(privkey))
if err != nil {
panic(err)
}
}
// GetPrivPubKey return privkey and pubkey
func (a *AddrBook) GetPrivPubKey() (string, string) {
a.keymtx.Lock()
......
......@@ -26,6 +26,10 @@ func (n *Node) destroyPeer(peer *Peer) {
func (n *Node) monitorErrPeer() {
for {
peer := <-n.nodeInfo.monitorChan
if peer == nil {
log.Info("monitorChan close")
return
}
if !peer.version.IsSupport() {
//如果版本不支持,直接删除节点
log.Info("VersoinMonitor", "NotSupport,addr", peer.Addr())
......@@ -260,6 +264,10 @@ func (n *Node) nodeReBalance() {
defer ticker.Stop()
for {
if n.isClose() {
log.Debug("nodeReBalance", "loop", "done")
return
}
<-ticker.C
log.Info("nodeReBalance", "cacheSize", n.CacheBoundsSize())
......@@ -362,7 +370,10 @@ func (n *Node) monitorPeers() {
defer ticker.Stop()
_, selfName := n.nodeInfo.addrBook.GetPrivPubKey()
for {
if n.isClose() {
log.Debug("monitorPeers", "loop", "done")
return
}
<-ticker.C
localBlockHeight, err := p2pcli.GetBlockHeight(n.nodeInfo)
if err != nil {
......
......@@ -158,6 +158,8 @@ func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) {
log.Debug("grpc DialCon", "did not connect", err, "addr", na.String())
return nil, err
}
//p2p version check 通过版本协议,获取通信session
//判断是否对方是否支持压缩
cli := pb.NewP2PgserviceClient(conn)
_, err = cli.GetHeaders(context.Background(), &pb.P2PGetHeaders{StartHeight: 0, EndHeight: 0, Version: version}, grpc.FailFast(true))
......@@ -186,7 +188,5 @@ func (na *NetAddress) DialTimeout(version int32) (*grpc.ClientConn, error) {
return nil, err
}
//p2p version check
return conn, nil
}
......@@ -33,6 +33,7 @@ func (n *Node) Start() {
}
n.detectNodeAddr()
n.monitor()
atomic.StoreInt32(&n.closed, 0)
go n.doNat()
}
......@@ -45,12 +46,14 @@ func (n *Node) Close() {
}
log.Debug("stop", "listen", "closed")
n.nodeInfo.addrBook.Close()
n.nodeInfo.monitorChan <- nil
log.Debug("stop", "addrBook", "closed")
n.removeAll()
if Filter != nil {
Filter.Close()
}
n.deleteNatMapPort()
log.Info("stop", "PeerRemoeAll", "closed")
}
......
......@@ -7,9 +7,11 @@ package p2p
import (
"fmt"
"math/rand"
"sync/atomic"
"time"
"github.com/33cn/chain33/client"
l "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
......@@ -24,13 +26,17 @@ var (
// P2p interface
type P2p struct {
api client.QueueProtocolAPI
client queue.Client
node *Node
p2pCli EventInterface
txCapcity int32
txFactory chan struct{}
otherFactory chan struct{}
waitRestart chan struct{}
closed int32
restart int32
cfg *types.P2P
}
// New produce a p2p object
......@@ -71,7 +77,9 @@ func New(cfg *types.P2P) *P2p {
p2p.p2pCli = NewP2PCli(p2p)
p2p.txFactory = make(chan struct{}, 1000) // 1000 task
p2p.otherFactory = make(chan struct{}, 1000) //other task 1000
p2p.waitRestart = make(chan struct{}, 1)
p2p.txCapcity = 1000
p2p.cfg = cfg
return p2p
}
......@@ -82,6 +90,10 @@ func (network *P2p) isClose() bool {
return atomic.LoadInt32(&network.closed) == 1
}
func (network *P2p) isRestart() bool {
return atomic.LoadInt32(&network.restart) == 1
}
// Close network client
func (network *P2p) Close() {
atomic.StoreInt32(&network.closed, 1)
......@@ -89,25 +101,48 @@ func (network *P2p) Close() {
network.node.Close()
log.Debug("close", "node", "done")
if network.client != nil {
network.client.Close()
if !network.isRestart() {
network.client.Close()
}
}
network.node.pubsub.Shutdown()
}
// SetQueueClient set the queue
func (network *P2p) SetQueueClient(client queue.Client) {
network.client = client
network.node.SetQueueClient(client)
go func() {
log.Info("p2p", "setqueuecliet", "ok")
network.node.Start()
network.subP2pMsg()
err := network.loadP2PPrivKeyToWallet()
if err != nil {
return
func (network *P2p) SetQueueClient(cli queue.Client) {
var err error
if network.client == nil {
network.client = cli
}
network.api, err = client.New(cli, nil)
if err != nil {
panic("SetQueueClient client.New err")
}
network.node.SetQueueClient(cli)
go func(p2p *P2p) {
p2p.node.Start()
if p2p.isRestart() {
//reset
atomic.StoreInt32(&p2p.closed, 0)
atomic.StoreInt32(&p2p.restart, 0)
network.waitRestart <- struct{}{}
} else {
p2p.subP2pMsg()
go p2p.loadP2PPrivKeyToWallet()
go p2p.genAirDropKeyFromWallet()
}
}()
log.Debug("SetQueueClient gorountine ret")
}(network)
}
func (network *P2p) showTaskCapcity() {
......@@ -124,6 +159,93 @@ func (network *P2p) showTaskCapcity() {
}
}
func (network *P2p) genAirDropKeyFromWallet() error {
for {
if network.isClose() {
return nil
}
msg := network.client.NewMessage("wallet", types.EventGetWalletStatus, nil)
err := network.client.SendTimeout(msg, true, time.Minute)
if err != nil {
log.Error("genAirDropKeyFromWallet", "Error", err.Error())
time.Sleep(time.Second)
continue
}
resp, err := network.client.WaitTimeout(msg, time.Minute)
if err != nil {
time.Sleep(time.Second)
continue
}
if resp.GetData().(*types.WalletStatus).GetIsWalletLock() { //上锁
time.Sleep(time.Second)
continue
}
if !resp.GetData().(*types.WalletStatus).GetIsHasSeed() { //无种子
time.Sleep(time.Second)
continue
}
break
}
r := rand.New(rand.NewSource(types.Now().Unix()))
var minIndex int32 = 100000000
randIndex := minIndex + r.Int31n(1000000)
reqIndex := &types.Int32{Data: randIndex}
msg, err := network.api.ExecWalletFunc("wallet", "NewAccountByIndex", reqIndex)
if err != nil {
log.Error("genAirDropKeyFromWallet", "err", err)
return err
}
var hexPrivkey string
if reply, ok := msg.(*types.ReplyString); !ok {
log.Error("genAirDropKeyFromWallet", "wrong format data", "")
panic(err)
} else {
hexPrivkey = reply.GetData()
}
log.Info("genAirDropKeyFromWallet", "hexprivkey", hexPrivkey)
if hexPrivkey[:2] == "0x" {
hexPrivkey = hexPrivkey[2:]
}
hexPubkey, err := P2pComm.Pubkey(hexPrivkey)
if err != nil {
log.Error("genAirDropKeyFromWallet", "gen pub error", err)
panic(err)
}
log.Info("genAirDropKeyFromWallet", "pubkey", hexPubkey)
_, pub := network.node.nodeInfo.addrBook.GetPrivPubKey()
if pub == hexPubkey {
return nil
}
//覆盖addrbook 中的公私钥对
network.node.nodeInfo.addrBook.ResetPeerkey(hexPrivkey, hexPubkey)
//重启p2p模块
log.Info("genAirDropKeyFromWallet", "p2p will Restart....")
network.ReStart()
return nil
}
// ReStart p2p
func (network *P2p) ReStart() {
atomic.StoreInt32(&network.restart, 1)
network.Close()
node, err := NewNode(network.cfg) //创建新的node节点
if err != nil {
panic(err.Error())
}
network.node = node
network.SetQueueClient(network.client)
}
func (network *P2p) loadP2PPrivKeyToWallet() error {
for {
......@@ -193,15 +315,23 @@ func (network *P2p) subP2pMsg() {
go network.showTaskCapcity()
go func() {
defer func() {
close(network.otherFactory)
close(network.txFactory)
}()
var taskIndex int64
network.client.Sub("p2p")
for msg := range network.client.Recv() {
if network.isRestart() {
//wait for restart
log.Info("waitp2p restart....")
<-network.waitRestart
log.Info("p2p restart ok....")
}
if network.isClose() {
log.Debug("subP2pMsg", "loop", "done")
close(network.otherFactory)
close(network.txFactory)
return
}
taskIndex++
......
......@@ -13,6 +13,7 @@ import (
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/wallet"
//"github.com/33cn/chain33/util/testnode"
"github.com/stretchr/testify/assert"
......@@ -33,6 +34,36 @@ func init() {
p2pModule = initP2p(33802, dataDir)
p2pModule.Wait()
go func() {
cfg, sub := types.InitCfg("../cmd/chain33/chain33.test.toml")
wcli := wallet.New(cfg.Wallet, sub.Wallet)
client := q.Client()
wcli.SetQueueClient(client)
//导入种子,解锁钱包
password := "a12345678"
seed := "cushion canal bitter result harvest sentence ability time steel basket useful ask depth sorry area course purpose search exile chapter mountain project ranch buffalo"
saveSeedByPw := &types.SaveSeedByPw{Seed: seed, Passwd: password}
msgSaveEmpty := client.NewMessage("wallet", types.EventSaveSeed, saveSeedByPw)
client.Send(msgSaveEmpty, true)
_, err := client.Wait(msgSaveEmpty)
if err != nil {
return
}
walletUnLock := &types.WalletUnLock{
Passwd: password,
Timeout: 0,
WalletOrTicket: false,
}
msgUnlock := client.NewMessage("wallet", types.EventWalletUnLock, walletUnLock)
client.Send(msgUnlock, true)
_, err = client.Wait(msgUnlock)
if err != nil {
return
}
}()
go func() {
blockchainKey := "blockchain"
client := q.Client()
client.Sub(blockchainKey)
......@@ -163,7 +194,7 @@ func TestPeer(t *testing.T) {
assert.IsType(t, "string", peer.GetPeerName())
localP2P.node.AddCachePeer(peer)
//
peer.GetRunning()
localP2P.node.natOk()
localP2P.node.flushNodePort(43803, 43802)
p2pcli := NewNormalP2PCli()
......@@ -193,6 +224,8 @@ func TestPeer(t *testing.T) {
_, err = p2pcli.GetAddr(peer)
assert.Nil(t, err)
localP2P.node.pubsub.FIFOPub(&types.P2PTx{Tx: &types.Transaction{}}, "tx")
localP2P.node.pubsub.FIFOPub(&types.P2PBlock{Block: &types.Block{}}, "block")
// //测试获取高度
height, err := p2pcli.GetBlockHeight(localP2P.node.nodeInfo)
assert.Nil(t, err)
......@@ -204,7 +237,12 @@ func TestPeer(t *testing.T) {
job.GetFreePeer(1)
var ins []*types.Inventory
var inv types.Inventory
inv.Ty = msgBlock
inv.Height = 2
ins = append(ins, &inv)
var bChan = make(chan *types.BlockPid, 256)
job.syncDownloadBlock(peer, ins[0], bChan)
respIns := job.DownloadBlock(ins, bChan)
t.Log(respIns)
job.ResetDownloadPeers([]*Peer{peer})
......@@ -277,6 +315,7 @@ func TestGrpcStreamConns(t *testing.T) {
_, err = cli.ServerStreamSend(context.Background(), ping)
assert.Nil(t, err)
_, err = cli.ServerStreamRead(context.Background())
assert.Nil(t, err)
var emptyBlock types.P2PBlock
......@@ -359,6 +398,7 @@ func TestAddrBook(t *testing.T) {
assert.Equal(t, addrBook.genPubkey(hex.EncodeToString(prv)), pubstr)
addrBook.Save()
addrBook.GetAddrs()
addrBook.ResetPeerkey(hex.EncodeToString(prv), pubstr)
}
func TestBytesToInt32(t *testing.T) {
......@@ -393,8 +433,15 @@ func TestP2pListen(t *testing.T) {
listen1.Close()
listen2.Close()
}
func TestP2pRestart(t *testing.T) {
assert.Equal(t, false, p2pModule.isClose())
assert.Equal(t, false, p2pModule.isRestart())
p2pModule.ReStart()
}
func TestP2pClose(t *testing.T) {
p2pModule.Wait()
p2pModule.Close()
os.RemoveAll(dataDir)
}
......@@ -364,6 +364,7 @@ func (m *Cli) GetHeaders(msg *queue.Message, taskindex int64) {
msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{Msg: []byte("no pid")}))
return
}
msg.Reply(m.network.client.NewMessage("blockchain", pb.EventReply, pb.Reply{IsOk: true, Msg: []byte("ok")}))
peers, infos := m.network.node.GetActivePeers()
for paddr, info := range infos {
......@@ -411,6 +412,7 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
req := msg.GetData().(*pb.ReqBlocks)
log.Info("GetBlocks", "start", req.GetStart(), "end", req.GetEnd())
pids := req.GetPid()
log.Info("GetBlocks", "pids", pids)
var Inventorys = make([]*pb.Inventory, 0)
for i := req.GetStart(); i <= req.GetEnd(); i++ {
var inv pb.Inventory
......@@ -424,7 +426,7 @@ func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
var downloadPeers []*Peer
peers, infos := m.network.node.GetActivePeers()
if len(pids) > 0 && pids[0] != "" { //指定Pid 下载数据
log.Info("fetch from peer in pids")
log.Info("fetch from peer in pids", "pids", pids)
var pidmap = make(map[string]bool)
for _, pid := range pids {
pidmap[pid] = true
......
......@@ -89,7 +89,6 @@ func (s *P2pserver) Ping(ctx context.Context, in *pb.P2PPing) (*pb.P2PPong, erro
}
}
log.Debug("Send Pong", "Nonce", in.GetNonce())
return &pb.P2PPong{Nonce: in.GetNonce()}, nil
......@@ -413,11 +412,28 @@ func (s *P2pserver) ServerStreamSend(in *pb.P2PPing, stream pb.P2Pgservice_Serve
log.Debug("ServerStreamSend")
peername := hex.EncodeToString(in.GetSign().GetPubkey())
defer s.deleteInBoundPeerInfo(peername)
defer func() { s.deleteSChan <- stream }()
dataChain := s.addStreamHandler(stream)
for data := range dataChain {
if s.IsClose() {
return fmt.Errorf("node close")
}
innerpeer := s.getInBoundPeerInfo(peername)
if innerpeer != nil {
if !s.checkVersion(innerpeer.p2pversion) {
log.Error("ServerStreamSend CheckVersion", "version", innerpeer.p2pversion)
if innerpeer.p2pversion == 0 {
return fmt.Errorf("version empty")
}
return pb.ErrVersion
}
} else {
return fmt.Errorf("no peer info")
}
p2pdata := new(pb.BroadCastData)
if block, ok := data.(*pb.P2PBlock); ok {
if block.GetBlock() != nil {
......@@ -441,8 +457,8 @@ func (s *P2pserver) ServerStreamSend(in *pb.P2PPing, stream pb.P2Pgservice_Serve
err := stream.Send(p2pdata)
if err != nil {
s.deleteSChan <- stream
s.deleteInBoundPeerInfo(peername)
//s.deleteSChan <- stream
//s.deleteInBoundPeerInfo(peername)
return err
}
}
......@@ -465,12 +481,15 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
return fmt.Errorf("ctx.Addr format err")
}
} else {
return fmt.Errorf("getctx err")
}
var hash [64]byte
var peeraddr, peername string
defer s.deleteInBoundPeerInfo(peername)
defer stream.SendAndClose(&pb.ReqNil{})
var in = new(pb.BroadCastData)
for {
......@@ -484,6 +503,17 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
}
if block := in.GetBlock(); block != nil {
innerpeer := s.getInBoundPeerInfo(peername)
if innerpeer != nil {
log.Error("ServerStreamRead CheckVersion", "version", innerpeer.p2pversion, "ip", remoteIP)
if !s.checkVersion(innerpeer.p2pversion) {
return pb.ErrVersion
}
} else {
log.Error("ServerStreamRead", "no peer info", "")
return fmt.Errorf("no peer info")
}
hex.Encode(hash[:], block.GetBlock().Hash())
blockhash := string(hash[:])
......@@ -507,6 +537,15 @@ func (s *P2pserver) ServerStreamRead(stream pb.P2Pgservice_ServerStreamReadServe
}
} else if tx := in.GetTx(); tx != nil {
innerpeer := s.getInBoundPeerInfo(peername)
if innerpeer != nil {
if !s.checkVersion(innerpeer.p2pversion) {
return pb.ErrVersion
}
} else {
return fmt.Errorf("no peer info")
}
hex.Encode(hash[:], tx.GetTx().Hash())
txhash := string(hash[:])
log.Debug("ServerStreamRead", "txhash:", txhash)
......
......@@ -181,7 +181,7 @@ func (p *Peer) sendStream() {
//Stream Send data
for {
if !p.GetRunning() {
log.Info("sendStream peer is not running")
log.Info("sendStream peer connect closed", "peerid", p.GetPeerName())
return
}
ctx, cancel := context.WithCancel(context.Background())
......@@ -239,6 +239,9 @@ func (p *Peer) sendStream() {
SEND_LOOP:
for {
if !p.GetRunning() {
return
}
select {
case task := <-p.taskChan:
if !p.GetRunning() {
......@@ -247,7 +250,7 @@ func (p *Peer) sendStream() {
log.Error("CloseSend", "err", errs)
}
cancel()
log.Error("sendStream peer is not running")
log.Error("sendStream peer connect closed", "peerName", p.GetPeerName())
return
}
p2pdata := new(pb.BroadCastData)
......@@ -263,6 +266,7 @@ func (p *Peer) sendStream() {
log.Debug("sendStream", "find peer height>this broadblock ,send process", "break")
continue
}
}
p2pdata.Value = &pb.BroadCastData_Block{Block: block}
......@@ -344,6 +348,7 @@ func (p *Peer) readStream() {
}
return
}
data, err := resp.Recv()
P2pComm.CollectPeerStat(err, p)
if err != nil {
......@@ -425,6 +430,10 @@ func (p *Peer) readStream() {
// GetRunning get running ok or not
func (p *Peer) GetRunning() bool {
if p.node.isClose() {
return false
}
return atomic.LoadInt32(&p.isclose) != 1
}
......
......@@ -5,6 +5,8 @@
package store
import (
"sync"
dbm "github.com/33cn/chain33/common/db"
clog "github.com/33cn/chain33/common/log"
log "github.com/33cn/chain33/common/log/log15"
......@@ -61,6 +63,7 @@ type BaseStore struct {
qclient queue.Client
done chan struct{}
child SubStore
wg sync.WaitGroup
}
// NewBaseStore new base store struct
......@@ -94,7 +97,9 @@ func (store *BaseStore) Wait() {}
func (store *BaseStore) processMessage(msg *queue.Message) {
client := store.qclient
if msg.Ty == types.EventStoreSet {
store.wg.Add(1)
go func() {
defer store.wg.Done()
datas := msg.GetData().(*types.StoreSetWithSync)
hash, err := store.child.Set(datas.Storeset, datas.Sync)
if err != nil {
......@@ -104,13 +109,17 @@ func (store *BaseStore) processMessage(msg *queue.Message) {
msg.Reply(client.NewMessage("", types.EventStoreSetReply, &types.ReplyHash{Hash: hash}))
}()
} else if msg.Ty == types.EventStoreGet {
store.wg.Add(1)
go func() {
defer store.wg.Done()
datas := msg.GetData().(*types.StoreGet)
values := store.child.Get(datas)
msg.Reply(client.NewMessage("", types.EventStoreGetReply, &types.StoreReplyValue{Values: values}))
}()
} else if msg.Ty == types.EventStoreMemSet { //只是在内存中set 一下,并不改变状态
store.wg.Add(1)
go func() {
defer store.wg.Done()
datas := msg.GetData().(*types.StoreSetWithSync)
var hash []byte
var err error
......@@ -126,7 +135,9 @@ func (store *BaseStore) processMessage(msg *queue.Message) {
msg.Reply(client.NewMessage("", types.EventStoreSetReply, &types.ReplyHash{Hash: hash}))
}()
} else if msg.Ty == types.EventStoreCommit { //把内存中set 的交易 commit
store.wg.Add(1)
go func() {
defer store.wg.Done()
req := msg.GetData().(*types.ReqHash)
var hash []byte
var err error
......@@ -145,7 +156,9 @@ func (store *BaseStore) processMessage(msg *queue.Message) {
}
}()
} else if msg.Ty == types.EventStoreRollback {
store.wg.Add(1)
go func() {
defer store.wg.Done()
req := msg.GetData().(*types.ReqHash)
hash, err := store.child.Rollback(req)
if err != nil {
......@@ -155,7 +168,9 @@ func (store *BaseStore) processMessage(msg *queue.Message) {
}
}()
} else if msg.Ty == types.EventStoreGetTotalCoins {
store.wg.Add(1)
go func() {
defer store.wg.Done()
req := msg.GetData().(*types.IterateRangeByStateHash)
resp := &types.ReplyGetTotalCoins{}
resp.Count = req.Count
......@@ -163,7 +178,9 @@ func (store *BaseStore) processMessage(msg *queue.Message) {
msg.Reply(client.NewMessage("", types.EventGetTotalCoinsReply, resp))
}()
} else if msg.Ty == types.EventStoreDel {
store.wg.Add(1)
go func() {
defer store.wg.Done()
req := msg.GetData().(*types.StoreDel)
hash, err := store.child.Del(req)
if err != nil {
......@@ -173,13 +190,19 @@ func (store *BaseStore) processMessage(msg *queue.Message) {
}
}()
} else if msg.Ty == types.EventStoreList {
store.wg.Add(1)
go func() {
defer store.wg.Done()
req := msg.GetData().(*types.StoreList)
query := NewStoreListQuery(store.child, req)
msg.Reply(client.NewMessage("", types.EventStoreListReply, query.Run()))
}()
} else {
go store.child.ProcEvent(msg)
store.wg.Add(1)
go func() {
defer store.wg.Done()
store.child.ProcEvent(msg)
}()
}
}
......@@ -193,6 +216,7 @@ func (store *BaseStore) Close() {
if store.qclient != nil {
store.qclient.Close()
<-store.done
store.wg.Wait()
}
store.db.Close()
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment