Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shut down the connection supervisor in case of an unrecoverable replication error #2189

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/lemon-taxis-stare.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Avoid stopping the beam process when an unrecoverable error is encountered. Instead, stop the main OTP supervisor. Required for multi-tenancy.
35 changes: 23 additions & 12 deletions integration-tests/tests/invalidated-replication-slot.lux
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

[my invalidated_slot_error=
"""
[error] :gen_statem {:"Elixir.Electric.ProcessRegistry:single_stack", {Electric.Postgres.ReplicationClient, nil}} terminating
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot_integration"

This slot has been invalidated because it exceeded the maximum reserved size.
"""]

[my stack_id="single_stack"]

###

## Start a new Postgres cluster configured for easy replication slot invalidation.
Expand All @@ -25,8 +26,11 @@
[shell electric]
??[info] Starting replication from postgres

# Reset the failure pattern because we'll be matching on an error.
-
# Verify that the stack supervisor is registered using regular process registration. If we
# change this at any point, the line below will catch it and we'll be able to correct the
# check further down that verifies that the stack supervisor is no longer running.
!IO.puts("Stack supervisor pid: #{inspect Process.whereis(Electric.StackSupervisor)}")
??Stack supervisor pid: #PID<

## Seed the database with enough data to exceed max_wal_size and force a checkpoint that
## will invalidate the replication slot.
Expand All @@ -36,21 +40,28 @@
[shell pg]
?invalidating slot "electric_slot_integration" because its restart_lsn [\d\w]+/[\d\w]+ exceeds max_slot_wal_keep_size

[macro verify_connection_and_stack_supervisors_shutdown stack_id invalidated_slot_error]
??$invalidated_slot_error
??[error] Stopping connection supervisor with stack_id=$stack_id due to an unrecoverable error

!IO.puts("Stack supervisor pid: #{inspect Process.whereis(Electric.StackSupervisor)}")
??Stack supervisor pid: nil
[endmacro]

## Observe the fatal connection error.
[shell electric]
??$invalidated_slot_error
# Reset the failure pattern because we'll be matching on an error.
-

# Confirm Electric process exit.
??$PS1
[invoke verify_connection_and_stack_supervisors_shutdown $stack_id $invalidated_slot_error]

## Start the sync service once again to verify that it crashes due to the invalidated slot error.
[invoke setup_electric]
# Restart the OTP application to verify that the supervisors shut down again due to the invalidated slot.
!:ok = Application.stop(:electric)
!:ok = Application.start(:electric)

[shell electric]
??[info] Starting replication from postgres
-
??$invalidated_slot_error
??$PS1

[invoke verify_connection_and_stack_supervisors_shutdown $stack_id $invalidated_slot_error]

[cleanup]
[invoke teardown]
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ defmodule Electric.Application do
],
pool_opts: [pool_size: Electric.Config.get_env(:db_pool_size)],
storage: storage,
chunk_bytes_threshold: Electric.Config.get_env(:chunk_bytes_threshold)
chunk_bytes_threshold: Electric.Config.get_env(:chunk_bytes_threshold),
name: Electric.StackSupervisor
},
{Electric.Telemetry, stack_id: stack_id, storage: storage},
{Bandit,
Expand Down
44 changes: 25 additions & 19 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,13 @@ defmodule Electric.Connection.Manager do
# connection and the DB pool. If any of the latter two shut down, Connection.Manager will
# itself terminate to be restarted by its supervisor in a clean state.
def handle_info({:EXIT, pid, reason}, %State{replication_client_pid: pid} = state) do
halt_if_fatal_error!(reason)
with false <- stop_if_fatal_error(reason, state) do
Logger.debug(
"Handling the exit of the replication client #{inspect(pid)} with reason #{inspect(reason)}"
)

Logger.debug(
"Handling the exit of the replication client #{inspect(pid)} with reason #{inspect(reason)}"
)

{:noreply, %{state | replication_client_pid: nil}, {:continue, :start_replication_client}}
{:noreply, %{state | replication_client_pid: nil}, {:continue, :start_replication_client}}
end
end

# The most likely reason for the lock connection or the DB pool to exit is the database
Expand Down Expand Up @@ -524,8 +524,13 @@ defmodule Electric.Connection.Manager do
end

defp handle_connection_error(error, state, mode) do
halt_if_fatal_error!(error)
with false <- stop_if_fatal_error(error, state) do
state = schedule_reconnection_after_error(error, state, mode)
{:noreply, state}
end
end

defp schedule_reconnection_after_error(error, state, mode) do
message =
case error do
%DBConnection.ConnectionError{message: message} ->
Expand Down Expand Up @@ -553,8 +558,7 @@ defmodule Electric.Connection.Manager do
is_nil(state.pool_pid) -> :start_connection_pool
end

state = schedule_reconnection(step, state)
{:noreply, state}
schedule_reconnection(step, state)
end

defp pg_error_extra_info(pg_error) do
Expand All @@ -573,23 +577,25 @@ defmodule Electric.Connection.Manager do
end
end

@invalid_slot_detail "This slot has been invalidated because it exceeded the maximum reserved size."

defp halt_if_fatal_error!(
defp stop_if_fatal_error(
%Postgrex.Error{
postgres: %{
code: :object_not_in_prerequisite_state,
detail: @invalid_slot_detail,
pg_code: "55000",
routine: "StartLogicalReplication"
detail: "This slot has been invalidated" <> _,
pg_code: "55000"
}
} = error
} = error,
state
) do
System.stop(1)
exit(error)
# Perform supervisor shutdown in a task to avoid a circular dependency where the manager
# process is waiting for the supervisor to shut down its children, one of which is the
# manager process itself.
Task.start(Electric.Connection.Supervisor, :shutdown, [state.stack_id, error])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job, but let's dispatch_stack_event here too, so that the control plane can be aware of what's happened and why this is down


{:noreply, state}
end

defp halt_if_fatal_error!(_), do: nil
defp stop_if_fatal_error(_, _), do: false

defp schedule_reconnection(step, %State{backoff: {backoff, _}} = state) do
{time, backoff} = :backoff.fail(backoff)
Expand Down
18 changes: 17 additions & 1 deletion packages/sync-service/lib/electric/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ defmodule Electric.Connection.Supervisor do
has successfully initialized a database connection pool.
"""

use Supervisor
# This supervisor is meant to be a child of Electric.StackSupervisor.
#
# The `restart: :transient, significant: true` combo allows for shutting the supervisor down
# and signalling the parent supervisor to shut itself down as well if that one has
# `:auto_shutdown` set to `:any_significant` or `:all_significant`.
use Supervisor, restart: :transient, significant: true

require Logger

def name(opts) do
Electric.ProcessRegistry.name(opts[:stack_id], __MODULE__)
Expand All @@ -28,6 +35,15 @@ defmodule Electric.Connection.Supervisor do
Supervisor.start_link(__MODULE__, opts, name: name(opts))
end

def shutdown(stack_id, reason) do
Logger.error(
"Stopping connection supervisor with stack_id=#{inspect(stack_id)} " <>
"due to an unrecoverable error: #{inspect(reason)}"
)

Supervisor.stop(name(stack_id: stack_id), {:shutdown, reason}, 1_000)
end

def init(opts) do
Process.set_label({:connection_supervisor, opts[:stack_id]})
Logger.metadata(stack_id: opts[:stack_id])
Expand Down
7 changes: 7 additions & 0 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ defmodule Electric.StackSupervisor do
2. `Electric.Replication.ShapeLogCollector` collects transactions from the replication connection, fanning them out to `Electric.Shapes.Consumer` (4.1.1.2)
3. `Electric.ShapeCache` coordinates shape creation and handle allocation, shape metadata
"""

# Setting `restart: :transient` is required for passing the `:auto_shutdown` to `Supervisor.init()` below.
use Supervisor, restart: :transient

@opts_schema NimbleOptions.new!(
Expand Down Expand Up @@ -119,6 +121,11 @@ defmodule Electric.StackSupervisor do
Registry.register(registry, {:stack_status, stack_id}, value)
end

# noop if there's no registry running
def dispatch_stack_event(nil, _stack_id, _event) do
:ok
end

def dispatch_stack_event(registry, stack_id, event) do
Registry.dispatch(registry, {:stack_status, stack_id}, fn entries ->
for {pid, ref} <- entries do
Expand Down
Loading