Skip to content

Commit

Permalink
schedule scrub
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Sharshakov <[email protected]>
  • Loading branch information
dsseng committed Nov 30, 2024
1 parent ce8ea36 commit e178d4d
Showing 1 changed file with 38 additions and 41 deletions.
79 changes: 38 additions & 41 deletions internal/app/machined/pkg/controllers/runtime/fs_scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ import (
)

type scrubSchedule struct {
period time.Duration
upcoming time.Time
period time.Duration
timer *time.Timer
}

// FSScrubController watches v1alpha1.Config and schedules filesystem online check tasks.
type FSScrubController struct {
Runtime runtime.Runtime
schedule map[string]scrubSchedule
// When a mountpoint is scheduled to be scrubbed, its path is sent to this channel to be processed in the Run function.
c chan string
}

// Name implements controller.Controller interface.
Expand Down Expand Up @@ -63,38 +65,26 @@ func (ctrl *FSScrubController) Outputs() []controller.Output {

// Run implements controller.Controller interface.
func (ctrl *FSScrubController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
var (
ticker *time.Ticker
tickerC <-chan time.Time
)

tickerStop := func() {
if ticker == nil {
return
stopTimers := func() {
for _, task := range ctrl.schedule {
if task.timer != nil {
task.timer.Stop()
}
}

ticker.Stop()

ticker = nil
tickerC = nil
}

defer tickerStop()

tickerStop()

ticker = time.NewTicker(15 * time.Second)
tickerC = ticker.C
defer stopTimers()

ctrl.schedule = make(map[string]scrubSchedule)
ctrl.c = make(chan string)

for {
select {
case <-ctx.Done():
return nil
case <-tickerC:
if err := ctrl.scrub("/var", []string{}); err != nil {
return fmt.Errorf("error running filesystem scrub: %w", err)
case mountpoint := <-ctrl.c:
if err := ctrl.runScrub(mountpoint, []string{}); err != nil {
logger.Error("!!! scrub !!! error running filesystem scrub", zap.Error(err))
}

continue
Expand All @@ -113,50 +103,57 @@ func (ctrl *FSScrubController) updateSchedule(ctx context.Context, r controller.
return fmt.Errorf("error getting volume status: %w", err)
}

logger.Warn("reading volume status")
volumesStatus.ForEach(func(item *block.VolumeStatus) {
logger.Warn("!!! scrub !!! reading volume status")
for item := range volumesStatus.All() {
vol := item.TypedSpec()

logger.Warn("volume status", zap.Reflect("item", vol))
logger.Warn("!!! scrub !!! volume status", zap.Reflect("volume", vol))

if vol.Phase != block.VolumePhaseReady {
logger.Warn("vol.Phase != block.VolumePhaseReady", zap.Reflect("item", vol))
logger.Warn("!!! scrub !!! vol.Phase != block.VolumePhaseReady", zap.Reflect("item", vol))

return
continue
}

if vol.Filesystem != block.FilesystemTypeXFS {
logger.Warn("vol.Filesystem != block.FilesystemTypeXFS", zap.Reflect("item", vol))
logger.Warn("!!! scrub !!! vol.Filesystem != block.FilesystemTypeXFS", zap.Reflect("item", vol))

return
continue
}

volumeConfig, err := safe.ReaderGetByID[*block.VolumeConfig](ctx, r, item.Metadata().ID())
if err != nil {
logger.Warn("err", zap.Error(err))

return
return fmt.Errorf("!!! scrub !!! error getting volume config: %w", err)
}

mountpoint := volumeConfig.TypedSpec().Mount.TargetPath

if _, ok := ctrl.schedule[mountpoint]; !ok {
per := 10 * time.Second
seconds := rand.Int64N(int64(per.Seconds()))
firstTimeout := time.Duration(rand.Int64N(int64(per.Seconds()))) * time.Second
logger.Warn("!!! scrub !!! firstTimeout", zap.Duration("firstTimeout", firstTimeout))

// When scheduling the first scrub, we use a random time to avoid all scrubs running in a row.
// After the first scrub, we use the period defined in the config.
cb := func() {
logger.Warn("!!! scrub !!! ding", zap.String("path", mountpoint))
ctrl.c <- mountpoint
ctrl.schedule[mountpoint].timer.Reset(ctrl.schedule[mountpoint].period)
}

ctrl.schedule[mountpoint] = scrubSchedule{
period: per,
upcoming: time.Now().Add(time.Duration(seconds * int64(time.Second))),
period: per,
timer: time.AfterFunc(firstTimeout, cb),
}

logger.Warn("scheduled", zap.String("path", mountpoint), zap.Reflect("upcoming", ctrl.schedule[mountpoint].upcoming))
logger.Warn("!!! scrub !!! scheduled", zap.String("path", mountpoint))
}
})
}

return nil
return err
}

func (ctrl *FSScrubController) scrub(mountpoint string, opts []string) error {
func (ctrl *FSScrubController) runScrub(mountpoint string, opts []string) error {
args := []string{"/usr/sbin/xfs_scrub", "-T", "-v"}
args = append(args, opts...)
args = append(args, mountpoint)
Expand Down

0 comments on commit e178d4d

Please sign in to comment.