Skip to content

Commit

Permalink
Merge pull request #8 from NFIBrokerage/allow-dynamic-named-processes
Browse files Browse the repository at this point in the history
Allow beeline to match Elixir's OTP naming
  • Loading branch information
pmonson711 authored Oct 12, 2022
2 parents c005cbc + a1046da commit 9674bf9
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 68 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to
## UNRELEASED
-->
## 1.0.1 - 2022-10-12

This change adds the ability to use :global and Registry names for the process
and topology.

## 1.0.0 - 2022-05-11

Expand Down
11 changes: 6 additions & 5 deletions lib/beeline.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Beeline do
import Beeline.ProcessNaming.Guards
@schema Beeline.Config.schema()
@producer_schema Beeline.Producer.schema()

Expand Down Expand Up @@ -140,10 +141,10 @@ defmodule Beeline do
iex> Beeline.restart_stages(MyEventHandler)
:ok
"""
@spec restart_stages(module()) :: :ok | {:error, term()}
def restart_stages(beeline) when is_atom(beeline) do
@spec restart_stages(GenServer.name()) :: :ok | {:error, term()}
def restart_stages(beeline) when is_beeline_name(beeline) do
beeline
|> Module.concat(Topology)
|> Beeline.ProcessNaming.name(Topology)
|> GenServer.call(:restart_stages)
end

Expand Down Expand Up @@ -215,9 +216,9 @@ defmodule Beeline do
This function can be used to test running events through a topology.
If there are multiple producers, one is picked at random.
"""
def test_events(events, beeline) when is_atom(beeline) do
def test_events(events, beeline) when is_beeline_name(beeline) do
beeline
|> Module.concat(Topology)
|> Beeline.ProcessNaming.name(Topology)
|> GenServer.call({:test_events, events})
end
end
5 changes: 3 additions & 2 deletions lib/beeline/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ defmodule Beeline.Config do
The GenServer name for the topology. The topology will build on this
name, using it as a prefix.
""",
type: :atom
type:
{:or, [:atom, {:tuple, [:atom, :any]}, {:tuple, [:atom, :atom, :any]}]}
],
producers: [
doc: """
Expand Down Expand Up @@ -195,7 +196,7 @@ defmodule Beeline.Config do

@doc false
def add_default_producer_opt({:name, nil}, acc, key, all_opts) do
name = Module.concat(all_opts[:name], "Producer_#{key}")
name = Beeline.ProcessNaming.name(all_opts[:name], "Producer_#{key}")

[{:name, name} | acc]
end
Expand Down
33 changes: 33 additions & 0 deletions lib/beeline/process_naming.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Beeline.ProcessNaming do
@moduledoc false

defmodule Guards do
@moduledoc false
# coveralls-ignore-start
defguard is_beeline_name(name)
when is_atom(name) or
(is_tuple(name) and elem(name, 0) == :global and
tuple_size(name) == 2) or
(is_tuple(name) and elem(name, 0) == :via and
elem(name, 1) == Registry and tuple_size(name) == 3)

# coveralls-ignore-stop
end

# Provides common logic for standard OTP server_name types.
def(name(%Beeline.Config{name: name}, appended_name)) do
name(name, appended_name)
end

def name(base_name, appended_name) when is_atom(base_name) do
Module.concat(base_name, appended_name)
end

def name({:global, base_name}, appended_name) do
{:global, {base_name, appended_name}}
end

def name({:via, Registry, {registry_name, base_name}}, appended_name) do
{:via, Registry, {registry_name, {base_name, appended_name}}}
end
end
7 changes: 3 additions & 4 deletions lib/beeline/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ defmodule Beeline.Topology do

@behaviour GenServer

import Beeline.ProcessNaming, only: [name: 2]
alias __MODULE__.StageSupervisor

defstruct [:supervisor_pid, :config]

def start_link(config) do
GenServer.start_link(__MODULE__, config,
name: Module.concat(config.name, "Topology")
)
GenServer.start_link(__MODULE__, config, name: name(config.name, Topology))
end

@impl GenServer
Expand Down Expand Up @@ -70,7 +69,7 @@ defmodule Beeline.Topology do

Supervisor.start_link(children,
strategy: :one_for_one,
name: Module.concat(config.name, Supervisor)
name: name(config.name, Supervisor)
)
end
end
13 changes: 4 additions & 9 deletions lib/beeline/topology/stage_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Beeline.Topology.StageSupervisor do

@behaviour Supervisor

import Beeline.ProcessNaming, only: [name: 2]
alias Beeline.Topology.{Producer, Consumer}

def child_spec(config) do
Expand All @@ -25,16 +26,10 @@ defmodule Beeline.Topology.StageSupervisor do
}
end

def name(%Beeline.Config{name: name}) do
name(name)
end

def name(base_name) when is_atom(base_name) do
Module.concat(base_name, "StageSupervisor")
end

def start_link(config) do
Supervisor.start_link(__MODULE__, config, name: name(config))
Supervisor.start_link(__MODULE__, config,
name: name(config, StageSupervisor)
)
end

@impl Supervisor
Expand Down
14 changes: 7 additions & 7 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
%{
"bless": {:hex, :bless, "1.2.0", "5e8190738dcf2fedcbcee2433b44f0e4629a1b0ce84052e5ba17f8dff48d42bd", [:mix], [], "hexpm", "046692560e401dc33c2bb3223bb3f210c18fafbae24431f0056516bb5c8be805"},
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"},
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"credo": {:hex, :credo, "1.6.4", "ddd474afb6e8c240313f3a7b0d025cc3213f0d171879429bf8535d7021d9ad78", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "c28f910b61e1ff829bffa056ef7293a8db50e87f2c57a9b5c3f57eee124536b7"},
"earmark_parser": {:hex, :earmark_parser, "1.4.25", "2024618731c55ebfcc5439d756852ec4e85978a39d0d58593763924d9a15916f", [:mix], [], "hexpm", "56749c5e1c59447f7b7a23ddb235e4b3defe276afc220a6227237f3efe83f51e"},
"credo": {:hex, :credo, "1.6.7", "323f5734350fd23a456f2688b9430e7d517afb313fbd38671b8a4449798a7854", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "41e110bfb007f7eda7f897c10bf019ceab9a0b269ce79f015d54b0dcf4fc7dd3"},
"earmark_parser": {:hex, :earmark_parser, "1.4.28", "0bf6546eb7cd6185ae086cbc5d20cd6dbb4b428aad14c02c49f7b554484b4586", [:mix], [], "hexpm", "501cef12286a3231dc80c81352a9453decf9586977f917a96e619293132743fb"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"event_store_db_gpb_protobufs": {:hex, :event_store_db_gpb_protobufs, "2.1.4", "ec48ce5637e54b89d8873164cf96ccf6d8d03df800e65387c760ae1e2e3f6edc", [:rebar3], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "c3af9b05bc453690abf083a22c6d8e247a53690747356d5ecf272eb6f2448244"},
"ex_doc": {:hex, :ex_doc, "0.28.4", "001a0ea6beac2f810f1abc3dbf4b123e9593eaa5f00dd13ded024eae7c523298", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bf85d003dd34911d89c8ddb8bda1a958af3471a274a4c2150a9c01c78ac3f8ed"},
"excoveralls": {:hex, :excoveralls, "0.14.4", "295498f1ae47bdc6dce59af9a585c381e1aefc63298d48172efaaa90c3d251db", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e3ab02f2df4c1c7a519728a6f0a747e71d7d6e846020aae338173619217931c1"},
"ex_doc": {:hex, :ex_doc, "0.28.5", "3e52a6d2130ce74d096859e477b97080c156d0926701c13870a4e1f752363279", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d2c4b07133113e9aa3e9ba27efb9088ba900e9e51caa383919676afdf09ab181"},
"excoveralls": {:hex, :excoveralls, "0.14.6", "610e921e25b180a8538229ef547957f7e04bd3d3e9a55c7c5b7d24354abbba70", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "0eceddaa9785cfcefbf3cd37812705f9d8ad34a758e513bb975b081dce4eb11e"},
"exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"},
"extreme": {:hex, :extreme, "1.0.1", "885bca760f988a9df724c965303ff0575965bbd93dba05a45111c20eeeaba74c", [:mix], [{:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:exprotobuf, "~> 1.2.9", [hex: :exprotobuf, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "92304fb3be75e5f7d0954d40253152842ab178fc153f90b231a87053c03aaf60"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
Expand All @@ -17,15 +17,15 @@
"hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"},
"hpax": {:hex, :hpax, "0.1.1", "2396c313683ada39e98c20a75a82911592b47e5c24391363343bde74f82396ca", [:mix], [], "hexpm", "0ae7d5a0b04a8a60caf7a39fcf3ec476f35cc2cc16c05abea730d3ce6ac6c826"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"},
"jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"},
"kelvin": {:hex, :kelvin, "1.0.0", "8092d14793e07731d02738dfa716dea1ae956696ad94d9fefdbb6ab146b32a42", [:mix], [{:extreme, "~> 1.0", [hex: :extreme, repo: "hexpm", optional: false]}, {:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm", "add52a21676b2830b5221f2694c05b3c436e427b2edf42603b6d433ea0bfdfad"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mint": {:hex, :mint, "1.4.1", "49b3b6ea35a9a38836d2ad745251b01ca9ec062f7cb66f546bf22e6699137126", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "cd261766e61011a9079cccf8fa9d826e7a397c24fbedf0e11b49312bea629b58"},
"nimble_options": {:hex, :nimble_options, "0.4.0", "c89babbab52221a24b8d1ff9e7d838be70f0d871be823165c94dd3418eea728f", [:mix], [], "hexpm", "e6701c1af326a11eea9634a3b1c62b475339ace9456c1a23ec3bc9a847bca02d"},
"nimble_options": {:hex, :nimble_options, "0.5.0", "ac126f49101977fbbd5dcf19125af1a7cb5829bd7ebaeb565c34ba5bec0bcef6", [:mix], [], "hexpm", "a26a7e34bcd716d22f5d72fd52edbf8addd2e1d62e5f52fec17394dcad81d8e7"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"spear": {:hex, :spear, "1.0.0", "c02efd2d8602f776a2085c7a06258840ea86d6af98563d7fc3849685f8cd9554", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:event_store_db_gpb_protobufs, "~> 2.1", [hex: :event_store_db_gpb_protobufs, repo: "hexpm", optional: false]}, {:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}, {:jason, ">= 0.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "8b01af4efb4565f349230f9316b8284e0385f019115a2b5b060fadba20a58c8c"},
Expand Down
46 changes: 5 additions & 41 deletions test/beeline/dummy_producer_test.exs
Original file line number Diff line number Diff line change
@@ -1,52 +1,16 @@
defmodule Beeline.DummyProducerFixture do
@moduledoc """
A fixture event handler that subscribes to a dummy producer
"""

use Beeline

def start_link(test_proc) do
Beeline.start_link(__MODULE__,
name: __MODULE__,
producers: [
default: [
adapter: :dummy,
connection: nil,
stream_name: "dummy-stream"
]
],
spawn_health_checkers?: false,
# these options don't matter in test mode
auto_subscribe?: fn _producer -> false end,
get_stream_position: fn _producer -> -1 end,
context: test_proc
)
end

@impl GenStage
def handle_events([subscription_event], _from, test_proc) do
event = Beeline.decode_event(subscription_event)

if match?(%{poison?: true}, event) do
raise "inconceivable!"
end

send(test_proc, {:event, event})

{:noreply, [], test_proc}
end
end

defmodule Beeline.DummyProducerTest do
use ExUnit.Case, async: true

@moduletag :capture_log

@producer_id {Beeline.Topology.Producer, :default}
@fixture Beeline.DummyProducerFixture
@fixture Beeline.DummyNameFixture

setup do
[beeline_pid: start_supervised!({@fixture, self()})]
[
beeline_pid:
start_supervised!({@fixture, %{name: @fixture, proc: self()}})
]
end

test "the dummy handler can handle events" do
Expand Down
85 changes: 85 additions & 0 deletions test/beeline/global_registry_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
defmodule Beeline.GlobalRegistryTest do
use ExUnit.Case, async: true

@moduletag :capture_log

@producer_id {Beeline.Topology.Producer, :default}
@fixture Beeline.DummyNameFixture
@name {:global, @fixture}

setup do
[beeline_pid: start_supervised!({@fixture, %{name: @name, proc: self()}})]
end

test "the dummy handler can handle events" do
events = [%{foo: "bar"}, %{foo: "bar"}, %{foo: "bar"}]

:ok = Beeline.test_events(events, {:global, @fixture})

assert_receive {:event, event_a}
assert_receive {:event, event_b}
assert_receive {:event, event_c}

assert [event_a, event_b, event_c] == events
end

test "the dummy producer and handler are both restarted with restart_stages/1" do
%{
@producer_id => producer_pid,
@fixture => consumer_pid
} = stage_children()

producer_ref = Process.monitor(producer_pid)
consumer_ref = Process.monitor(consumer_pid)

assert Beeline.restart_stages({:global, @fixture}) == :ok

assert_receive {:DOWN, ^producer_ref, :process, ^producer_pid, :shutdown}
assert_receive {:DOWN, ^consumer_ref, :process, ^consumer_pid, :shutdown}

%{
@producer_id => producer_pid,
@fixture => consumer_pid
} = stage_children()

assert Process.alive?(producer_pid)
assert Process.alive?(consumer_pid)
end

test "when the consumer raises on an event, it kills the producer as well" do
%{
@producer_id => producer_pid,
@fixture => consumer_pid
} = stage_children()

producer_ref = Process.monitor(producer_pid)
consumer_ref = Process.monitor(consumer_pid)

good_event = %{foo: "bar"}
bad_event = %{poison?: true}

:ok = Beeline.test_events([good_event, bad_event], {:global, @fixture})

assert_receive {:event, ^good_event}
refute_receive {:event, ^bad_event}

assert_receive {:DOWN, ^producer_ref, :process, ^producer_pid, :shutdown}
assert_receive {:DOWN, ^consumer_ref, :process, ^consumer_pid, _error}

# then the producer and consumer are restarted
%{
@producer_id => producer_pid,
@fixture => consumer_pid
} = stage_children()

assert Process.alive?(producer_pid)
assert Process.alive?(consumer_pid)
end

defp stage_children do
{:global, @fixture}
|> Beeline.ProcessNaming.name(StageSupervisor)
|> Supervisor.which_children()
|> Enum.into(%{}, fn {id, pid, _, _} -> {id, pid} end)
end
end
Loading

0 comments on commit 9674bf9

Please sign in to comment.