Skip to content

Commit

Permalink
Change discovery url to endpoints
Browse files Browse the repository at this point in the history
Currently the discovery url is just one endpoint. But actually it
should be the same as the etcdctl, which means that it should be
a list of endpoints. When one endpoint is down, the clientv3 can
fail over to the next endpoint automatically.
  • Loading branch information
ahrtr committed Feb 23, 2022
1 parent 6af7601 commit f07b911
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 90 deletions.
11 changes: 6 additions & 5 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ import (
type ServerConfig struct {
Name string

EnableV2Discovery bool
DiscoveryURL string
DiscoveryProxy string
DiscoveryCfg v3discovery.DiscoveryConfig
DiscoveryURL string
DiscoveryProxy string
DiscoveryCfg v3discovery.DiscoveryConfig

ClientURLs types.URLs
PeerURLs types.URLs
Expand Down Expand Up @@ -309,7 +308,9 @@ func (c *ServerConfig) WALDir() string {

func (c *ServerConfig) SnapDir() string { return filepath.Join(c.MemberDir(), "snap") }

func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }
func (c *ServerConfig) ShouldDiscover() bool {
return c.DiscoveryURL != "" || len(c.DiscoveryCfg.Endpoints) > 0
}

// ReqTimeout returns timeout for request to finish.
func (c *ServerConfig) ReqTimeout() time.Duration {
Expand Down
45 changes: 27 additions & 18 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ const (
// It's enabled by default.
DefaultStrictReconfigCheck = true

DefaultEnableV2Discovery = true

// maxElectionMs specifies the maximum value of election timeout.
// More details are listed in ../Documentation/tuning.md#time-parameters.
maxElectionMs = 50000
Expand All @@ -106,7 +104,7 @@ const (

var (
ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set. " +
"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
"Choose one of \"initial-cluster\", \"discovery\", \"discovery-endpoints\" or \"discovery-srv\"")
ErrUnsetAdvertiseClientURLsFlag = fmt.Errorf("--advertise-client-urls is required when --listen-client-urls is set explicitly")
ErrLogRotationInvalidLogOutput = fmt.Errorf("--log-outputs requires a single file path when --log-rotate-config-json is defined")

Expand Down Expand Up @@ -227,9 +225,8 @@ type Config struct {
DNSClusterServiceName string `json:"discovery-srv-name"`
Dproxy string `json:"discovery-proxy"`

EnableV2Discovery bool `json:"enable-v2-discovery"`
Durl string `json:"discovery"`
DiscoveryCfg v3discovery.DiscoveryConfig `json:"discovery-config"`
Durl string `json:"discovery"`
DiscoveryCfg v3discovery.DiscoveryConfig `json:"discovery-config"`

InitialCluster string `json:"initial-cluster"`
InitialClusterToken string `json:"initial-cluster-token"`
Expand Down Expand Up @@ -518,7 +515,6 @@ func NewConfig() *Config {

V2Deprecation: config.V2_DEPR_DEFAULT,

EnableV2Discovery: DefaultEnableV2Discovery,
DiscoveryCfg: v3discovery.DiscoveryConfig{
DialTimeout: DefaultDiscoveryDialTimeout,
RequestTimeOut: DefaultDiscoveryRequestTimeOut,
Expand Down Expand Up @@ -606,8 +602,8 @@ func (cfg *configYAML) configFromFile(path string) error {
cfg.HostWhitelist = uv.Values
}

// If a discovery flag is set, clear default initial cluster set by InitialClusterFromName
if (cfg.Durl != "" || cfg.DNSCluster != "") && cfg.InitialCluster == defaultInitialCluster {
// If a discovery or discovery-endpoints flag is set, clear default initial cluster set by InitialClusterFromName
if (cfg.Durl != "" || cfg.DNSCluster != "" || len(cfg.DiscoveryCfg.Endpoints) > 0) && cfg.InitialCluster == defaultInitialCluster {
cfg.InitialCluster = ""
}
if cfg.ClusterState == "" {
Expand Down Expand Up @@ -674,7 +670,7 @@ func (cfg *Config) Validate() error {
}
// Check if conflicting flags are passed.
nSet := 0
for _, v := range []bool{cfg.Durl != "", cfg.InitialCluster != "", cfg.DNSCluster != ""} {
for _, v := range []bool{cfg.Durl != "", cfg.InitialCluster != "", cfg.DNSCluster != "", len(cfg.DiscoveryCfg.Endpoints) > 0} {
if v {
nSet++
}
Expand All @@ -690,18 +686,24 @@ func (cfg *Config) Validate() error {

// Check if both v2 discovery and v3 discovery flags are passed.
v2discoveryFlagsExist := cfg.Dproxy != ""
v3discoveryFlagsExist := cfg.DiscoveryCfg.CertFile != "" ||
v3discoveryFlagsExist := len(cfg.DiscoveryCfg.Endpoints) > 0 ||
cfg.DiscoveryCfg.Token != "" ||
cfg.DiscoveryCfg.CertFile != "" ||
cfg.DiscoveryCfg.KeyFile != "" ||
cfg.DiscoveryCfg.TrustedCAFile != "" ||
cfg.DiscoveryCfg.User != "" ||
cfg.DiscoveryCfg.Password != ""
if cfg.EnableV2Discovery && v3discoveryFlagsExist {
return errors.New("v2 discovery is enabled, but some v3 discovery " +
"settings (discovery-cert, discovery-key, discovery-cacert, " +
"discovery-user, discovery-password) are set")

if v2discoveryFlagsExist && v3discoveryFlagsExist {
return errors.New("both v2 discovery settings (discovery, discovery-proxy) " +
"and v3 discovery settings (discovery-token, discovery-endpoints, discovery-cert, " +
"discovery-key, discovery-cacert, discovery-user, discovery-password) are set")
}
if !cfg.EnableV2Discovery && v2discoveryFlagsExist {
return errors.New("v3 discovery is enabled, but --discovery-proxy is set")

// If one of `discovery-token` and `discovery-endpoints` is provided,
// then the other one must be provided as well.
if (cfg.DiscoveryCfg.Token != "") != (len(cfg.DiscoveryCfg.Endpoints) > 0) {
return errors.New("both --discovery-token and --discovery-endpoints must be set")
}

if cfg.TickMs == 0 {
Expand Down Expand Up @@ -753,11 +755,18 @@ func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, tok
switch {
case cfg.Durl != "":
urlsmap = types.URLsMap{}
// If using discovery, generate a temporary cluster based on
// If using v2 discovery, generate a temporary cluster based on
// self's advertised peer URLs
urlsmap[cfg.Name] = cfg.APUrls
token = cfg.Durl

case len(cfg.DiscoveryCfg.Endpoints) > 0:
urlsmap = types.URLsMap{}
// If using v3 discovery, generate a temporary cluster based on
// self's advertised peer URLs
urlsmap[cfg.Name] = cfg.APUrls
token = cfg.DiscoveryCfg.Token

case cfg.DNSCluster != "":
clusterStrs, cerr := cfg.GetDNSClusterNames()
lg := cfg.logger
Expand Down
4 changes: 3 additions & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -175,7 +176,6 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
EnableV2Discovery: cfg.EnableV2Discovery,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
DiscoveryCfg: cfg.DiscoveryCfg,
Expand Down Expand Up @@ -348,6 +348,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
zap.String("discovery-url", sc.DiscoveryURL),
zap.String("discovery-proxy", sc.DiscoveryProxy),

zap.String("discovery-token", sc.DiscoveryCfg.Token),
zap.String("discovery-endpoints", strings.Join(sc.DiscoveryCfg.Endpoints, ",")),
zap.String("discovery-dial-timeout", sc.DiscoveryCfg.DialTimeout.String()),
zap.String("discovery-request-timeout", sc.DiscoveryCfg.RequestTimeOut.String()),
zap.String("discovery-keepalive-time", sc.DiscoveryCfg.KeepAliveTime.String()),
Expand Down
14 changes: 11 additions & 3 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,16 @@ func newConfig() *config {
"advertise-client-urls",
"List of this member's client URLs to advertise to the public.",
)
fs.BoolVar(&cfg.ec.EnableV2Discovery, "enable-v2-discovery", cfg.ec.EnableV2Discovery, "Enable to bootstrap the cluster using v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8.")
fs.StringVar(&cfg.ec.Durl, "discovery", cfg.ec.Durl, "Discovery URL used to bootstrap the cluster.")

fs.StringVar(&cfg.ec.Durl, "discovery", cfg.ec.Durl, "Discovery URL used to bootstrap the cluster for v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8.")
fs.Var(cfg.cf.fallback, "discovery-fallback", fmt.Sprintf("Valid values include %q", cfg.cf.fallback.Valids()))

fs.Var(
flags.NewUniqueStringsValue(""),
"discovery-endpoints",
"V3 discovery: List of gRPC endpoints of the discovery service.",
)
fs.StringVar(&cfg.ec.DiscoveryCfg.Token, "discovery-token", "", "V3 discovery: discovery token for the etcd cluster to be bootstrapped.")
fs.DurationVar(&cfg.ec.DiscoveryCfg.DialTimeout, "discovery-dial-timeout", cfg.ec.DiscoveryCfg.DialTimeout, "V3 discovery: dial timeout for client connections.")
fs.DurationVar(&cfg.ec.DiscoveryCfg.RequestTimeOut, "discovery-request-timeout", cfg.ec.DiscoveryCfg.RequestTimeOut, "V3 discovery: timeout for discovery requests (excluding dial timeout).")
fs.DurationVar(&cfg.ec.DiscoveryCfg.KeepAliveTime, "discovery-keepalive-time", cfg.ec.DiscoveryCfg.KeepAliveTime, "V3 discovery: keepalive time for client connections.")
Expand Down Expand Up @@ -408,6 +414,8 @@ func (cfg *config) configFromCmdLine() error {
cfg.ec.ACUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls")
cfg.ec.ListenMetricsUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-metrics-urls")

cfg.ec.DiscoveryCfg.Endpoints = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "discovery-endpoints")

cfg.ec.CORS = flags.UniqueURLsMapFromFlag(cfg.cf.flagSet, "cors")
cfg.ec.HostWhitelist = flags.UniqueStringsMapFromFlag(cfg.cf.flagSet, "host-whitelist")

Expand All @@ -428,7 +436,7 @@ func (cfg *config) configFromCmdLine() error {
}

// disable default initial-cluster if discovery is set
if (cfg.ec.Durl != "" || cfg.ec.DNSCluster != "" || cfg.ec.DNSClusterServiceName != "") && !flags.IsSet(cfg.cf.flagSet, "initial-cluster") {
if (cfg.ec.Durl != "" || cfg.ec.DNSCluster != "" || cfg.ec.DNSClusterServiceName != "" || len(cfg.ec.DiscoveryCfg.Endpoints) > 0) && !flags.IsSet(cfg.cf.flagSet, "initial-cluster") {
cfg.ec.InitialCluster = ""
}

Expand Down
27 changes: 14 additions & 13 deletions server/etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ func startEtcdOrProxyV2(args []string) {
if types.URLs(cfg.ec.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs {
lg.Warn("forgot to set --initial-advertise-peer-urls?")
}
if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 {
lg.Warn("--discovery flag is not set")
if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 && len(cfg.ec.DiscoveryCfg.Endpoints) == 0 {
lg.Warn("V2 discovery settings (i.e., --discovery) or v3 discovery settings (i.e., --discovery-token, --discovery-endpoints) are not set")
}
os.Exit(1)
}
Expand Down Expand Up @@ -287,7 +287,7 @@ func startProxy(cfg *config) error {
b, err := os.ReadFile(clusterfile)
switch {
case err == nil:
if cfg.ec.Durl != "" {
if cfg.ec.Durl != "" || len(cfg.ec.DiscoveryCfg.Endpoints) > 0 {
lg.Warn(
"discovery token ignored since the proxy has already been initialized; valid cluster file found",
zap.String("cluster-file", clusterfile),
Expand Down Expand Up @@ -318,21 +318,22 @@ func startProxy(cfg *config) error {
return fmt.Errorf("error setting up initial cluster: %v", err)
}

var s string
if cfg.ec.Durl != "" {
var s string
if cfg.ec.EnableV2Discovery {
lg.Warn("V2 discovery is deprecated!")
s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy)
} else {
s, err = v3discovery.GetCluster(lg, cfg.ec.Durl, &cfg.ec.DiscoveryCfg)
}
if err != nil {
return err
}
lg.Warn("V2 discovery is deprecated!")
s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy)
} else if len(cfg.ec.DiscoveryCfg.Endpoints) > 0 {
s, err = v3discovery.GetCluster(lg, &cfg.ec.DiscoveryCfg)
}
if err != nil {
return err
}
if s != "" {
if urlsmap, err = types.NewURLsMap(s); err != nil {
return err
}
}

peerURLs = urlsmap.URLs()
lg.Info("proxy using peer URLS", zap.Strings("peer-urls", peerURLs))

Expand Down
8 changes: 5 additions & 3 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ Clustering:
--advertise-client-urls 'http://localhost:2379'
List of this member's client URLs to advertise to the public.
The client URLs advertised should be accessible to machines that talk to etcd cluster. etcd client libraries parse these URLs to connect to the cluster.
--enable-v2-discovery 'true'
Enable to bootstrap the cluster using v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8.
--discovery ''
Discovery URL used to bootstrap the cluster.
Discovery URL used to bootstrap the cluster for v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8.
--discovery-token ''
V3 discovery: discovery token for the etcd cluster to be bootstrapped.
--discovery-endpoints ''
V3 discovery: List of gRPC endpoints of the discovery service.
--discovery-dial-timeout '2s'
V3 discovery: dial timeout for client connections.
--discovery-request-timeout '5s'
Expand Down
36 changes: 14 additions & 22 deletions server/etcdserver/api/v3discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"errors"

"math"
"net/url"
"path"
"sort"
"strconv"
Expand Down Expand Up @@ -56,7 +55,8 @@ var (
)

type DiscoveryConfig struct {
Url string `json:"discovery"`
Token string `json:"discovery-token"`
Endpoints []string `json:"discovery-endpoints"`

DialTimeout time.Duration `json:"discovery-dial-timeout"`
RequestTimeOut time.Duration `json:"discovery-request-timeout"`
Expand Down Expand Up @@ -110,10 +110,10 @@ func getMemberKey(cluster, memberId string) string {
return path.Join(getMemberKeyPrefix(cluster), memberId)
}

// GetCluster will connect to the discovery service at the given url and
// GetCluster will connect to the discovery service at the given endpoints and
// retrieve a string describing the cluster
func GetCluster(lg *zap.Logger, dUrl string, cfg *DiscoveryConfig) (cs string, rerr error) {
d, err := newDiscovery(lg, dUrl, cfg, 0)
func GetCluster(lg *zap.Logger, cfg *DiscoveryConfig) (cs string, rerr error) {
d, err := newDiscovery(lg, cfg, 0)
if err != nil {
return "", err
}
Expand All @@ -137,15 +137,15 @@ func GetCluster(lg *zap.Logger, dUrl string, cfg *DiscoveryConfig) (cs string, r
return d.getCluster()
}

// JoinCluster will connect to the discovery service at the given url, and
// JoinCluster will connect to the discovery service at the endpoints, and
// register the server represented by the given id and config to the cluster.
// The parameter `config` is supposed to be in the format "memberName=peerURLs",
// such as "member1=http://127.0.0.1:2380".
//
// The final returned string has the same format as "--initial-cluster", such as
// "infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380".
func JoinCluster(lg *zap.Logger, durl string, cfg *DiscoveryConfig, id types.ID, config string) (cs string, rerr error) {
d, err := newDiscovery(lg, durl, cfg, id)
func JoinCluster(lg *zap.Logger, cfg *DiscoveryConfig, id types.ID, config string) (cs string, rerr error) {
d, err := newDiscovery(lg, cfg, id)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -175,26 +175,19 @@ type discovery struct {
memberId types.ID
c *clientv3.Client
retries uint
durl string

cfg *DiscoveryConfig

clock clockwork.Clock
}

func newDiscovery(lg *zap.Logger, durl string, dcfg *DiscoveryConfig, id types.ID) (*discovery, error) {
func newDiscovery(lg *zap.Logger, dcfg *DiscoveryConfig, id types.ID) (*discovery, error) {
if lg == nil {
lg = zap.NewNop()
}
u, err := url.Parse(durl)
if err != nil {
return nil, err
}
token := u.Path
u.Path = ""

lg = lg.With(zap.String("discovery-url", durl))
cfg, err := newClientCfg(dcfg, u.String(), lg)
lg = lg.With(zap.String("discovery-token", dcfg.Token), zap.String("discovery-endpoints", strings.Join(dcfg.Endpoints, ",")))
cfg, err := newClientCfg(dcfg, lg)
if err != nil {
return nil, err
}
Expand All @@ -205,18 +198,17 @@ func newDiscovery(lg *zap.Logger, durl string, dcfg *DiscoveryConfig, id types.I
}
return &discovery{
lg: lg,
clusterToken: token,
clusterToken: dcfg.Token,
memberId: id,
c: c,
durl: u.String(),
cfg: dcfg,
clock: clockwork.NewRealClock(),
}, nil
}

// The following function follows the same logic as etcdctl, refer to
// https://github.com/etcd-io/etcd/blob/f9a8c49c695b098d66a07948666664ea10d01a82/etcdctl/ctlv3/command/global.go#L191-L250
func newClientCfg(dcfg *DiscoveryConfig, dUrl string, lg *zap.Logger) (*clientv3.Config, error) {
func newClientCfg(dcfg *DiscoveryConfig, lg *zap.Logger) (*clientv3.Config, error) {
var cfgtls *transport.TLSInfo

if dcfg.CertFile != "" || dcfg.KeyFile != "" || dcfg.TrustedCAFile != "" {
Expand All @@ -229,7 +221,7 @@ func newClientCfg(dcfg *DiscoveryConfig, dUrl string, lg *zap.Logger) (*clientv3
}

cfg := &clientv3.Config{
Endpoints: []string{dUrl},
Endpoints: dcfg.Endpoints,
DialTimeout: dcfg.DialTimeout,
DialKeepAliveTime: dcfg.KeepAliveTime,
DialKeepAliveTimeout: dcfg.KeepAliveTimeout,
Expand Down
6 changes: 4 additions & 2 deletions server/etcdserver/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,12 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*
}
if cfg.ShouldDiscover() {
var str string
if cfg.EnableV2Discovery {
if cfg.DiscoveryURL != "" {
cfg.Logger.Warn("V2 discovery is deprecated!")
str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
} else {
str, err = v3discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, &cfg.DiscoveryCfg, m.ID, cfg.InitialPeerURLsMap.String())
cfg.Logger.Info("Bootstrapping cluster using v3 discovery.")
str, err = v3discovery.JoinCluster(cfg.Logger, &cfg.DiscoveryCfg, m.ID, cfg.InitialPeerURLsMap.String())
}
if err != nil {
return nil, &DiscoveryError{Op: "join", Err: err}
Expand Down
Loading

0 comments on commit f07b911

Please sign in to comment.