Skip to content

Commit

Permalink
Telemetry Fishish (#457)
Browse files Browse the repository at this point in the history
  • Loading branch information
maennchen authored Sep 14, 2020
1 parent e922d4a commit e896c20
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 44 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

### Added
- Telemetry Support (#415)

### Fixed
- Properly override jobs with duplicate name (#392)
- Simplify `TaskRegistry` and make tests deterministic
Expand Down
25 changes: 16 additions & 9 deletions lib/quantum/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,17 @@ defmodule Quantum.Executor do

# Ececute the given function on a given node via the task supervisor
@spec run(Node.t(), Job.t(), GenServer.server(), boolean(), atom()) :: Task.t()
defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging, scheduler) do
defp run(
node,
%Job{name: job_name, task: task} = job,
task_supervisor,
debug_logging,
scheduler
) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Task for job #{inspect(job_name)} started on node #{
inspect(node)
node
}"
end)

Expand All @@ -93,8 +99,8 @@ defmodule Quantum.Executor do
start_monotonic_time = :erlang.monotonic_time()

:telemetry.execute([:quantum, :job, :start], %{system_time: start_monotonic_time}, %{
job_name: job_name,
node: inspect(node),
job: job,
node: node,
scheduler: scheduler
})

Expand All @@ -112,8 +118,8 @@ defmodule Quantum.Executor do
duration = :erlang.monotonic_time() - start_monotonic_time

:telemetry.execute([:quantum, :job, :exception], %{duration: duration}, %{
job_name: job_name,
node: inspect(node),
job: job,
node: node,
reason: value,
stacktrace: __STACKTRACE__,
scheduler: scheduler
Expand All @@ -130,9 +136,10 @@ defmodule Quantum.Executor do
duration = :erlang.monotonic_time() - start_monotonic_time

:telemetry.execute([:quantum, :job, :stop], %{duration: duration}, %{
job_name: job_name,
node: inspect(node),
scheduler: scheduler
job: job,
node: node,
scheduler: scheduler,
result: result
})
end

Expand Down
26 changes: 2 additions & 24 deletions lib/quantum/job_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,10 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Storage, skipping config"
end)

for %Job{state: :active, name: name} = job <- storage_jobs do
for %Job{state: :active} = job <- storage_jobs do
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :add], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: scheduler
})
end
Expand Down Expand Up @@ -111,9 +109,7 @@ defmodule Quantum.JobBroadcaster do

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

Expand All @@ -131,9 +127,7 @@ defmodule Quantum.JobBroadcaster do

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

Expand All @@ -150,9 +144,7 @@ defmodule Quantum.JobBroadcaster do

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :add], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

Expand Down Expand Up @@ -180,9 +172,7 @@ defmodule Quantum.JobBroadcaster do

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

Expand All @@ -199,9 +189,7 @@ defmodule Quantum.JobBroadcaster do

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

Expand All @@ -218,9 +206,7 @@ defmodule Quantum.JobBroadcaster do

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :add], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

Expand Down Expand Up @@ -248,9 +234,7 @@ defmodule Quantum.JobBroadcaster do
{:ok, %{state: :active, name: name} = job} ->
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :delete], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

Expand All @@ -261,9 +245,7 @@ defmodule Quantum.JobBroadcaster do
{:ok, %{state: :inactive, name: name} = job} ->
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :delete], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

Expand Down Expand Up @@ -300,9 +282,7 @@ defmodule Quantum.JobBroadcaster do
{:ok, job} ->
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

Expand Down Expand Up @@ -334,12 +314,10 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Deleting all jobs"
end)

for {name, %Job{} = job} <- jobs do
for {_name, %Job{} = job} <- jobs do
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :delete], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})
end
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ defmodule Quantum.Mixfile do
"pages/configuration.md",
"pages/runtime.md",
"pages/crontab-format.md",
"pages/run-strategies.md"
"pages/run-strategies.md",
"pages/telemetry.md"
],
groups_for_modules: [
"Run Strategy": [
Expand Down
79 changes: 79 additions & 0 deletions pages/telemetry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Telemetry

Sice version [`3.2.0`](https://github.com/quantum-elixir/quantum-core/releases/tag/v3.2.0) `quantum` supports [`:telemetry`](https://hexdocs.pm/telemetry) metrics.

<!--
large parts of this docs are copied from https://raw.githubusercontent.com/phoenixframework/phoenix/master/guides/telemetry.md
thanks phoenix :heart:
-->

## Overview

The [`:telemetry`](https://hexdocs.pm/telemetry) library allows you to emit events at various stages of an application's lifecycle. You can then respond to these events by, among other things, aggregating them as metrics and sending the metrics data to a reporting destination.

Telemetry stores events by their name in an ETS table, along with the handler for each event. Then, when a given event is executed, Telemetry looks up its handler and invokes it.

## Telemetry Events

Many Elixir libraries (including Quantum) are already using
the [`:telemetry`](http://hexdocs.pm/telemetry) package as a
way to give users more insight into the behavior of their
applications, by emitting events at key moments in the
application lifecycle.

A Telemetry event is made up of the following:

* `name` - A string (e.g. `"my_app.worker.stop"`) or a
list of atoms that uniquely identifies the event.

* `measurements` - A map of atom keys (e.g. `:duration`)
and numeric values.

* `metadata` - A map of key/value pairs that can be used
for tagging metrics.

### A Quantum Example

Here is an example of an event from your endpoint:

* `[:quantum, :job, :stop]` - dispatched whenever a job
execution is done

* Measurement: `%{duration: native_time}`

* Metadata: `%{job: Quantum.Job.t(), node: Node.t(), scheduler: atom()}`

This means that after each job execution, `Quantum`, via `:telemetry`,
will emit a "stop" event, with a measurement of how long it
took to execute the job:

```elixir
:telemetry.execute([:quantum, :job, :start], %{system_time: system_time}, %{
job: job,
node: node,
scheduler: scheduler
})
```

### Quantum Telemetry Events

The following events are published by Quantum with the following measurements and metadata:

* `[:quantum, :job, :start]` - dispatched on job executiuon start
* Measurement: `%{system_time: system_time}`
* Metadata: `%{job: Quantum.Job.t(), node: Node.t(), scheduler: atom()}`
* `[:quantum, :job, :exception]` - dispatched on job executiuon fail
* Measurement: `%{duration: native_time}`
* Metadata: `%{job: Quantum.Job.t(), node: Node.t(), scheduler: atom(), reason: term(), stacktrace: __STACKTRACE__}`
* `[:quantum, :job, :stop]` - dispatched on job executiuon end
* Measurement: `%{duration: native_time}`
* Metadata: `%{job: Quantum.Job.t(), node: Node.t(), scheduler: atom(), result: term()}`
* `[:quantum, :job, :add]` - dispatched when a job is added
* Measurement: `%{}`
* Metadata: `%{job: Quantum.Job.t(), scheduler: atom()}`
* `[:quantum, :job, :update]` - dispatched when a job is updated
* Measurement: `%{}`
* Metadata: `%{job: Quantum.Job.t(), scheduler: atom()}`
* `[:quantum, :job, :delete]` - dispatched when a job is deleted
* Measurement: `%{}`
* Metadata: `%{job: Quantum.Job.t(), scheduler: atom()}`
13 changes: 7 additions & 6 deletions test/quantum/executor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ defmodule Quantum.ExecutorTest do
end

defmodule TelemetryTestHandler do
require Logger
@moduledoc false

def handle_event(
[:quantum, :job, :start],
%{system_time: _system_time} = _measurements,
%{job_name: job_name, node: _node, scheduler: _scheduler} = _metadata,
%{job: %Job{name: job_name}, node: _node, scheduler: _scheduler} = _metadata,
%{parent_thread: parent_thread, test_id: test_id}
) do
send(parent_thread, %{test_id: test_id, job_name: job_name, type: :start})
Expand All @@ -32,7 +32,8 @@ defmodule Quantum.ExecutorTest do
def handle_event(
[:quantum, :job, :stop],
%{duration: _duration} = _measurements,
%{job_name: job_name, node: _node, scheduler: _scheduler} = _metadata,
%{job: %Job{name: job_name}, node: _node, scheduler: _scheduler, result: _result} =
_metadata,
%{parent_thread: parent_thread, test_id: test_id}
) do
send(parent_thread, %{test_id: test_id, job_name: job_name, type: :stop})
Expand All @@ -42,11 +43,11 @@ defmodule Quantum.ExecutorTest do
[:quantum, :job, :exception],
%{duration: _duration} = _measurements,
%{
job_name: job_name,
job: %Job{name: job_name},
node: _node,
scheduler: _scheduler,
reason: reason,
stacktrace: stacktrace,
scheduler: _scheduler
stacktrace: stacktrace
} = _metadata,
%{parent_thread: parent_thread, test_id: test_id}
) do
Expand Down
8 changes: 4 additions & 4 deletions test/quantum/job_broadcaster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ defmodule Quantum.JobBroadcasterTest do
end

defmodule TelemetryTestHandler do
require Logger
@moduledoc false

def handle_event(
[:quantum, :job, :add],
_measurements,
%{job_name: job_name, job: _job, node: _node, scheduler: _scheduler} = _metadata,
%{job: %Job{name: job_name}, scheduler: _scheduler} = _metadata,
%{parent_thread: parent_thread, test_id: test_id}
) do
send(parent_thread, %{test_id: test_id, job_name: job_name, type: :add})
Expand All @@ -36,7 +36,7 @@ defmodule Quantum.JobBroadcasterTest do
def handle_event(
[:quantum, :job, :delete],
_measurements,
%{job_name: job_name, job: _job, node: _node, scheduler: _scheduler} = _metadata,
%{job: %Job{name: job_name}, scheduler: _scheduler} = _metadata,
%{parent_thread: parent_thread, test_id: test_id}
) do
send(parent_thread, %{test_id: test_id, job_name: job_name, type: :delete})
Expand All @@ -45,7 +45,7 @@ defmodule Quantum.JobBroadcasterTest do
def handle_event(
[:quantum, :job, :update],
_measurements,
%{job_name: job_name, job: _job, node: _node, scheduler: _scheduler} = _metadata,
%{job: %Job{name: job_name}, scheduler: _scheduler} = _metadata,
%{parent_thread: parent_thread, test_id: test_id}
) do
send(parent_thread, %{test_id: test_id, job_name: job_name, type: :update})
Expand Down

0 comments on commit e896c20

Please sign in to comment.