Commit a3a0d677 authored by mdj33's avatar mdj33 Committed by vipwzw

add timeout for multi server req

parent 24b3e317
......@@ -95,7 +95,7 @@ targetTimePerBlock = 16
[consensus.sub.para]
#主链节点的grpc服务器ip,当前可以支持多ip负载均衡,如“101.37.227.226:8802,39.97.20.242:8802,47.107.15.126:8802,jiedian2.bityuan.com,cloud.bityuan.com”
#ParaRemoteGrpcClient="101.37.227.226:8802,39.97.20.242:8802,47.107.15.126:8802,jiedian2.bityuan.com,cloud.bityuan.com"
#ParaRemoteGrpcClient="183.129.226.74:8802,183.129.226.75:8802,101.37.227.226:8802,39.97.20.242:8802,47.107.15.126:8802,jiedian2.bityuan.com,cloud.bityuan.com"
ParaRemoteGrpcClient="localhost:8802"
#主链指定高度的区块开始同步
startHeight=345850
......
......@@ -91,6 +91,7 @@ type subConfig struct {
MultiDownloadOpen int32 `json:"multiDownloadOpen,omitempty"`
MultiDownInvNumPerJob int64 `json:"multiDownInvNumPerJob,omitempty"`
MultiDownJobBuffNum uint32 `json:"multiDownJobBuffNum,omitempty"`
MultiDownServerRspTime uint32 `json:"multiDownServerRspTime,omitempty"`
}
// New function to init paracross env
......@@ -199,9 +200,10 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
}
para.multiDldCli = &multiDldClient{
paraClient: para,
invNumPerJob: defaultInvNumPerJob,
jobBufferNum: defaultJobBufferNum,
paraClient: para,
invNumPerJob: defaultInvNumPerJob,
jobBufferNum: defaultJobBufferNum,
serverTimeout: maxServerRspTimeout,
}
if subcfg.MultiDownInvNumPerJob > 0 {
para.multiDldCli.invNumPerJob = subcfg.MultiDownInvNumPerJob
......@@ -213,6 +215,10 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
para.multiDldCli.multiDldOpen = true
}
if subcfg.MultiDownServerRspTime > 0 {
para.multiDldCli.serverTimeout = subcfg.MultiDownServerRspTime
}
c.SetChild(para)
return para
}
......
......@@ -507,6 +507,8 @@ func (client *client) procLocalBlocks(mainBlocks *types.ParaTxDetails) error {
}
func (client *client) CreateBlock() {
defer client.wg.Done()
client.multiDldCli.tryMultiServerDownload()
lastSeq, lastSeqMainHash, err := client.getLastLocalBlockSeq()
......@@ -563,5 +565,4 @@ out:
}
plog.Info("para CreateBlock quit")
client.wg.Done()
}
......@@ -8,10 +8,10 @@ import (
"context"
"sync"
"strings"
"time"
"strings"
"github.com/33cn/chain33/rpc/grpcclient"
"github.com/33cn/chain33/types"
)
......@@ -23,12 +23,14 @@ const (
maxBlockSize = 20000000 // 单次1000block size累积超过20M 需保存到localdb
downTimesFastThreshold = 450 // 单个server 下载超过450次,平均20次用20s来计算,下载7分钟左右检查有没有差别比较大的
downTimesSlowThreshold = 30 // 慢的server小于30次,则小于快server的15倍,需要剔除
maxServerRspTimeout = 15
)
type connectCli struct {
ip string
conn types.Chain33Client
downTimes int64
timeout uint32
}
//invertory 是每次请求的最小单位,每次请求最多MaxBlockCountPerTime
......@@ -59,10 +61,12 @@ type multiDldClient struct {
paraClient *client
jobBufferNum uint32
invNumPerJob int64
serverTimeout uint32
conns []*connectCli
connsCheckDone bool
multiDldOpen bool
wg sync.WaitGroup
mtx sync.Mutex
}
func (m *multiDldClient) getInvs(startHeight, endHeight int64) []*inventory {
......@@ -84,25 +88,78 @@ func (m *multiDldClient) getInvs(startHeight, endHeight int64) []*inventory {
return invs
}
func (m *multiDldClient) tryMultiServerDownload() {
if !m.multiDldOpen {
func (m *multiDldClient) testConn(conn *connectCli, inv *inventory) {
defer m.wg.Done()
recv := make(chan bool, 1)
testInv := &inventory{start: inv.start, end: inv.end, curHeight: inv.start, connCli: conn}
go func() {
_, err := requestMainBlocks(testInv)
if err != nil {
plog.Info("multiServerDownload.testconn ip error", "ip", conn.ip, "err", err.Error())
recv <- false
return
}
recv <- true
}()
t := time.NewTimer(time.Second * time.Duration(conn.timeout))
select {
case <-t.C:
plog.Info("multiServerDownload.testconn ip timeout", "ip", conn.ip)
return
case ret := <-recv:
if ret {
m.mtx.Lock()
m.conns = append(m.conns, conn)
m.mtx.Unlock()
}
t.Stop()
return
}
}
func (m *multiDldClient) getConns(inv *inventory) error {
paraRemoteGrpcIps := types.Conf("config.consensus.sub.para").GStr("ParaRemoteGrpcClient")
ips := strings.Split(paraRemoteGrpcIps, ",")
var conns []*connectCli
for _, ip := range ips {
conn, err := grpcclient.NewMainChainClient(ip)
if err == nil {
conns = append(conns, &connectCli{conn: conn, ip: ip})
conns = append(conns, &connectCli{conn: conn, ip: ip, timeout: m.serverTimeout})
}
}
if len(conns) == 0 {
plog.Info("tryMultiServerDownload no valid ips")
return types.ErrNotFound
}
plog.Info("tryMultiServerDownload test connects, wait 15s...")
for _, conn := range conns {
m.wg.Add(1)
go m.testConn(conn, inv)
}
m.wg.Wait()
if len(m.conns) == 0 {
plog.Info("tryMultiServerDownload not valid ips")
return types.ErrNotFound
}
plog.Info("multiServerDownload test connects done")
for _, conn := range m.conns {
plog.Info("multiServerDownload ok ip", "ip", conn.ip)
}
return nil
}
func (m *multiDldClient) tryMultiServerDownload() {
if !m.multiDldOpen {
return
}
m.conns = conns
curMainHeight, err := m.paraClient.GetLastHeightOnMainChain()
if err != nil {
......@@ -125,6 +182,12 @@ func (m *multiDldClient) tryMultiServerDownload() {
return
}
//获取可用IP 链接
err = m.getConns(totalInvs[0])
if err != nil {
return
}
plog.Info("tryMultiServerDownload", "start", localBlock.MainHeight+1, "end", curMainHeight-maxRollbackHeight, "totalInvs", totalInvsNum)
jobsCh := make(chan *downloadJob, m.jobBufferNum)
......@@ -213,14 +276,15 @@ func (d *downloadJob) process() {
}
}
d.mDldCli.paraClient.blockSyncClient.handleLocalChangedMsg()
continue
}
//block需要严格顺序执行,数据库错误,panic 重新来过
err := d.mDldCli.paraClient.procLocalBlocks(inv.txs)
if err != nil {
panic(err)
} else {
//block需要严格顺序执行,数据库错误,panic 重新来过
err := d.mDldCli.paraClient.procLocalBlocks(inv.txs)
if err != nil {
panic(err)
}
}
//release memory
inv.txs = nil
}
}
......@@ -375,7 +439,7 @@ func (d *downloadJob) checkDownLoadRate() {
}
func (d *downloadJob) requestMainBlocks(inv *inventory) (*types.ParaTxDetails, error) {
func requestMainBlocks(inv *inventory) (*types.ParaTxDetails, error) {
req := &types.ReqParaTxByTitle{IsSeq: false, Start: inv.curHeight, End: inv.end, Title: types.GetTitle()}
txs, err := inv.connCli.conn.GetParaTxByTitle(context.Background(), req)
if err != nil {
......@@ -393,6 +457,33 @@ func (d *downloadJob) requestMainBlocks(inv *inventory) (*types.ParaTxDetails, e
return validMainBlocks(txs), nil
}
func requestMainBlockWithTime(inv *inventory) *types.ParaTxDetails {
retCh := make(chan *types.ParaTxDetails, 1)
go func() {
tx, err := requestMainBlocks(inv)
if err != nil {
plog.Error("requestMainBlockWithTime err", "start", inv.start, "end", inv.end, "ip", inv.connCli.ip, "err", err.Error())
close(retCh)
return
}
retCh <- tx
}()
t := time.NewTimer(time.Second * time.Duration(inv.connCli.timeout))
select {
case <-t.C:
plog.Debug("requestMainBlockWithTime timeout", "start", inv.start, "end", inv.end, "ip", inv.connCli.ip)
return nil
case ret, ok := <-retCh:
if !ok {
t.Stop()
return nil
}
t.Stop()
return ret
}
}
func (d *downloadJob) getInvBlocks(inv *inventory, connPool chan *connectCli) {
start := time.Now()
defer func() {
......@@ -401,16 +492,16 @@ func (d *downloadJob) getInvBlocks(inv *inventory, connPool chan *connectCli) {
}()
inv.curHeight = inv.start
plog.Debug("getInvBlocks begin", "start", inv.start, "end", inv.end)
plog.Debug("getInvBlocks begin", "start", inv.start, "end", inv.end, "ip", inv.connCli.ip)
for {
txs, err := d.requestMainBlocks(inv)
if err != nil || len(txs.Items) == 0 {
txs := requestMainBlockWithTime(inv)
if txs == nil || len(txs.Items) == 0 {
d.resetInv(inv)
plog.Error("getInvBlocks reqMainBlock error", "err", err, "ip", inv.connCli.ip)
plog.Error("getInvBlocks reqMainBlock nil", "ip", inv.connCli.ip)
return
}
err = d.verifyDownloadBlock(inv, txs)
err := d.verifyDownloadBlock(inv, txs)
if err != nil {
d.resetInv(inv)
return
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment