From a3a0d677256a8c9389dd396504c0120f8f7e41f8 Mon Sep 17 00:00:00 2001 From: mdj33 Date: Fri, 16 Aug 2019 22:53:55 +0800 Subject: [PATCH] add timeout for multi server req --- chain33.para.toml | 2 +- plugin/consensus/para/para.go | 12 +- plugin/consensus/para/paracreate.go | 3 +- plugin/consensus/para/paramultidownload.go | 129 ++++++++++++++++++--- 4 files changed, 122 insertions(+), 24 deletions(-) diff --git a/chain33.para.toml b/chain33.para.toml index a6d3bf0d..2913af08 100644 --- a/chain33.para.toml +++ b/chain33.para.toml @@ -95,7 +95,7 @@ targetTimePerBlock = 16 [consensus.sub.para] #主链节点的grpc服务器ip,当前可以支持多ip负载均衡,如“101.37.227.226:8802,39.97.20.242:8802,47.107.15.126:8802,jiedian2.bityuan.com,cloud.bityuan.com” -#ParaRemoteGrpcClient="101.37.227.226:8802,39.97.20.242:8802,47.107.15.126:8802,jiedian2.bityuan.com,cloud.bityuan.com" +#ParaRemoteGrpcClient="183.129.226.74:8802,183.129.226.75:8802,101.37.227.226:8802,39.97.20.242:8802,47.107.15.126:8802,jiedian2.bityuan.com,cloud.bityuan.com" ParaRemoteGrpcClient="localhost:8802" #主链指定高度的区块开始同步 startHeight=345850 diff --git a/plugin/consensus/para/para.go b/plugin/consensus/para/para.go index 08265d8d..38f13139 100644 --- a/plugin/consensus/para/para.go +++ b/plugin/consensus/para/para.go @@ -91,6 +91,7 @@ type subConfig struct { MultiDownloadOpen int32 `json:"multiDownloadOpen,omitempty"` MultiDownInvNumPerJob int64 `json:"multiDownInvNumPerJob,omitempty"` MultiDownJobBuffNum uint32 `json:"multiDownJobBuffNum,omitempty"` + MultiDownServerRspTime uint32 `json:"multiDownServerRspTime,omitempty"` } // New function to init paracross env @@ -199,9 +200,10 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { } para.multiDldCli = &multiDldClient{ - paraClient: para, - invNumPerJob: defaultInvNumPerJob, - jobBufferNum: defaultJobBufferNum, + paraClient: para, + invNumPerJob: defaultInvNumPerJob, + jobBufferNum: defaultJobBufferNum, + serverTimeout: maxServerRspTimeout, } if subcfg.MultiDownInvNumPerJob > 0 { para.multiDldCli.invNumPerJob = subcfg.MultiDownInvNumPerJob @@ -213,6 +215,10 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { para.multiDldCli.multiDldOpen = true } + if subcfg.MultiDownServerRspTime > 0 { + para.multiDldCli.serverTimeout = subcfg.MultiDownServerRspTime + } + c.SetChild(para) return para } diff --git a/plugin/consensus/para/paracreate.go b/plugin/consensus/para/paracreate.go index 0f22f428..03018cab 100644 --- a/plugin/consensus/para/paracreate.go +++ b/plugin/consensus/para/paracreate.go @@ -507,6 +507,8 @@ func (client *client) procLocalBlocks(mainBlocks *types.ParaTxDetails) error { } func (client *client) CreateBlock() { + defer client.wg.Done() + client.multiDldCli.tryMultiServerDownload() lastSeq, lastSeqMainHash, err := client.getLastLocalBlockSeq() @@ -563,5 +565,4 @@ out: } plog.Info("para CreateBlock quit") - client.wg.Done() } diff --git a/plugin/consensus/para/paramultidownload.go b/plugin/consensus/para/paramultidownload.go index a360beed..c9668d23 100644 --- a/plugin/consensus/para/paramultidownload.go +++ b/plugin/consensus/para/paramultidownload.go @@ -8,10 +8,10 @@ import ( "context" "sync" - "strings" - "time" + "strings" + "github.com/33cn/chain33/rpc/grpcclient" "github.com/33cn/chain33/types" ) @@ -23,12 +23,14 @@ const ( maxBlockSize = 20000000 // 单次1000block size累积超过20M 需保存到localdb downTimesFastThreshold = 450 // 单个server 下载超过450次,平均20次用20s来计算,下载7分钟左右检查有没有差别比较大的 downTimesSlowThreshold = 30 // 慢的server小于30次,则小于快server的15倍,需要剔除 + maxServerRspTimeout = 15 ) type connectCli struct { ip string conn types.Chain33Client downTimes int64 + timeout uint32 } //invertory 是每次请求的最小单位,每次请求最多MaxBlockCountPerTime @@ -59,10 +61,12 @@ type multiDldClient struct { paraClient *client jobBufferNum uint32 invNumPerJob int64 + serverTimeout uint32 conns []*connectCli connsCheckDone bool multiDldOpen bool wg sync.WaitGroup + mtx sync.Mutex } func (m *multiDldClient) getInvs(startHeight, endHeight int64) []*inventory { @@ -84,25 +88,78 @@ func (m *multiDldClient) getInvs(startHeight, endHeight int64) []*inventory { return invs } -func (m *multiDldClient) tryMultiServerDownload() { - if !m.multiDldOpen { +func (m *multiDldClient) testConn(conn *connectCli, inv *inventory) { + defer m.wg.Done() + + recv := make(chan bool, 1) + testInv := &inventory{start: inv.start, end: inv.end, curHeight: inv.start, connCli: conn} + + go func() { + _, err := requestMainBlocks(testInv) + if err != nil { + plog.Info("multiServerDownload.testconn ip error", "ip", conn.ip, "err", err.Error()) + recv <- false + return + } + recv <- true + }() + + t := time.NewTimer(time.Second * time.Duration(conn.timeout)) + select { + case <-t.C: + plog.Info("multiServerDownload.testconn ip timeout", "ip", conn.ip) + return + case ret := <-recv: + if ret { + m.mtx.Lock() + m.conns = append(m.conns, conn) + m.mtx.Unlock() + } + t.Stop() return } +} +func (m *multiDldClient) getConns(inv *inventory) error { paraRemoteGrpcIps := types.Conf("config.consensus.sub.para").GStr("ParaRemoteGrpcClient") ips := strings.Split(paraRemoteGrpcIps, ",") var conns []*connectCli for _, ip := range ips { conn, err := grpcclient.NewMainChainClient(ip) if err == nil { - conns = append(conns, &connectCli{conn: conn, ip: ip}) + conns = append(conns, &connectCli{conn: conn, ip: ip, timeout: m.serverTimeout}) } } + if len(conns) == 0 { + plog.Info("tryMultiServerDownload no valid ips") + return types.ErrNotFound + } + + plog.Info("tryMultiServerDownload test connects, wait 15s...") + for _, conn := range conns { + m.wg.Add(1) + go m.testConn(conn, inv) + } + m.wg.Wait() + + if len(m.conns) == 0 { plog.Info("tryMultiServerDownload not valid ips") + return types.ErrNotFound + } + + plog.Info("multiServerDownload test connects done") + for _, conn := range m.conns { + plog.Info("multiServerDownload ok ip", "ip", conn.ip) + } + + return nil +} + +func (m *multiDldClient) tryMultiServerDownload() { + if !m.multiDldOpen { return } - m.conns = conns curMainHeight, err := m.paraClient.GetLastHeightOnMainChain() if err != nil { @@ -125,6 +182,12 @@ func (m *multiDldClient) tryMultiServerDownload() { return } + //获取可用IP 链接 + err = m.getConns(totalInvs[0]) + if err != nil { + return + } + plog.Info("tryMultiServerDownload", "start", localBlock.MainHeight+1, "end", curMainHeight-maxRollbackHeight, "totalInvs", totalInvsNum) jobsCh := make(chan *downloadJob, m.jobBufferNum) @@ -213,14 +276,15 @@ func (d *downloadJob) process() { } } d.mDldCli.paraClient.blockSyncClient.handleLocalChangedMsg() - continue - } - - //block需要严格顺序执行,数据库错误,panic 重新来过 - err := d.mDldCli.paraClient.procLocalBlocks(inv.txs) - if err != nil { - panic(err) + } else { + //block需要严格顺序执行,数据库错误,panic 重新来过 + err := d.mDldCli.paraClient.procLocalBlocks(inv.txs) + if err != nil { + panic(err) + } } + //release memory + inv.txs = nil } } @@ -375,7 +439,7 @@ func (d *downloadJob) checkDownLoadRate() { } -func (d *downloadJob) requestMainBlocks(inv *inventory) (*types.ParaTxDetails, error) { +func requestMainBlocks(inv *inventory) (*types.ParaTxDetails, error) { req := &types.ReqParaTxByTitle{IsSeq: false, Start: inv.curHeight, End: inv.end, Title: types.GetTitle()} txs, err := inv.connCli.conn.GetParaTxByTitle(context.Background(), req) if err != nil { @@ -393,6 +457,33 @@ func (d *downloadJob) requestMainBlocks(inv *inventory) (*types.ParaTxDetails, e return validMainBlocks(txs), nil } +func requestMainBlockWithTime(inv *inventory) *types.ParaTxDetails { + retCh := make(chan *types.ParaTxDetails, 1) + go func() { + tx, err := requestMainBlocks(inv) + if err != nil { + plog.Error("requestMainBlockWithTime err", "start", inv.start, "end", inv.end, "ip", inv.connCli.ip, "err", err.Error()) + close(retCh) + return + } + retCh <- tx + }() + + t := time.NewTimer(time.Second * time.Duration(inv.connCli.timeout)) + select { + case <-t.C: + plog.Debug("requestMainBlockWithTime timeout", "start", inv.start, "end", inv.end, "ip", inv.connCli.ip) + return nil + case ret, ok := <-retCh: + if !ok { + t.Stop() + return nil + } + t.Stop() + return ret + } +} + func (d *downloadJob) getInvBlocks(inv *inventory, connPool chan *connectCli) { start := time.Now() defer func() { @@ -401,16 +492,16 @@ func (d *downloadJob) getInvBlocks(inv *inventory, connPool chan *connectCli) { }() inv.curHeight = inv.start - plog.Debug("getInvBlocks begin", "start", inv.start, "end", inv.end) + plog.Debug("getInvBlocks begin", "start", inv.start, "end", inv.end, "ip", inv.connCli.ip) for { - txs, err := d.requestMainBlocks(inv) - if err != nil || len(txs.Items) == 0 { + txs := requestMainBlockWithTime(inv) + if txs == nil || len(txs.Items) == 0 { d.resetInv(inv) - plog.Error("getInvBlocks reqMainBlock error", "err", err, "ip", inv.connCli.ip) + plog.Error("getInvBlocks reqMainBlock nil", "ip", inv.connCli.ip) return } - err = d.verifyDownloadBlock(inv, txs) + err := d.verifyDownloadBlock(inv, txs) if err != nil { d.resetInv(inv) return -- 2.17.1