Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delayed_jobs_ready to DelayedJobs plugin and collect_by_queue option for GoodJob plugin #302

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ end
| Counter | `delayed_jobs_total` | Total number of delayed jobs executed | `job_name` |
| Gauge | `delayed_jobs_enqueued` | Number of enqueued delayed jobs | - |
| Gauge | `delayed_jobs_pending` | Number of pending delayed jobs | - |
| Gauge | `delayed_jobs_ready` | Number of ready delayed jobs      | - |
| Counter | `delayed_failed_jobs_total` | Total number failed delayed jobs executed | `job_name` |
| Counter | `delayed_jobs_max_attempts_reached_total` | Total number of delayed jobs that reached max attempts | - |
| Summary | `delayed_job_duration_seconds_summary` | Summary of the time it takes jobs to execute | `status` |
Expand Down Expand Up @@ -639,6 +640,9 @@ installation, you'll need to start the instrumentation:
# e.g. config/initializers/good_job.rb
require 'prometheus_exporter/instrumentation'
PrometheusExporter::Instrumentation::GoodJob.start

# or, to collect metrics labelled by their queue name
PrometheusExporter::Instrumentation::GoodJob.start(collect_by_queue: true)
```

#### Metrics collected by GoodJob Instrumentation
Expand Down Expand Up @@ -884,7 +888,7 @@ prometheus_exporter -p 8080 \
--prefix 'foo_'
```

You can use `-b` option to bind the `prometheus_exporter` web server to any IPv4 interface with `-b 0.0.0.0`,
You can use `-b` option to bind the `prometheus_exporter` web server to any IPv4 interface with `-b 0.0.0.0`,
any IPv6 interface with `-b ::`, or `-b ANY` to any IPv4/IPv6 interfaces available on your host system.

#### Enabling Basic Authentication
Expand Down
9 changes: 6 additions & 3 deletions lib/prometheus_exporter/instrumentation/delayed_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ def register_plugin(client: nil)
max_attempts = Delayed::Worker.max_attempts
enqueued_count = Delayed::Job.where(queue: job.queue).count
pending_count = Delayed::Job.where(attempts: 0, locked_at: nil, queue: job.queue).count
instrumenter.call(job, max_attempts, enqueued_count, pending_count, *args, &block)
# It may be necessary to coallesce the run_at time with created_at timestamp to get a more accurate count
ready_count = Delayed::Job.where(queue: job.queue, run_at: ..Time.current).count
instrumenter.call(job, max_attempts, enqueued_count, pending_count, ready_count, *args, &block)
end
end
end
Expand All @@ -28,7 +30,7 @@ def initialize(client: nil)
@client = client || PrometheusExporter::Client.default
end

def call(job, max_attempts, enqueued_count, pending_count, *args, &block)
def call(job, max_attempts, enqueued_count, pending_count, ready_count, *args, &block)
success = false
start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
latency = Time.current - job.run_at
Expand All @@ -49,7 +51,8 @@ def call(job, max_attempts, enqueued_count, pending_count, *args, &block)
attempts: attempts,
max_attempts: max_attempts,
enqueued: enqueued_count,
pending: pending_count
pending: pending_count,
ready: ready_count
)
end
end
Expand Down
34 changes: 24 additions & 10 deletions lib/prometheus_exporter/instrumentation/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,42 @@
# collects stats from GoodJob
module PrometheusExporter::Instrumentation
class GoodJob < PeriodicStats
def self.start(client: nil, frequency: 30)
def self.start(client: nil, frequency: 30, collect_by_queue: false)
good_job_collector = new
client ||= PrometheusExporter::Client.default

worker_loop do
client.send_json(good_job_collector.collect)
client.send_json(good_job_collector.collect(collect_by_queue))
end

super
end

def collect
def collect(by_queue = false)
queue_names = by_queue ? ::GoodJob::Job.distinct.pluck(:queue_name) : nil
{
type: "good_job",
scheduled: ::GoodJob::Job.scheduled.size,
retried: ::GoodJob::Job.retried.size,
queued: ::GoodJob::Job.queued.size,
running: ::GoodJob::Job.running.size,
finished: ::GoodJob::Job.finished.size,
succeeded: ::GoodJob::Job.succeeded.size,
discarded: ::GoodJob::Job.discarded.size
by_queue: by_queue,
scheduled: compute_stats(::GoodJob::Job.scheduled, by_queue, queue_names),
retried: compute_stats(::GoodJob::Job.retried, by_queue, queue_names),
queued: compute_stats(::GoodJob::Job.queued, by_queue, queue_names),
running: compute_stats(::GoodJob::Job.running, by_queue, queue_names),
finished: compute_stats(::GoodJob::Job.finished, by_queue, queue_names),
succeeded: compute_stats(::GoodJob::Job.succeeded, by_queue, queue_names),
discarded: compute_stats(::GoodJob::Job.discarded, by_queue, queue_names)
}
end

private

def compute_stats(scope, by_queue, queue_names)
return scope.size unless by_queue

result = scope.group(:queue_name).size
queue_names.each do |queue|
result[queue] ||= 0
end
result
end
end
end
9 changes: 8 additions & 1 deletion lib/prometheus_exporter/server/delayed_job_collector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def initialize
@delayed_job_attempts_summary = nil
@delayed_jobs_enqueued = nil
@delayed_jobs_pending = nil
@delayed_jobs_ready = nil
end

def type
Expand All @@ -36,13 +37,14 @@ def collect(obj)
@delayed_job_attempts_summary.observe(obj["attempts"], counter_labels) if obj["success"]
@delayed_jobs_enqueued.observe(obj["enqueued"], gauge_labels)
@delayed_jobs_pending.observe(obj["pending"], gauge_labels)
@delayed_jobs_ready.observe(obj["ready"], gauge_labels)
end

def metrics
if @delayed_jobs_total
[@delayed_job_duration_seconds, @delayed_job_latency_seconds_total, @delayed_jobs_total, @delayed_failed_jobs_total,
@delayed_jobs_max_attempts_reached_total, @delayed_job_duration_seconds_summary, @delayed_job_attempts_summary,
@delayed_jobs_enqueued, @delayed_jobs_pending]
@delayed_jobs_enqueued, @delayed_jobs_pending, @delayed_jobs_ready]
else
[]
end
Expand Down Expand Up @@ -73,6 +75,11 @@ def ensure_delayed_job_metrics
PrometheusExporter::Metric::Gauge.new(
"delayed_jobs_pending", "Number of pending delayed jobs.")

@delayed_jobs_ready =
PrometheusExporter::Metric::Gauge.new(
"delayed_jobs_ready", "Number of ready delayed jobs."
)

@delayed_failed_jobs_total =
PrometheusExporter::Metric::Counter.new(
"delayed_failed_jobs_total", "Total number failed delayed jobs executed.")
Expand Down
35 changes: 21 additions & 14 deletions lib/prometheus_exporter/server/good_job_collector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,9 @@ def type
end

def metrics
return [] if good_job_metrics.length == 0

good_job_metrics.map do |metric|
labels = metric.fetch("custom_labels", {})

GOOD_JOB_GAUGES.map do |name, help|
value = metric[name.to_s]

if value
gauge = gauges[name] ||= PrometheusExporter::Metric::Gauge.new("good_job_#{name}", help)
gauge.observe(value, labels)
end
end
end
return [] if good_job_metrics.length.zero?

good_job_metrics.each(&method(:process_metric))
gauges.values
end

Expand All @@ -48,5 +36,24 @@ def collect(object)
private

attr_reader :good_job_metrics, :gauges

def process_metric(metric)
labels = metric.fetch("custom_labels", {})

GOOD_JOB_GAUGES.each do |name, help|
next unless (value = metric[name.to_s])

gauge = gauges[name] ||= PrometheusExporter::Metric::Gauge.new("good_job_#{name}", help)
observe_metric(gauge, metric, labels, value)
end
end

def observe_metric(gauge, metric, labels, value)
if metric["by_queue"]
value.each { |queue_name, count| gauge.observe(count, labels.merge(queue_name: queue_name)) }
else
gauge.observe(value, labels)
end
end
end
end
8 changes: 4 additions & 4 deletions lib/prometheus_exporter/server/metrics_container.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ def size(&blk)
end
alias_method :length, :size

def map(&blk)
wrap_expire(:map, &blk)
end

def each(&blk)
wrap_expire(:each, &blk)
end

def map(&blk)
wrap_expire(:map, &blk)
end

def expire(time: nil, new_metric: nil)
time ||= get_time

Expand Down