Skip to content

Commit

Permalink
have flightGroups take some time to allow reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
moshegood committed Feb 5, 2024
1 parent 21866d0 commit d22388d
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions internal/streams/stream_provider_server_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package streams

import (
"net/http"
"os"
"strconv"
"sync"
"time"

"github.com/launchdarkly/ld-relay/v8/internal/sdkauth"

Expand Down Expand Up @@ -109,6 +112,7 @@ func (r *serverSideEnvStreamRepository) Replay(channel, id string) chan eventsou
// getReplayEvent will return a ServerSidePutEvent with all the data needed for a Replay.
func (r *serverSideEnvStreamRepository) getReplayEvent() (eventsource.Event, error) {
data, err, _ := r.flightGroup.Do("getReplayEvent", func() (interface{}, error) {
start := time.Now()
flags, err := r.store.GetAll(ldstoreimpl.Features())

if err != nil {
Expand All @@ -126,7 +130,19 @@ func (r *serverSideEnvStreamRepository) getReplayEvent() (eventsource.Event, err
{Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)},
}

// This call uses a lot of system resources (RAM in particular).
event := MakeServerSidePutEvent(allData)
// So we sleep for a bit to allow a bunch of concurrent calls to
// all make use of this same flightGroup.
delayS := os.Getenv("LD_STREAMING_DELAY_SECONDS")
if delay, err := strconv.Atoi(delayS); err == nil {
if delay > 0 && delay <= 60 {
time.Sleep(time(delay)*time.Second - time.Since(start))

Check failure on line 140 in internal/streams/stream_provider_server_side.go

View workflow job for this annotation

GitHub Actions / Go 1.20.13 / Unit Tests

use of package time not in selector (typecheck)

Check failure on line 140 in internal/streams/stream_provider_server_side.go

View workflow job for this annotation

GitHub Actions / Go 1.20.13 / Unit Tests

use of package time not in selector) (typecheck)

Check failure on line 140 in internal/streams/stream_provider_server_side.go

View workflow job for this annotation

GitHub Actions / Go 1.20.13 / Unit Tests

use of package time not in selector) (typecheck)

Check failure on line 140 in internal/streams/stream_provider_server_side.go

View workflow job for this annotation

GitHub Actions / Go 1.20.13 / Unit Tests

use of package time not in selector) (typecheck)

Check failure on line 140 in internal/streams/stream_provider_server_side.go

View workflow job for this annotation

GitHub Actions / Go 1.20.13 / Benchmarks

use of package time not in selector

Check failure on line 140 in internal/streams/stream_provider_server_side.go

View workflow job for this annotation

GitHub Actions / Go 1.21.6 / Benchmarks

use of package time not in selector

Check failure on line 140 in internal/streams/stream_provider_server_side.go

View workflow job for this annotation

GitHub Actions / Go 1.21.6 / Unit Tests

use of package time not in selector (typecheck)

Check failure on line 140 in internal/streams/stream_provider_server_side.go

View workflow job for this annotation

GitHub Actions / Go 1.21.6 / Unit Tests

use of package time not in selector) (typecheck)

Check failure on line 140 in internal/streams/stream_provider_server_side.go

View workflow job for this annotation

GitHub Actions / Go 1.21.6 / Unit Tests

use of package time not in selector) (typecheck)

Check failure on line 140 in internal/streams/stream_provider_server_side.go

View workflow job for this annotation

GitHub Actions / Go 1.21.6 / Unit Tests

use of package time not in selector) (typecheck)
} else {
r.loggers.Warnf("Ignoring invalid value for LD_STREAMING_DELAY_SECONDS: %s", delayS)
}
}

return event, nil
})

Expand Down

0 comments on commit d22388d

Please sign in to comment.