Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioned tables support #2197

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/sync-service/lib/electric/plug/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ defmodule Electric.Plug.Utils do
stack_ready_timeout = Access.get(conn.assigns.config, :stack_ready_timeout, 5_000)
stack_events_registry = conn.assigns.config[:stack_events_registry]

ref = make_ref()
Electric.StackSupervisor.subscribe_to_stack_events(stack_events_registry, stack_id, ref)
ref = Electric.StackSupervisor.subscribe_to_stack_events(stack_events_registry, stack_id)

if Electric.ProcessRegistry.alive?(stack_id, Electric.Replication.Supervisor) do
conn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ defmodule Electric.Postgres.Configuration do
FROM input_relations ir
JOIN pg_class pc ON pc.relname = ir.tablename
JOIN pg_namespace pn ON pn.oid = pc.relnamespace
WHERE pn.nspname = ir.schemaname AND pc.relkind = 'r';
WHERE pn.nspname = ir.schemaname AND pc.relkind IN ('r', 'p');
"""

relations = Map.keys(filters)
Expand Down
22 changes: 17 additions & 5 deletions packages/sync-service/lib/electric/postgres/inspector.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
defmodule Electric.Postgres.Inspector do
alias Electric.Replication.Eval.Parser

@type relation :: Electric.relation()
@type relation_id :: Electric.relation_id()
@type relation_kind :: :ordinary_table | :partitioned_table

@type column_info :: %{
name: String.t(),
Expand All @@ -17,15 +19,19 @@ defmodule Electric.Postgres.Inspector do

@type relation_info :: %{
relation_id: relation_id(),
relation: relation()
relation: relation(),
kind: relation_kind(),
parent: nil | relation(),
children: nil | [relation(), ...]
}

@callback load_relation(String.t(), opts :: term()) ::
@callback load_relation(String.t() | relation(), opts :: term()) ::
{:ok, relation_info()} | {:error, String.t()}

@callback load_column_info(relation(), opts :: term()) ::
{:ok, [column_info()]} | :table_not_found

# @callback introspect_relation()
@callback clean(relation(), opts :: term()) :: true

@type inspector :: {module(), opts :: term()}
Expand All @@ -40,9 +46,14 @@ defmodule Electric.Postgres.Inspector do
`"Users"` would return `{"public", "Users"}`,
`some_schema.users` would return `{"some_schema", "users"}`.
"""
@spec load_relation(String.t(), inspector()) :: {:ok, relation_info()} | {:error, String.t()}
def load_relation(table, {module, opts}),
do: module.load_relation(table, opts)
def load_relation(%{schema: schema, table: table}, inspector),
do: load_relation({schema, table}, inspector)

@spec load_relation(String.t() | relation(), inspector()) ::
{:ok, relation_info()} | {:error, String.t()}
def load_relation(table, {module, opts}) do
module.load_relation(table, opts)
end

@doc """
Load column information about a given table using a provided inspector.
Expand All @@ -55,6 +66,7 @@ defmodule Electric.Postgres.Inspector do
@doc """
Clean up all information about a given relation using a provided inspector.
"""

@spec clean(relation(), inspector()) :: true
def clean(relation, {module, opts}), do: module.clean(relation, opts)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,79 @@ defmodule Electric.Postgres.Inspector.DirectInspector do
@doc """
Returns the PG relation from the table name.
"""
def load_relation(table, conn) do
def load_relation(table, conn) when is_binary(table) do
# The extra cast from $1 to text is needed because of Postgrex' OID type encoding
# see: https://github.com/elixir-ecto/postgrex#oid-type-encoding
query = """
SELECT nspname, relname, pg_class.oid
FROM pg_class
JOIN pg_namespace ON relnamespace = pg_namespace.oid
WHERE
relkind = 'r' AND
pg_class.oid = $1::text::regclass
"""
query = load_relation_query("$1::text::regclass")
do_load_relation(conn, query, [table])
end

def load_relation({schema, name}, conn) when is_binary(schema) and is_binary(name) do
query = load_relation_query("format('%I.%I', $1::text, $2::text)::regclass")
do_load_relation(conn, query, [schema, name])
end

case Postgrex.query(conn, query, [table]) do
defp do_load_relation(conn, query, params) do
case Postgrex.query(conn, query, params) do
{:ok, result} ->
# We expect exactly one row because the query didn't fail
# so the relation exists since we could cast it to a regclass
[[schema, table, oid]] = result.rows
{:ok, %{relation_id: oid, relation: {schema, table}}}
[[schema, table, oid, kind, parent, children]] = result.rows

{:ok,
%{
relation_id: oid,
relation: {schema, table},
kind: resolve_kind(kind),
parent: map_relations(parent),
children: map_relations(children)
}}

{:error, err} ->
{:error, Exception.message(err)}
end
end

defp load_relation_query(match) do
# partitions can live in other namespaces from the parent/root table, so we
# need to keep track of them
[
"""
SELECT pn.nspname, pc.relname, pc.oid, pc.relkind, pi_parent.parent, pi_children.children
FROM pg_catalog.pg_class pc
JOIN pg_catalog.pg_namespace pn ON pc.relnamespace = pn.oid
LEFT OUTER JOIN ( -- get schema and name of parent table (if any)
SELECT pi.inhrelid, ARRAY[pn.nspname, pc.relname] parent
FROM pg_catalog.pg_inherits pi
JOIN pg_catalog.pg_class pc ON pi.inhparent = pc.oid
JOIN pg_catalog.pg_namespace pn ON pc.relnamespace = pn.oid
) pi_parent ON pc.oid = pi_parent.inhrelid
LEFT OUTER JOIN ( -- get list of child partitions (if any)
SELECT pi.inhparent, ARRAY_AGG(ARRAY[pn.nspname, pc.relname]) AS children
FROM pg_catalog.pg_inherits pi
JOIN pg_catalog.pg_class pc ON pi.inhrelid = pc.oid
JOIN pg_catalog.pg_namespace pn ON pc.relnamespace = pn.oid
GROUP BY pi.inhparent
) pi_children ON pc.oid = pi_children.inhparent
WHERE
pc.relkind IN ('r', 'p') AND
""",
"pc.oid = ",
match
]
end

defp resolve_kind("r"), do: :ordinary_table
defp resolve_kind("p"), do: :partitioned_table

defp map_relations(nil), do: nil

defp map_relations([schema, name]) when is_binary(schema) and is_binary(name),
do: {schema, name}

defp map_relations(relations) when is_list(relations),
do: Enum.map(relations, &map_relations/1)

@doc """
Load table information (refs) from the database
"""
Expand All @@ -49,7 +98,7 @@ defmodule Electric.Postgres.Inspector.DirectInspector do
JOIN pg_type ON atttypid = pg_type.oid
LEFT JOIN pg_index ON indrelid = pg_class.oid AND indisprimary
LEFT JOIN pg_type AS elem_pg_type ON pg_type.typelem = elem_pg_type.oid
WHERE relname = $1 AND nspname = $2 AND relkind = 'r'
WHERE relname = $1 AND nspname = $2 AND relkind IN ('r', 'p')
ORDER BY pg_class.oid, attnum
"""

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Electric.Postgres.Inspector.EtsInspector do
alias Electric.Postgres.Inspector.DirectInspector
use GenServer

alias Electric.Postgres.Inspector.DirectInspector

@behaviour Electric.Postgres.Inspector

## Public API
Expand Down Expand Up @@ -117,7 +119,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
{:error, err} ->
{:reply, {:error, err}, state}

{:ok, relation} ->
{:ok, %{relation: relation} = info} ->
# We keep the mapping in both directions:
# - Forward: user-provided table name -> PG relation (many-to-one)
# e.g. `~s|users|` -> `{"public", "users"}`
Expand All @@ -127,9 +129,11 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
#
# The forward direction allows for efficient lookup (based on user-provided table name)
# the backward direction allows for efficient cleanup (based on PG relation)
:ets.insert(state.pg_info_table, {{table, :table_to_relation}, relation})
:ets.insert(state.pg_relation_table, {{relation, :relation_to_table}, table})
{:reply, {:ok, relation}, state}
:ets.insert(state.pg_info_table, {{table, :table_to_relation}, info})
:ets.insert(state.pg_info_table, {{relation, :table_to_relation}, info})
:ets.insert(state.pg_relation_table, {{info, :relation_to_table}, table})
:ets.insert(state.pg_relation_table, {{info, :relation_to_table}, relation})
{:reply, {:ok, info}, state}
end

relation ->
Expand Down Expand Up @@ -159,12 +163,26 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
end

@pg_rel_position 2
defp relation_from_ets(table, opts_or_state) do
defp relation_from_ets(table, opts_or_state) when is_binary(table) do
ets_table = get_column_info_table(opts_or_state)

:ets.lookup_element(ets_table, {table, :table_to_relation}, @pg_rel_position, :not_found)
end

defp relation_from_ets({_schema, _name} = relation, opts_or_state) do
ets_table = get_column_info_table(opts_or_state)

with info when is_map(info) <-
:ets.lookup_element(
ets_table,
{relation, :table_to_relation},
@pg_rel_position,
:not_found
) do
info
end
end

@pg_table_idx 1
defp tables_from_ets(relation, opts_or_state) do
ets_table = get_relation_table(opts_or_state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ defmodule Electric.Replication.ShapeLogCollector do
state = Map.merge(opts, %{producer: nil, subscriptions: {0, MapSet.new()}})
# start in demand: :accumulate mode so that the ShapeCache is able to start
# all active consumers before we start sending transactions
{:producer, state, dispatcher: Electric.Shapes.Dispatcher, demand: opts.demand}
{:producer, state,
dispatcher: {Electric.Shapes.Dispatcher, inspector: state.inspector}, demand: opts.demand}
end

def handle_subscribe(:consumer, _opts, from, state) do
Expand Down Expand Up @@ -148,14 +149,43 @@ defmodule Electric.Replication.ShapeLogCollector do

OpenTelemetry.add_span_attributes("rel.is_dropped": true)

reload_partitioned_table(rel, state)

{:reply, :ok, [], state}
end

defp handle_relation(rel, from, state) do
OpenTelemetry.add_span_attributes("rel.is_dropped": false)
reload_partitioned_table(rel, state)
{:noreply, [rel], %{state | producer: from}}
end

defp reload_partitioned_table(rel, state) do
case Inspector.load_relation(rel, state.inspector) do
{:ok, %{parent: nil}} ->
:ok

{:ok, %{parent: {_, _} = parent}} ->
# probably a new partition for an existing partitioned table
# so force a reload of the relation info

# TODO: we should probabaly have a way to clean the inspector cache
# just based on the relation, there's a chance that this results in
# a query to pg just to then drop the info
with {:ok, info} <- Inspector.load_relation(parent, state.inspector) do
Inspector.clean(info, state.inspector)
end

{:ok, _} ->
# probably a malformed value from a test inspector
:ok

{:error, _} ->
# just ignore errors here, they're unlikely anyway
:ok
end
end

defp remove_subscription(from, %{subscriptions: {count, set}} = state) do
subscriptions =
if MapSet.member?(set, from) do
Expand Down
57 changes: 42 additions & 15 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ defmodule Electric.Shapes.Consumer do
restart: :temporary,
significant: true

alias Electric.ShapeCache.LogChunker
alias Electric.LogItems
alias Electric.Postgres.Inspector
alias Electric.Replication.Changes
alias Electric.Replication.Changes.Transaction
alias Electric.ShapeCache
alias Electric.ShapeCache.LogChunker
alias Electric.Shapes.Shape
alias Electric.Telemetry.OpenTelemetry
alias Electric.Utils
Expand Down Expand Up @@ -182,25 +183,51 @@ defmodule Electric.Shapes.Consumer do

# `Shapes.Dispatcher` only works with single-events, so we can safely assert
# that here
def handle_events([%Changes.Relation{}], _from, state) do
%{shape: %{root_table: root_table}, inspector: {inspector, inspector_opts}} = state
def handle_events([%Changes.Relation{} = relation], _from, state) do
%{shape: %{root_table: root_table} = shape, inspector: inspector} = state

# we now recelve relation messages from partitions, as well as ones
# affecting our root table so we need to be clear what we're getting -- if
# the relation message refers to our root table then we need to drop the
# shape as something has changed. if the relation is a new partition, so
# it's parent is our root table, then we need to just add that partition to
# our shape so txns from the new partition are properly mapped to our root
# table.
if relation.id == shape.root_table_id do
Logger.info(
"Schema for the table #{Utils.inspect_relation(root_table)} changed - terminating shape #{state.shape_handle}"
)

Logger.info(
"Schema for the table #{Utils.inspect_relation(root_table)} changed - terminating shape #{state.shape_handle}"
)
# We clean up the relation info from ETS as it has changed and we want
# to source the fresh info from postgres for the next shape creation
Inspector.clean(root_table, inspector)

# We clean up the relation info from ETS as it has changed and we want
# to source the fresh info from postgres for the next shape creation
inspector.clean(root_table, inspector_opts)
state =
reply_to_snapshot_waiters(
{:error, "Shape relation changed before snapshot was ready"},
state
)

state =
reply_to_snapshot_waiters(
{:error, "Shape relation changed before snapshot was ready"},
state
cleanup(state)

{:stop, :normal, state}
else
# if we're receiving this relation message but the relation doesn't refer
# to the root table for the shape, then it **must** be because of the addition of a partition
# to the root table

{:ok, %{parent: ^root_table, relation: table}} =
Inspector.load_relation({relation.schema, relation.table}, inspector)

# a new partition has been added
Logger.info(
"New partition #{Utils.inspect_relation(table)} for table #{Utils.inspect_relation(root_table)}"
)

cleanup(state)
{:stop, :normal, state}
shape = Shape.add_partition(shape, root_table, table)

{:noreply, [], %{state | shape: shape}}
end
end

# Buffer incoming transactions until we know our xmin
Expand Down
Loading
Loading