Skip to content

Commit

Permalink
Fix storing compaction hash with multiple ongoing compaction requests
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Jun 1, 2024
1 parent 2f216dc commit c87d197
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions server/storage/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,22 @@ func (s *store) checkPrevCompactionCompleted() bool {
defer tx.RUnlock()
scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx)
finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx)
return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
completed := scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
s.lg.Info("check prev compaction completed", zap.Bool("completed", completed), zap.Int64("scheduled-compact", scheduledCompact), zap.Int64("finished-compaction", finishedCompact))
return completed
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) <-chan struct{} {
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) <-chan struct{} {
ch := make(chan struct{})
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
if ctx.Err() != nil {
s.compactBarrier(ctx, ch)
return
}
tx := s.b.ReadTx()
tx.RLock()
finishedCompact, _ := UnsafeReadFinishedCompact(tx)
tx.RUnlock()
hash, err := s.scheduleCompaction(rev, prevCompactRev)
if err != nil {
s.lg.Warn("Failed compaction", zap.Error(err))
Expand All @@ -242,7 +248,7 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevC
}
// Only store the hash value if the previous hash is completed, i.e. this compaction
// hashes every revision from last compaction. For more details, see #15919.
if prevCompactionCompleted {
if finishedCompact == prevCompactRev {
s.hashes.Store(hash)
} else {
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
Expand All @@ -256,18 +262,16 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevC
}

func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
if err != nil {
return ch, err
}

return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted), nil
return s.compact(traceutil.TODO(), rev, prevCompactRev), nil
}

func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock()
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision")
if err != nil {
Expand All @@ -276,7 +280,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
}
s.mu.Unlock()

return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted), nil
return s.compact(trace, rev, prevCompactRev), nil
}

func (s *store) Commit() {
Expand Down

0 comments on commit c87d197

Please sign in to comment.