Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/sync-service/lib/electric/postgres/inspector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ defmodule Electric.Postgres.Inspector do
@doc """
Clean up all information about a given relation using a provided inspector.
"""
@spec clean(relation_id(), inspector()) :: :ok
def clean(relation_id, {module, opts}) when is_relation_id(relation_id) do
module.clean(relation_id, opts)
@spec purge_relation_info(relation_id(), inspector()) :: :ok
def purge_relation_info(relation_id, {module, opts}) when is_relation_id(relation_id) do
module.purge_relation_info(relation_id, opts)
end

@doc """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,5 +242,5 @@ defmodule Electric.Postgres.Inspector.DirectInspector do
end

@impl Electric.Postgres.Inspector
def clean(_, _), do: :ok
def purge_relation_info(_, _), do: :ok
end
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
end

@impl Inspector
@spec clean(Electric.relation_id(), opts :: term()) :: :ok
def clean(relation_id, opts) when is_relation_id(relation_id) do
GenServer.call(opts[:server], {:clean, relation_id}, :infinity)
@spec purge_relation_info(Electric.relation_id(), opts :: term()) :: :ok
def purge_relation_info(relation_id, opts) when is_relation_id(relation_id) do
GenServer.call(opts[:server], {:purge_relation_info, relation_id}, :infinity)
end

@impl Inspector
Expand Down Expand Up @@ -159,7 +159,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
{:reply, response, state}
end

def handle_call({:clean, oid}, _from, state) do
def handle_call({:purge_relation_info, oid}, _from, state) do
{:reply, :ok, delete_relation_info(state, oid)}
end

Expand Down
13 changes: 6 additions & 7 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ defmodule Electric.Shapes.Consumer do
# Schema changed while we were creating stuff, which means shape is functionally invalid.
# Return a 409 to trigger a fresh start with validation against the new schema.
%{shape: %Shape{root_table_id: root_table_id}} = state
clean_table(root_table_id, state)
purge_relation_from_inspector_cache(root_table_id, state)
end

state
Expand Down Expand Up @@ -420,8 +420,7 @@ defmodule Electric.Shapes.Consumer do
State.reply_to_snapshot_waiters(state, {:error, "Shape terminated before snapshot was ready"})
end

# Any relation that gets let through by the `ShapeLogCollector` (as coupled with `Shapes.Dispatcher`)
# is a signal that we need to terminate the shape.
# Any relation that gets let through by the `ShapeLogCollector` is a signal that we need to terminate the shape.
defp handle_event(%Changes.Relation{}, state) do
%{shape: %Shape{root_table_id: root_table_id, root_table: root_table}} = state

Expand All @@ -431,7 +430,7 @@ defmodule Electric.Shapes.Consumer do

# We clean up the relation info from ETS as it has changed and we want
# to source the fresh info from postgres for the next shape creation
clean_table(root_table_id, state)
purge_relation_from_inspector_cache(root_table_id, state)

state
|> State.reply_to_snapshot_waiters(
Expand Down Expand Up @@ -801,9 +800,9 @@ defmodule Electric.Shapes.Consumer do
end)
end

defp clean_table(table_oid, state) do
inspector = Electric.StackConfig.lookup!(state.stack_id, :inspector)
Inspector.clean(table_oid, inspector)
defp purge_relation_from_inspector_cache(table_oid, state) do
inspector = Inspector.for_stack(state.stack_id)
Inspector.purge_relation_info(table_oid, inspector)
end

defp handle_materializer_down(reason, state) do
Expand Down
Loading