Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
nshoes committed Jan 23, 2025
1 parent 9726b69 commit ab30c3d
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 137 deletions.
3 changes: 0 additions & 3 deletions lib/mix/tasks/gen.devices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ defmodule Mix.Tasks.NervesHub.Gen.Devices do
org_id: org.id,
product_id: product.id,
identifier: "generated-#{i}",
connection_status: :connected,
connection_established_at: DateTime.now!("Etc/UTC"),
connection_last_seen_at: DateTime.now!("Etc/UTC"),
connection_metadata: %{
"location" => %{
"longitude" => lng,
Expand Down
42 changes: 18 additions & 24 deletions lib/nerves_hub/devices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ defmodule NervesHub.Devices do
alias NervesHub.Deployments.Deployment
alias NervesHub.Deployments.Orchestrator
alias NervesHub.Devices.CACertificate
alias NervesHub.Devices.Connections
alias NervesHub.Devices.Device
alias NervesHub.Devices.DeviceCertificate
alias NervesHub.Devices.DeviceHealth
Expand All @@ -36,13 +35,6 @@ defmodule NervesHub.Devices do
Repo.get(Device, device_id)
end

def get_device(device_id, :preload_latest_connection) when is_integer(device_id) do
Device
|> where(id: ^device_id)
|> Connections.preload_latest_connection()
|> Repo.one()
end

def get_active_device(filters) do
Device
|> Repo.exclude_deleted()
Expand All @@ -61,15 +53,6 @@ defmodule NervesHub.Devices do
|> Repo.all()
end

def get_devices_by_org_id_and_product_id(org_id, product_id, :preload_latest_connection) do
Device
|> where([d], d.org_id == ^org_id)
|> where([d], d.product_id == ^product_id)
|> Connections.preload_latest_connection()
|> Repo.exclude_deleted()
|> Repo.all()
end

def get_devices_by_org_id_and_product_id(org_id, product_id, opts) do
{entries, _pager} = get_devices_by_org_id_and_product_id_with_pager(org_id, product_id, opts)
entries
Expand All @@ -89,11 +72,22 @@ defmodule NervesHub.Devices do
|> join(:left, [d, o], p in assoc(d, :product))
|> join(:left, [d, o, p], dp in assoc(d, :deployment))
|> join(:left, [d, o, p, dp], f in assoc(dp, :firmware))
|> join(:left, [d, o, p, dp, f], lc in assoc(d, :latest_connection))
|> Repo.exclude_deleted()
|> order_by(^sort_devices(sorting))
|> then(fn query ->
case sorting do
{:asc, :connection_last_seen_at} ->
order_by(query, [d, o, p, dp, lc], asc_nulls_first: lc.last_seen_at)

{:desc, :connection_last_seen_at} ->
order_by(query, [d, o, p, dp, lc], desc_nulls_last: lc.last_seen_at)

sorting ->
sorting
end
end)
|> Filtering.build_filters(filters)
|> preload([d, o, p, dp, f], org: o, product: p, deployment: {dp, firmware: f})
|> Connections.preload_latest_connection()
|> preload([d, o, p, dp, f, _lc], org: o, product: p, deployment: {dp, firmware: f})
|> Flop.run(flop)
end

Expand Down Expand Up @@ -126,7 +120,7 @@ defmodule NervesHub.Devices do

Device
|> where([d], d.product_id == ^product_id)
|> Connections.preload_latest_connection()
|> preload(:latest_connection)
|> Repo.exclude_deleted()
|> Filtering.build_filters(filters)
|> order_by(^sort_devices(sorting))
Expand All @@ -145,14 +139,15 @@ defmodule NervesHub.Devices do

def get_minimal_device_location_by_org_id_and_product_id(org_id, product_id) do
Device
|> join(:inner, [d], dc in DeviceConnection, on: d.latest_connection_id == dc.id)
|> where(org_id: ^org_id)
|> where(product_id: ^product_id)
|> where([d], not is_nil(fragment("?->'location'->'latitude'", d.connection_metadata)))
|> where([d], not is_nil(fragment("?->'location'->'longitude'", d.connection_metadata)))
|> select([d, dc], %{
id: d.id,
identifier: d.identifier,
connection_status: d.connection_status,
connection_status: dc.connection_status,
latitude: fragment("?->'location'->'latitude'", d.connection_metadata),
longitude: fragment("?->'location'->'longitude'", d.connection_metadata),
firmware_uuid: fragment("?->'uuid'", d.firmware_metadata)
Expand Down Expand Up @@ -298,8 +293,7 @@ defmodule NervesHub.Devices do
end

defp join_and_preload(query, :latest_connection) do
query
|> Connections.preload_latest_connection()
preload(query, :latest_connection)
end

@spec get_shared_secret_auth(String.t()) ::
Expand Down
48 changes: 24 additions & 24 deletions lib/nerves_hub/devices/connections.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule NervesHub.Devices.Connections do
"""
import Ecto.Query

alias Ecto.Multi
alias NervesHub.Devices.Device
alias NervesHub.Devices.DeviceConnection
alias NervesHub.Repo
Expand Down Expand Up @@ -31,15 +32,6 @@ defmodule NervesHub.Devices.Connections do
|> Repo.one()
end

@doc """
Preload latest respective connection in a device query.
"""
@spec preload_latest_connection(Ecto.Query.t()) :: Ecto.Query.t()
def preload_latest_connection(query) do
query
|> preload(device_connections: ^distinct_on_device())
end

@doc """
Creates a device connection, reported from device socket
"""
Expand All @@ -48,14 +40,25 @@ defmodule NervesHub.Devices.Connections do
def device_connected(device_id) do
now = DateTime.utc_now()

%{
device_id: device_id,
established_at: now,
last_seen_at: now,
status: :connected
}
|> DeviceConnection.create_changeset()
|> Repo.insert()
changeset =
DeviceConnection.create_changeset(%{
device_id: device_id,
established_at: now,
last_seen_at: now,
status: :connected
})

case Repo.insert(changeset) do
{:ok, device_connection} ->
Device
|> where(id: ^device_id)
|> Repo.update_all(set: [latest_connection_id: device_connection.id])

{:ok, device_connection}

{:error, _} = error ->
error
end
end

@doc """
Expand Down Expand Up @@ -123,12 +126,6 @@ defmodule NervesHub.Devices.Connections do
})
end

defp distinct_on_device() do
DeviceConnection
|> distinct(:device_id)
|> order_by([:device_id, desc: :last_seen_at])
end

def clean_stale_connections() do
interval = Application.get_env(:nerves_hub, :device_last_seen_update_interval_minutes)
a_minute_ago = DateTime.shift(DateTime.utc_now(), minute: -(interval + 1))
Expand All @@ -150,7 +147,10 @@ defmodule NervesHub.Devices.Connections do
days_ago = DateTime.shift(DateTime.utc_now(), day: -interval)

DeviceConnection
|> where([d], d.last_seen_at < ^days_ago)
|> join(:inner, [dc], d in Device, on: dc.device_id == d.id)
|> where([dc, _d], dc.last_seen_at < ^days_ago)
|> where([dc, _d], dc.status != :connected)
|> where([dc, d], dc.id != d.latest_connection_id)
|> Repo.delete_all()
end
end
24 changes: 11 additions & 13 deletions lib/nerves_hub/devices/device.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ defmodule NervesHub.Devices.Device do
:updates_blocked_until,
:connecting_code,
:deployment_id,
:connection_status,
:connection_established_at,
:connection_disconnected_at,
:connection_last_seen_at,
:connection_types,
:connection_metadata,
:status,
Expand All @@ -44,6 +40,7 @@ defmodule NervesHub.Devices.Device do
embeds_one(:firmware_metadata, FirmwareMetadata, on_replace: :update)
has_many(:device_certificates, DeviceCertificate, on_delete: :delete_all)
has_many(:device_connections, DeviceConnection, on_delete: :delete_all)
has_one(:latest_connection, DeviceConnection)
has_many(:device_metrics, DeviceMetric, on_delete: :delete_all)

field(:identifier, :string)
Expand All @@ -67,15 +64,16 @@ defmodule NervesHub.Devices.Device do

timestamps()

# Deprecated fields, replaced with device_connections table.
field(:connection_status, Ecto.Enum,
values: [:connected, :disconnected, :not_seen],
default: :not_seen
)

field(:connection_established_at, :utc_datetime)
field(:connection_disconnected_at, :utc_datetime)
field(:connection_last_seen_at, :utc_datetime)
# Deprecated fields, remove these any time after 29/1/2025.
# Also remove index from NervesHub.Repo.Migrations.AddConnectionStatusIndexToDevices.
# field(:connection_status, Ecto.Enum,
# values: [:connected, :disconnected, :not_seen],
# default: :not_seen
# )
# field(:connection_established_at, :utc_datetime)
# field(:connection_disconnected_at, :utc_datetime)
# field(:connection_last_seen_at, :utc_datetime)
field(:latest_connection_id, :binary_id)
embeds_one(:extensions, DeviceExtensionsSetting, on_replace: :update)
end

Expand Down
6 changes: 5 additions & 1 deletion lib/nerves_hub/tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule NervesHub.Tracker do
"""

alias NervesHub.Devices.Device
alias NervesHub.Repo

def online(%{} = device) do
online(device.identifier)
Expand Down Expand Up @@ -69,7 +70,10 @@ defmodule NervesHub.Tracker do
Returns `true` if device's latest connections has a status of `:connected`,
otherwise `false`.
"""
def online?(%{device_connections: [%{status: :connected}]}), do: true
def online?(%{latest_connection: %Ecto.Association.NotLoaded{}} = device),
do: Repo.preload(device, :latest_connection)

def online?(%{latest_connection: %{status: :connected}}), do: true
def online?(_), do: false

@doc """
Expand Down
4 changes: 3 additions & 1 deletion lib/nerves_hub_web/live/devices/show.ex
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ defmodule NervesHubWeb.Live.Devices.Show do
# Get device with its latest connection data preloaded
{:ok, device} = Devices.get_device_by_identifier(org, device.identifier, :latest_connection)

dbg(device)

socket
|> assign(:device, device)
|> assign(:device_connection, device_connection(device))
Expand Down Expand Up @@ -468,7 +470,7 @@ defmodule NervesHubWeb.Live.Devices.Show do
end)
end

defp device_connection(%{device_connections: [connection]}), do: connection
defp device_connection(%{latest_connection: latest_connection}), do: latest_connection
defp device_connection(_), do: nil

defp assign_metadata(%{assigns: %{device: device}} = socket) do
Expand Down
11 changes: 7 additions & 4 deletions lib/nerves_hub_web/views/api/device_view.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule NervesHubWeb.API.DeviceView do
use NervesHubWeb, :api_view

alias NervesHub.Repo
alias NervesHub.Tracker

def render("index.json", %{devices: devices, pagination: pagination}) do
Expand All @@ -15,15 +16,14 @@ defmodule NervesHubWeb.API.DeviceView do
end

def render("device.json", %{device: device}) do
device = Repo.preload(device, :latest_connection)

%{
identifier: device.identifier,
tags: device.tags,
version: version(device),
online: Tracker.sync_online?(device),
connection_status: device.connection_status,
connection_established_at: device.connection_established_at,
connection_disconnected_at: device.connection_disconnected_at,
connection_last_seen_at: device.connection_last_seen_at,
connection_status: connection_status(device),
# deprecated
last_communication: connection_last_seen_at(device),
description: device.description,
Expand All @@ -50,4 +50,7 @@ defmodule NervesHubWeb.API.DeviceView do

defp connection_last_seen_at(%{connection_last_seen_at: nil}), do: "never"
defp connection_last_seen_at(%{connection_last_seen_at: dt}), do: to_string(dt)

defp connection_status(%{latest_connection: %{status: status}}), do: status
defp connection_status(_), do: :not_seen
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule NervesHub.Repo.Migrations.AddLatestConnectionIdToDevices do
use Ecto.Migration

def change() do
alter table(:devices) do
add(:latest_connection_id, :binary)
end
end
end
9 changes: 0 additions & 9 deletions test/nerves_hub/device_connections_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,4 @@ defmodule NervesHub.DeviceConnectionsTest do

assert %DeviceConnection{status: :disconnected} = Connections.get_latest_for_device(device.id)
end

test "get device with latest connection preloaded", %{device: device} do
assert {:ok, %DeviceConnection{}} = Connections.device_connected(device.id)

%{device_connections: [connection]} =
Devices.get_device(device.id, :preload_latest_connection)

assert connection.status == :connected
end
end
58 changes: 0 additions & 58 deletions test/nerves_hub/devices_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -740,64 +740,6 @@ defmodule NervesHub.DevicesTest do
end
end

describe "clean up device connection statuses" do
test "don't change the connection status of devices with a recent heartbeat", %{
org: org,
product: product,
firmware: firmware
} do
Fixtures.device_fixture(org, product, firmware, %{
connection_status: :connected,
connection_established_at: DateTime.shift(DateTime.utc_now(), minute: -10),
connection_last_seen_at: DateTime.shift(DateTime.utc_now(), minute: -1)
})

Fixtures.device_fixture(org, product, firmware, %{
connection_status: :connected,
connection_established_at: DateTime.shift(DateTime.utc_now(), minute: -9),
connection_last_seen_at: DateTime.shift(DateTime.utc_now(), minute: -2)
})

Fixtures.device_fixture(org, product, firmware, %{
connection_status: :connected,
connection_established_at: DateTime.shift(DateTime.utc_now(), minute: -11),
connection_last_seen_at: DateTime.shift(DateTime.utc_now(), minute: -1)
})

assert Devices.connected_count(product) == 3
Devices.clean_connection_states()
assert Devices.connected_count(product) == 3
end

test "clean connection status of devices not seen recently", %{
org: org,
product: product,
firmware: firmware
} do
Fixtures.device_fixture(org, product, firmware, %{
connection_status: :connected,
connection_established_at: DateTime.shift(DateTime.utc_now(), minute: -10),
connection_last_seen_at: DateTime.shift(DateTime.utc_now(), minute: -1)
})

Fixtures.device_fixture(org, product, firmware, %{
connection_status: :connected,
connection_established_at: DateTime.shift(DateTime.utc_now(), minute: -25),
connection_last_seen_at: DateTime.shift(DateTime.utc_now(), minute: -15)
})

Fixtures.device_fixture(org, product, firmware, %{
connection_status: :connected,
connection_established_at: DateTime.shift(DateTime.utc_now(), minute: -47),
connection_last_seen_at: DateTime.shift(DateTime.utc_now(), minute: -9)
})

assert Devices.connected_count(product) == 3
Devices.clean_connection_states()
assert Devices.connected_count(product) == 1
end
end

defp update_firmware_uuid(device, uuid) do
firmware_metadata = %{
architecture: "x86_64",
Expand Down

0 comments on commit ab30c3d

Please sign in to comment.