Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid racing in closing Etcd.errc channel #19206

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,18 @@
errc chan error

closeOnce sync.Once
wg sync.WaitGroup

// Etcd.errHandler uses a fan-in pattern to send errors to Etcd.errc.
// To prevent panics, ensure all writes to Etcd.errc complete before closing it.
// sync.WaitGroup (Etcd.errcWg) is used to track inflight writes to Etcd.errc.
// However, the usage pattern of Etcd.errHandler can lead to race conditions when
// increasing the counter happens after Etcd.errcWg.Wait() is unblocked.
// To address this, WaitGroup updates are gated by by Etcd.closingErrc flag within
// Etcd.errHandler using a critical section. Etcd.closingErrc is set to true
// before Etcd.errcWg.Wait(), preventing further WaitGroup updates.
errcWg sync.WaitGroup
errcMu sync.RWMutex
closingErrc bool
}

type peerListener struct {
Expand Down Expand Up @@ -255,6 +266,15 @@

// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
e.closingErrc = false
// The WaitGroup must be initialized with a value of 1 to avoid a race condition.
// This race condition can occur if the first e.errcWg.Add() call in a goroutine happens
// after the main function calls e.errcWg.Wait(). By initializing the WaitGroup to 1,
// we guarantee that the main function will wait for at least one operation to complete
// before continuing.

// See https://pkg.go.dev/sync#WaitGroup.Add for more details on WaitGroup.
e.errcWg.Add(1)

// newly started member ("memberInitialized==false")
// does not need corruption check
Expand Down Expand Up @@ -457,7 +477,13 @@
}
}
if e.errc != nil {
e.wg.Wait()
e.errcMu.Lock()
e.closingErrc = true
e.errcMu.Unlock()

// e.errWg's initial counter is 1 so we need a matching done called here
e.errcWg.Done()
e.errcWg.Wait()
close(e.errc)
}
}
Expand Down Expand Up @@ -872,9 +898,6 @@
}

func (e *Etcd) errHandler(err error) {
e.wg.Add(1)
defer e.wg.Done()

if err != nil {
e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err))
}
Expand All @@ -883,6 +906,16 @@
return
default:
}

e.errcMu.RLock()
if e.closingErrc {
return

Check warning on line 912 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L910-L912

Added lines #L910 - L912 were not covered by tests
}
e.errcWg.Add(1)
e.errcMu.RUnlock()

Check warning on line 915 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L914-L915

Added lines #L914 - L915 were not covered by tests

defer e.errcWg.Done()

Check warning on line 917 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L917

Added line #L917 was not covered by tests

select {
case <-e.stopc:
case e.errc <- err:
Expand Down
Loading