Skip to content

Commit

Permalink
Revert "lll-fixer can fix comments already, making errors 78->58"
Browse files Browse the repository at this point in the history
to use a shorter len and version that handles litterals

This reverts commit 8b54291.
  • Loading branch information
ldemailly committed Mar 21, 2024
1 parent 8b54291 commit 95e5564
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 30 deletions.
30 changes: 10 additions & 20 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ var doNotProcessChannels = map[string]time.Time{}

func CheckError(err string) (retryable bool, pause bool, description string) {
// Special case for channel_not_found, we don't want to retry this one right away.
// We are making it a 'soft failure' so that we don't keep retrying it for a period of time for any message that is
// sent to a channel that doesn't exist.
// We are making it a 'soft failure' so that we don't keep retrying it for a period of time for any message that is sent to a channel that doesn't exist.
if err == "channel_not_found" {
return true, true, "Channel not found"
}
Expand Down Expand Up @@ -125,8 +124,7 @@ func (s *SlackClient) PostMessage(request SlackPostMessageRequest, url string, t

// Charset is required to remove warnings from Slack. Maybe it's nice to have it configurable. /shrug
req.Header.Set("Content-Type", "application/json; charset=utf-8")
// Documentation says that you are allowed the POST the token instead, however that does simply not work. Hence why we
// are using the Authorization header.
// Documentation says that you are allowed the POST the token instead, however that does simply not work. Hence why we are using the Authorization header.
req.Header.Set("Authorization", "Bearer "+token)

resp, err := s.client.Do(req)
Expand Down Expand Up @@ -168,27 +166,22 @@ func (app *App) Shutdown() {
//nolint:gocognit // but could probably use a refactor.
func (app *App) processQueue(ctx context.Context, maxRetries int, initialBackoff time.Duration, burst int, slackRequestRate time.Duration) {
// This is the rate limiter, which will block until it is allowed to continue on r.Wait(ctx).
// I kept the rate at 1 per second, as doing more than that will cause Slack to reject the messages anyways. We can
// burst however.
// Do note that this is best effort, in case of failures, we will exponentially backoff and retry, which will cause the
// rate to be lower than 1 per second due to obvious reasons.
// I kept the rate at 1 per second, as doing more than that will cause Slack to reject the messages anyways. We can burst however.
// Do note that this is best effort, in case of failures, we will exponentially backoff and retry, which will cause the rate to be lower than 1 per second due to obvious reasons.
r := rate.NewLimiter(rate.Every(slackRequestRate), burst)

for {
select {
case msg, ok := <-app.slackQueue:
// We do check if the channel is closed, but its important to note is that the channel will be closed when the queue is
// empty and the Shutdown() is called.
// Simply calling close(app.slackQueue) will not close the channel, it will only prevent any more messages from being
// sent to the channel.
// We do check if the channel is closed, but its important to note is that the channel will be closed when the queue is empty and the Shutdown() is called.
// Simply calling close(app.slackQueue) will not close the channel, it will only prevent any more messages from being sent to the channel.
// Only once the channel is empty, will it be closed.
if !ok {
return
}
log.S(log.Debug, "Got message from queue", log.Any("message", msg))

// Rate limiter was initially before fetching a message from the queue, but that caused problems by indefinitely
// looping even if there was no message in the queue.
// Rate limiter was initially before fetching a message from the queue, but that caused problems by indefinitely looping even if there was no message in the queue.
// On shutdown, it would cancel the context, even if the queue was stopped (thus no messages would even come in).
err := r.Wait(ctx)
if err != nil {
Expand All @@ -201,12 +194,10 @@ func (app *App) processQueue(ctx context.Context, maxRetries int, initialBackoff

retryCount := 0
for {
// Check if the channel is in the doNotProcessChannels map, if it is, check if it's been more than 15 minutes since we
// last tried to send a message to it.
// Check if the channel is in the doNotProcessChannels map, if it is, check if it's been more than 15 minutes since we last tried to send a message to it.
if (doNotProcessChannels[msg.Channel] != time.Time{}) {
if time.Since(doNotProcessChannels[msg.Channel]) >= 15*time.Minute {
// Remove the channel from the map, so that we can process it again. If the channel isn't created in the meantime, we
// will just add it again.
// Remove the channel from the map, so that we can process it again. If the channel isn't created in the meantime, we will just add it again.
delete(doNotProcessChannels, msg.Channel)
} else {
log.S(log.Info, "Channel is on the doNotProcess list, not trying to post this message", log.String("channel", msg.Channel))
Expand Down Expand Up @@ -257,8 +248,7 @@ func (app *App) processQueue(ctx context.Context, maxRetries int, initialBackoff
}
}

// Need to call this to clean up the wg, which is vital for the shutdown to work (so that we process all the messages
// in the queue before exiting cleanly)
// Need to call this to clean up the wg, which is vital for the shutdown to work (so that we process all the messages in the queue before exiting cleanly)
app.wg.Done()

case <-ctx.Done():
Expand Down
3 changes: 1 addition & 2 deletions app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ func TestApp_TestSlackRequestRate(t *testing.T) {
diffInSeconds := endTime.Sub(startTime).Seconds()
log.S(log.Debug, "diffInSeconds", log.Float64("diffInSeconds", diffInSeconds))

// The sum is always: (Amount of messages * RPS * delay in seconds) minus burst. In this case 20 * 4 * 1 - 10 = 5
// seconds.
// The sum is always: (Amount of messages * RPS * delay in seconds) minus burst. In this case 20 * 4 * 1 - 10 = 5 seconds.
if math.RoundToEven(diffInSeconds) != 5 {
t.Fatal("Expected processQueue finish the job in ~5 seconds, give or take. Got", diffInSeconds)
}
Expand Down
6 changes: 2 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ func (app *App) handleRequest(w http.ResponseWriter, r *http.Request) {
maxQueueSize := int(float64(cap(app.slackQueue)) * 0.9)
// Reject requests if the queue is almost full
// If the channel is full, the request will block until there is space in the channel.
// Ideally we don't reject at 90%, but initially after some tests I got blocked. So I decided to be a bit more
// conservative.
// Ideally we don't reject at 90%, but initially after some tests I got blocked. So I decided to be a bit more conservative.
// ToDo: Fix this behavior so we can reach 100% channel size without problems.
if len(app.slackQueue) >= maxQueueSize {
log.S(log.Warning, "Queue is almost full, returning StatusServiceUnavailable", log.Int("queueSize", len(app.slackQueue)))
Expand Down Expand Up @@ -101,8 +100,7 @@ func (app *App) handleRequest(w http.ResponseWriter, r *http.Request) {
request.Channel = app.channelOverride
}

// Add a counter to the wait group, this is important to wait for all the messages to be processed before shutting down
// the server.
// Add a counter to the wait group, this is important to wait for all the messages to be processed before shutting down the server.
app.wg.Add(1)
// Send the message to the slackQueue to be processed
app.slackQueue <- request
Expand Down
6 changes: 2 additions & 4 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ func TestStartServer(t *testing.T) {
}()

// Give server some time to start
// If you are running on a non-priviledged account, and get a popup asking for permission to accept incoming
// connections, you can increase this time...
// If you are running on a non-priviledged account, and get a popup asking for permission to accept incoming connections, you can increase this time...
time.Sleep(1 * time.Second)

// Make a sample request to ensure server is running
Expand All @@ -125,8 +124,7 @@ func TestStartServer(t *testing.T) {
t.Fatal("Expected error making POST request after server shut down, got none")
}

// to avoid confusion; we _are_ expecting a err, but for the edge-case it doesn't (resp != nil), we should close the
// body.
// to avoid confusion; we _are_ expecting a err, but for the edge-case it doesn't (resp != nil), we should close the body.
if secondResp != nil {
defer secondResp.Body.Close()
}
Expand Down

0 comments on commit 95e5564

Please sign in to comment.