Skip to content

Commit

Permalink
Feature: Make slack request rate configurable (#42)
Browse files Browse the repository at this point in the history
If multiple pods of this app is deployed, they will not respect the
slack API request rate of 1 RPS. To have multiple pods of this app using
the same slack bot token, we need to have the SlackRequestRPS
configurable.

---------

Co-authored-by: aalur <[email protected]>
  • Loading branch information
ani1311 and aalur authored Mar 13, 2024
1 parent 001cd36 commit d33be52
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 52 deletions.
51 changes: 27 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Many applications posting messages to Slack either overlook Slack's rate limits
By being a 1:1 forwarding proxy, you simply POST to this application instead, and it will get forwarded to Slack.

Furthermore, by adding observability, we can have a much clearer picture of:

- Requests per second
- To which channel?
- Are there failures and at what rate?
Expand All @@ -29,7 +30,6 @@ These type of insights are currently not possible to know via Slack, and only vi

We don't try to 'mock' the Slack API. We make a fair assumption that the message you post to the proxy **is already tested** and meets the API spec. In other words, if you got a new (custom) application where you are testing the API, I would highly recommend you do that to Slack directly. Once you have 'battletested' your implementation, you then simply convert the URL to this proxy and gain out of the box retries and rate limit behaviour with included metrics.


## Features

### SlackProxy Metrics
Expand Down Expand Up @@ -65,7 +65,6 @@ The `slackproxy` service provides several metrics to monitor and gauge the perfo
- Metric: `slackproxy_queue_size`
- Description: The current size of the proxy's queue.


### Queue

Monitor the queue size with the `slackproxy_queue_size` metric. This isn't a persistent queue. If the application crashes abruptly, the queue is lost. However, during a clean application shutdown, the queue processes, given adequate time. If, for instance, there's a prolonged Slack outage or if you face an outage, the queue might be lost. While the queue size is configurable, remember that the processing rate is a maximum of 1 message per second. If the queue consistently reaches its limit, consider horizontal scaling.
Expand All @@ -87,10 +86,10 @@ Permanent errors are logged in detail, including the complete POST request. Conc
- How to run multiple replicas with each their own API key?
- Add some basic sanity check if the basics are part of the request (channel, some body, etc)


## Slack Application manifest

This manifest is required when making an application that can:

- Use a single token
- Post to any (public) channel
- Change it's name
Expand Down Expand Up @@ -121,41 +120,45 @@ settings:

### Required

- `--token` : Bearer token for the Slack API.
- Example: `--token=YOUR_BEARER_TOKEN`
- `--token` : Bearer token for the Slack API.
- Example: `--token=YOUR_BEARER_TOKEN`

### Optional
### Optional

> I would recommend not altering these values until you have a good understanding how it performs for your workload
- `--maxRetries` : Maximum number of retries for posting a message.
- Default: *`3`*
- Example: `--maxRetries=5`
- Default: *`3`*
- Example: `--maxRetries=5`

- `--initialBackoffMs` : Initial backoff in milliseconds for retries.
- Default: *`1000`*
- Example: `--initialBackoffMs=2000`
- Default: *`1000`*
- Example: `--initialBackoffMs=2000`

- `--slackURL` : Slack Post Message API URL.
- Default: *`https://slack.com/api/chat.postMessage`*
- Example: `--slackURL=https://api.slack.com/your-endpoint`
- Default: *`https://slack.com/api/chat.postMessage`*
- Example: `--slackURL=https://api.slack.com/your-endpoint`

- `--queueSize` : Maximum number of messages in the queue.
- Default: *`100`*
- Example: `--queueSize=200`
- Default: *`100`*
- Example: `--queueSize=200`

- `--burst` : Maximum number of burst messages to allow.
- Default: *`3`*
- Example: `--burst=2`
- Default: *`3`*
- Example: `--burst=2`

- `--metricsPort` : Port used for the `/metrics` endpoint
- Default: *`:9090`*
- Example: `--metricsPort :9090`
- Default: *`:9090`*
- Example: `--metricsPort :9090`

- `--applicationPort` : Port used for the application endpoint (where you send your requests to)
- Default: *`:8080`*
- Example: `--applicationPort :8080`

- `--channelOverride` : Override on sending _all_ messages to this defined channel. This is useful for debugging or if you want to force to use a single channel
- Default: *``*
- Example: `--channelOverride #debug-notifications`
- Default: *`:8080`*
- Example: `--applicationPort :8080`

- `--channelOverride` : Override on sending *all* messages to this defined channel. This is useful for debugging or if you want to force to use a single channel
- Default: *``*
- Example: `--channelOverride #debug-notifications`

- `--slackRequestRate` : Request rate for slack requests in milliseconds.
- Default: *`1000`*
- Example: `--slackRequestRate=500`
20 changes: 11 additions & 9 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,14 @@ func (s *SlackClient) PostMessage(request SlackPostMessageRequest, url string, t
return nil
}

func NewApp(queueSize int, httpClient *http.Client, metrics *Metrics, channelOverride string) *App {
func NewApp(queueSize int, httpClient *http.Client, metrics *Metrics, channelOverride, slackPostMessageURL, slackToken string) *App {
return &App{
slackQueue: make(chan SlackPostMessageRequest, queueSize),
messenger: &SlackClient{client: httpClient},
metrics: metrics,
channelOverride: channelOverride,
slackQueue: make(chan SlackPostMessageRequest, queueSize),
messenger: &SlackClient{client: httpClient},
SlackPostMessageURL: slackPostMessageURL,
SlackToken: slackToken,
metrics: metrics,
channelOverride: channelOverride,
}
}

Expand All @@ -162,11 +164,11 @@ func (app *App) Shutdown() {
}

//nolint:gocognit // but could probably use a refactor.
func (app *App) processQueue(ctx context.Context, maxRetries int, initialBackoffMs int, slackPostMessageURL string, tokenFlag string, burst int) {
func (app *App) processQueue(ctx context.Context, maxRetries int, initialBackoff time.Duration, burst int, slackRequestRate time.Duration) {
// This is the rate limiter, which will block until it is allowed to continue on r.Wait(ctx).
// I kept the rate at 1 per second, as doing more than that will cause Slack to reject the messages anyways. We can burst however.
// Do note that this is best effort, in case of failures, we will exponentially backoff and retry, which will cause the rate to be lower than 1 per second due to obvious reasons.
r := rate.NewLimiter(rate.Every(1*time.Second), burst)
r := rate.NewLimiter(rate.Every(slackRequestRate), burst)

for {
select {
Expand Down Expand Up @@ -204,7 +206,7 @@ func (app *App) processQueue(ctx context.Context, maxRetries int, initialBackoff
}
}

err := app.messenger.PostMessage(msg, slackPostMessageURL, tokenFlag)
err := app.messenger.PostMessage(msg, app.SlackPostMessageURL, app.SlackToken)
//nolint:nestif // but simplify by not having else at least.
if err != nil {
retryable, pause, description := CheckError(err.Error())
Expand Down Expand Up @@ -232,7 +234,7 @@ func (app *App) processQueue(ctx context.Context, maxRetries int, initialBackoff

if retryCount < maxRetries {
retryCount++
backoffDuration := time.Duration(initialBackoffMs*int(math.Pow(2, float64(retryCount-1)))) * time.Millisecond
backoffDuration := initialBackoff * time.Duration(math.Pow(2, float64(retryCount-1)))
time.Sleep(backoffDuration)
} else {
log.S(log.Error, "Message failed after retries", log.Any("err", err), log.Int("retryCount", retryCount))
Expand Down
67 changes: 57 additions & 10 deletions app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ func TestApp_singleBurst_Success(t *testing.T) {

messenger := &MockSlackMessenger{}
app := &App{
slackQueue: make(chan SlackPostMessageRequest, 2),
messenger: messenger,
metrics: metrics,
slackQueue: make(chan SlackPostMessageRequest, 2),
messenger: messenger,
metrics: metrics,
SlackPostMessageURL: "http://mock.url",
SlackToken: "mockToken",
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

go app.processQueue(ctx, 3, 1000, "http://mock.url", "mockToken", 1)
go app.processQueue(ctx, 3, 1000, 1, 1000)

startTime := time.Now()

Expand All @@ -59,7 +61,7 @@ func TestApp_singleBurst_Success(t *testing.T) {
diffInSeconds := endTime.Sub(startTime).Seconds()
log.S(log.Debug, "diffInSeconds", log.Float64("diffInSeconds", diffInSeconds))

// The sum is always: (Amount of messages * delay in seconds) minus burst. In this case 10 * 1 - 1 = 9 seconds.
// The sum is always: (Amount of messages * RPS * delay in seconds) minus burst. In this case 20 * 1 - 10 = 10 seconds.
if math.RoundToEven(diffInSeconds) != 9 {
t.Fatal("Expected processQueue finish the job in ~9 seconds, give or take. Got", diffInSeconds)
}
Expand All @@ -71,15 +73,17 @@ func TestApp_MultiBurst_Success(t *testing.T) {

messenger := &MockSlackMessenger{}
app := &App{
slackQueue: make(chan SlackPostMessageRequest, 2),
messenger: messenger,
metrics: metrics,
slackQueue: make(chan SlackPostMessageRequest, 2),
messenger: messenger,
metrics: metrics,
SlackPostMessageURL: "http://mock.url",
SlackToken: "mockToken",
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

go app.processQueue(ctx, 3, 1000, "http://mock.url", "mockToken", 10)
go app.processQueue(ctx, 3, 1000, 10, 1000)

startTime := time.Now()

Expand All @@ -100,8 +104,51 @@ func TestApp_MultiBurst_Success(t *testing.T) {
diffInSeconds := endTime.Sub(startTime).Seconds()
log.S(log.Debug, "diffInSeconds", log.Float64("diffInSeconds", diffInSeconds))

// The sum is always: (Amount of messages * delay in seconds) minus burst. In this case 20 * 1 - 10 = 10 seconds.
// The sum is always: (Amount of messages * RPS * delay in seconds) minus burst. In this case 20 * 1 - 10 = 10 seconds.
if math.RoundToEven(diffInSeconds) != 10 {
t.Fatal("Expected processQueue finish the job in ~9 seconds, give or take. Got", diffInSeconds)
}
}

func TestApp_TestSlackRequestRate(t *testing.T) {
r := prometheus.NewRegistry()
metrics := NewMetrics(r)

messenger := &MockSlackMessenger{}
app := &App{
slackQueue: make(chan SlackPostMessageRequest, 2),
messenger: messenger,
metrics: metrics,
SlackPostMessageURL: "http://mock.url",
SlackToken: "mockToken",
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

go app.processQueue(ctx, 3, 1000, 1, 250)

startTime := time.Now()

count := 20
for i := 0; i < count; i++ {
app.wg.Add(1)
app.slackQueue <- SlackPostMessageRequest{
Channel: "mockChannel",
}
}

log.S(log.Debug, "Posting messages done")

app.wg.Wait()

endTime := time.Now()

diffInSeconds := endTime.Sub(startTime).Seconds()
log.S(log.Debug, "diffInSeconds", log.Float64("diffInSeconds", diffInSeconds))

// The sum is always: (Amount of messages * RPS * delay in seconds) minus burst. In this case 20 * 4 * 1 - 10 = 5 seconds.
if math.RoundToEven(diffInSeconds) != 5 {
t.Fatal("Expected processQueue finish the job in ~5 seconds, give or take. Got", diffInSeconds)
}
}
22 changes: 13 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ type SlackPostMessageRequest struct {
}

type App struct {
slackQueue chan SlackPostMessageRequest
wg sync.WaitGroup
messenger SlackMessenger
metrics *Metrics
channelOverride string
slackQueue chan SlackPostMessageRequest
wg sync.WaitGroup
messenger SlackMessenger
SlackPostMessageURL string
SlackToken string
metrics *Metrics
channelOverride string
}

// podIndex retrieves the index of the current pod based on the HOSTNAME environment variable.
Expand Down Expand Up @@ -91,18 +93,20 @@ func getSlackTokens() []string {
func main() {
var (
maxRetries = 2
initialBackoffMs = 1000
initialBackoff = 1000 * time.Millisecond
slackPostMessageURL = "https://slack.com/api/chat.postMessage"
maxQueueSize = 100
burst = 3
metricsPort = ":9090"
applicationPort = ":8080"
channelOverride string
slackRequestRate = 1000 * time.Millisecond
)

// Define the flags with the default values // TODO: move the ones that can change to dflag
flag.IntVar(&maxRetries, "maxRetries", maxRetries, "Maximum number of retries for posting a message")
flag.IntVar(&initialBackoffMs, "initialBackoffMs", initialBackoffMs, "Initial backoff in milliseconds for retries")
flag.Duration("initialBackoffMs", initialBackoff, "Initial backoff in milliseconds for retries")
flag.Duration("slackRequestRate", slackRequestRate, "Rate limit for slack requests in milliseconds")
flag.StringVar(&slackPostMessageURL, "slackURL", slackPostMessageURL, "Slack Post Message API URL")
flag.IntVar(&maxQueueSize, "queueSize", maxQueueSize, "Maximum number of messages in the queue")
flag.IntVar(&burst, "burst", burst, "Maximum number of burst to allow")
Expand Down Expand Up @@ -142,7 +146,7 @@ func main() {
// Initialize the app, metrics are passed along so they are accessible
app := NewApp(maxQueueSize, &http.Client{
Timeout: 10 * time.Second,
}, metrics, channelOverride)
}, metrics, channelOverride, slackPostMessageURL, token)

log.Infof("Starting metrics server.")
StartMetricServer(r, metricsPort)
Expand All @@ -156,7 +160,7 @@ func main() {
defer serverCancel()

log.Infof("Starting main app logic")
go app.processQueue(ctx, maxRetries, initialBackoffMs, slackPostMessageURL, token, burst)
go app.processQueue(ctx, maxRetries, initialBackoff, burst, slackRequestRate)
log.Infof("Starting receiver server")
// Check error return of app.StartServer in go routine anon function:
go func() {
Expand Down

0 comments on commit d33be52

Please sign in to comment.