Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leverage disk for the summary cache to stream summaries #1621

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,7 @@ defmodule Archethic do
TransactionChain.fetch_genesis_address(address, nodes)
end

defdelegate list_transactions_summaries_from_current_slot(),
to: BeaconChain

defdelegate list_transactions_summaries_from_current_slot(date),
defdelegate list_transactions_summaries_from_current_slot(date \\ DateTime.utc_now()),
to: BeaconChain

@doc """
Expand Down
84 changes: 40 additions & 44 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ defmodule Archethic.BeaconChain do
beacon_subset: Base.encode16(subset)
)

SummaryCache.add_slot(subset, slot, node_public_key)
SummaryCache.add_slot(slot, node_public_key)

{:error, reason} ->
Logger.error("Invalid beacon slot - #{inspect(reason)}")
Expand Down Expand Up @@ -193,34 +193,36 @@ defmodule Archethic.BeaconChain do
@doc """
Get all slots of a subset from summary cache and return unique transaction summaries
"""
@spec get_summary_slots(binary()) :: list(TransactionSummary.t())
@spec get_summary_slots(subset :: binary()) :: list(TransactionSummary.t())
def get_summary_slots(subset) when is_binary(subset) do
SummaryCache.stream_current_slots(subset)
|> Stream.map(fn {slot, _} -> slot end)
|> Stream.flat_map(fn %Slot{transaction_attestations: transaction_attestations} ->
transaction_summaries =
transaction_attestations
|> Enum.map(& &1.transaction_summary)

transaction_summaries
end)
|> Stream.uniq_by(fn %TransactionSummary{address: address} -> address end)
|> Enum.to_list()
summary_time = DateTime.utc_now() |> SummaryTimer.next_summary()

summary_time
|> SummaryCache.stream_summaries(subset)
|> Enum.uniq_by(fn %TransactionSummary{address: address} -> address end)
end

@doc """
Returns the current summary's replication attestations that current node have
"""
@spec get_current_summary_replication_attestations(subset :: binary()) ::
@spec get_current_summary_replication_attestations(subsets :: list(binary())) ::
Enumerable.t() | list(ReplicationAttestation.t())
def get_current_summary_replication_attestations(subset) do
%Slot{transaction_attestations: replication_attestations} = Subset.get_current_slot(subset)

SummaryCache.stream_current_slots(subset)
|> Stream.flat_map(fn {%Slot{transaction_attestations: replication_attestations}, _} ->
replication_attestations
def get_current_summary_replication_attestations(subsets) when is_list(subsets) do
summary_time = SummaryTimer.next_summary(DateTime.utc_now())

Task.Supervisor.async_stream(Archethic.task_supervisors(), subsets, fn subset ->
cache_replication_attestations =
summary_time
|> SummaryCache.stream_slots(subset)
|> Enum.flat_map(fn {%Slot{transaction_attestations: replication_attestations}, _} ->
replication_attestations
end)

%Slot{transaction_attestations: replication_attestations} = Subset.get_current_slot(subset)
replication_attestations ++ cache_replication_attestations
end)
|> Stream.concat(replication_attestations)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(fn {:ok, x} -> x end)
|> ReplicationAttestation.reduce_confirmations()
end

Expand Down Expand Up @@ -298,8 +300,11 @@ defmodule Archethic.BeaconChain do
start_time = System.monotonic_time()

authorized_nodes =
download_nodes
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))
if length(download_nodes) > 1 do
Enum.reject(download_nodes, &(&1.first_public_key == Crypto.first_node_public_key()))
else
download_nodes
end

# get the summaries addresses to download per node
summaries_by_node =
Expand Down Expand Up @@ -354,7 +359,7 @@ defmodule Archethic.BeaconChain do
Function only used by the explorer to retrieve current slot transactions.
We only ask 3 nodes because it's OK if it's not 100% accurate.
"""
@spec list_transactions_summaries_from_current_slot(DateTime.t()) ::
@spec list_transactions_summaries_from_current_slot(datetime :: DateTime.t()) ::
list(TransactionSummary.t())
def list_transactions_summaries_from_current_slot(datetime = %DateTime{} \\ DateTime.utc_now()) do
Task.Supervisor.async_stream_nolink(
Expand All @@ -369,6 +374,7 @@ defmodule Archethic.BeaconChain do
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(fn {:ok, summaries} -> summaries end)
|> Enum.to_list()

# remove duplicates & sort
|> Stream.uniq_by(& &1.address)
Expand Down Expand Up @@ -426,23 +432,13 @@ defmodule Archethic.BeaconChain do
end

defp fetch_current_summaries(node, subsets) do
Task.Supervisor.async_stream_nolink(
Archethic.task_supervisors(),
Stream.chunk_every(subsets, 10),
fn subsets ->
case P2P.send_message(node, %GetCurrentSummaries{subsets: subsets}) do
{:ok, %TransactionSummaryList{transaction_summaries: transaction_summaries}} ->
transaction_summaries

_ ->
[]
end
end,
on_timeout: :kill_task
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(&elem(&1, 1))
|> Enum.to_list()
case P2P.send_message(node, %GetCurrentSummaries{subsets: subsets}) do
{:ok, %TransactionSummaryList{transaction_summaries: transaction_summaries}} ->
transaction_summaries

_ ->
[]
end
end

defp fetch_current_summary_replication_attestations_from_node(node, subsets) do
Expand Down Expand Up @@ -536,8 +532,8 @@ defmodule Archethic.BeaconChain do
@doc """
Retrieve the network stats for a given subset from the cached slots
"""
@spec get_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()}
def get_network_stats(subset) when is_binary(subset) do
NetworkCoordinates.aggregate_network_stats(subset)
@spec get_network_stats(binary(), DateTime.t()) :: %{Crypto.key() => Slot.net_stats()}
def get_network_stats(subset, summary_time) when is_binary(subset) do
NetworkCoordinates.aggregate_network_stats(subset, summary_time)
end
end
8 changes: 4 additions & 4 deletions lib/archethic/beacon_chain/network_coordinates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,10 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do

The aggregation is using some weighted logistic regression.
"""
@spec aggregate_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()}
def aggregate_network_stats(subset) when is_binary(subset) do
subset
|> SummaryCache.stream_current_slots()
@spec aggregate_network_stats(binary(), DateTime.t()) :: %{Crypto.key() => Slot.net_stats()}
def aggregate_network_stats(subset, summary_time = %DateTime{}) when is_binary(subset) do
summary_time
|> SummaryCache.stream_slots(subset)
|> Stream.filter(&match?({%Slot{p2p_view: %{network_stats: [_ | _]}}, _}, &1))
|> Stream.map(fn
{%Slot{p2p_view: %{network_stats: net_stats}}, node} ->
Expand Down
6 changes: 3 additions & 3 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ defmodule Archethic.BeaconChain.Subset do
# Avoid to store or dispatch an empty beacon's slot
unless Slot.empty?(current_slot) do
if summary_time?(time) do
SummaryCache.add_slot(subset, current_slot, Crypto.first_node_public_key())
SummaryCache.add_slot(current_slot, Crypto.first_node_public_key())
else
next_summary_time = SummaryTimer.next_summary(time)
broadcast_beacon_slot(subset, next_summary_time, current_slot)
Expand Down Expand Up @@ -376,9 +376,9 @@ defmodule Archethic.BeaconChain.Subset do

defp handle_summary(time, subset) do
beacon_slots =
subset
|> SummaryCache.stream_current_slots()
SummaryCache.stream_slots(time, subset)
|> Stream.map(fn {slot, _} -> slot end)
|> Enum.to_list()

if Enum.empty?(beacon_slots) do
:ok
Expand Down
12 changes: 6 additions & 6 deletions lib/archethic/beacon_chain/subset/stats_collector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
JobCache.get!(
{:get, summary_time},
function: fn ->
get_current_node_subsets(summary_time)
|> do_get_stats(timeout)
summary_time
|> get_current_node_subsets()
|> do_get_stats(summary_time, timeout)
end,
timeout: timeout
)
Expand Down Expand Up @@ -119,7 +120,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
function: fn ->
case action do
:get ->
do_get_stats(subsets, NetworkCoordinates.timeout())
do_get_stats(subsets, summary_time, NetworkCoordinates.timeout())

:fetch ->
do_fetch_stats(summary_time, NetworkCoordinates.timeout())
Expand All @@ -133,12 +134,11 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
JobCache.stop(key)
end

defp do_get_stats(subsets, timeout) do
defp do_get_stats(subsets, summary_time, timeout) do
subsets
|> Task.async_stream(
fn subset ->
stats = BeaconChain.get_network_stats(subset)

stats = BeaconChain.get_network_stats(subset, summary_time)
{subset, stats}
end,
timeout: timeout,
Expand Down
Loading
Loading