Skip to content

Commit

Permalink
Leverage disk for the summary cache to stream summaries
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Dec 19, 2024
1 parent 2813bb7 commit 10b914c
Show file tree
Hide file tree
Showing 16 changed files with 278 additions and 299 deletions.
3 changes: 0 additions & 3 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -494,9 +494,6 @@ defmodule Archethic do
TransactionChain.fetch_genesis_address(address, nodes)
end

defdelegate list_transactions_summaries_from_current_slot(),
to: BeaconChain

defdelegate list_transactions_summaries_from_current_slot(date),
to: BeaconChain

Expand Down
81 changes: 40 additions & 41 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ defmodule Archethic.BeaconChain do
beacon_subset: Base.encode16(subset)
)

SummaryCache.add_slot(subset, slot, node_public_key)
SummaryCache.add_slot(slot, node_public_key)

{:error, reason} ->
Logger.error("Invalid beacon slot - #{inspect(reason)}")
Expand Down Expand Up @@ -193,34 +193,37 @@ defmodule Archethic.BeaconChain do
@doc """
Get all slots of a subset from summary cache and return unique transaction summaries
"""
@spec get_summary_slots(binary()) :: list(TransactionSummary.t())
def get_summary_slots(subset) when is_binary(subset) do
SummaryCache.stream_current_slots(subset)
|> Stream.map(fn {slot, _} -> slot end)
|> Stream.flat_map(fn %Slot{transaction_attestations: transaction_attestations} ->
transaction_summaries =
transaction_attestations
|> Enum.map(& &1.transaction_summary)

transaction_summaries
end)
|> Stream.uniq_by(fn %TransactionSummary{address: address} -> address end)
|> Enum.to_list()
@spec get_summary_slots(subset :: binary()) :: list(TransactionSummary.t())
def get_summary_slots(subset) do
summary_time = DateTime.utc_now() |> SummaryTimer.next_summary()

summary_time
|> SummaryCache.stream_summaries(subset)
|> Enum.uniq_by(fn %TransactionSummary{address: address} -> address end)
end

@doc """
Returns the current summary's replication attestations that current node have
"""
@spec get_current_summary_replication_attestations(subset :: binary()) ::
@spec get_current_summary_replication_attestations(subsets :: list(binary())) ::
Enumerable.t() | list(ReplicationAttestation.t())
def get_current_summary_replication_attestations(subset) do
%Slot{transaction_attestations: replication_attestations} = Subset.get_current_slot(subset)
def get_current_summary_replication_attestations(subsets) do
summary_time = SummaryTimer.next_summary(DateTime.utc_now())

SummaryCache.stream_current_slots(subset)
|> Stream.flat_map(fn {%Slot{transaction_attestations: replication_attestations}, _} ->
replication_attestations
subsets
|> Task.async_stream(fn subset ->
cache_replication_attestations =
summary_time
|> SummaryCache.stream_slots(subset)
|> Enum.flat_map(fn {%Slot{transaction_attestations: replication_attestations}, _} ->
replication_attestations
end)

%Slot{transaction_attestations: replication_attestations} = Subset.get_current_slot(subset)
replication_attestations ++ cache_replication_attestations
end)
|> Stream.concat(replication_attestations)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(fn {:ok, x} -> x end)
|> ReplicationAttestation.reduce_confirmations()
end

Expand Down Expand Up @@ -354,19 +357,22 @@ defmodule Archethic.BeaconChain do
Function only used by the explorer to retrieve current slot transactions.
We only ask 3 nodes because it's OK if it's not 100% accurate.
"""
@spec list_transactions_summaries_from_current_slot(DateTime.t()) ::
@spec list_transactions_summaries_from_current_slot(datetime :: DateTime.t()) ::
list(TransactionSummary.t())
def list_transactions_summaries_from_current_slot(datetime = %DateTime{} \\ DateTime.utc_now()) do
get_next_summary_elected_subsets_by_nodes(datetime)
|> Task.async_stream(
Task.Supervisor.async_stream_nolink(
Archethic.task_supervisors(),
get_next_summary_elected_subsets_by_nodes(datetime),
fn {node, subsets} ->
fetch_current_summaries(node, subsets)
end,
ordered: false,
max_concurrency: 256
max_concurrency: 256,
on_timeout: :kill_task
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(fn {:ok, summaries} -> summaries end)
|> Enum.to_list()

# remove duplicates & sort
|> Stream.uniq_by(& &1.address)
Expand Down Expand Up @@ -424,20 +430,13 @@ defmodule Archethic.BeaconChain do
end

defp fetch_current_summaries(node, subsets) do
subsets
|> Stream.chunk_every(10)
|> Task.async_stream(fn subsets ->
case P2P.send_message(node, %GetCurrentSummaries{subsets: subsets}) do
{:ok, %TransactionSummaryList{transaction_summaries: transaction_summaries}} ->
transaction_summaries
case P2P.send_message(node, %GetCurrentSummaries{subsets: subsets}) do
{:ok, %TransactionSummaryList{transaction_summaries: transaction_summaries}} ->
transaction_summaries

_ ->
[]
end
end)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(&elem(&1, 1))
|> Enum.to_list()
_ ->
[]
end
end

defp fetch_current_summary_replication_attestations_from_node(node, subsets) do
Expand Down Expand Up @@ -531,8 +530,8 @@ defmodule Archethic.BeaconChain do
@doc """
Retrieve the network stats for a given subset from the cached slots
"""
@spec get_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()}
def get_network_stats(subset) when is_binary(subset) do
NetworkCoordinates.aggregate_network_stats(subset)
@spec get_network_stats(binary(), DateTime.t()) :: %{Crypto.key() => Slot.net_stats()}
def get_network_stats(subset, summary_time) when is_binary(subset) do
NetworkCoordinates.aggregate_network_stats(subset, summary_time)
end
end
8 changes: 4 additions & 4 deletions lib/archethic/beacon_chain/network_coordinates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,10 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
The aggregation is using some weighted logistic regression.
"""
@spec aggregate_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()}
def aggregate_network_stats(subset) when is_binary(subset) do
subset
|> SummaryCache.stream_current_slots()
@spec aggregate_network_stats(binary(), DateTime.t()) :: %{Crypto.key() => Slot.net_stats()}
def aggregate_network_stats(subset, summary_time = %DateTime{}) when is_binary(subset) do
summary_time
|> SummaryCache.stream_slots(subset)
|> Stream.filter(&match?({%Slot{p2p_view: %{network_stats: [_ | _]}}, _}, &1))
|> Stream.map(fn
{%Slot{p2p_view: %{network_stats: net_stats}}, node} ->
Expand Down
6 changes: 3 additions & 3 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ defmodule Archethic.BeaconChain.Subset do
# Avoid to store or dispatch an empty beacon's slot
unless Slot.empty?(current_slot) do
if summary_time?(time) do
SummaryCache.add_slot(subset, current_slot, Crypto.first_node_public_key())
SummaryCache.add_slot(current_slot, Crypto.first_node_public_key())
else
next_summary_time = SummaryTimer.next_summary(time)
broadcast_beacon_slot(subset, next_summary_time, current_slot)
Expand Down Expand Up @@ -376,9 +376,9 @@ defmodule Archethic.BeaconChain.Subset do

defp handle_summary(time, subset) do
beacon_slots =
subset
|> SummaryCache.stream_current_slots()
SummaryCache.stream_slots(time, subset)
|> Stream.map(fn {slot, _} -> slot end)
|> Enum.to_list()

if Enum.empty?(beacon_slots) do
:ok
Expand Down
12 changes: 6 additions & 6 deletions lib/archethic/beacon_chain/subset/stats_collector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
JobCache.get!(
{:get, summary_time},
function: fn ->
get_current_node_subsets(summary_time)
|> do_get_stats(timeout)
summary_time
|> get_current_node_subsets()
|> do_get_stats(summary_time, timeout)
end,
timeout: timeout
)
Expand Down Expand Up @@ -119,7 +120,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
function: fn ->
case action do
:get ->
do_get_stats(subsets, NetworkCoordinates.timeout())
do_get_stats(subsets, summary_time, NetworkCoordinates.timeout())

:fetch ->
do_fetch_stats(summary_time, NetworkCoordinates.timeout())
Expand All @@ -133,12 +134,11 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
JobCache.stop(key)
end

defp do_get_stats(subsets, timeout) do
defp do_get_stats(subsets, summary_time, timeout) do
subsets
|> Task.async_stream(
fn subset ->
stats = BeaconChain.get_network_stats(subset)

stats = BeaconChain.get_network_stats(subset, summary_time)
{subset, stats}
end,
timeout: timeout,
Expand Down
Loading

0 comments on commit 10b914c

Please sign in to comment.