Skip to content

Commit

Permalink
fixed all the lll mostly with lll-fixer + manual few more
Browse files Browse the repository at this point in the history
  • Loading branch information
ldemailly committed Mar 21, 2024
1 parent 95e5564 commit a1f670e
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 69 deletions.
156 changes: 100 additions & 56 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,53 +46,77 @@ var slackPermanentErrors = map[string]string{
"restricted_action_thread_only_channel": "Cannot post top-level messages into a thread-only channel.",
"slack_connect_canvas_sharing_blocked": "Admin has disabled Canvas File sharing in all Slack Connect communications",
"slack_connect_file_link_sharing_blocked": "Admin has disabled Slack File sharing in all Slack Connect communications",
"team_access_not_granted": "The token used is not granted the specific workspace access required to complete this request.",
"too_many_attachments": "Too many attachments were provided with this message. A maximum of 100 attachments are allowed on a message.",
"too_many_contact_cards": "Too many contact_cards were provided with this message. A maximum of 10 contact cards are allowed on a message.",
"cannot_reply_to_message": "This message type cannot have thread replies.",
"access_denied": "Access to a resource specified in the request is denied.",
"account_inactive": "Authentication token is for a deleted user or workspace when using a bot token.",
"deprecated_endpoint": "The endpoint has been deprecated.",
"enterprise_is_restricted": "The method cannot be called from an Enterprise.",
"invalid_auth": "Some aspect of authentication cannot be validated. Either the provided token is invalid or the request originates from an IP address disallowed from making the request.",
"method_deprecated": "The method has been deprecated.",
"missing_scope": "The token used is not granted the specific scope permissions required to complete this request.",
"not_allowed_token_type": "The token type used in this request is not allowed.",
"not_authed": "No authentication token provided.",
"no_permission": "The workspace token used in this request does not have the permissions necessary to complete the request. Make sure your app is a member of the conversation it's attempting to post a message to.",
"org_login_required": "The workspace is undergoing an enterprise migration and will not be available until migration is complete.",
"token_expired": "Authentication token has expired",
"token_revoked": "Authentication token is for a deleted user or workspace or the app has been removed when using a user token.",
"two_factor_setup_required": "Two factor setup is required.",
"accesslimited": "Access to this method is limited on the current network",
"fatal_error": "The server could not complete your operation(s) without encountering a catastrophic error. It's possible some aspect of the operation succeeded before the error was raised.",
"internal_error": "The server could not complete your operation(s) without encountering an error, likely due to a transient issue on our end. It's possible some aspect of the operation succeeded before the error was raised.",
"invalid_arg_name": "The method was passed an argument whose name falls outside the bounds of accepted or expected values. This includes very long names and names with non-alphanumeric characters other than _. If you get this error, it is typically an indication that you have made a very malformed API call.",
"invalid_arguments": "The method was either called with invalid arguments or some detail about the arguments passed is invalid, which is more likely when using complex arguments like blocks or attachments.",
"invalid_array_arg": "The method was passed an array as an argument. Please only input valid strings.",
"invalid_charset": "The method was called via a POST request, but the charset specified in the Content-Type header was invalid. Valid charset names are: utf-8 iso-8859-1.",
"invalid_form_data": "The method was called via a POST request with Content-Type application/x-www-form-urlencoded or multipart/form-data, but the form data was either missing or syntactically invalid.",
"invalid_post_type": "The method was called via a POST request, but the specified Content-Type was invalid. Valid types are: application/json application/x-www-form-urlencoded multipart/form-data text/plain.",
"missing_post_type": "The method was called via a POST request and included a data payload, but the request did not include a Content-Type header.",
"ratelimited": "The request has been ratelimited. Refer to the Retry-After header for when to retry the request.",
"service_unavailable": "The service is temporarily unavailable",
"team_added_to_org": "The workspace associated with your request is currently undergoing migration to an Enterprise Organization. Web API and other platform operations will be intermittently unavailable until the transition is complete.",

"team_access_not_granted": "The token used is not granted the specific workspace access required to complete this request.",
"too_many_attachments": "Too many attachments were provided with this message. A maximum of 100 attachments are allowed on" +
" a message.",
"too_many_contact_cards": "Too many contact_cards were provided with this message. A maximum of 10 contact cards are allowed" +
" on a message.",
"cannot_reply_to_message": "This message type cannot have thread replies.",
"access_denied": "Access to a resource specified in the request is denied.",
"account_inactive": "Authentication token is for a deleted user or workspace when using a bot token.",
"deprecated_endpoint": "The endpoint has been deprecated.",
"enterprise_is_restricted": "The method cannot be called from an Enterprise.",
"invalid_auth": "Some aspect of authentication cannot be validated. Either the provided token is invalid or the" +
" request originates from an IP address disallowed from making the request.",
"method_deprecated": "The method has been deprecated.",
"missing_scope": "The token used is not granted the specific scope permissions required to complete this request.",
"not_allowed_token_type": "The token type used in this request is not allowed.",
"not_authed": "No authentication token provided.",
"no_permission": "The workspace token used in this request does not have the permissions necessary to complete the" +
" request. Make sure your app is a member of the conversation it's attempting to post a message to.",
"org_login_required": "The workspace is undergoing an enterprise migration and will not be available until migration is" +
" complete.",
"token_expired": "Authentication token has expired",
"token_revoked": "Authentication token is for a deleted user or workspace or the app has been removed when using a" +
" user token.",
"two_factor_setup_required": "Two factor setup is required.",
"accesslimited": "Access to this method is limited on the current network",
"fatal_error": "The server could not complete your operation(s) without encountering a catastrophic error. It's" +
" possible some aspect of the operation succeeded before the error was raised.",
"internal_error": "The server could not complete your operation(s) without encountering an error, likely due to a" +
" transient issue on our end. It's possible some aspect of the operation succeeded before the error" +
" was raised.",
"invalid_arg_name": "The method was passed an argument whose name falls outside the bounds of accepted or expected" +
" values. This includes very long names and names with non-alphanumeric characters other than _. If" +
" you get this error, it is typically an indication that you have made a very malformed API call.",
"invalid_arguments": "The method was either called with invalid arguments or some detail about the arguments passed is" +
" invalid, which is more likely when using complex arguments like blocks or attachments.",
"invalid_array_arg": "The method was passed an array as an argument. Please only input valid strings.",
"invalid_charset": "The method was called via a POST request, but the charset specified in the Content-Type header was" +
" invalid. Valid charset names are: utf-8 iso-8859-1.",
"invalid_form_data": "The method was called via a POST request with Content-Type application/x-www-form-urlencoded or" +
" multipart/form-data, but the form data was either missing or syntactically invalid.",
"invalid_post_type": "The method was called via a POST request, but the specified Content-Type was invalid. Valid types" +
" are: application/json application/x-www-form-urlencoded multipart/form-data text/plain.",
"missing_post_type": "The method was called via a POST request and included a data payload, but the request did not" +
" include a Content-Type header.",
"ratelimited": "The request has been ratelimited. Refer to the Retry-After header for when to retry the request.",
"service_unavailable": "The service is temporarily unavailable",
"team_added_to_org": "The workspace associated with your request is currently undergoing migration to an Enterprise" +
" Organization. Web API and other platform operations will be intermittently unavailable until the" +
" transition is complete.",
}

var slackRetryErrors = map[string]string{
"message_limit_exceeded": "Members on this team are sending too many messages. For more details, see https://slack.com/help/articles/115002422943-Usage-limits-for-free-workspaces",
"rate_limited": "Application has posted too many messages, read the Rate Limit documentation for more information",
"fatal_error": "The server could not complete your operation(s) without encountering a catastrophic error. It's possible some aspect of the operation succeeded before the error was raised.",
"internal_error": "The server could not complete your operation(s) without encountering an error, likely due to a transient issue on our end. It's possible some aspect of the operation succeeded before the error was raised.",
"ratelimited": "The request has been ratelimited. Refer to the Retry-After header for when to retry the request.",
"request_timeout": "The method was called via a POST request, but the POST data was either missing or truncated.",
"message_limit_exceeded": "Members on this team are sending too many messages. For more details, see" +
" https://slack.com/help/articles/115002422943-Usage-limits-for-free-workspaces",
"rate_limited": "Application has posted too many messages, read the Rate Limit documentation for more information",
"fatal_error": "The server could not complete your operation(s) without encountering a catastrophic error. It's" +
" possible some aspect of the operation succeeded before the error was raised.",
"internal_error": "The server could not complete your operation(s) without encountering an error, likely due to a" +
" transient issue on our end. It's possible some aspect of the operation succeeded before the error" +
" was raised.",
"ratelimited": "The request has been ratelimited. Refer to the Retry-After header for when to retry the request.",
"request_timeout": "The method was called via a POST request, but the POST data was either missing or truncated.",
}

var doNotProcessChannels = map[string]time.Time{}

func CheckError(err string) (retryable bool, pause bool, description string) {
// Special case for channel_not_found, we don't want to retry this one right away.
// We are making it a 'soft failure' so that we don't keep retrying it for a period of time for any message that is sent to a channel that doesn't exist.
// We are making it a 'soft failure' so that we don't keep retrying it for a period of time for any
// message that is sent to a channel that doesn't exist.
if err == "channel_not_found" {
return true, true, "Channel not found"
}
Expand All @@ -116,15 +140,18 @@ func (s *SlackClient) PostMessage(request SlackPostMessageRequest, url string, t
if err != nil {
return err
}
// Detach from the caller/new context. TODO: have some timeout (or use jrpc package functions which do that already)
// Detach from the caller/new context. TODO: have some timeout (or use jrpc package functions which
// do that already)
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, url, bytes.NewBuffer(jsonValue))
if err != nil {
return err
}

// Charset is required to remove warnings from Slack. Maybe it's nice to have it configurable. /shrug
// Charset is required to remove warnings from Slack. Maybe it's nice to have it configurable.
// /shrug
req.Header.Set("Content-Type", "application/json; charset=utf-8")
// Documentation says that you are allowed the POST the token instead, however that does simply not work. Hence why we are using the Authorization header.
// Documentation says that you are allowed the POST the token instead, however that does simply not
// work. Hence why we are using the Authorization header.
req.Header.Set("Authorization", "Bearer "+token)

resp, err := s.client.Do(req)
Expand All @@ -146,7 +173,9 @@ func (s *SlackClient) PostMessage(request SlackPostMessageRequest, url string, t
return nil
}

func NewApp(queueSize int, httpClient *http.Client, metrics *Metrics, channelOverride, slackPostMessageURL, slackToken string) *App {
func NewApp(queueSize int, httpClient *http.Client,
metrics *Metrics, channelOverride, slackPostMessageURL, slackToken string,
) *App {
return &App{
slackQueue: make(chan SlackPostMessageRequest, queueSize),
messenger: &SlackClient{client: httpClient},
Expand All @@ -164,28 +193,37 @@ func (app *App) Shutdown() {
}

//nolint:gocognit // but could probably use a refactor.
func (app *App) processQueue(ctx context.Context, maxRetries int, initialBackoff time.Duration, burst int, slackRequestRate time.Duration) {
func (app *App) processQueue(ctx context.Context, maxRetries int,
initialBackoff time.Duration, burst int, slackRequestRate time.Duration,
) {
// This is the rate limiter, which will block until it is allowed to continue on r.Wait(ctx).
// I kept the rate at 1 per second, as doing more than that will cause Slack to reject the messages anyways. We can burst however.
// Do note that this is best effort, in case of failures, we will exponentially backoff and retry, which will cause the rate to be lower than 1 per second due to obvious reasons.
// I kept the rate at 1 per second, as doing more than that will cause Slack to reject the messages
// anyways. We can burst however.
// Do note that this is best effort, in case of failures, we will exponentially backoff and retry,
// which will cause the rate to be lower than 1 per second due to obvious reasons.
r := rate.NewLimiter(rate.Every(slackRequestRate), burst)

for {
select {
case msg, ok := <-app.slackQueue:
// We do check if the channel is closed, but its important to note is that the channel will be closed when the queue is empty and the Shutdown() is called.
// Simply calling close(app.slackQueue) will not close the channel, it will only prevent any more messages from being sent to the channel.
// We do check if the channel is closed, but its important to note is that the channel will be
// closed when the queue is empty and the Shutdown() is called.
// Simply calling close(app.slackQueue) will not close the channel, it will only prevent any more
// messages from being sent to the channel.
// Only once the channel is empty, will it be closed.
if !ok {
return
}
log.S(log.Debug, "Got message from queue", log.Any("message", msg))

// Rate limiter was initially before fetching a message from the queue, but that caused problems by indefinitely looping even if there was no message in the queue.
// On shutdown, it would cancel the context, even if the queue was stopped (thus no messages would even come in).
// Rate limiter was initially before fetching a message from the queue, but that caused problems by
// indefinitely looping even if there was no message in the queue.
// On shutdown, it would cancel the context, even if the queue was stopped (thus no messages would
// even come in).
err := r.Wait(ctx)
if err != nil {
log.Fatalf("Error while waiting for rate limiter. This should not happen, provide debug info + error message to an issue if it does: %v", err)
log.Fatalf("Error while waiting for rate limiter. This should not happen, provide debug info + error message"+
" to an issue if it does: %v", err)
return
}

Expand All @@ -194,10 +232,12 @@ func (app *App) processQueue(ctx context.Context, maxRetries int, initialBackoff

retryCount := 0
for {
// Check if the channel is in the doNotProcessChannels map, if it is, check if it's been more than 15 minutes since we last tried to send a message to it.
// Check if the channel is in the doNotProcessChannels map, if it is, check if it's been more than
// 15 minutes since we last tried to send a message to it.
if (doNotProcessChannels[msg.Channel] != time.Time{}) {
if time.Since(doNotProcessChannels[msg.Channel]) >= 15*time.Minute {
// Remove the channel from the map, so that we can process it again. If the channel isn't created in the meantime, we will just add it again.
// Remove the channel from the map, so that we can process it again. If the channel isn't created
// in the meantime, we will just add it again.
delete(doNotProcessChannels, msg.Channel)
} else {
log.S(log.Info, "Channel is on the doNotProcess list, not trying to post this message", log.String("channel", msg.Channel))
Expand All @@ -221,14 +261,17 @@ func (app *App) processQueue(ctx context.Context, maxRetries int, initialBackoff

if !retryable {
app.metrics.RequestsFailedTotal.WithLabelValues(msg.Channel).Inc()
log.S(log.Error, "Permanent error, message will not be retried", log.Any("err", err), log.String("description", description), log.String("channel", msg.Channel), log.Any("message", msg))
log.S(log.Error, "Permanent error, message will not be retried", log.Any("err", err),
log.String("description", description), log.String("channel", msg.Channel), log.Any("message", msg))
break
}

if description == "Unknown error" {
log.S(log.Error, "Unknown error, since we can't infer what type of error it is, we will retry it. However, please create a ticket/issue for this project for this error", log.Any("err", err))
log.S(log.Error, "Unknown error, since we can't infer what type of error it is, we will retry it. However, please"+
" create a ticket/issue for this project for this error", log.Any("err", err))
}
log.S(log.Warning, "Temporary error, message will be retried", log.Any("err", err), log.String("description", description), log.String("channel", msg.Channel), log.Any("message", msg))
log.S(log.Warning, "Temporary error, message will be retried", log.Any("err", err),
log.String("description", description), log.String("channel", msg.Channel), log.Any("message", msg))

app.metrics.RequestsRetriedTotal.WithLabelValues(msg.Channel).Inc()

Expand All @@ -248,7 +291,8 @@ func (app *App) processQueue(ctx context.Context, maxRetries int, initialBackoff
}
}

// Need to call this to clean up the wg, which is vital for the shutdown to work (so that we process all the messages in the queue before exiting cleanly)
// Need to call this to clean up the wg, which is vital for the shutdown to work (so that we
// process all the messages in the queue before exiting cleanly)
app.wg.Done()

case <-ctx.Done():
Expand Down
Loading

0 comments on commit a1f670e

Please sign in to comment.