Commit fced6903 authored by ChenHongyu's avatar ChenHongyu

demo3新增自定义负载均衡策略, 通过 key 转到指定 server

parent b6316cd5
......@@ -8,15 +8,15 @@ require (
github.com/coreos/etcd v3.3.25+incompatible
github.com/golang/protobuf v1.5.2
github.com/inconshreveable/log15 v0.0.0-20201112154412-8562bdadbbac
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.21.0
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.5.1
gitlab.33.cn/chat/dtalk v0.0.3
gitlab.33.cn/utils/go-kit v1.0.7
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a // indirect
golang.org/x/net v0.0.0-20210510120150-4163338589ed // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/grpc v1.33.0
gopkg.in/yaml.v2 v2.4.0 // indirect
)
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
This diff is collapsed.
package consistenthash
import (
"github.com/rs/zerolog/log"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"example/lb_demo3/ketama"
)
const (
// Name 一致性hash平衡器的注册名称
Name = "consistent_hash"
// DefaultKey 默认的用于计算一致性hash的key
DefaultKey = "X-Consistent-Hash"
)
func init() {
balancer.Register(newBuilder(DefaultKey))
}
// RegisterBuilder 注册一致性hash构建器
func RegisterBuilder(chKey string) {
balancer.Register(newBuilder(chKey))
}
// newBuilder 新建一致性hash构建器
func newBuilder(chKey string) balancer.Builder {
if chKey == "" {
chKey = DefaultKey
}
return base.NewBalancerBuilderV2(Name,
&chPickerBuilder{chKey: chKey},
base.Config{HealthCheck: true},
)
}
// chPickerBuilder 一致性hash构建器
type chPickerBuilder struct {
chKey string
}
// Build 构建一致性hash选取器
func (b *chPickerBuilder) Build(buildInfo base.PickerBuildInfo) balancer.V2Picker {
if len(buildInfo.ReadySCs) == 0 {
return base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)
}
// 构建chPicker
picker := &chPicker{
subConns: make(map[string]balancer.SubConn),
ch: ketama.New(), // 基于Ketama算法的一致性hash负载均衡器
chKey: b.chKey, // 用于计算一致性hash的key
}
for sc, conInfo := range buildInfo.ReadySCs {
node := conInfo.Address.Addr
picker.ch.Add(node)
picker.subConns[node] = sc
}
return picker
}
// chPicker 一致性hash选取器
type chPicker struct {
subConns map[string]balancer.SubConn
ch *ketama.Ketama
chKey string
}
// Pick 获取一致性hash选取结果
func (p *chPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
var ret balancer.PickResult
key, _ := info.Ctx.Value(p.chKey).(string)
if key == "" {
key = "randUtil.NewString(20)"
}
node, ok := p.ch.Get(key)
if ok {
ret.SubConn = p.subConns[node]
}
log.Debug().Str("key", key).Str("node", node).Msg("consistenthash Pick")
return ret, nil
}
package consistenthash
import (
"context"
"strconv"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/resolver"
)
func init() {
RegisterBuilder(DefaultKey)
}
func TestChPicker_PickErr(t *testing.T) {
builder := &chPickerBuilder{chKey: DefaultKey}
picker := builder.Build(base.PickerBuildInfo{})
_, err := picker.Pick(balancer.PickInfo{})
assert.EqualError(t, err, balancer.ErrNoSubConnAvailable.Error())
}
func TestChPicker_Pick(t *testing.T) {
cases := []struct {
name string
candidates int
}{
{
name: "single",
candidates: 1,
},
{
name: "two",
candidates: 2,
},
{
name: "multiple",
candidates: 10,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
const total = 10000
builder := &chPickerBuilder{chKey: DefaultKey}
buildInfo := base.PickerBuildInfo{ReadySCs: make(map[balancer.SubConn]base.SubConnInfo)}
for i := 0; i < c.candidates; i++ {
sc := &mockClientConn{address: strconv.Itoa(i)}
buildInfo.ReadySCs[sc] = base.SubConnInfo{
Address: resolver.Address{Addr: strconv.Itoa(i)},
}
}
picker := builder.Build(buildInfo)
var wg sync.WaitGroup
wg.Add(total)
for i := 0; i < total; i++ {
go func() {
_, _ = picker.Pick(balancer.PickInfo{
FullMethodName: "/",
Ctx: context.Background(),
})
wg.Done()
}()
}
wg.Wait()
ch := picker.(*chPicker).ch
t.Logf("%+v", ch)
})
}
}
type mockClientConn struct {
address string
}
func (m *mockClientConn) UpdateAddresses(addresses []resolver.Address) {
}
func (m *mockClientConn) Connect() {
}
package key
import (
"github.com/rs/zerolog/log"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"sync"
)
const (
Name = "picker_key"
DefaultKey = "X-rand-string-balabala"
)
func init() {
balancer.Register(newBuilder(DefaultKey))
}
func newBuilder(key string) balancer.Builder {
return base.NewBalancerBuilderV2(
Name,
&keyPickerBuilder{key: key},
base.Config{HealthCheck: false},
)
}
type keyPickerBuilder struct {
key string
}
func (b *keyPickerBuilder) Build(info base.PickerBuildInfo) balancer.V2Picker {
grpclog.Infof("keyPickerBuilder: newPicker called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)
}
picker := &keyPicker{
addr2sc: make(map[string]balancer.SubConn),
key: b.key,
}
log.Debug().Int("len info.ReadySCs", len(info.ReadySCs)).Msg("")
for sc, conInfo := range info.ReadySCs {
addr := conInfo.Address.Addr
picker.setAddr2sc(addr, sc)
}
return picker
}
type keyPicker struct {
addr2sc map[string]balancer.SubConn
sync.RWMutex
key string
}
func (p *keyPicker) setAddr2sc(addr string, sc balancer.SubConn) {
p.Lock()
defer p.Unlock()
p.addr2sc[addr] = sc
log.Debug().Str("addr", addr).Msg("set addr 2 sc")
}
func (p *keyPicker) getAddr2sc(addr string) (balancer.SubConn, error) {
p.RLock()
defer p.RUnlock()
sc, ok := p.addr2sc[addr]
if ok {
return sc, nil
}
log.Warn().Str("addr", addr).Msg("server warn")
// TODO
for _, tsc := range p.addr2sc {
return tsc, nil
}
return nil, nil
}
func (p *keyPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
var res balancer.PickResult
addr, _ := info.Ctx.Value(p.key).(string)
sc, _ := p.getAddr2sc(addr)
res.SubConn = sc
return res, nil
}
......@@ -2,6 +2,7 @@ package common
import (
"context"
"example/lb_demo3/balancer/key"
"fmt"
"time"
......@@ -38,6 +39,8 @@ func NewGRPCConn(addr string, timeout time.Duration) (*grpc.ClientConn, error) {
}),
//grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
// 自定义负载策略
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "weight")),
//grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "weight")),
//grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, consistenthash.Name)),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, key.Name)),
}...)
}
......@@ -13,7 +13,7 @@ import (
"example/lb_demo3/generator/config"
"example/lb_demo3/generator/server/grpc"
"example/lb_demo3/generator/service"
"example/lb_demo3/naming"
naming "example/pkg/naming/key"
"github.com/Terry-Mao/goim/pkg/ip"
"github.com/inconshreveable/log15"
)
......@@ -37,7 +37,7 @@ func main() {
// register server
_, port, _ := net.SplitHostPort(config.Conf.GRPCServer.Addr)
addr := fmt.Sprintf("%s:%s", ip.InternalIP(), port)
if err := naming.Register(config.Conf.Reg.RegAddrs, config.Conf.Reg.SrvName, addr, config.Conf.Reg.Schema, 15, config.Conf.Weight); err != nil {
if err := naming.Register(config.Conf.Reg.RegAddrs, config.Conf.Reg.SrvName, addr, config.Conf.Reg.Schema, 15); err != nil {
panic(err)
}
fmt.Println("register ok")
......
......@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log"
"strconv"
"strings"
"time"
......@@ -51,7 +50,7 @@ func withAlive(name, addr, schema string, ttl, weight int64) error {
}
log.Printf("key:%v, weight=%d\n", "/"+schema+"/"+name+"/"+addr, weight)
_, err = cli.Put(context.Background(), "/"+schema+"/"+name+"/"+addr, strconv.FormatInt(weight, 10), clientv3.WithLease(leaseResp.ID))
_, err = cli.Put(context.Background(), "/"+schema+"/"+name+"/"+addr, addr, clientv3.WithLease(leaseResp.ID))
if err != nil {
log.Printf("put etcd error:%s", err)
return err
......
package naming
//
//import (
// "context"
// "fmt"
// "log"
// "strconv"
// "strings"
// "time"
//
// "example/lb_demo3/balancer/weight"
// "github.com/coreos/etcd/clientv3"
// "github.com/coreos/etcd/mvcc/mvccpb"
// "google.golang.org/grpc/resolver"
//)
//
//var schema string
//var cli *clientv3.Client
//
//type etcdResolver struct {
// rawAddr string
// schema string
// cc resolver.ClientConn
//}
//
//func NewResolver(etcdAddr, schema string) resolver.Builder {
// return &etcdResolver{rawAddr: etcdAddr, schema: schema}
//}
//
//func (r *etcdResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// //fmt.Println("target:", target)
// var err error
// if cli == nil {
// cli, err = clientv3.New(clientv3.Config{
// Endpoints: strings.Split(r.rawAddr, ";"),
// DialTimeout: 15 * time.Second,
// })
// if err != nil {
// return nil, err
// }
// }
//
// r.cc = cc
//
// go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
//
// return r, nil
//}
//
//func (r etcdResolver) Scheme() string {
// return r.schema
//}
//
//func (r etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
// //log.Println("ResolveNow")
//}
//
//func (r etcdResolver) Close() {
// //log.Println("Close")
//}
//
//func (r *etcdResolver) watch(keyPrefix string) {
// var addrList []resolver.Address
//
// getResp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
// if err != nil {
// log.Println(err)
// } else {
// for i := range getResp.Kvs {
// nodeWeight, err := strconv.Atoi(string(getResp.Kvs[i].Value))
// if err != nil {
// log.Println(getResp.Kvs[i].Key, getResp.Kvs[i].Value)
// continue
// }
// addr := resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)}
// addr = weight.SetAddrInfo(addr, weight.AddrInfo{Weight: nodeWeight})
//
// addrList = append(addrList, addr)
// }
// }
// fmt.Println(addrList)
// // 新版本etcd去除了NewAddress方法 以UpdateState代替
// r.cc.UpdateState(resolver.State{Addresses: addrList})
//
// rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
// for n := range rch {
// for _, ev := range n.Events {
// nodeWeight, err := strconv.Atoi(string(ev.Kv.Value))
// if err != nil {
// log.Println(ev.Kv.Key, ev.Kv.Value)
// continue
// }
// addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
// switch ev.Type {
// case mvccpb.PUT:
// if !exist(addrList, addr) {
// address := resolver.Address{Addr: addr}
// address = weight.SetAddrInfo(address, weight.AddrInfo{Weight: nodeWeight})
// addrList = append(addrList, address)
// r.cc.UpdateState(resolver.State{Addresses: addrList})
// }
// case mvccpb.DELETE:
// if s, ok := remove(addrList, addr); ok {
// addrList = s
// r.cc.UpdateState(resolver.State{Addresses: addrList})
// }
// }
// log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
// }
// }
//}
//
//func exist(l []resolver.Address, addr string) bool {
// for i := range l {
// if l[i].Addr == addr {
// return true
// }
// }
// return false
//}
//
//func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
// for i := range s {
// if s[i].Addr == addr {
// s[i] = s[len(s)-1]
// s[len(s)-1] = resolver.Address{}
// s = s[:len(s)-1]
// return s, true
// }
// }
// return nil, false
//}
......@@ -4,8 +4,8 @@ import (
"context"
"example/lb_demo3/common"
idgen "example/lb_demo3/generator/api"
"example/lb_demo3/naming"
"example/lb_demo3/taotie/config"
naming "example/pkg/naming/key"
"fmt"
"github.com/pkg/errors"
"github.com/rs/zerolog"
......
......@@ -2,11 +2,13 @@ package service
import (
"context"
"example/lb_demo3/taotie/config"
"flag"
"os"
"testing"
"time"
"example/lb_demo3/balancer/key"
"example/lb_demo3/taotie/config"
)
func TestMain(m *testing.M) {
......@@ -20,9 +22,12 @@ func TestMain(m *testing.M) {
func Benchmark_GetID(b *testing.B) {
srv := New(config.Conf)
b.ResetTimer()
b.N = 100
b.N = 10
time.Sleep(5 * time.Second)
addrList := []string{"192.168.22.238:30002", "192.168.22.238:30003", "192.168.22.238:30004"}
for i := 0; i < b.N; i++ {
id, err := srv.GetLogId(context.Background(), int64(i))
ctx := context.Background()
id, err := srv.GetLogId(context.WithValue(ctx, key.DefaultKey, addrList[i%len(addrList)]), int64(i))
if err != nil {
srv.log.Error().Err(err).Msg("")
}
......
package naming
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
)
func Register(etcdAddr, name, addr, schema string, ttl int64) error {
var err error
if cli == nil {
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(etcdAddr, ";"),
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Printf("connect to etcd err:%s", err)
return err
}
}
ticker := time.NewTicker(time.Second * time.Duration(ttl))
go func() {
for {
getResp, err := cli.Get(context.Background(), "/"+schema+"/"+name+"/"+addr)
if err != nil {
fmt.Println("etcd get err:", err)
} else if getResp.Count == 0 {
err = withAlive(name, addr, schema, ttl)
if err != nil {
log.Printf("keep alive:%s", err)
}
}
<-ticker.C
}
}()
return nil
}
func withAlive(name, addr, schema string, ttl int64) error {
leaseResp, err := cli.Grant(context.Background(), ttl)
if err != nil {
return err
}
log.Printf("key:%v\n", "/"+schema+"/"+name+"/"+addr)
_, err = cli.Put(context.Background(), "/"+schema+"/"+name+"/"+addr, addr, clientv3.WithLease(leaseResp.ID))
if err != nil {
log.Printf("put etcd error:%s", err)
return err
}
ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
log.Printf("keep alive error:%s", err)
return err
}
go func() {
<-ch
//for leaseKeepResp := range ch {
// log.Println("续约成功", leaseKeepResp.ID)
//}
//
//log.Println("关闭续租")
}()
return nil
}
func UnRegister(name, addr, schema string) error {
if cli != nil {
//fmt.Println("unregister...")
_, err := cli.Delete(context.Background(), "/"+schema+"/"+name+"/"+addr)
return err
}
return nil
}
......@@ -4,11 +4,9 @@ import (
"context"
"fmt"
"log"
"strconv"
"strings"
"time"
"example/lb_demo3/balancer/weight"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"google.golang.org/grpc/resolver"
......@@ -67,15 +65,7 @@ func (r *etcdResolver) watch(keyPrefix string) {
log.Println(err)
} else {
for i := range getResp.Kvs {
nodeWeight, err := strconv.Atoi(string(getResp.Kvs[i].Value))
if err != nil {
log.Println(getResp.Kvs[i].Key, getResp.Kvs[i].Value)
continue
}
addr := resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)}
addr = weight.SetAddrInfo(addr, weight.AddrInfo{Weight: nodeWeight})
addrList = append(addrList, addr)
addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)})
}
}
fmt.Println(addrList)
......@@ -85,18 +75,11 @@ func (r *etcdResolver) watch(keyPrefix string) {
rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
for n := range rch {
for _, ev := range n.Events {
nodeWeight, err := strconv.Atoi(string(ev.Kv.Value))
if err != nil {
log.Println(ev.Kv.Key, ev.Kv.Value)
continue
}
addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
switch ev.Type {
case mvccpb.PUT:
if !exist(addrList, addr) {
address := resolver.Address{Addr: addr}
address = weight.SetAddrInfo(address, weight.AddrInfo{Weight: nodeWeight})
addrList = append(addrList, address)
addrList = append(addrList, resolver.Address{Addr: addr})
r.cc.UpdateState(resolver.State{Addresses: addrList})
}
case mvccpb.DELETE:
......
package naming
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
)
func Register(etcdAddr, name, addr, schema string, ttl int64) error {
var err error
if cli == nil {
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(etcdAddr, ";"),
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Printf("connect to etcd err:%s", err)
return err
}
}
ticker := time.NewTicker(time.Second * time.Duration(ttl))
go func() {
for {
getResp, err := cli.Get(context.Background(), "/"+schema+"/"+name+"/"+addr)
if err != nil {
fmt.Println("etcd get err:", err)
} else if getResp.Count == 0 {
err = withAlive(name, addr, schema, ttl)
if err != nil {
log.Printf("keep alive:%s", err)
}
}
<-ticker.C
}
}()
return nil
}
func withAlive(name, addr, schema string, ttl int64) error {
leaseResp, err := cli.Grant(context.Background(), ttl)
if err != nil {
return err
}
log.Printf("key:%v\n", "/"+schema+"/"+name+"/"+addr)
_, err = cli.Put(context.Background(), "/"+schema+"/"+name+"/"+addr, addr, clientv3.WithLease(leaseResp.ID))
if err != nil {
log.Printf("put etcd error:%s", err)
return err
}
ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
log.Printf("keep alive error:%s", err)
return err
}
go func() {
<-ch
//for leaseKeepResp := range ch {
// log.Println("续约成功", leaseKeepResp.ID)
//}
//
//log.Println("关闭续租")
}()
return nil
}
func UnRegister(name, addr, schema string) error {
if cli != nil {
//fmt.Println("unregister...")
_, err := cli.Delete(context.Background(), "/"+schema+"/"+name+"/"+addr)
return err
}
return nil
}
package naming
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"google.golang.org/grpc/resolver"
)
var schema string
var cli *clientv3.Client
type etcdResolver struct {
rawAddr string
schema string
cc resolver.ClientConn
}
func NewResolver(etcdAddr, schema string) resolver.Builder {
return &etcdResolver{rawAddr: etcdAddr, schema: schema}
}
func (r *etcdResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
//fmt.Println("target:", target)
var err error
if cli == nil {
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(r.rawAddr, ";"),
DialTimeout: 15 * time.Second,
})
if err != nil {
return nil, err
}
}
r.cc = cc
go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
return r, nil
}
func (r etcdResolver) Scheme() string {
return r.schema
}
func (r etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
//log.Println("ResolveNow")
}
func (r etcdResolver) Close() {
//log.Println("Close")
}
func (r *etcdResolver) watch(keyPrefix string) {
var addrList []resolver.Address
getResp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
if err != nil {
log.Println(err)
} else {
for i := range getResp.Kvs {
addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)})
}
}
fmt.Println(addrList)
// 新版本etcd去除了NewAddress方法 以UpdateState代替
r.cc.UpdateState(resolver.State{Addresses: addrList})
rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
for n := range rch {
for _, ev := range n.Events {
addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
switch ev.Type {
case mvccpb.PUT:
if !exist(addrList, addr) {
addrList = append(addrList, resolver.Address{Addr: addr})
r.cc.UpdateState(resolver.State{Addresses: addrList})
}
case mvccpb.DELETE:
if s, ok := remove(addrList, addr); ok {
addrList = s
r.cc.UpdateState(resolver.State{Addresses: addrList})
}
}
log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
func exist(l []resolver.Address, addr string) bool {
for i := range l {
if l[i].Addr == addr {
return true
}
}
return false
}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
for i := range s {
if s[i].Addr == addr {
s[i] = s[len(s)-1]
s[len(s)-1] = resolver.Address{}
s = s[:len(s)-1]
return s, true
}
}
return nil, false
}
package naming
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
)
func Register(etcdAddr, name, addr, schema string, ttl, weight int64) error {
var err error
if cli == nil {
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(etcdAddr, ";"),
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Printf("connect to etcd err:%s", err)
return err
}
}
ticker := time.NewTicker(time.Second * time.Duration(ttl))
go func() {
for {
getResp, err := cli.Get(context.Background(), "/"+schema+"/"+name+"/"+addr)
if err != nil {
fmt.Println("etcd get err:", err)
} else if getResp.Count == 0 {
err = withAlive(name, addr, schema, ttl, weight)
if err != nil {
log.Printf("keep alive:%s", err)
}
}
<-ticker.C
}
}()
return nil
}
func withAlive(name, addr, schema string, ttl, weight int64) error {
leaseResp, err := cli.Grant(context.Background(), ttl)
if err != nil {
return err
}
log.Printf("key:%v, weight=%d\n", "/"+schema+"/"+name+"/"+addr, weight)
_, err = cli.Put(context.Background(), "/"+schema+"/"+name+"/"+addr, addr, clientv3.WithLease(leaseResp.ID))
if err != nil {
log.Printf("put etcd error:%s", err)
return err
}
ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
log.Printf("keep alive error:%s", err)
return err
}
go func() {
<-ch
//for leaseKeepResp := range ch {
// log.Println("续约成功", leaseKeepResp.ID)
//}
//
//log.Println("关闭续租")
}()
return nil
}
func UnRegister(name, addr, schema string) error {
if cli != nil {
//fmt.Println("unregister...")
_, err := cli.Delete(context.Background(), "/"+schema+"/"+name+"/"+addr)
return err
}
return nil
}
package naming
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"google.golang.org/grpc/resolver"
)
var schema string
var cli *clientv3.Client
type etcdResolver struct {
rawAddr string
schema string
cc resolver.ClientConn
}
func NewResolver(etcdAddr, schema string) resolver.Builder {
return &etcdResolver{rawAddr: etcdAddr, schema: schema}
}
func (r *etcdResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
//fmt.Println("target:", target)
var err error
if cli == nil {
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(r.rawAddr, ";"),
DialTimeout: 15 * time.Second,
})
if err != nil {
return nil, err
}
}
r.cc = cc
go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
return r, nil
}
func (r etcdResolver) Scheme() string {
return r.schema
}
func (r etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
//log.Println("ResolveNow")
}
func (r etcdResolver) Close() {
//log.Println("Close")
}
func (r *etcdResolver) watch(keyPrefix string) {
var addrList []resolver.Address
getResp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
if err != nil {
log.Println(err)
} else {
for i := range getResp.Kvs {
addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)})
}
}
fmt.Println(addrList)
// 新版本etcd去除了NewAddress方法 以UpdateState代替
r.cc.UpdateState(resolver.State{Addresses: addrList})
rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
for n := range rch {
for _, ev := range n.Events {
addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
switch ev.Type {
case mvccpb.PUT:
if !exist(addrList, addr) {
addrList = append(addrList, resolver.Address{Addr: addr})
r.cc.UpdateState(resolver.State{Addresses: addrList})
}
case mvccpb.DELETE:
if s, ok := remove(addrList, addr); ok {
addrList = s
r.cc.UpdateState(resolver.State{Addresses: addrList})
}
}
log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
func exist(l []resolver.Address, addr string) bool {
for i := range l {
if l[i].Addr == addr {
return true
}
}
return false
}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
for i := range s {
if s[i].Addr == addr {
s[i] = s[len(s)-1]
s[len(s)-1] = resolver.Address{}
s = s[:len(s)-1]
return s, true
}
}
return nil, false
}
package naming
//
//import (
// "context"
// "fmt"
// "log"
// "strconv"
// "strings"
// "time"
//
// "example/lb_demo3/balancer/weight"
// "github.com/coreos/etcd/clientv3"
// "github.com/coreos/etcd/mvcc/mvccpb"
// "google.golang.org/grpc/resolver"
//)
//
//var schema string
//var cli *clientv3.Client
//
//type etcdResolver struct {
// rawAddr string
// schema string
// cc resolver.ClientConn
//}
//
//func NewResolver(etcdAddr, schema string) resolver.Builder {
// return &etcdResolver{rawAddr: etcdAddr, schema: schema}
//}
//
//func (r *etcdResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// //fmt.Println("target:", target)
// var err error
// if cli == nil {
// cli, err = clientv3.New(clientv3.Config{
// Endpoints: strings.Split(r.rawAddr, ";"),
// DialTimeout: 15 * time.Second,
// })
// if err != nil {
// return nil, err
// }
// }
//
// r.cc = cc
//
// go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
//
// return r, nil
//}
//
//func (r etcdResolver) Scheme() string {
// return r.schema
//}
//
//func (r etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
// //log.Println("ResolveNow")
//}
//
//func (r etcdResolver) Close() {
// //log.Println("Close")
//}
//
//func (r *etcdResolver) watch(keyPrefix string) {
// var addrList []resolver.Address
//
// getResp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
// if err != nil {
// log.Println(err)
// } else {
// for i := range getResp.Kvs {
// nodeWeight, err := strconv.Atoi(string(getResp.Kvs[i].Value))
// if err != nil {
// log.Println(getResp.Kvs[i].Key, getResp.Kvs[i].Value)
// continue
// }
// addr := resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)}
// addr = weight.SetAddrInfo(addr, weight.AddrInfo{Weight: nodeWeight})
//
// addrList = append(addrList, addr)
// }
// }
// fmt.Println(addrList)
// // 新版本etcd去除了NewAddress方法 以UpdateState代替
// r.cc.UpdateState(resolver.State{Addresses: addrList})
//
// rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
// for n := range rch {
// for _, ev := range n.Events {
// nodeWeight, err := strconv.Atoi(string(ev.Kv.Value))
// if err != nil {
// log.Println(ev.Kv.Key, ev.Kv.Value)
// continue
// }
// addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
// switch ev.Type {
// case mvccpb.PUT:
// if !exist(addrList, addr) {
// address := resolver.Address{Addr: addr}
// address = weight.SetAddrInfo(address, weight.AddrInfo{Weight: nodeWeight})
// addrList = append(addrList, address)
// r.cc.UpdateState(resolver.State{Addresses: addrList})
// }
// case mvccpb.DELETE:
// if s, ok := remove(addrList, addr); ok {
// addrList = s
// r.cc.UpdateState(resolver.State{Addresses: addrList})
// }
// }
// log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
// }
// }
//}
//
//func exist(l []resolver.Address, addr string) bool {
// for i := range l {
// if l[i].Addr == addr {
// return true
// }
// }
// return false
//}
//
//func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
// for i := range s {
// if s[i].Addr == addr {
// s[i] = s[len(s)-1]
// s[len(s)-1] = resolver.Address{}
// s = s[:len(s)-1]
// return s, true
// }
// }
// return nil, false
//}
package util
import (
"encoding/json"
"fmt"
"log"
"strconv"
)
func ToString(val interface{}) string {
if val == nil {
return ""
}
switch val.(type) {
case float64:
return strconv.FormatFloat(val.(float64), 'f', -1, 64)
case float32:
return strconv.FormatFloat(val.(float64), 'f', -1, 64)
case int64:
return strconv.FormatInt(val.(int64), 10)
}
return fmt.Sprintf("%v", val)
}
func ToInt(val interface{}) int {
return int(ToInt32(val))
}
func ToUInt32(val interface{}) uint32 {
return uint32(ToInt32(val))
}
func ToInt32(o interface{}) int32 {
if o == nil {
log.Fatal("nil value")
return 0
}
switch t := o.(type) {
case int:
return int32(t)
case int32:
return t
case int64:
return int32(t)
case float64:
return int32(t)
case string:
if o == "" {
log.Fatal("empty string")
return 0
}
temp, err := strconv.ParseInt(o.(string), 10, 32)
if err != nil {
log.Fatal("string parse int err", err)
return 0
}
return int32(temp)
default:
log.Fatal("unknown type", fmt.Sprintf("%T", o))
return 0
}
}
func ToInt64(val interface{}) int64 {
if val == nil {
log.Fatal("nil value")
return 0
}
switch val.(type) {
case int:
return int64(val.(int))
case string:
if val.(string) == "" {
log.Fatal("empty string")
return 0
}
ret, err := strconv.ParseInt(val.(string), 10, 64)
if err != nil {
log.Fatal("string parse int err", err)
return 0
}
return ret
case float64:
return int64(val.(float64))
case int64:
return val.(int64)
case json.Number:
v := val.(json.Number)
ret, err := v.Int64()
if err != nil {
log.Fatal("unknown json number")
return 0
}
return ret
default:
log.Fatal("unknown type", fmt.Sprintf("%T", val))
return 0
}
}
func ToFloat64(val interface{}) float64 {
if val == nil {
log.Fatal("nil value")
return 0
}
switch val.(type) {
case string:
ret, err := strconv.ParseFloat(val.(string), 64)
if err != nil {
log.Fatal("string parse float err", err)
}
return ret
default:
if v, ok := val.(float64); ok {
return v
}
log.Fatal("unknown type", fmt.Sprintf("%T", val))
return 0
}
}
func TypeToString(val interface{}) string {
if val == nil {
log.Fatal("nil value")
return ""
}
switch val.(type) {
case float64:
return strconv.FormatFloat(val.(float64), 'f', -1, 64)
case float32:
return strconv.FormatFloat(val.(float64), 'f', -1, 64)
case int64:
return strconv.FormatInt(val.(int64), 10)
}
return fmt.Sprintf("%v", val)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment