diff --git a/lib/archethic.ex b/lib/archethic.ex index b0a681357..8a7bde0aa 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -494,10 +494,7 @@ defmodule Archethic do TransactionChain.fetch_genesis_address(address, nodes) end - defdelegate list_transactions_summaries_from_current_slot(), - to: BeaconChain - - defdelegate list_transactions_summaries_from_current_slot(date), + defdelegate list_transactions_summaries_from_current_slot(date \\ DateTime.utc_now()), to: BeaconChain @doc """ diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index 897351e1a..bb1f161c1 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -130,7 +130,7 @@ defmodule Archethic.BeaconChain do beacon_subset: Base.encode16(subset) ) - SummaryCache.add_slot(subset, slot, node_public_key) + SummaryCache.add_slot(slot, node_public_key) {:error, reason} -> Logger.error("Invalid beacon slot - #{inspect(reason)}") @@ -193,34 +193,36 @@ defmodule Archethic.BeaconChain do @doc """ Get all slots of a subset from summary cache and return unique transaction summaries """ - @spec get_summary_slots(binary()) :: list(TransactionSummary.t()) + @spec get_summary_slots(subset :: binary()) :: list(TransactionSummary.t()) def get_summary_slots(subset) when is_binary(subset) do - SummaryCache.stream_current_slots(subset) - |> Stream.map(fn {slot, _} -> slot end) - |> Stream.flat_map(fn %Slot{transaction_attestations: transaction_attestations} -> - transaction_summaries = - transaction_attestations - |> Enum.map(& &1.transaction_summary) - - transaction_summaries - end) - |> Stream.uniq_by(fn %TransactionSummary{address: address} -> address end) - |> Enum.to_list() + summary_time = DateTime.utc_now() |> SummaryTimer.next_summary() + + summary_time + |> SummaryCache.stream_summaries(subset) + |> Enum.uniq_by(fn %TransactionSummary{address: address} -> address end) end @doc """ Returns the current summary's replication attestations that current node have """ - @spec get_current_summary_replication_attestations(subset :: binary()) :: + @spec get_current_summary_replication_attestations(subsets :: list(binary())) :: Enumerable.t() | list(ReplicationAttestation.t()) - def get_current_summary_replication_attestations(subset) do - %Slot{transaction_attestations: replication_attestations} = Subset.get_current_slot(subset) - - SummaryCache.stream_current_slots(subset) - |> Stream.flat_map(fn {%Slot{transaction_attestations: replication_attestations}, _} -> - replication_attestations + def get_current_summary_replication_attestations(subsets) when is_list(subsets) do + summary_time = SummaryTimer.next_summary(DateTime.utc_now()) + + Task.Supervisor.async_stream(Archethic.task_supervisors(), subsets, fn subset -> + cache_replication_attestations = + summary_time + |> SummaryCache.stream_slots(subset) + |> Enum.flat_map(fn {%Slot{transaction_attestations: replication_attestations}, _} -> + replication_attestations + end) + + %Slot{transaction_attestations: replication_attestations} = Subset.get_current_slot(subset) + replication_attestations ++ cache_replication_attestations end) - |> Stream.concat(replication_attestations) + |> Stream.filter(&match?({:ok, _}, &1)) + |> Stream.flat_map(fn {:ok, x} -> x end) |> ReplicationAttestation.reduce_confirmations() end @@ -298,8 +300,11 @@ defmodule Archethic.BeaconChain do start_time = System.monotonic_time() authorized_nodes = - download_nodes - |> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key())) + if length(download_nodes) > 1 do + Enum.reject(download_nodes, &(&1.first_public_key == Crypto.first_node_public_key())) + else + download_nodes + end # get the summaries addresses to download per node summaries_by_node = @@ -354,7 +359,7 @@ defmodule Archethic.BeaconChain do Function only used by the explorer to retrieve current slot transactions. We only ask 3 nodes because it's OK if it's not 100% accurate. """ - @spec list_transactions_summaries_from_current_slot(DateTime.t()) :: + @spec list_transactions_summaries_from_current_slot(datetime :: DateTime.t()) :: list(TransactionSummary.t()) def list_transactions_summaries_from_current_slot(datetime = %DateTime{} \\ DateTime.utc_now()) do Task.Supervisor.async_stream_nolink( @@ -369,6 +374,7 @@ defmodule Archethic.BeaconChain do ) |> Stream.filter(&match?({:ok, _}, &1)) |> Stream.flat_map(fn {:ok, summaries} -> summaries end) + |> Enum.to_list() # remove duplicates & sort |> Stream.uniq_by(& &1.address) @@ -426,23 +432,13 @@ defmodule Archethic.BeaconChain do end defp fetch_current_summaries(node, subsets) do - Task.Supervisor.async_stream_nolink( - Archethic.task_supervisors(), - Stream.chunk_every(subsets, 10), - fn subsets -> - case P2P.send_message(node, %GetCurrentSummaries{subsets: subsets}) do - {:ok, %TransactionSummaryList{transaction_summaries: transaction_summaries}} -> - transaction_summaries - - _ -> - [] - end - end, - on_timeout: :kill_task - ) - |> Stream.filter(&match?({:ok, _}, &1)) - |> Stream.flat_map(&elem(&1, 1)) - |> Enum.to_list() + case P2P.send_message(node, %GetCurrentSummaries{subsets: subsets}) do + {:ok, %TransactionSummaryList{transaction_summaries: transaction_summaries}} -> + transaction_summaries + + _ -> + [] + end end defp fetch_current_summary_replication_attestations_from_node(node, subsets) do @@ -536,8 +532,8 @@ defmodule Archethic.BeaconChain do @doc """ Retrieve the network stats for a given subset from the cached slots """ - @spec get_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()} - def get_network_stats(subset) when is_binary(subset) do - NetworkCoordinates.aggregate_network_stats(subset) + @spec get_network_stats(binary(), DateTime.t()) :: %{Crypto.key() => Slot.net_stats()} + def get_network_stats(subset, summary_time) when is_binary(subset) do + NetworkCoordinates.aggregate_network_stats(subset, summary_time) end end diff --git a/lib/archethic/beacon_chain/network_coordinates.ex b/lib/archethic/beacon_chain/network_coordinates.ex index 294aa5eda..33db8c129 100644 --- a/lib/archethic/beacon_chain/network_coordinates.ex +++ b/lib/archethic/beacon_chain/network_coordinates.ex @@ -386,10 +386,10 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do The aggregation is using some weighted logistic regression. """ - @spec aggregate_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()} - def aggregate_network_stats(subset) when is_binary(subset) do - subset - |> SummaryCache.stream_current_slots() + @spec aggregate_network_stats(binary(), DateTime.t()) :: %{Crypto.key() => Slot.net_stats()} + def aggregate_network_stats(subset, summary_time = %DateTime{}) when is_binary(subset) do + summary_time + |> SummaryCache.stream_slots(subset) |> Stream.filter(&match?({%Slot{p2p_view: %{network_stats: [_ | _]}}, _}, &1)) |> Stream.map(fn {%Slot{p2p_view: %{network_stats: net_stats}}, node} -> diff --git a/lib/archethic/beacon_chain/subset.ex b/lib/archethic/beacon_chain/subset.ex index 4efa130e5..5ba4e5d2a 100644 --- a/lib/archethic/beacon_chain/subset.ex +++ b/lib/archethic/beacon_chain/subset.ex @@ -336,7 +336,7 @@ defmodule Archethic.BeaconChain.Subset do # Avoid to store or dispatch an empty beacon's slot unless Slot.empty?(current_slot) do if summary_time?(time) do - SummaryCache.add_slot(subset, current_slot, Crypto.first_node_public_key()) + SummaryCache.add_slot(current_slot, Crypto.first_node_public_key()) else next_summary_time = SummaryTimer.next_summary(time) broadcast_beacon_slot(subset, next_summary_time, current_slot) @@ -376,9 +376,9 @@ defmodule Archethic.BeaconChain.Subset do defp handle_summary(time, subset) do beacon_slots = - subset - |> SummaryCache.stream_current_slots() + SummaryCache.stream_slots(time, subset) |> Stream.map(fn {slot, _} -> slot end) + |> Enum.to_list() if Enum.empty?(beacon_slots) do :ok diff --git a/lib/archethic/beacon_chain/subset/stats_collector.ex b/lib/archethic/beacon_chain/subset/stats_collector.ex index 6db07ace0..bd336c11d 100644 --- a/lib/archethic/beacon_chain/subset/stats_collector.ex +++ b/lib/archethic/beacon_chain/subset/stats_collector.ex @@ -44,8 +44,9 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do JobCache.get!( {:get, summary_time}, function: fn -> - get_current_node_subsets(summary_time) - |> do_get_stats(timeout) + summary_time + |> get_current_node_subsets() + |> do_get_stats(summary_time, timeout) end, timeout: timeout ) @@ -119,7 +120,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do function: fn -> case action do :get -> - do_get_stats(subsets, NetworkCoordinates.timeout()) + do_get_stats(subsets, summary_time, NetworkCoordinates.timeout()) :fetch -> do_fetch_stats(summary_time, NetworkCoordinates.timeout()) @@ -133,12 +134,11 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do JobCache.stop(key) end - defp do_get_stats(subsets, timeout) do + defp do_get_stats(subsets, summary_time, timeout) do subsets |> Task.async_stream( fn subset -> - stats = BeaconChain.get_network_stats(subset) - + stats = BeaconChain.get_network_stats(subset, summary_time) {subset, stats} end, timeout: timeout, diff --git a/lib/archethic/beacon_chain/subset/summary_cache.ex b/lib/archethic/beacon_chain/subset/summary_cache.ex index 2db9f6dc6..3a60de4b7 100644 --- a/lib/archethic/beacon_chain/subset/summary_cache.ex +++ b/lib/archethic/beacon_chain/subset/summary_cache.ex @@ -3,8 +3,8 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do Handle the caching of the beacon slots defined for the summary """ - alias Archethic.BeaconChain alias Archethic.BeaconChain.Slot + alias Archethic.TransactionChain.TransactionSummary alias Archethic.BeaconChain.SummaryTimer alias Archethic.Crypto @@ -16,40 +16,21 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do use GenServer @vsn 2 - @table_name :archethic_summary_cache + @batch_read_size 102_400 def start_link(arg \\ []) do - GenServer.start_link(__MODULE__, arg) + GenServer.start_link(__MODULE__, arg, name: __MODULE__) end def init(_) do - :ets.new(@table_name, [ - :bag, - :named_table, - :public, - read_concurrency: true - ]) - - :ok = recover_slots(SummaryTimer.next_summary(DateTime.utc_now())) - PubSub.register_to_current_epoch_of_slot_time() PubSub.register_to_node_status() - PubSub.register_to_self_repair() {:ok, %{}} end def code_change(_version, state, _extra), do: {:ok, state} - def handle_info(:self_repair_sync, state) do - previous_summary_time = SummaryTimer.previous_summary(DateTime.utc_now()) - - BeaconChain.list_subsets() - |> Enum.each(&clean_previous_summary_slots(&1, previous_summary_time)) - - {:noreply, state} - end - def handle_info({:current_epoch_of_slot_timer, slot_time}, state) do if SummaryTimer.match_interval?(slot_time), do: delete_old_backup_file(slot_time) @@ -60,58 +41,40 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do previous_summary_time = SummaryTimer.previous_summary(DateTime.utc_now()) delete_old_backup_file(previous_summary_time) - BeaconChain.list_subsets() - |> Enum.each(&clean_previous_summary_slots(&1, previous_summary_time)) - {:noreply, state} end def handle_info(:node_down, state), do: {:noreply, state} @doc """ - Stream all the entries for a subset + Stream all the transaction summaries """ - @spec stream_current_slots(subset :: binary()) :: - Enumerable.t() | list({Slot.t(), Crypto.key()}) - def stream_current_slots(subset) do - # generate match pattern - # :ets.fun2ms(fn {key, value} when key == subset -> value end) - match_pattern = [{{:"$1", :"$2"}, [{:==, :"$1", subset}], [:"$2"]}] - - Stream.resource( - fn -> - # Fix the table to avoid "invalid continuation" error - # source: https://www.erlang.org/doc/man/ets#safe_fixtable-2 - :ets.safe_fixtable(@table_name, true) - :ets.select(@table_name, match_pattern, 1) - end, - &do_stream_current_slots/1, - fn _ -> - :ets.safe_fixtable(@table_name, false) - :ok - end - ) - end - - defp do_stream_current_slots(:"$end_of_table") do - {:halt, :"$end_of_table"} - end - - defp do_stream_current_slots({slot, continuation}) do - {slot, :ets.select(continuation)} + @spec stream_summaries(DateTime.utc_now(), pos_integer()) :: + list(TransactionSummary.t()) + def stream_summaries(summary_time, subset) do + summary_time + |> stream_slots(subset) + |> Stream.flat_map(fn {%Slot{transaction_attestations: attestations}, _} -> attestations end) + |> Stream.map(& &1.transaction_summary) + |> Enum.to_list() end @doc """ Add new beacon slots to the summary's cache """ - @spec add_slot(subset :: binary(), Slot.t(), Crypto.key()) :: :ok - def add_slot(subset, slot = %Slot{}, node_public_key) do - true = :ets.insert(@table_name, {subset, {slot, node_public_key}}) + @spec add_slot(Slot.t(), Crypto.key()) :: :ok + def add_slot(slot = %Slot{}, node_public_key) do + GenServer.call(__MODULE__, {:add_slot, slot, node_public_key}) + end + + def handle_call({:add_slot, slot, node_public_key}, _from, state) do backup_slot(slot, node_public_key) + {:reply, :ok, state} end defp delete_old_backup_file(previous_summary_time) do # We keep 2 backup, the current one and the last one + previous_backup_path = recover_path(previous_summary_time) Utils.mut_dir("slot_backup*") @@ -121,9 +84,15 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do end defp recover_path(summary_time = %DateTime{}), - do: Utils.mut_dir("slot_backup-#{DateTime.to_unix(summary_time)}") + do: Utils.mut_dir(Path.join(["slot_backup", "#{DateTime.to_unix(summary_time)}"])) + + defp recover_path(summary_time = %DateTime{}, subset), + do: + Utils.mut_dir( + Path.join(["slot_backup", "#{DateTime.to_unix(summary_time)}", Base.encode16(subset)]) + ) - defp backup_slot(slot = %Slot{slot_time: slot_time}, node_public_key) do + defp backup_slot(slot = %Slot{slot_time: slot_time, subset: subset}, node_public_key) do content = serialize(slot, node_public_key) summary_time = @@ -131,23 +100,28 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do do: slot_time, else: SummaryTimer.next_summary(slot_time) - summary_time - |> recover_path() - |> File.write!(content, [:append, :binary]) + path = recover_path(summary_time) + + unless File.dir?(path) do + File.mkdir_p!(path) + end + + File.write!(Path.join(path, Base.encode16(subset)), content, [:append, :binary]) end - defp recover_slots(summary_time) do - backup_file_path = recover_path(summary_time) + @spec stream_slots(DateTime.t(), subset :: binary) :: + Enumerable.t() | list({slot :: Slot.t(), node_public_key :: Crypto.key()}) + def stream_slots(summary_time, subset) do + backup_file_path = recover_path(summary_time, subset) if File.exists?(backup_file_path) do - content = File.read!(backup_file_path) - - deserialize(content, []) - |> Enum.each(fn {slot = %Slot{subset: subset}, node_public_key} -> - true = :ets.insert(@table_name, {subset, {slot, node_public_key}}) + backup_file_path + |> File.stream!([], @batch_read_size) + |> Stream.transform(<<>>, fn content, rest -> + deserialize(<>) end) else - :ok + [] end end @@ -158,25 +132,19 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do <> end - defp deserialize(<<>>, acc), do: acc + defp deserialize(rest, acc \\ []) + defp deserialize(<<>>, acc), do: {Enum.reverse(acc), <<>>} defp deserialize(rest, acc) do - {slot_size, rest} = VarInt.get_value(rest) - <> = rest - {slot, _} = Slot.deserialize(slot_bin) - {node_public_key, rest} = Utils.deserialize_public_key(rest) - deserialize(rest, [{slot, node_public_key} | acc]) - end - - defp clean_previous_summary_slots(subset, previous_summary_time) do - subset - |> stream_current_slots() - |> Stream.filter(fn {%Slot{slot_time: slot_time}, _} -> - DateTime.compare(slot_time, previous_summary_time) != :gt - end) - |> Stream.each(fn item -> - :ets.delete_object(@table_name, {subset, item}) - end) - |> Stream.run() + with {slot_size, rest} <- VarInt.get_value(rest), + <> <- rest, + {slot, _} = Slot.deserialize(slot_bin), + {node_public_key, rest} <- Utils.deserialize_public_key(rest) do + deserialize(rest, [{slot, node_public_key} | acc]) + else + _ -> + # This happens when the content is not a complete entry + {acc, rest} + end end end diff --git a/lib/archethic/p2p/message/get_current_replication_attestations.ex b/lib/archethic/p2p/message/get_current_replication_attestations.ex index 750bc7031..7895a1777 100644 --- a/lib/archethic/p2p/message/get_current_replication_attestations.ex +++ b/lib/archethic/p2p/message/get_current_replication_attestations.ex @@ -5,7 +5,7 @@ defmodule Archethic.P2P.Message.GetCurrentReplicationAttestations do """ @enforce_keys [:subsets] - defstruct [:subsets] + defstruct [:subsets, page: 0] alias Archethic.BeaconChain alias Archethic.Crypto @@ -19,7 +19,7 @@ defmodule Archethic.P2P.Message.GetCurrentReplicationAttestations do %CurrentReplicationAttestations{ replication_attestations: subsets - |> Stream.flat_map(&BeaconChain.get_current_summary_replication_attestations/1) + |> BeaconChain.get_current_summary_replication_attestations() |> Enum.to_list() } end diff --git a/lib/archethic/p2p/message/get_current_summaries.ex b/lib/archethic/p2p/message/get_current_summaries.ex index b4d13fe8e..289f92fc7 100644 --- a/lib/archethic/p2p/message/get_current_summaries.ex +++ b/lib/archethic/p2p/message/get_current_summaries.ex @@ -19,18 +19,16 @@ defmodule Archethic.P2P.Message.GetCurrentSummaries do @spec process(__MODULE__.t(), Crypto.key()) :: TransactionSummaryList.t() def process(%__MODULE__{subsets: subsets}, _) do transaction_summaries = - Enum.flat_map(subsets, fn subset -> - transaction_summaries = BeaconChain.get_summary_slots(subset) - + Task.async_stream(subsets, fn subset -> %Slot{transaction_attestations: transaction_attestations} = Subset.get_current_slot(subset) - Enum.reduce( - transaction_attestations, - transaction_summaries, - &[&1.transaction_summary | &2] - ) + transaction_attestations + |> Enum.map(& &1.transaction_summary) + |> Enum.concat(BeaconChain.get_summary_slots(subset)) end) + |> Enum.filter(&match?({:ok, _}, &1)) + |> Enum.flat_map(fn {:ok, x} -> x end) %TransactionSummaryList{ transaction_summaries: transaction_summaries @@ -44,9 +42,8 @@ defmodule Archethic.P2P.Message.GetCurrentSummaries do end @spec deserialize(bitstring()) :: {t(), bitstring} - def deserialize(<>) do - subsets_bin = :binary.part(rest, 0, nb_subsets) + def deserialize(<>) do subsets = for <>, do: <> - {%__MODULE__{subsets: subsets}, <<>>} + {%__MODULE__{subsets: subsets}, rest} end end diff --git a/lib/archethic_web/api/graphql/schema/beacon_chain_summary_type.ex b/lib/archethic_web/api/graphql/schema/beacon_chain_summary_type.ex index 9445ea739..e55a3ac3e 100644 --- a/lib/archethic_web/api/graphql/schema/beacon_chain_summary_type.ex +++ b/lib/archethic_web/api/graphql/schema/beacon_chain_summary_type.ex @@ -2,43 +2,17 @@ defmodule ArchethicWeb.API.GraphQL.Schema.BeaconChainSummary do @moduledoc false use Absinthe.Schema.Notation - alias Archethic.BeaconChain.SummaryAggregate @desc """ [Beacon Chain Summary] represents the beacon chain aggregate for a certain date """ - @default_limit 100 - object :beacon_chain_summary do field(:version, :integer) field(:summary_time, :timestamp) field(:availability_adding_time, :integer) field(:p2p_availabilities, :p2p_availabilities) - - field(:transaction_summaries, list_of(:transaction_summary)) do - arg(:paging_offset, :non_neg_integer) - arg(:limit, :pos_integer) - - resolve(fn args, - %{ - source: %SummaryAggregate{ - replication_attestations: attestations - } - } -> - limit = Map.get(args, :limit, @default_limit) - paging_offset = Map.get(args, :paging_offset, 0) - - # TODO: Replace transaction summaries by attestations - result = - attestations - |> Stream.map(& &1.transaction_summary) - |> Stream.drop(paging_offset) - |> Enum.take(limit) - - {:ok, result} - end) - end + field(:transaction_summaries, list_of(:transaction_summary)) end @desc """ diff --git a/lib/archethic_web/api/graphql/schema/resolver.ex b/lib/archethic_web/api/graphql/schema/resolver.ex index 43af41c49..1bda1671b 100644 --- a/lib/archethic_web/api/graphql/schema/resolver.ex +++ b/lib/archethic_web/api/graphql/schema/resolver.ex @@ -8,7 +8,7 @@ defmodule ArchethicWeb.API.GraphQL.Schema.Resolver do alias Archethic.P2P alias Archethic.BeaconChain - alias Archethic.BeaconChain.ReplicationAttestation + # alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.BeaconChain.SummaryAggregate alias Archethic.BeaconChain.Subset.P2PSampling @@ -242,18 +242,15 @@ defmodule ArchethicWeb.API.GraphQL.Schema.Resolver do transform_beacon_chain_summary(res, next_datetime_summary_time) end - defp create_empty_beacon_summary_aggregate(transactions_list, datetime = %DateTime{}) do - attestations = - Enum.map( - transactions_list, - &%ReplicationAttestation{transaction_summary: &1, confirmations: []} - ) - - %SummaryAggregate{ + defp create_empty_beacon_summary_aggregate( + transactions_list, + datetime = %DateTime{} + ) do + %{ summary_time: datetime, availability_adding_time: [], version: 1, - replication_attestations: attestations, + transaction_summaries: transactions_list, p2p_availabilities: %{} } end diff --git a/lib/archethic_web/explorer/live/chains/beacon_chain_live.ex b/lib/archethic_web/explorer/live/chains/beacon_chain_live.ex index c307deee6..1d2561758 100644 --- a/lib/archethic_web/explorer/live/chains/beacon_chain_live.ex +++ b/lib/archethic_web/explorer/live/chains/beacon_chain_live.ex @@ -110,6 +110,17 @@ defmodule ArchethicWeb.Explorer.BeaconChainLive do list_transactions_from_summaries(date) end) + transactions = + case transactions do + [] -> + date + |> list_transactions_from_summaries() + |> tap(fn txs -> TransactionCache.put(date, txs) end) + + txs -> + txs + end + new_assign = socket |> assign(:fetching, false) @@ -125,6 +136,17 @@ defmodule ArchethicWeb.Explorer.BeaconChainLive do list_transactions_from_aggregate(date) end) + transactions = + case transactions do + [] -> + date + |> list_transactions_from_aggregate() + |> tap(fn txs -> TransactionCache.put(date, txs) end) + + txs -> + txs + end + new_assign = socket |> assign(:fetching, false) diff --git a/test/archethic/beacon_chain/subset/stats_collector_test.exs b/test/archethic/beacon_chain/subset/stats_collector_test.exs index 0b943976e..e87719510 100644 --- a/test/archethic/beacon_chain/subset/stats_collector_test.exs +++ b/test/archethic/beacon_chain/subset/stats_collector_test.exs @@ -44,7 +44,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do with_mocks([ {JobCache, [], start: fn _ -> :ok end, stop: fn _ -> :ok end}, - {BeaconChain, [:passthrough], get_network_stats: fn _ -> %{} end}, + {BeaconChain, [:passthrough], get_network_stats: fn _, _ -> %{} end}, {NetworkCoordinates, [], timeout: fn -> @timeout end, fetch_network_stats: fn _summary_time, _ -> Nx.tensor(0) end} ]) do @@ -70,13 +70,13 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do with_mocks([ {BeaconChain, [:passthrough], get_network_stats: fn - ^subset1 -> + ^subset1, _ -> %{ node1_public_key => [%{latency: 1}], node2_public_key => [%{latency: 1}] } - ^subset2 -> + ^subset2, _ -> %{ node3_public_key => [%{latency: 10}], node4_public_key => [%{latency: 10}] @@ -132,13 +132,13 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do with_mocks([ {BeaconChain, [:passthrough], get_network_stats: fn - ^subset1 -> + ^subset1, _ -> %{ node1_public_key => [%{latency: 1}], node2_public_key => [%{latency: 1}] } - ^subset2 -> + ^subset2, _ -> %{ node3_public_key => [%{latency: 10}], node4_public_key => [%{latency: 10}] @@ -202,7 +202,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do ]) with_mocks([ - {BeaconChain, [:passthrough], get_network_stats: fn _ -> %{} end}, + {BeaconChain, [:passthrough], get_network_stats: fn _, _ -> %{} end}, {NetworkCoordinates, [], timeout: fn -> @timeout end, fetch_network_stats: fn _summary_time, _ -> @@ -234,7 +234,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do |> length with_mocks([ - {BeaconChain, [:passthrough], get_network_stats: fn _ -> %{} end}, + {BeaconChain, [:passthrough], get_network_stats: fn _, _ -> %{} end}, {NetworkCoordinates, [], timeout: fn -> @timeout end, fetch_network_stats: fn _summary_time, _ -> diff --git a/test/archethic/beacon_chain/subset/summary_cache_test.exs b/test/archethic/beacon_chain/subset/summary_cache_test.exs index 2a946e9d4..893cfd619 100644 --- a/test/archethic/beacon_chain/subset/summary_cache_test.exs +++ b/test/archethic/beacon_chain/subset/summary_cache_test.exs @@ -14,8 +14,6 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do alias Archethic.TransactionChain.TransactionSummary - import Mock - test "should clean the previous backup on summary time" do Application.put_env(:archethic, SummaryTimer, interval: "0 * * * * *") @@ -41,9 +39,9 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do summary_time = ~U[2023-01-01 08:01:00Z] - SummaryCache.add_slot(subset, slot_pre_summary, "node_key") - SummaryCache.add_slot(subset, slot_pre_summary2, "node_key") - SummaryCache.add_slot(subset, slot_post_summary, "node_key") + SummaryCache.add_slot(slot_pre_summary, "node_key") + SummaryCache.add_slot(slot_pre_summary2, "node_key") + SummaryCache.add_slot(slot_post_summary, "node_key") send(pid, {:current_epoch_of_slot_timer, summary_time}) Process.sleep(100) @@ -53,67 +51,18 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do refute File.exists?(recover_path) end - test_with_mock "should clean the previous backup and ets table on node up", - DateTime, - [:passthrough], - utc_now: fn -> ~U[2023-01-01 08:00:50Z] end do - {:ok, pid} = SummaryCache.start_link() - File.mkdir_p!(Utils.mut_dir()) - - subset = <<0>> - - slot_in_old_backup = %Slot{ - slot_time: ~U[2023-01-01 07:58:50Z], - subset: subset - } - - slot_pre_summary = %Slot{ - slot_time: ~U[2023-01-01 07:59:50Z], - subset: subset - } - - slot_pre_summary2 = %Slot{ - slot_time: ~U[2023-01-01 08:00:00Z], - subset: subset - } - - slot_post_summary = %Slot{ - slot_time: ~U[2023-01-01 08:00:20Z], - subset: subset - } - - summary_time = ~U[2023-01-01 08:00:00Z] - - SummaryCache.add_slot(subset, slot_in_old_backup, "node_key") - SummaryCache.add_slot(subset, slot_pre_summary, "node_key") - SummaryCache.add_slot(subset, slot_pre_summary2, "node_key") - SummaryCache.add_slot(subset, slot_post_summary, "node_key") - - send(pid, :node_up) - Process.sleep(100) - - assert [{^slot_post_summary, "node_key"}] = - subset - |> SummaryCache.stream_current_slots() - |> Enum.to_list() - - previous_summary_time = SummaryTimer.previous_summary(summary_time) - recover_path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(previous_summary_time)}") - refute File.exists?(recover_path) - end - - test "summary cache should backup a slot, recover it on restart" do + test "stream_slots/2 should stream the slot from file" do Application.put_env(:archethic, SummaryTimer, interval: "0 0 * * * *") File.mkdir_p!(Utils.mut_dir()) next_summary_time = SummaryTimer.next_summary(DateTime.utc_now()) - path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(next_summary_time)}") + # path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(next_summary_time)}") - {:ok, pid} = SummaryCache.start_link() + {:ok, _pid} = SummaryCache.start_link() slot = %Slot{ subset: <<0>>, - slot_time: DateTime.add(next_summary_time, -10, :minute), + slot_time: DateTime.add(next_summary_time, -20, :minute), transaction_attestations: [ %ReplicationAttestation{ transaction_summary: %TransactionSummary{ @@ -147,7 +96,7 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do slot2 = %Slot{ subset: <<0>>, - slot_time: next_summary_time, + slot_time: DateTime.add(next_summary_time, -10, :minute), transaction_attestations: [], end_of_node_synchronizations: [], p2p_view: %{ @@ -160,20 +109,14 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do } node_key = Crypto.first_node_public_key() - :ok = SummaryCache.add_slot(<<0>>, slot, node_key) - :ok = SummaryCache.add_slot(<<0>>, slot2, node_key) - - assert [{^slot, ^node_key}, {^slot2, ^node_key}] = - :ets.lookup_element(:archethic_summary_cache, <<0>>, 2) - - assert File.exists?(path) - - GenServer.stop(pid) - assert Process.alive?(pid) == false + :ok = SummaryCache.add_slot(slot, node_key) + :ok = SummaryCache.add_slot(slot2, node_key) - {:ok, _} = SummaryCache.start_link() + slots = + next_summary_time + |> SummaryCache.stream_slots(<<0>>) + |> Enum.sort_by(fn {slot, _} -> slot.slot_time end, {:asc, DateTime}) - slots = SummaryCache.stream_current_slots(<<0>>) |> Enum.to_list() assert [{^slot, ^node_key}, {^slot2, ^node_key}] = slots end @@ -197,15 +140,15 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do subset: subset } - SummaryCache.add_slot(subset, slot_pre_summary, node_key) - SummaryCache.add_slot(subset, slot_post_summary, node_key) + SummaryCache.add_slot(slot_pre_summary, node_key) + SummaryCache.add_slot(slot_post_summary, node_key) send(pid, :self_repair_sync) Process.sleep(50) assert [{^slot_post_summary, ^node_key}] = - subset - |> SummaryCache.stream_current_slots() + slot_post_summary.slot_time + |> SummaryCache.stream_slots(<<0>>) |> Enum.to_list() end end diff --git a/test/archethic/beacon_chain/subset_test.exs b/test/archethic/beacon_chain/subset_test.exs index 6551b7c44..9cacf81b2 100644 --- a/test/archethic/beacon_chain/subset_test.exs +++ b/test/archethic/beacon_chain/subset_test.exs @@ -264,14 +264,14 @@ defmodule Archethic.BeaconChain.SubsetTest do Map.update!(state, :current_slot, fn slot -> %Slot{slot | slot_time: slot_time} end) end) - attestation = create_attestation(subset, ~U[2023-07-11 00:55:00Z]) + attestation = create_attestation(subset, ~U[2023-07-11 00:55:00.000Z]) send(pid, {:new_replication_attestation, attestation}) # Add old slot in SummaryCache to ensure it will be deleted %{current_slot: slot} = :sys.get_state(pid) old_slot = %Slot{slot | slot_time: ~U[2023-07-11 00:50:00Z]} - SummaryCache.add_slot(subset, old_slot, Crypto.first_node_public_key()) + SummaryCache.add_slot(old_slot, Crypto.first_node_public_key()) me = self() diff --git a/test/archethic/beacon_chain_test.exs b/test/archethic/beacon_chain_test.exs index 68ec6951c..783719ee0 100644 --- a/test/archethic/beacon_chain_test.exs +++ b/test/archethic/beacon_chain_test.exs @@ -37,7 +37,7 @@ defmodule Archethic.BeaconChainTest do import Mock setup do - Application.put_env(:archethic, SlotTimer, interval: "0 0 * * * *") + Application.put_env(:archethic, SlotTimer, interval: "0 * * * * *") Application.put_env(:archethic, SummaryTimer, interval: "0 0 * * * *") Enum.map(BeaconChain.list_subsets(), &start_supervised({Subset, subset: &1}, id: &1)) @@ -100,7 +100,11 @@ defmodule Archethic.BeaconChainTest do Process.sleep(500) assert [{%Slot{subset: <<0>>}, _}] = - SummaryCache.stream_current_slots(<<0>>) |> Enum.to_list() + slot.slot_time + |> SummaryTimer.next_summary() + |> SummaryCache.stream_slots(<<0>>) + |> Enum.filter(&match?({%Slot{subset: <<0>>}, _}, &1)) + |> Enum.to_list() end end @@ -563,7 +567,7 @@ defmodule Archethic.BeaconChainTest do subset: <<0>>, slot_time: DateTime.utc_now(), p2p_view: %{ - availabilities: <<>>, + availabilities: <<0::16, 0::16, 0::16>>, network_stats: [%{latency: 100}, %{latency: 200}, %{latency: 50}] } }, @@ -571,7 +575,7 @@ defmodule Archethic.BeaconChainTest do subset: <<0>>, slot_time: DateTime.utc_now() |> DateTime.add(10), p2p_view: %{ - availabilities: <<>>, + availabilities: <<0::16, 0::16, 0::16>>, network_stats: [%{latency: 110}, %{latency: 150}, %{latency: 70}] } }, @@ -579,7 +583,7 @@ defmodule Archethic.BeaconChainTest do subset: <<0>>, slot_time: DateTime.utc_now() |> DateTime.add(20), p2p_view: %{ - availabilities: <<>>, + availabilities: <<0::16, 0::16, 0::16>>, network_stats: [%{latency: 130}, %{latency: 110}, %{latency: 80}] } } @@ -590,7 +594,7 @@ defmodule Archethic.BeaconChainTest do subset: <<0>>, slot_time: DateTime.utc_now(), p2p_view: %{ - availabilities: <<>>, + availabilities: <<0::16, 0::16, 0::16>>, network_stats: [%{latency: 80}, %{latency: 110}, %{latency: 150}] } }, @@ -598,7 +602,7 @@ defmodule Archethic.BeaconChainTest do subset: <<0>>, slot_time: DateTime.utc_now() |> DateTime.add(10), p2p_view: %{ - availabilities: <<>>, + availabilities: <<0::16, 0::16, 0::16>>, network_stats: [%{latency: 70}, %{latency: 140}, %{latency: 100}] } }, @@ -606,7 +610,7 @@ defmodule Archethic.BeaconChainTest do subset: <<0>>, slot_time: DateTime.utc_now() |> DateTime.add(20), p2p_view: %{ - availabilities: <<>>, + availabilities: <<0::16, 0::16, 0::16>>, network_stats: [%{latency: 70}, %{latency: 100}, %{latency: 120}] } } @@ -615,13 +619,20 @@ defmodule Archethic.BeaconChainTest do File.mkdir_p!(Utils.mut_dir()) SummaryCache.start_link() - Enum.map(node1_slots, &SummaryCache.add_slot(<<0>>, &1, "node1")) - Enum.map(node2_slots, &SummaryCache.add_slot(<<0>>, &1, "node2")) + node_key1 = ArchethicCase.random_public_key() + node_key2 = ArchethicCase.random_public_key() + + Enum.map(node1_slots, &SummaryCache.add_slot(&1, node_key1)) + Enum.map(node2_slots, &SummaryCache.add_slot(&1, node_key2)) + + network_stats = + BeaconChain.get_network_stats(<<0>>, SummaryTimer.next_summary(DateTime.utc_now())) - assert %{ - "node1" => [%{latency: 118}, %{latency: 138}, %{latency: 71}], - "node2" => [%{latency: 75}, %{latency: 118}, %{latency: 128}] - } = BeaconChain.get_network_stats(<<0>>) + assert [%{latency: 118}, %{latency: 138}, %{latency: 71}] = + Map.get(network_stats, node_key1) + + assert [%{latency: 71}, %{latency: 115}, %{latency: 118}] = + Map.get(network_stats, node_key2) end end @@ -656,11 +667,7 @@ defmodule Archethic.BeaconChainTest do end) summaries = BeaconChain.list_transactions_summaries_from_current_slot() - - # there are 256 subsets, and we query the summaries 10 by 10 - # so we call the GetCurrentSummaries 26 times - # each call to GetCurrentSummaries return a list of 1 transactionSummary (mock above) - assert length(summaries) == 26 + assert length(summaries) == 1 end end diff --git a/test/archethic_web/api/graphql/schema_test.exs b/test/archethic_web/api/graphql/schema_test.exs index 2ae45afe8..d7cb353de 100644 --- a/test/archethic_web/api/graphql/schema_test.exs +++ b/test/archethic_web/api/graphql/schema_test.exs @@ -1000,7 +1000,7 @@ defmodule ArchethicWeb.API.GraphQL.SchemaTest do }) MockClient - |> expect(:send_message, 26, fn + |> stub(:send_message, fn _, %GetCurrentSummaries{}, _ -> {:ok, %TransactionSummaryList{transaction_summaries: []}} end)