Skip to content

Commit

Permalink
Merge pull request #2198 from josephschorr/gc-background-ctx
Browse files Browse the repository at this point in the history
Move unlock call to a background context in GC
  • Loading branch information
josephschorr authored Jan 9, 2025
2 parents 31b08ba + e06fd9f commit 7b9f388
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 8 deletions.
25 changes: 23 additions & 2 deletions internal/datastore/common/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,38 @@ func RegisterGCMetrics() error {
// GarbageCollector represents any datastore that supports external garbage
// collection.
type GarbageCollector interface {
// HasGCRun returns true if a garbage collection run has been completed.
HasGCRun() bool

// MarkGCCompleted marks that a garbage collection run has been completed.
MarkGCCompleted()

// ResetGCCompleted resets the state of the garbage collection run.
ResetGCCompleted()

// LockForGCRun attempts to acquire a lock for garbage collection. This lock
// is typically done at the datastore level, to ensure that no other nodes are
// running garbage collection at the same time.
LockForGCRun(ctx context.Context) (bool, error)
UnlockAfterGCRun(ctx context.Context) error

// UnlockAfterGCRun releases the lock after a garbage collection run.
// NOTE: this method does not take a context, as the context used for the
// reset of the GC run can be canceled/timed out and the unlock will still need to happen.
UnlockAfterGCRun() error

// ReadyState returns the current state of the datastore.
ReadyState(context.Context) (datastore.ReadyState, error)

// Now returns the current time from the datastore.
Now(context.Context) (time.Time, error)

// TxIDBefore returns the highest transaction ID before the provided time.
TxIDBefore(context.Context, time.Time) (datastore.Revision, error)

// DeleteBeforeTx deletes all data before the provided transaction ID.
DeleteBeforeTx(ctx context.Context, txID datastore.Revision) (DeletionCounts, error)

// DeleteExpiredRels deletes all relationships that have expired.
DeleteExpiredRels(ctx context.Context) (int64, error)
}

Expand Down Expand Up @@ -196,7 +217,7 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er
}

defer func() {
err := gc.UnlockAfterGCRun(ctx)
err := gc.UnlockAfterGCRun()
if err != nil {
log.Error().
Err(err).
Expand Down
38 changes: 36 additions & 2 deletions internal/datastore/common/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type fakeGC struct {
deleter gcDeleter
metrics gcMetrics
lock sync.RWMutex
wasLocked bool
wasUnlocked bool
}

type gcMetrics struct {
Expand All @@ -39,11 +41,19 @@ func newFakeGC(deleter gcDeleter) fakeGC {
}
}

func (*fakeGC) LockForGCRun(ctx context.Context) (bool, error) {
func (gc *fakeGC) LockForGCRun(ctx context.Context) (bool, error) {
gc.lock.Lock()
defer gc.lock.Unlock()

gc.wasLocked = true
return true, nil
}

func (*fakeGC) UnlockAfterGCRun(ctx context.Context) error {
func (gc *fakeGC) UnlockAfterGCRun() error {
gc.lock.Lock()
defer gc.lock.Unlock()

gc.wasUnlocked = true
return nil
}

Expand Down Expand Up @@ -227,3 +237,27 @@ func TestGCFailureBackoffReset(t *testing.T) {
// the GC enough time to run.
require.Greater(t, gc.GetMetrics().markedCompleteCount, 20, "Next interval was not reset with backoff")
}

func TestGCUnlockOnTimeout(t *testing.T) {
gc := newFakeGC(alwaysErrorDeleter{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
interval := 10 * time.Millisecond
window := 10 * time.Second
timeout := 1 * time.Millisecond

require.Error(t, StartGarbageCollector(ctx, &gc, interval, window, timeout))
}()

time.Sleep(30 * time.Millisecond)
require.False(t, gc.HasGCRun(), "GC should not have run")

gc.lock.Lock()
defer gc.lock.Unlock()

require.True(t, gc.wasLocked, "GC should have been locked")
require.True(t, gc.wasUnlocked, "GC should have been unlocked")
}
4 changes: 2 additions & 2 deletions internal/datastore/mysql/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func (mds *Datastore) LockForGCRun(ctx context.Context) (bool, error) {
return mds.tryAcquireLock(ctx, gcRunLock)
}

func (mds *Datastore) UnlockAfterGCRun(ctx context.Context) error {
return mds.releaseLock(ctx, gcRunLock)
func (mds *Datastore) UnlockAfterGCRun() error {
return mds.releaseLock(context.Background(), gcRunLock)
}

func (mds *Datastore) Now(ctx context.Context) (time.Time, error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/postgres/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func (pgd *pgDatastore) LockForGCRun(ctx context.Context) (bool, error) {
return pgd.tryAcquireLock(ctx, gcRunLock)
}

func (pgd *pgDatastore) UnlockAfterGCRun(ctx context.Context) error {
return pgd.releaseLock(ctx, gcRunLock)
func (pgd *pgDatastore) UnlockAfterGCRun() error {
return pgd.releaseLock(context.Background(), gcRunLock)
}

func (pgd *pgDatastore) HasGCRun() bool {
Expand Down

0 comments on commit 7b9f388

Please sign in to comment.