Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job with concurrency key blocked when it shouldn't have, eventually was run after duration expired #456

Open
leondmello opened this issue Dec 16, 2024 · 11 comments
Assignees

Comments

@leondmello
Copy link

leondmello commented Dec 16, 2024

We are observing that our jobs are sometimes unnecessarily blocked.
In the following example, for the same concurrency key, there are only 2 jobs. The first one completes immediately in just over a 100 milliseconds (finished_at value).
The second one takes over 30 minutes, we are guessing it is blocked by the concurrency control duration. The scheduled_at time of the second job is much after when the first job completes.
As per our understanding, the second job should have started immediately.
We confirmed that none of the jobs processes bounced during that time.

... indicates redacted data.

Concurrency Config

limits_concurrency to: 1, key: ->(action, params) do
  ...
end, duration: 30.minutes

Queue Config

dispatchers:
    - polling_interval: 1
      batch_size: 500
workers:
    - queues: [<queue of blocked job>, "*"]
      threads: 3
      processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
      polling_interval: 0.1

Console queries

SolidQueue::Job.where("DATE_PART('minutes', finished_at - scheduled_at) >= 30").count
=> 1

SolidQueue::Job.where(concurrency_key: '...').select(:active_job_id, :scheduled_at, :finished_at).order(:finished_at)
=>
[#<SolidQueue::Job:0x00007f062d151a50
  active_job_id: "60916418-e7a3-4f1e-9171-357b6e52c154",
  scheduled_at: "2024-12-16 01:20:09.441199000 +0000",
  finished_at: "2024-12-16 01:20:09.648985000 +0000",
  id: nil>,
 #<SolidQueue::Job:0x00007f062d151910
  active_job_id: "3c4f34da-9db7-430e-9cbe-c1214f98ccb9",
  scheduled_at: "2024-12-16 01:54:49.287841000 +0000",
  finished_at: "2024-12-16 02:24:50.264702000 +0000",
  id: nil>]

Logs about the second job execution

I, [2024-12-16T02:24:50.154361 #84]  INFO -- : [ActiveJob] [...] [3c4f34da-9db7-430e-9cbe-c1214f98ccb9] Performing ... (Job ID: 3c4f34da-9db7-430e-9cbe-c1214f98ccb9) from SolidQueue(...) enqueued at 2024-12-16T01:54:49.287885003Z with arguments: ...
I, [2024-12-16T02:24:50.264573 #84]  INFO -- : [ActiveJob] [...] [3c4f34da-9db7-430e-9cbe-c1214f98ccb9] Performed ... (Job ID: 3c4f34da-9db7-430e-9cbe-c1214f98ccb9) from SolidQueue(...) in 110.35ms
@rosa
Copy link
Member

rosa commented Dec 16, 2024

As per our understanding, the second job should have started immediately.

That's right. The first job should have unblocked the second job here 🤔 Would it be possible for you to enable Active Record logs at the debug level to see what's happening when the first job runs and tries to unblock it?

@leondmello
Copy link
Author

leondmello commented Dec 16, 2024

That would be a lot of logging unfortunately. And this isn’t that common.

@leondmello
Copy link
Author

I'll see if I can add it in our staging environment. My plan is to add an initializer which basically does

module SolidQueue
  class Job
    def unblock_next_blocked_job
      ActiveRecord.verbose_query_logs = true
      super
    ensure
      ActiveRecord.verbose_query_logs = false
    end
  end
end

@leondmello
Copy link
Author

leondmello commented Dec 17, 2024

@rosa Are debug logs prefixed with SolidQueue::Semaphore sufficient. (Or maybe some more prefixes?)
Went with a different approach which right now allows debug logs prefixed with SolidQueue::

class FilteredSqLogger < ActiveSupport::Logger
  def debug(progname = nil, &block)
    super if progname.match(/^\s+SolidQueue::/)
  end
end

@rosa
Copy link
Member

rosa commented Dec 17, 2024

I think you could try SolidQueue::Semaphore and SolidQueue::BlockedExecution. I think any clues would involve any of those records.

@leondmello
Copy link
Author

leondmello commented Dec 17, 2024

Ended up going with /^\s*SolidQueue::(?!Process)/ since Process was the most noisy. (That was too noisy as well 🙂)
I will post an update to this issue once we see regression.

@leondmello
Copy link
Author

Happened again

Records (... is for obfuscation, I searched with the proper concurrency key)

SolidQueue::Job.where(concurrency_key: '...').select(:id, :scheduled_at, :finished_at)
=>
[#<SolidQueue::Job:0x00007f0eaf044c40 id: 17031406, scheduled_at: "2024-12-23 11:54:57.301722000 +0000", finished_at: "2024-12-23 11:54:57.793273000 +0000">,
 #<SolidQueue::Job:0x00007f0eaf044b00 id: 17031411, scheduled_at: "2024-12-23 11:54:57.832971000 +0000", finished_at: "2024-12-23 12:01:15.875778000 +0000">]

Logs for that key

D, [2024-12-23T11:54:57.872605 #97] DEBUG -- :   SolidQueue::Semaphore Update All (64.5ms)  UPDATE "solid_queue_semaphores" SET value = value + 1, expires_at = '2024-12-23 11:57:57.807857' WHERE "solid_queue_semaphores"."key" = '...' AND "solid_queue_semaphores"."value" < 1

D, [2024-12-23T11:54:57.879150 #97] DEBUG -- :   SolidQueue::BlockedExecution Load (5.8ms)  SELECT "solid_queue_blocked_executions".* FROM "solid_queue_blocked_executions" WHERE "solid_queue_blocked_executions"."concurrency_key" = '...' ORDER BY "solid_queue_blocked_executions"."priority" ASC, "solid_queue_blocked_executions"."job_id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED

D, [2024-12-23T12:01:14.997407 #94] DEBUG -- :   SolidQueue::Semaphore Pluck (5.1ms)  SELECT "solid_queue_semaphores"."key", "solid_queue_semaphores"."value" FROM "solid_queue_semaphores" WHERE "solid_queue_semaphores"."key" = '...'

D, [2024-12-23T12:01:15.008779 #94] DEBUG -- :   SolidQueue::BlockedExecution Load (5.6ms)  SELECT "solid_queue_blocked_executions".* FROM "solid_queue_blocked_executions" WHERE "solid_queue_blocked_executions"."concurrency_key" = '...' ORDER BY "solid_queue_blocked_executions"."priority" ASC, "solid_queue_blocked_executions"."job_id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED

D, [2024-12-23T12:01:15.380319 #94] DEBUG -- :   SolidQueue::Semaphore Load (7.0ms)  SELECT "solid_queue_semaphores".* FROM "solid_queue_semaphores" WHERE "solid_queue_semaphores"."key" = '...' LIMIT 1

D, [2024-12-23T12:01:15.469601 #94] DEBUG -- :   SolidQueue::Semaphore Insert (67.5ms)  INSERT INTO "solid_queue_semaphores" ("key","value","expires_at","created_at","updated_at") VALUES ('...', 0, '2024-12-23 12:04:15.380905', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT ("key") DO NOTHING RETURNING "id"

D, [2024-12-23T12:01:15.902346 #97] DEBUG -- :   SolidQueue::Semaphore Update All (8.4ms)  UPDATE "solid_queue_semaphores" SET value = value + 1, expires_at = '2024-12-23 12:04:15.893182' WHERE "solid_queue_semaphores"."key" = '...' AND "solid_queue_semaphores"."value" < 1

D, [2024-12-23T12:01:15.908124 #97] DEBUG -- :   SolidQueue::BlockedExecution Load (5.1ms)  SELECT "solid_queue_blocked_executions".* FROM "solid_queue_blocked_executions" WHERE "solid_queue_blocked_executions"."concurrency_key" = '...' ORDER BY "solid_queue_blocked_executions"."priority" ASC, "solid_queue_blocked_executions"."job_id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED

@rosa
Copy link
Member

rosa commented Dec 27, 2024

Ohhhh, this is super useful, @leondmello! I think I know what's going on, thanks to your logs. Looks like a tricky race condition. Basically, what happens is that your first job finishes before the second one has been enqueued and blocked, not before enough to have unblocked the semaphore but before enough to not see it blocked yet. Here's the first job finishing:

finished_at: "2024-12-23 11:54:57.793273000 +0000"

Then, when that job finishes, it unblocks the semaphore, here:

D, [2024-12-23T11:54:57.872605 #97] DEBUG --

Here's the second job enqueued:

scheduled_at: "2024-12-23 11:54:57.832971000 +0000"

At that point the semaphore hasn't been updated yet, because it was updated at 11:54:57.872605, so it gets blocked. However, when this SELECT runs, the blocked execution mustn't have been created yet:

D, [2024-12-23T11:54:57.879150 #97] DEBUG -- :   SolidQueue::BlockedExecution Load (5.8ms)  SELECT "solid_queue_blocked_executions".* FROM "solid_queue_blocked_executions" WHERE "solid_queue_blocked_executions"."concurrency_key" = '...' ORDER BY "solid_queue_blocked_executions"."priority" ASC, "solid_queue_blocked_executions"."job_id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED

So it can't get unblocked, because it's not completely blocked yet. Ahh, tricky!

What isolation level are you running?

@leondmello
Copy link
Author

What isolation level are you running?

We have this in our application.rb,

    config.active_support.isolation_level = :fiber

But don't know the reason as to why that was added some time ago. Trying to find out.

@rosa
Copy link
Member

rosa commented Dec 27, 2024

@leondmello, no, no worries about that. I meant the transaction isolation level in your DB (I imagined you're using PostgreSQL because of the logs), but thinking more about it, it shouldn't make any difference, so no worries about it.

It's a tricky one, I'm not sure right now about how to fix it but I'll keep thinking about it.

@rosa rosa self-assigned this Dec 28, 2024
@rosa
Copy link
Member

rosa commented Dec 30, 2024

Thinking more about this and how to fix... Claiming the semaphore + enqueuing the job happens in a transaction, but the semaphore is not locked if it exists because it's just checked, so it can be released while the transaction is ongoing, and you wouldn't know, regardless of whether READ COMMITTED or REPEATABLE READ is used as isolation level (the most common isolation levels), because you'd just read that once in the beginning.

I think, to fix this, we'd have to take a lock there, when checking the semaphore, and releasing it when the transaction is committed 🤔 However, this might introduce a new kind of deadlock I can't see right now (I've fixed a couple of deadlocks here in the past) so I need to think carefully about it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants