Skip to content

Commit

Permalink
Merge pull request #19196 from gangli113/main
Browse files Browse the repository at this point in the history
migrate flag experimental-compaction-batch-limit to use compaction-batch-limit
  • Loading branch information
ahrtr authored Jan 17, 2025
2 parents ab819b5 + 1ef973c commit 9da01a8
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 41 deletions.
11 changes: 9 additions & 2 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ var (
experimentalNonBoolFlagMigrationMap = map[string]string{
"experimental-compact-hash-check-time": "compact-hash-check-time",
"experimental-corrupt-check-time": "corrupt-check-time",
"experimental-compaction-batch-limit": "compaction-batch-limit",
}
)

Expand Down Expand Up @@ -387,7 +388,11 @@ type Config struct {
// Deprecated in v3.6.
// TODO: Delete in v3.7
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
// ExperimentalCompactionBatchLimit Sets the maximum revisions deleted in each compaction batch.
// Deprecated in v3.6 and will be decommissioned in v3.7.
// TODO: Delete in v3.7
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
CompactionBatchLimit int `json:"compaction-batch-limit"`
// ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop.
ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
Expand Down Expand Up @@ -788,7 +793,9 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {
fs.BoolVar(&cfg.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7
fs.BoolVar(&cfg.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
fs.IntVar(&cfg.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
// TODO: delete in v3.7
fs.IntVar(&cfg.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch. Deprecated in v3.6 and will be decommissioned in v3.7. Use --compaction-batch-limit instead.")
fs.IntVar(&cfg.CompactionBatchLimit, "compaction-batch-limit", cfg.CompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.")
fs.DurationVar(&cfg.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status checks.")
Expand Down
2 changes: 1 addition & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
Expand Down
5 changes: 5 additions & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ var (
"experimental-compact-hash-check-time": "--experimental-compact-hash-check-time is deprecated in 3.6 and will be decommissioned in 3.7. Use '--compact-hash-check-time' instead.",
"experimental-txn-mode-write-with-shared-buffer": "--experimental-txn-mode-write-with-shared-buffer is deprecated in v3.6 and will be decommissioned in v3.7. Use '--feature-gates=TxnModeWriteWithSharedBuffer=true' instead.",
"experimental-corrupt-check-time": "--experimental-corrupt-check-time is deprecated in v3.6 and will be decommissioned in v3.7. Use '--corrupt-check-time' instead.",
"experimental-compaction-batch-limit": "--experimental-compaction-batch-limit is deprecated in v3.6 and will be decommissioned in v3.7. Use '--compaction-batch-limit' instead.",
}
)

Expand Down Expand Up @@ -179,6 +180,10 @@ func (cfg *config) parse(arguments []string) error {
cfg.ec.CorruptCheckTime = cfg.ec.ExperimentalCorruptCheckTime
}

if cfg.ec.FlagsExplicitlySet["experimental-compaction-batch-limit"] {
cfg.ec.CompactionBatchLimit = cfg.ec.ExperimentalCompactionBatchLimit
}

// `V2Deprecation` (--v2-deprecation) is deprecated and scheduled for removal in v3.8. The default value is enforced, ignoring user input.
cfg.ec.V2Deprecation = cconfig.V2DeprDefault

Expand Down
123 changes: 93 additions & 30 deletions server/etcdmain/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,6 @@ func TestCompactHashCheckTimeFlagMigration(t *testing.T) {
name string
compactHashCheckTime string
experimentalCompactHashCheckTime string
useConfigFile bool
expectErr bool
expectedCompactHashCheckTime time.Duration
}{
Expand Down Expand Up @@ -536,19 +535,7 @@ func TestCompactHashCheckTimeFlagMigration(t *testing.T) {
yc.ExperimentalCompactHashCheckTime = experimentalCompactHashCheckTime
}

b, err := yaml.Marshal(&yc)
if err != nil {
t.Fatal(err)
}

tmpfile := mustCreateCfgFile(t, b)
defer os.Remove(tmpfile.Name())

cfgFromCmdLine := newConfig()
errFromCmdLine := cfgFromCmdLine.parse(cmdLineArgs)

cfgFromFile := newConfig()
errFromFile := cfgFromFile.parse([]string{fmt.Sprintf("--config-file=%s", tmpfile.Name())})
cfgFromCmdLine, errFromCmdLine, cfgFromFile, errFromFile := generateCfgsFromFileAndCmdLine(t, yc, cmdLineArgs)

if tc.expectErr {
if errFromCmdLine == nil || errFromFile == nil {
Expand All @@ -557,7 +544,7 @@ func TestCompactHashCheckTimeFlagMigration(t *testing.T) {
return
}
if errFromCmdLine != nil || errFromFile != nil {
t.Fatal(err)
t.Fatal("error parsing config")
}

if cfgFromCmdLine.ec.CompactHashCheckTime != tc.expectedCompactHashCheckTime {
Expand All @@ -578,7 +565,6 @@ func TestCorruptCheckTimeFlagMigration(t *testing.T) {
name string
corruptCheckTime string
experimentalCorruptCheckTime string
useConfigFile bool
expectErr bool
expectedCorruptCheckTime time.Duration
}{
Expand Down Expand Up @@ -625,19 +611,7 @@ func TestCorruptCheckTimeFlagMigration(t *testing.T) {
yc.ExperimentalCorruptCheckTime = experimentalCorruptCheckTime
}

b, err := yaml.Marshal(&yc)
if err != nil {
t.Fatal(err)
}

tmpfile := mustCreateCfgFile(t, b)
defer os.Remove(tmpfile.Name())

cfgFromCmdLine := newConfig()
errFromCmdLine := cfgFromCmdLine.parse(cmdLineArgs)

cfgFromFile := newConfig()
errFromFile := cfgFromFile.parse([]string{fmt.Sprintf("--config-file=%s", tmpfile.Name())})
cfgFromCmdLine, errFromCmdLine, cfgFromFile, errFromFile := generateCfgsFromFileAndCmdLine(t, yc, cmdLineArgs)

if tc.expectErr {
if errFromCmdLine == nil || errFromFile == nil {
Expand All @@ -646,7 +620,7 @@ func TestCorruptCheckTimeFlagMigration(t *testing.T) {
return
}
if errFromCmdLine != nil || errFromFile != nil {
t.Fatal(err)
t.Fatal("error parsing config")
}

if cfgFromCmdLine.ec.CorruptCheckTime != tc.expectedCorruptCheckTime {
Expand All @@ -659,6 +633,92 @@ func TestCorruptCheckTimeFlagMigration(t *testing.T) {
}
}

// TestCompactionBatchLimitFlagMigration tests the migration from
// --experimental-compaction-batch-limit to --compaction-batch-limit
// TODO: delete in v3.7
func TestCompactionBatchLimitFlagMigration(t *testing.T) {
testCases := []struct {
name string
compactionBatchLimit int
experimentalCompactionBatchLimit int
expectErr bool
expectedCompactionBatchLimit int
}{
{
name: "cannot set both experimental flag and non experimental flag",
compactionBatchLimit: 1,
experimentalCompactionBatchLimit: 2,
expectErr: true,
},
{
name: "can set experimental flag",
experimentalCompactionBatchLimit: 2,
expectedCompactionBatchLimit: 2,
},
{
name: "can set non experimental flag",
compactionBatchLimit: 1,
expectedCompactionBatchLimit: 1,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cmdLineArgs := []string{}
yc := struct {
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit,omitempty"`
CompactionBatchLimit int `json:"compaction-batch-limit,omitempty"`
}{}

if tc.compactionBatchLimit != 0 {
cmdLineArgs = append(cmdLineArgs, fmt.Sprintf("--compaction-batch-limit=%d", tc.compactionBatchLimit))
yc.CompactionBatchLimit = tc.compactionBatchLimit
}

if tc.experimentalCompactionBatchLimit != 0 {
cmdLineArgs = append(cmdLineArgs, fmt.Sprintf("--experimental-compaction-batch-limit=%d", tc.experimentalCompactionBatchLimit))
yc.ExperimentalCompactionBatchLimit = tc.experimentalCompactionBatchLimit
}

cfgFromCmdLine, errFromCmdLine, cfgFromFile, errFromFile := generateCfgsFromFileAndCmdLine(t, yc, cmdLineArgs)

if tc.expectErr {
if errFromCmdLine == nil || errFromFile == nil {
t.Fatal("expect parse error")
}
return
}
if errFromCmdLine != nil || errFromFile != nil {
t.Fatal("error parsing config")
}

if cfgFromCmdLine.ec.CompactionBatchLimit != tc.expectedCompactionBatchLimit {
t.Errorf("expected CorruptCheckTime=%v, got %v", tc.expectedCompactionBatchLimit, cfgFromCmdLine.ec.CompactionBatchLimit)
}
if cfgFromFile.ec.CompactionBatchLimit != tc.expectedCompactionBatchLimit {
t.Errorf("expected CorruptCheckTime=%v, got %v", tc.expectedCompactionBatchLimit, cfgFromFile.ec.CompactionBatchLimit)
}
})
}
}

// TODO delete in v3.7
func generateCfgsFromFileAndCmdLine(t *testing.T, yc any, cmdLineArgs []string) (*config, error, *config, error) {
b, err := yaml.Marshal(&yc)
if err != nil {
t.Fatal(err)
}

tmpfile := mustCreateCfgFile(t, b)
defer os.Remove(tmpfile.Name())

cfgFromCmdLine := newConfig()
errFromCmdLine := cfgFromCmdLine.parse(cmdLineArgs)

cfgFromFile := newConfig()
errFromFile := cfgFromFile.parse([]string{fmt.Sprintf("--config-file=%s", tmpfile.Name())})
return cfgFromCmdLine, errFromCmdLine, cfgFromFile, errFromFile
}

func mustCreateCfgFile(t *testing.T, b []byte) *os.File {
tmpfile, err := os.CreateTemp("", "servercfg")
if err != nil {
Expand Down Expand Up @@ -753,6 +813,7 @@ func TestConfigFileDeprecatedOptions(t *testing.T) {
ExperimentalCompactHashCheckTime time.Duration `json:"experimental-compact-hash-check-time,omitempty"`
ExperimentalWarningUnaryRequestDuration time.Duration `json:"experimental-warning-unary-request-duration,omitempty"`
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time,omitempty"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit,omitempty"`
}

testCases := []struct {
Expand All @@ -772,11 +833,13 @@ func TestConfigFileDeprecatedOptions(t *testing.T) {
ExperimentalCompactHashCheckTime: 2 * time.Minute,
ExperimentalWarningUnaryRequestDuration: time.Second,
ExperimentalCorruptCheckTime: time.Minute,
ExperimentalCompactionBatchLimit: 1,
},
expectedFlags: map[string]struct{}{
"experimental-compact-hash-check-enabled": {},
"experimental-compact-hash-check-time": {},
"experimental-corrupt-check-time": {},
"experimental-compaction-batch-limit": {},
},
},
{
Expand Down
4 changes: 3 additions & 1 deletion server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ Experimental feature:
--experimental-enable-lease-checkpoint 'false'
ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
--experimental-compaction-batch-limit 1000
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch. Deprecated in v3.6 and will be decommissioned in v3.7. Use 'compaction-batch-limit' instead.
--compaction-batch-limit 1000
CompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
--experimental-peer-skip-client-san-verification 'false'
Skip verification of SAN field in client certificate for peer connections.
--experimental-watch-progress-notify-interval '10m'
Expand Down
18 changes: 14 additions & 4 deletions tests/e2e/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,18 @@ func testCompactHashCheckDetectCorruption(t *testing.T, useFeatureGate bool) {
}

func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
testCompactHashCheckDetectCorruptionInterrupt(t, false)
testCompactHashCheckDetectCorruptionInterrupt(t, false, false)
}

func TestCompactHashCheckDetectCorruptionInterruptWithFeatureGate(t *testing.T) {
testCompactHashCheckDetectCorruptionInterrupt(t, true)
testCompactHashCheckDetectCorruptionInterrupt(t, true, false)
}

func testCompactHashCheckDetectCorruptionInterrupt(t *testing.T, useFeatureGate bool) {
func TestCompactHashCheckDetectCorruptionInterruptWithExperimentalFlag(t *testing.T) {
testCompactHashCheckDetectCorruptionInterrupt(t, true, true)
}

func testCompactHashCheckDetectCorruptionInterrupt(t *testing.T, useFeatureGate bool, useExperimentalFlag bool) {
checkTime := time.Second
e2e.BeforeTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
Expand All @@ -325,14 +329,20 @@ func testCompactHashCheckDetectCorruptionInterrupt(t *testing.T, useFeatureGate
} else {
opts = append(opts, e2e.WithCompactHashCheckEnabled(true))
}
var compactionBatchLimit e2e.EPClusterOption
if useExperimentalFlag {
compactionBatchLimit = e2e.WithExperimentalCompactionBatchLimit(1)
} else {
compactionBatchLimit = e2e.WithCompactionBatchLimit(1)
}

cfg := e2e.NewConfig(opts...)
epc, err := e2e.InitEtcdProcessCluster(t, cfg)
require.NoError(t, err)

// Assign a node a very slow compaction speed, so that its compaction can be interrupted.
err = epc.UpdateProcOptions(slowCompactionNodeIndex, t,
e2e.WithCompactionBatchLimit(1),
compactionBatchLimit,
e2e.WithCompactionSleepInterval(1*time.Hour),
)
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ func WithServerFeatureGate(featureName string, val bool) EPClusterOption {
}

func WithCompactionBatchLimit(limit int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.ServerConfig.CompactionBatchLimit = limit }
}

func WithExperimentalCompactionBatchLimit(limit int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.ServerConfig.ExperimentalCompactionBatchLimit = limit }
}

Expand Down
4 changes: 2 additions & 2 deletions tests/robustness/failpoint/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.Et
}
rev = resp.Header.Revision

if !t.multiBatchCompaction || rev > int64(clus.Cfg.ServerConfig.ExperimentalCompactionBatchLimit) {
if !t.multiBatchCompaction || rev > int64(clus.Cfg.ServerConfig.CompactionBatchLimit) {
break
}
time.Sleep(50 * time.Millisecond)
Expand All @@ -99,7 +99,7 @@ func (t triggerCompact) Available(config e2e.EtcdProcessClusterConfig, _ e2e.Etc
// For multiBatchCompaction we need to guarantee that there are enough revisions between two compaction requests.
// With addition of compaction requests to traffic this might be hard if experimental-compaction-batch-limit is too high.
if t.multiBatchCompaction {
return config.ServerConfig.ExperimentalCompactionBatchLimit <= 10
return config.ServerConfig.CompactionBatchLimit <= 10
}
return true
}
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/options/server_config_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func WithSnapshotCount(input ...uint64) e2e.EPClusterOption {

func WithCompactionBatchLimit(input ...int) e2e.EPClusterOption {
return func(c *e2e.EtcdProcessClusterConfig) {
c.ServerConfig.ExperimentalCompactionBatchLimit = input[internalRand.Intn(len(input))]
c.ServerConfig.CompactionBatchLimit = input[internalRand.Intn(len(input))]
}
}

Expand Down

0 comments on commit 9da01a8

Please sign in to comment.