From 54cb3647be6ffd7282ce6fa676f74ab713d00cc9 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Sat, 22 Jan 2022 08:35:36 +0800 Subject: [PATCH] support v3 discovery to bootstrap a new etcd cluster --- CHANGELOG-3.6.md | 1 + client/pkg/types/urlsmap.go | 2 +- server/config/config.go | 17 +- server/embed/config.go | 28 +- server/embed/etcd.go | 15 + server/etcdmain/config.go | 16 +- server/etcdmain/etcd.go | 8 +- server/etcdmain/help.go | 28 +- .../etcdserver/api/v3discovery/discovery.go | 569 ++++++++++++++++++ .../api/v3discovery/discovery_test.go | 1 + server/etcdserver/bootstrap.go | 7 +- tests/framework/integration/cluster.go | 2 + 12 files changed, 679 insertions(+), 15 deletions(-) create mode 100644 server/etcdserver/api/v3discovery/discovery.go create mode 100644 server/etcdserver/api/v3discovery/discovery_test.go diff --git a/CHANGELOG-3.6.md b/CHANGELOG-3.6.md index 8d0591cfd6ca..50470522fbcb 100644 --- a/CHANGELOG-3.6.md +++ b/CHANGELOG-3.6.md @@ -35,6 +35,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0). - Add [`etcd --experimental-max-learners`](https://github.com/etcd-io/etcd/pull/13377) flag to allow configuration of learner max membership. - Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to handle upgrade from v3.5.2 clusters with this feature enabled. - Add [`etcdctl make-mirror --rev`](https://github.com/etcd-io/etcd/pull/13519) flag to support incremental mirror. +- Add [v3 discovery](https://github.com/etcd-io/etcd/pull/13635) to bootstrap a new etcd cluster. - Fix [non mutating requests pass through quotaKVServer when NOSPACE](https://github.com/etcd-io/etcd/pull/13435) - Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13467). - Fix [Provide a better liveness probe for when etcd runs as a Kubernetes pod](https://github.com/etcd-io/etcd/pull/13399) diff --git a/client/pkg/types/urlsmap.go b/client/pkg/types/urlsmap.go index 47690cc381a3..6cada961bb86 100644 --- a/client/pkg/types/urlsmap.go +++ b/client/pkg/types/urlsmap.go @@ -25,7 +25,7 @@ type URLsMap map[string]URLs // NewURLsMap returns a URLsMap instantiated from the given string, // which consists of discovery-formatted names-to-URLs, like: -// mach0=http://1.1.1.1:2380,mach0=http://2.2.2.2::2380,mach1=http://3.3.3.3:2380,mach2=http://4.4.4.4:2380 +// mach0=http://1.1.1.1:2380,mach1=http://2.2.2.2::2380,mach2=http://3.3.3.3:2380 func NewURLsMap(s string) (URLsMap, error) { m := parse(s) diff --git a/server/config/config.go b/server/config/config.go index 43ecab7ecd5f..dce16792ac2f 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -25,6 +25,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/netutil" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery" "go.etcd.io/etcd/server/v3/storage/datadir" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -34,12 +35,16 @@ import ( // ServerConfig holds the configuration of etcd as taken from the command line or discovery. type ServerConfig struct { - Name string - DiscoveryURL string - DiscoveryProxy string - ClientURLs types.URLs - PeerURLs types.URLs - DataDir string + Name string + + EnableV2Discovery bool + DiscoveryURL string + DiscoveryProxy string + DiscoveryCfg v3discovery.DiscoveryConfig + + ClientURLs types.URLs + PeerURLs types.URLs + DataDir string // DedicatedWALDir config will make the etcd to write the WAL to the WALDir // rather than the dataDir/member/wal. DedicatedWALDir string diff --git a/server/embed/config.go b/server/embed/config.go index 46d9df0add6a..c15b2ab8cf43 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -36,6 +36,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery" bolt "go.etcd.io/bbolt" "go.uber.org/multierr" @@ -61,6 +62,11 @@ const ( DefaultGRPCKeepAliveTimeout = 20 * time.Second DefaultDowngradeCheckTime = 5 * time.Second + DefaultDiscoveryDialTimeout = 2 * time.Second + DefaultDiscoveryRequestTimeOut = 5 * time.Second + DefaultDiscoveryKeepAliveTime = 2 * time.Second + DefaultDiscoveryKeepAliveTimeOut = 6 * time.Second + DefaultListenPeerURLs = "http://localhost:2380" DefaultListenClientURLs = "http://localhost:2379" @@ -87,6 +93,8 @@ const ( // It's enabled by default. DefaultStrictReconfigCheck = true + DefaultEnableV2Discovery = true + // maxElectionMs specifies the maximum value of election timeout. // More details are listed in ../Documentation/tuning.md#time-parameters. maxElectionMs = 50000 @@ -216,10 +224,14 @@ type Config struct { DNSCluster string `json:"discovery-srv"` DNSClusterServiceName string `json:"discovery-srv-name"` Dproxy string `json:"discovery-proxy"` - Durl string `json:"discovery"` - InitialCluster string `json:"initial-cluster"` - InitialClusterToken string `json:"initial-cluster-token"` - StrictReconfigCheck bool `json:"strict-reconfig-check"` + + EnableV2Discovery bool `json:"enable-v2-discovery"` + Durl string `json:"discovery"` + DiscoveryCfg v3discovery.DiscoveryConfig + + InitialCluster string `json:"initial-cluster"` + InitialClusterToken string `json:"initial-cluster-token"` + StrictReconfigCheck bool `json:"strict-reconfig-check"` // AutoCompactionMode is either 'periodic' or 'revision'. AutoCompactionMode string `json:"auto-compaction-mode"` @@ -501,6 +513,14 @@ func NewConfig() *Config { ExperimentalMaxLearners: membership.DefaultMaxLearners, V2Deprecation: config.V2_DEPR_DEFAULT, + + EnableV2Discovery: DefaultEnableV2Discovery, + DiscoveryCfg: v3discovery.DiscoveryConfig{ + DialTimeout: DefaultDiscoveryDialTimeout, + RequestTimeOut: DefaultDiscoveryRequestTimeOut, + KeepAliveTime: DefaultDiscoveryKeepAliveTime, + KeepAliveTimeout: DefaultDiscoveryKeepAliveTimeOut, + }, } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) return cfg diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 70484277016f..01933724dbbe 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -175,8 +175,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { MaxWALFiles: cfg.MaxWalFiles, InitialPeerURLsMap: urlsmap, InitialClusterToken: token, + EnableV2Discovery: cfg.EnableV2Discovery, DiscoveryURL: cfg.Durl, DiscoveryProxy: cfg.Dproxy, + DiscoveryCfg: cfg.DiscoveryCfg, NewCluster: cfg.IsNewCluster(), PeerTLSInfo: cfg.PeerTLSInfo, TickMs: cfg.TickMs, @@ -343,6 +345,19 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()), zap.String("discovery-url", sc.DiscoveryURL), zap.String("discovery-proxy", sc.DiscoveryProxy), + + zap.String("discovery-dial-timeout", sc.DiscoveryCfg.DialTimeout.String()), + zap.String("discovery-request-timeout", sc.DiscoveryCfg.RequestTimeOut.String()), + zap.String("discovery-keepalive-time", sc.DiscoveryCfg.KeepAliveTime.String()), + zap.String("discovery-keepalive-timeout", sc.DiscoveryCfg.KeepAliveTimeout.String()), + zap.Bool("discovery-insecure-transport", sc.DiscoveryCfg.InsecureTransport), + zap.Bool("discovery-insecure-skip-tls-verify", sc.DiscoveryCfg.InsecureSkipVerify), + zap.String("discovery-cert", sc.DiscoveryCfg.CertFile), + zap.String("discovery-key", sc.DiscoveryCfg.KeyFile), + zap.String("discovery-cacert", sc.DiscoveryCfg.TrustedCAFile), + zap.String("discovery-user", sc.DiscoveryCfg.User), + zap.Bool("discovery-insecure-discovery", sc.DiscoveryCfg.InsecureDiscovery), + zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()), zap.Int("max-learners", sc.ExperimentalMaxLearners), ) diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 4257f1ba081b..5476622b65df 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -186,11 +186,25 @@ func newConfig() *config { "advertise-client-urls", "List of this member's client URLs to advertise to the public.", ) + fs.BoolVar(&cfg.ec.EnableV2Discovery, "enable-v2-discovery", cfg.ec.EnableV2Discovery, "Enable to bootstrap the cluster using v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8.") fs.StringVar(&cfg.ec.Durl, "discovery", cfg.ec.Durl, "Discovery URL used to bootstrap the cluster.") fs.Var(cfg.cf.fallback, "discovery-fallback", fmt.Sprintf("Valid values include %q", cfg.cf.fallback.Valids())) + fs.DurationVar(&cfg.ec.DiscoveryCfg.DialTimeout, "discovery-dial-timeout", cfg.ec.DiscoveryCfg.DialTimeout, "V3 discovery: dial timeout for client connections.") + fs.DurationVar(&cfg.ec.DiscoveryCfg.RequestTimeOut, "discovery-request-timeout", cfg.ec.DiscoveryCfg.RequestTimeOut, "V3 discovery: timeout for discovery requests (excluding dial timeout).") + fs.DurationVar(&cfg.ec.DiscoveryCfg.KeepAliveTime, "discovery-keepalive-time", cfg.ec.DiscoveryCfg.KeepAliveTime, "V3 discovery: keepalive time for client connections.") + fs.DurationVar(&cfg.ec.DiscoveryCfg.KeepAliveTimeout, "discovery-keepalive-timeout", cfg.ec.DiscoveryCfg.KeepAliveTimeout, "V3 discovery: keepalive timeout for client connections.") + fs.BoolVar(&cfg.ec.DiscoveryCfg.InsecureTransport, "discovery-insecure-transport", false, "V3 discovery: disable transport security for client connections.") + fs.BoolVar(&cfg.ec.DiscoveryCfg.InsecureDiscovery, "discovery-insecure-discovery", true, "V3 discovery: accept insecure SRV records describing discovery service endpoint.") + fs.BoolVar(&cfg.ec.DiscoveryCfg.InsecureSkipVerify, "discovery-insecure-skip-tls-verify", false, "V3 discovery: skip server certificate verification (CAUTION: this option should be enabled only for testing purposes).") + fs.StringVar(&cfg.ec.DiscoveryCfg.CertFile, "discovery-cert", "", "V3 discovery: identify secure client using this TLS certificate file.") + fs.StringVar(&cfg.ec.DiscoveryCfg.KeyFile, "discovery-key", "", "V3 discovery: identify secure client using this TLS key file.") + fs.StringVar(&cfg.ec.DiscoveryCfg.TrustedCAFile, "discovery-cacert", "", "V3 discovery: verify certificates of TLS-enabled secure servers using this CA bundle.") + fs.StringVar(&cfg.ec.DiscoveryCfg.User, "discovery-user", "", "V3 discovery: username[:password] for authentication (prompt if password is not supplied).") + fs.StringVar(&cfg.ec.DiscoveryCfg.Password, "discovery-password", "", "V3 discovery: password for authentication (if this option is used, --user option shouldn't include password).") + fs.StringVar(&cfg.ec.Dproxy, "discovery-proxy", cfg.ec.Dproxy, "HTTP proxy to use for traffic to discovery service.") - fs.StringVar(&cfg.ec.DNSCluster, "discovery-srv", cfg.ec.DNSCluster, "DNS domain used to bootstrap initial cluster.") + fs.StringVar(&cfg.ec.DNSCluster, "discovery-srv", cfg.ec.DNSCluster, "DNS domain used to bootstrap initial cluster or the domain name to query for SRV records describing v3 discovery endpoint.") fs.StringVar(&cfg.ec.DNSClusterServiceName, "discovery-srv-name", cfg.ec.DNSClusterServiceName, "Service name to query when using DNS discovery.") fs.StringVar(&cfg.ec.InitialCluster, "initial-cluster", cfg.ec.InitialCluster, "Initial cluster configuration for bootstrapping.") fs.StringVar(&cfg.ec.InitialClusterToken, "initial-cluster-token", cfg.ec.InitialClusterToken, "Initial cluster token for the etcd cluster during bootstrap.") diff --git a/server/etcdmain/etcd.go b/server/etcdmain/etcd.go index 190364e2781b..255e5f42e4ec 100644 --- a/server/etcdmain/etcd.go +++ b/server/etcdmain/etcd.go @@ -17,6 +17,7 @@ package etcdmain import ( "encoding/json" "fmt" + "net/http" "os" "path/filepath" @@ -34,6 +35,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp" "go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery" "go.etcd.io/etcd/server/v3/proxy/httpproxy" "go.uber.org/zap" @@ -318,7 +320,11 @@ func startProxy(cfg *config) error { if cfg.ec.Durl != "" { var s string - s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy) + if cfg.ec.EnableV2Discovery { + s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy) + } else { + s, err = v3discovery.GetCluster(lg, cfg.ec.Durl, &cfg.ec.DiscoveryCfg) + } if err != nil { return err } diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index b9dea2e3c4e2..339587b6600b 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -104,15 +104,41 @@ Clustering: --advertise-client-urls 'http://localhost:2379' List of this member's client URLs to advertise to the public. The client URLs advertised should be accessible to machines that talk to etcd cluster. etcd client libraries parse these URLs to connect to the cluster. + --enable-v2-discovery 'true' + Enable to bootstrap the cluster using v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8. --discovery '' Discovery URL used to bootstrap the cluster. + --discovery-dial-timeout + V3 discovery: dial timeout for client connections. + --discovery-request-timeout + V3 discovery: timeout for discovery requests (excluding dial timeout). + --discovery-keepalive-time + V3 discovery: keepalive time for client connections. + --discovery-keepalive-timeout + V3 discovery: keepalive timeout for client connections. + --discovery-insecure-transport 'false' + V3 discovery: disable transport security for client connections. + --discovery-insecure-discovery + V3 discovery: accept insecure SRV records describing discovery service endpoint. + --discovery-insecure-skip-tls-verify + V3 discovery: skip server certificate verification (CAUTION: this option should be enabled only for testing purposes). + --discovery-cert + V3 discovery: identify secure client using this TLS certificate file. + --discovery-key + V3 discovery: identify secure client using this TLS key file. + --discovery-cacert + V3 discovery: verify certificates of TLS-enabled secure servers using this CA bundle. + --discovery-user + V3 discovery: username[:password] for authentication (prompt if password is not supplied). + --discovery-password + V3 discovery: password for authentication (if this option is used, --user option shouldn't include password). --discovery-fallback 'proxy' Expected behavior ('exit' or 'proxy') when discovery services fails. "proxy" supports v2 API only. --discovery-proxy '' HTTP proxy to use for traffic to discovery service. --discovery-srv '' - DNS srv domain used to bootstrap the cluster. + DNS srv domain used to bootstrap the cluster or the domain name to query for SRV records describing the v3 discovery endpoint. --discovery-srv-name '' Suffix to the dns srv name queried when bootstrapping. --strict-reconfig-check '` + strconv.FormatBool(embed.DefaultStrictReconfigCheck) + `' diff --git a/server/etcdserver/api/v3discovery/discovery.go b/server/etcdserver/api/v3discovery/discovery.go new file mode 100644 index 000000000000..9879d1008767 --- /dev/null +++ b/server/etcdserver/api/v3discovery/discovery.go @@ -0,0 +1,569 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package v3discovery provides an implementation of the cluster discovery that +// is used by etcd with v3 client. +package v3discovery + +import ( + "context" + "crypto/tls" + "errors" + + "math" + "net/url" + "path" + "sort" + "strconv" + "strings" + "time" + + "go.etcd.io/etcd/client/pkg/v3/transport" + "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/client/v3" + + "github.com/jonboulle/clockwork" + "go.uber.org/zap" +) + +const ( + discoveryPrefix = "/_etcd/registry" +) + +var ( + ErrInvalidURL = errors.New("discovery: invalid peer URL") + ErrBadSizeKey = errors.New("discovery: size key is bad") + ErrSizeNotFound = errors.New("discovery: size key not found") + ErrFullCluster = errors.New("discovery: cluster is full") + ErrTooManyRetries = errors.New("discovery: too many retries") +) + +var ( + // Number of retries discovery will attempt before giving up and error out. + nRetries = uint(math.MaxUint32) + maxExpoentialRetries = uint(8) +) + +type DiscoveryConfig struct { + Url string // --discovery + + ServiceName string // --discovery-srv + InsecureDiscovery bool //--discovery-insecure-discovery + + DialTimeout time.Duration //--discovery-dial-timeout + RequestTimeOut time.Duration //--discovery-request-timeout + KeepAliveTime time.Duration //--discovery-keepalive-time + KeepAliveTimeout time.Duration //--discovery-keepalive-timeout + + InsecureTransport bool //--discovery-insecure-transport + InsecureSkipVerify bool //--discovery-insecure-skip-tls-verify + CertFile string //--discovery-cert + KeyFile string //--discovery-key + TrustedCAFile string //--discovery-cacert + + User string //--discovery-user + Password string //--discovery-password +} + +type memberInfo struct { + peerId string + peerURL string + createRev int64 +} + +type clusterInfo struct { + clusterId string + members []memberInfo +} + +// key prefix for each cluster: "/_etcd/registry/". +func geClusterKeyPrefix(cluster string) string { + return path.Join(discoveryPrefix, cluster) +} + +// key format for cluster size: "/_etcd/registry//_config/size". +func geClusterSizeKey(cluster string) string { + return path.Join(geClusterKeyPrefix(cluster), "_config/size") +} + +// key prefix for each member: "/_etcd/registry//members". +func getMemberKeyPrefix(cluster string) string { + return path.Join(geClusterKeyPrefix(cluster), "members") +} + +// key format for each member: "/_etcd/registry//members/". +func getMemberKey(cluster, memberId string) string { + return path.Join(getMemberKeyPrefix(cluster), memberId) +} + +// GetCluster will connect to the discovery service at the given url and +// retrieve a string describing the cluster +func GetCluster(lg *zap.Logger, dUrl string, cfg *DiscoveryConfig) (cs string, rerr error) { + d, err := newDiscovery(lg, dUrl, cfg, 0) + if err != nil { + return "", err + } + + defer d.close() + defer func() { + if rerr != nil { + d.lg.Error( + "discovery failed to get cluster", + zap.String("cluster", cs), + zap.Error(rerr), + ) + } else { + d.lg.Info( + "discovery got cluster successfully", + zap.String("cluster", cs), + ) + } + }() + + return d.getCluster() +} + +// JoinCluster will connect to the discovery service at the given url, and +// register the server represented by the given id and config to the cluster +func JoinCluster(lg *zap.Logger, durl string, cfg *DiscoveryConfig, id types.ID, config string) (cs string, rerr error) { + d, err := newDiscovery(lg, durl, cfg, id) + if err != nil { + return "", err + } + + defer d.close() + defer func() { + if rerr != nil { + d.lg.Error( + "discovery failed to join cluster", + zap.String("cluster", cs), + zap.Error(rerr), + ) + } else { + d.lg.Info( + "discovery joined cluster successfully", + zap.String("cluster", cs), + ) + } + }() + + return d.joinCluster(config) +} + +type discovery struct { + lg *zap.Logger + cluster string + id types.ID + c *clientv3.Client + retries uint + durl string + + cfg *DiscoveryConfig + + clock clockwork.Clock +} + +func newDiscovery(lg *zap.Logger, durl string, dcfg *DiscoveryConfig, id types.ID) (*discovery, error) { + if lg == nil { + lg = zap.NewNop() + } + u, err := url.Parse(durl) + if err != nil { + return nil, err + } + token := u.Path + u.Path = "" + + lg = lg.With(zap.String("discovery-url", durl)) + cfg, err := newClientCfg(dcfg, u.String(), lg) + if err != nil { + return nil, err + } + + c, err := clientv3.New(*cfg) + if err != nil { + return nil, err + } + return &discovery{ + lg: lg, + cluster: token, + id: id, + c: c, + durl: u.String(), + cfg: dcfg, + clock: clockwork.NewRealClock(), + }, nil +} + +// The following function follows the same logic as etcdctl, refer to +// https://github.com/etcd-io/etcd/blob/f9a8c49c695b098d66a07948666664ea10d01a82/etcdctl/ctlv3/command/global.go#L191-L250 +func newClientCfg(dcfg *DiscoveryConfig, dUrl string, lg *zap.Logger) (*clientv3.Config, error) { + var cfgtls *transport.TLSInfo + + if dcfg.CertFile != "" || dcfg.KeyFile != "" || dcfg.TrustedCAFile != "" || dcfg.ServiceName != "" { + cfgtls = &transport.TLSInfo{ + CertFile: dcfg.CertFile, + KeyFile: dcfg.KeyFile, + TrustedCAFile: dcfg.TrustedCAFile, + ServerName: dcfg.ServiceName, + Logger: lg, + } + if dcfg.InsecureDiscovery { + cfgtls.ServerName = "" + } + } + + cfg := &clientv3.Config{ + Endpoints: []string{dUrl}, + DialTimeout: dcfg.DialTimeout, + DialKeepAliveTime: dcfg.KeepAliveTime, + DialKeepAliveTimeout: dcfg.KeepAliveTimeout, + Username: dcfg.User, + Password: dcfg.Password, + } + + if cfgtls != nil { + if clientTLS, err := cfgtls.ClientConfig(); err == nil { + cfg.TLS = clientTLS + } else { + return nil, err + } + } + + // if key/cert is not given but user wants secure connection, we + // should still setup an empty tls configuration for gRPC to setup + // secure connection. + if cfg.TLS == nil && !dcfg.InsecureTransport { + cfg.TLS = &tls.Config{} + } + + // If the user wants to skip TLS verification then we should set + // the InsecureSkipVerify flag in tls configuration. + if cfg.TLS != nil && dcfg.InsecureSkipVerify { + cfg.TLS.InsecureSkipVerify = true + } + + return cfg, nil +} + +func (d *discovery) getCluster() (string, error) { + cls, clusterSize, rev, err := d.checkCluster() + if err != nil { + if err == ErrFullCluster { + return cls.nodesToCluster(clusterSize) + } + return "", err + } + + for cls.Len() < clusterSize { + d.waitNodes(cls, clusterSize, rev) + } + + return cls.nodesToCluster(clusterSize) +} + +func (d *discovery) joinCluster(config string) (string, error) { + _, _, _, err := d.checkCluster() + if err != nil { + return "", err + } + + if err := d.registerSelf(config); err != nil { + return "", err + } + + cls, clusterSize, rev, err := d.checkCluster() + if err != nil { + return "", err + } + + for cls.Len() < clusterSize { + d.waitNodes(cls, clusterSize, rev) + } + + return cls.nodesToCluster(clusterSize) +} + +func (d *discovery) getClusterSize() (int, error) { + configKey := geClusterSizeKey(d.cluster) + ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RequestTimeOut) + defer cancel() + + resp, err := d.c.Get(ctx, configKey) + if err != nil { + d.lg.Warn( + "failed to get cluster size from discovery service", + zap.String("clusterSizeKey", configKey), + zap.Error(err), + ) + return 0, err + } + + if len(resp.Kvs) == 0 { + return 0, ErrSizeNotFound + } + + clusterSize, err := strconv.ParseInt(string(resp.Kvs[0].Value), 10, 0) + if err != nil || clusterSize <= 0 { + return 0, ErrBadSizeKey + } + + return int(clusterSize), nil +} + +func (d *discovery) getClusterMembers() (*clusterInfo, int64, error) { + membersKeyPrefix := getMemberKeyPrefix(d.cluster) + ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RequestTimeOut) + defer cancel() + + resp, err := d.c.Get(ctx, membersKeyPrefix, clientv3.WithPrefix()) + if err != nil { + d.lg.Warn( + "failed to get cluster members from discovery service", + zap.String("membersKeyPrefix", membersKeyPrefix), + zap.Error(err), + ) + return nil, 0, err + } + + cls := &clusterInfo{clusterId: d.cluster} + for _, kv := range resp.Kvs { + mKey := strings.TrimSpace(string(kv.Key)) + mValue := strings.TrimSpace(string(kv.Value)) + + if err := cls.add(mKey, mValue, kv.CreateRevision); err != nil { + d.lg.Warn( + err.Error(), + zap.String("memberKey", mKey), + zap.String("memberInfo", mValue), + ) + } else { + d.lg.Info( + "found peer from discovery service", + zap.String("memberKey", mKey), + zap.String("memberInfo", mValue), + ) + } + } + + return cls, resp.Header.Revision, nil +} + +func (d *discovery) checkClusterRetry() (*clusterInfo, int, int64, error) { + if d.retries < nRetries { + d.logAndBackoffForRetry("cluster status check") + return d.checkCluster() + } + return nil, 0, 0, ErrTooManyRetries +} + +func (d *discovery) checkCluster() (*clusterInfo, int, int64, error) { + clusterSize, err := d.getClusterSize() + if err != nil { + if err == ErrSizeNotFound || err == ErrBadSizeKey { + return nil, 0, 0, err + } + + return d.checkClusterRetry() + } + + cls, rev, err := d.getClusterMembers() + if err != nil { + return d.checkClusterRetry() + } + + // find self position + memberSelfId := getMemberKey(d.cluster, d.id.String()) + idx := 0 + for _, m := range cls.members { + if m.peerId == memberSelfId { + break + } + if idx >= clusterSize-1 { + return cls, clusterSize, rev, ErrFullCluster + } + idx++ + } + return cls, clusterSize, rev, nil +} + +func (d *discovery) registerSelfRetry(contents string) error { + if d.retries < nRetries { + d.logAndBackoffForRetry("register member itself") + return d.registerSelf(contents) + } + return ErrTooManyRetries +} + +func (d *discovery) registerSelf(contents string) error { + ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RequestTimeOut) + memberKey := getMemberKey(d.cluster, d.id.String()) + _, err := d.c.Put(ctx, memberKey, contents) + cancel() + + if err != nil { + d.lg.Warn( + "failed to register members itself to the discovery service", + zap.String("memberKey", memberKey), + zap.Error(err), + ) + return d.registerSelfRetry(contents) + } + + d.lg.Info( + "register member itself successfully", + zap.String("memberKey", memberKey), + zap.String("memberInfo", contents), + ) + + return nil +} + +func (d *discovery) waitNodes(cls *clusterInfo, clusterSize int, rev int64) { + // watch from the next revision + membersKeyPrefix := getMemberKeyPrefix(d.cluster) + w := d.c.Watch(context.Background(), membersKeyPrefix, clientv3.WithPrefix(), clientv3.WithRev(rev+1)) + + d.lg.Info( + "waiting for peers from discovery service", + zap.Int("clusterSize", clusterSize), + zap.Int("found-peers", cls.Len()), + ) + + // waiting for peers until all needed peers are returned + for wresp := range w { + for _, ev := range wresp.Events { + mKey := strings.TrimSpace(string(ev.Kv.Key)) + mValue := strings.TrimSpace(string(ev.Kv.Value)) + + if err := cls.add(mKey, mValue, ev.Kv.CreateRevision); err != nil { + d.lg.Warn( + err.Error(), + zap.String("memberKey", mKey), + zap.String("memberInfo", mValue), + ) + } else { + d.lg.Info( + "found peer from discovery service", + zap.String("memberKey", mKey), + zap.String("memberInfo", mValue), + ) + } + } + + if cls.Len() >= clusterSize { + break + } + } + + d.lg.Info( + "found all needed peers from discovery service", + zap.Int("clusterSize", clusterSize), + zap.Int("found-peers", cls.Len()), + ) +} + +func (d *discovery) logAndBackoffForRetry(step string) { + d.retries++ + // logAndBackoffForRetry stops exponential backoff when the retries are + // more than maxExpoentialRetries and is set to a constant backoff afterward. + retries := d.retries + if retries > maxExpoentialRetries { + retries = maxExpoentialRetries + } + retryTimeInSecond := time.Duration(0x1< clusterSize { + peerURLs = peerURLs[:clusterSize] + } + + us := strings.Join(peerURLs, ",") + _, err := types.NewURLsMap(us) + if err != nil { + return us, ErrInvalidURL + } + + return us, nil +} + +func (cls *clusterInfo) nodesToPeerURLs() []string { + var peerURLs []string + for _, peer := range cls.members { + peerURLs = append(peerURLs, peer.peerURL) + } + return peerURLs +} diff --git a/server/etcdserver/api/v3discovery/discovery_test.go b/server/etcdserver/api/v3discovery/discovery_test.go new file mode 100644 index 000000000000..6ae0bc11ee58 --- /dev/null +++ b/server/etcdserver/api/v3discovery/discovery_test.go @@ -0,0 +1 @@ +package v3discovery diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 0c1297afee50..76b888ec3e76 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -40,6 +40,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery" "go.etcd.io/etcd/server/v3/etcdserver/cindex" serverstorage "go.etcd.io/etcd/server/v3/storage" "go.etcd.io/etcd/server/v3/storage/backend" @@ -328,7 +329,11 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (* } if cfg.ShouldDiscover() { var str string - str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) + if cfg.EnableV2Discovery { + str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) + } else { + str, err = v3discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, &cfg.DiscoveryCfg, m.ID, cfg.InitialPeerURLsMap.String()) + } if err != nil { return nil, &DiscoveryError{Op: "join", Err: err} } diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index f7fc08677f8f..c97e79d41aae 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -609,6 +609,8 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS) clientScheme := SchemeFromTLSInfo(mcfg.ClientTLS) + m.EnableV2Discovery = embed.DefaultEnableV2Discovery + pln := newLocalListener(t) m.PeerListeners = []net.Listener{pln} m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})