Skip to content

feat: add erlang distribution metrics #1440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 30, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions lib/realtime/monitoring/distributed_metrics.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
defmodule Realtime.DistributedMetrics do
@moduledoc """
Gather stats for each connected node
"""

require Record
Record.defrecordp(:net_address, Record.extract(:net_address, from_lib: "kernel/include/net_address.hrl"))
@spec info() :: %{node => map}
def info do
# First check if Erlang distribution is started
if :net_kernel.get_state()[:started] != :no do
{:ok, nodes_info} = :net_kernel.nodes_info()

port_addresses =
:erlang.ports()
|> Stream.filter(fn port ->
:erlang.port_info(port, :name) == {:name, ~c"tcp_inet"}
end)
|> Stream.map(&{:inet.peername(&1), &1})
|> Stream.filter(fn
{{:ok, _peername}, _port} -> true
_ -> false
end)
|> Enum.map(fn {{:ok, peername}, port} -> {peername, port} end)
|> Enum.into(%{})

Map.new(nodes_info, &info(&1, port_addresses))
else
%{}
end
end

defp info({node, info}, port_addresses) do
dist_pid = info[:owner]
state = info[:state]

case info[:address] do
net_address(address: address) when address != :undefined ->
{node, info(node, port_addresses, dist_pid, state, address)}

_ ->
{node, %{pid: dist_pid, state: state}}
end
end

defp info(node, port_addresses, dist_pid, state, address) do
if dist_port = port_addresses[address] do
%{
inet_stats: inet_stats(dist_port),
port: dist_port,
pid: dist_pid,
state: state
}
else
%{pid: dist_pid, state: state}
end
|> Map.merge(%{
queue_size: node_queue_size(node)
})
end

defp inet_stats(port) do
case :inet.getstat(port) do
{:ok, stats} ->
stats

_ ->
nil
end
end

defp node_queue_size(node) do
case :ets.lookup(:sys_dist, node) do
[dist] ->
conn_id = elem(dist, 2)

with {:ok, _, _, queue_size} <- :erlang.dist_get_stat(conn_id) do
{:ok, queue_size}
else
_ -> {:error, :not_found}
end

_ ->
{:error, :not_found}
end
end
end
106 changes: 106 additions & 0 deletions lib/realtime/monitoring/prom_ex/plugins/distributed.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
defmodule Realtime.PromEx.Plugins.Distributed do
@moduledoc """
Distributed erlang metrics
"""

use PromEx.Plugin
alias Realtime.DistributedMetrics

@event_node_queue_size [:prom_ex, :plugin, :dist, :queue_size]
@event_recv_bytes [:prom_ex, :plugin, :dist, :recv, :bytes]
@event_recv_count [:prom_ex, :plugin, :dist, :recv, :count]
@event_send_bytes [:prom_ex, :plugin, :dist, :send, :bytes]
@event_send_count [:prom_ex, :plugin, :dist, :send, :count]
@event_send_pending_bytes [:prom_ex, :plugin, :dist, :send, :pending, :bytes]

@impl true
def polling_metrics(opts) do
poll_rate = Keyword.get(opts, :poll_rate)

[
metrics(poll_rate)
]
end

defp metrics(poll_rate) do
Polling.build(
:realtime_vm_dist,
poll_rate,
{__MODULE__, :execute_metrics, []},
[
last_value(
[:dist, :queue_size],
event_name: @event_node_queue_size,
description: "Number of bytes in the output distribution queue",
measurement: :size,
tags: [:origin_node, :target_node]
),
last_value(
[:dist, :recv_bytes],
event_name: @event_recv_bytes,
description: "Number of bytes received by the socket.",
measurement: :size,
tags: [:origin_node, :target_node]
),
last_value(
[:dist, :recv_count],
event_name: @event_recv_count,
description: "Number of packets received by the socket.",
measurement: :size,
tags: [:origin_node, :target_node]
),
last_value(
[:dist, :send_bytes],
event_name: @event_send_bytes,
description: "Number of bytes sent by the socket.",
measurement: :size,
tags: [:origin_node, :target_node]
),
last_value(
[:dist, :send_count],
event_name: @event_send_count,
description: "Number of packets sent by the socket.",
measurement: :size,
tags: [:origin_node, :target_node]
),
last_value(
[:dist, :send_pending_bytes],
event_name: @event_send_pending_bytes,
description: "Number of bytes waiting to be sent by the socket.",
measurement: :size,
tags: [:origin_node, :target_node]
)
]
)
end

def execute_metrics do
dist_info = DistributedMetrics.info()

Enum.each(dist_info, fn {node, info} ->
execute_queue_size(node, info)
execute_inet_stats(node, info)
end)
end

defp execute_inet_stats(node, info) do
if stats = info[:inet_stats] do
:telemetry.execute(@event_recv_bytes, %{size: stats[:recv_oct]}, %{origin_node: node(), target_node: node})
:telemetry.execute(@event_recv_count, %{size: stats[:recv_cnt]}, %{origin_node: node(), target_node: node})

:telemetry.execute(@event_send_bytes, %{size: stats[:send_oct]}, %{origin_node: node(), target_node: node})
:telemetry.execute(@event_send_count, %{size: stats[:send_cnt]}, %{origin_node: node(), target_node: node})

:telemetry.execute(@event_send_pending_bytes, %{size: stats[:send_pend]}, %{
origin_node: node(),
target_node: node
})
end
end

defp execute_queue_size(node, info) do
with {:ok, size} <- info[:queue_size] do
:telemetry.execute(@event_node_queue_size, %{size: size}, %{origin_node: node(), target_node: node})
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.57.11",
version: "2.58.0",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
36 changes: 36 additions & 0 deletions test/realtime/monitoring/distributed_metrics_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule Realtime.DistributedMetricsTest do
# Async false due to Clustered usage
use ExUnit.Case, async: false

alias Realtime.DistributedMetrics

setup_all do
{:ok, node} = Clustered.start()
%{node: node}
end

describe "info/0 while connected" do
test "per node metric", %{node: node} do
assert %{
^node => %{
pid: _pid,
port: _port,
queue_size: {:ok, 0},
state: :up,
inet_stats: [
recv_oct: _,
recv_cnt: _,
recv_max: _,
recv_avg: _,
recv_dvi: _,
send_oct: _,
send_cnt: _,
send_max: _,
send_avg: _,
send_pend: _
]
}
} = DistributedMetrics.info() |> dbg()
end
end
end
77 changes: 77 additions & 0 deletions test/realtime/monitoring/prom_ex/plugins/distributed_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
defmodule Realtime.PromEx.Plugins.DistributedTest do
# Async false due to Clustered usage
use ExUnit.Case, async: false
alias Realtime.PromEx.Plugins

defmodule MetricsTest do
use PromEx, otp_app: :metrics_test
@impl true
def plugins do
[{Plugins.Distributed, poll_rate: 100}]
end
end

setup_all do
{:ok, node} = Clustered.start()
start_supervised!(MetricsTest)
# Send some data back and forth
25 = :erpc.call(node, String, :to_integer, ["25"])
# Wait for MetricsTest to fetch metrics
Process.sleep(200)
%{node: node}
end

describe "pooling metrics" do
setup do
metrics =
PromEx.get_metrics(MetricsTest)
|> String.split("\n", trim: true)

%{metrics: metrics}
end

test "send_pending_bytes", %{metrics: metrics, node: node} do
pattern = ~r/dist_send_pending_bytes{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
assert metric_value(metrics, pattern) == 0
end

test "send_count", %{metrics: metrics, node: node} do
pattern = ~r/dist_send_count{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
assert metric_value(metrics, pattern) > 0
end

test "send_bytes", %{metrics: metrics, node: node} do
pattern = ~r/dist_send_bytes{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
assert metric_value(metrics, pattern) > 0
end

test "recv_count", %{metrics: metrics, node: node} do
pattern = ~r/dist_recv_count{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
assert metric_value(metrics, pattern) > 0
end

test "recv_bytes", %{metrics: metrics, node: node} do
pattern = ~r/dist_recv_bytes{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
assert metric_value(metrics, pattern) > 0
end

test "queue_size", %{metrics: metrics, node: node} do
pattern = ~r/dist_queue_size{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
assert metric_value(metrics, pattern) == 0
end
end

defp metric_value(metrics, pattern) do
metrics
|> Enum.find_value(
"0",
fn item ->
case Regex.run(pattern, item, capture: ["number"]) do
[number] -> number
_ -> false
end
end
)
|> String.to_integer()
end
end
Loading