From 95bb206636fe9e72ef0eb0d005e99f1c918c11c8 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 13 Oct 2025 00:37:41 +0200 Subject: [PATCH 01/18] Store the result of connection opts validation in StackConfig --- .../lib/electric/connection/manager.ex | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 0a2e516b28..060443483a 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -146,6 +146,8 @@ defmodule Electric.Connection.Manager do # the same reconnection period rather than a new one. @replication_liveness_confirmation_duration 5_000 + @validated_conn_opts_config_key {__MODULE__, :validated_connection_opts} + def child_spec(init_arg) do %{ id: __MODULE__, @@ -181,6 +183,12 @@ defmodule Electric.Connection.Manager do pool_name(Access.fetch!(opts, :stack_id), Access.fetch!(opts, :role)) end + def validated_connection_opts(stack_id, type) do + stack_id + |> Electric.StackConfig.lookup!(@validated_conn_opts_config_key) + |> Map.fetch!(type) + end + def drop_replication_slot_on_stop(manager) do GenServer.cast(manager, :drop_replication_slot_on_stop) end @@ -291,14 +299,15 @@ defmodule Electric.Connection.Manager do manual_table_publishing?: Keyword.get(opts, :manual_table_publishing?, false), max_shapes: Keyword.fetch!(opts, :max_shapes) } - |> initialize_connection_opts(opts) + |> init_connection_opts(opts) + |> init_validated_connection_opts() # Wait for the connection resolver to start before continuing with # connection setup. {:ok, state} end - defp initialize_connection_opts(state, opts) do + defp init_connection_opts(state, opts) do connection_opts = Keyword.fetch!(opts, :connection_opts) replication_opts = @@ -310,14 +319,28 @@ defmodule Electric.Connection.Manager do %{state | connection_opts: connection_opts, replication_opts: replication_opts} end + defp init_validated_connection_opts(%{stack_id: stack_id} = state) do + stack_validated_connection_opts = + Electric.StackConfig.lookup(stack_id, @validated_conn_opts_config_key) + + Map.update!(state, :validated_connection_opts, fn map -> + Map.new(map, fn {type, nil} -> {type, stack_validated_connection_opts[type]} end) + end) + end + + defp update_validated_connection_opts(%{stack_id: stack_id} = state, type, validated_opts) do + validated_connection_opts = Map.put(state.validated_connection_opts, type, validated_opts) + Electric.StackConfig.put(stack_id, @validated_conn_opts_config_key, validated_connection_opts) + %{state | validated_connection_opts: validated_connection_opts} + end + defp validate_connection(conn_opts, type, state) do - if opts = state.validated_connection_opts[type] do + if opts = Map.get(state.validated_connection_opts, type) do {:ok, opts, state} else try do with {:ok, validated_opts} <- ConnectionResolver.validate(state.stack_id, conn_opts) do - {:ok, validated_opts, - Map.update!(state, :validated_connection_opts, &Map.put(&1, type, validated_opts))} + {:ok, validated_opts, update_validated_connection_opts(state, type, validated_opts)} end catch :exit, {:killed, _} -> {:error, :killed} From e2a20babe831a532625c9dcbac841aae9c6eec7b Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 13 Oct 2025 23:35:43 +0200 Subject: [PATCH 02/18] Implement periodic retained WAL size check in Restarter --- .../lib/electric/connection/restarter.ex | 67 ++++++++++++++++--- .../lib/electric/connection/supervisor.ex | 11 ++- 2 files changed, 67 insertions(+), 11 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/restarter.ex b/packages/sync-service/lib/electric/connection/restarter.ex index 5205207fe0..784655d2b4 100644 --- a/packages/sync-service/lib/electric/connection/restarter.ex +++ b/packages/sync-service/lib/electric/connection/restarter.ex @@ -10,6 +10,10 @@ defmodule Electric.Connection.Restarter do - publication manager - schema reconciler + Once the connection subsystem is scaled down, Restarter starts a timer to check the retained + WAL size periodically. If the size exceeds the configured threshold, Restart will restart the + connection subsystem. Both the period and the size threshold are passed to `start_link/1`. + """ use GenServer @@ -75,30 +79,39 @@ defmodule Electric.Connection.Restarter do %{ stack_id: Keyword.fetch!(opts, :stack_id), stack_events_registry: Keyword.fetch!(opts, :stack_events_registry), - wait_until_conn_up_ref: nil + wal_size_check_period: Keyword.get(opts, :wal_size_check_period, 0), + wal_size_threshold: Keyword.get(opts, :wal_size_threshold, 1), + wait_until_conn_up_ref: nil, + wal_size_check_timer: nil }} end - def handle_cast(:stop_connection_subsystem, state) do - StatusMonitor.database_connections_going_to_sleep(state.stack_id) - Electric.Connection.Manager.Supervisor.stop_connection_manager(stack_id: state.stack_id) + def handle_cast(:stop_connection_subsystem, %{stack_id: stack_id} = state) do + StatusMonitor.database_connections_going_to_sleep(stack_id) + Electric.Connection.Manager.Supervisor.stop_connection_manager(stack_id: stack_id) Electric.StackSupervisor.dispatch_stack_event( state.stack_events_registry, - state.stack_id, + stack_id, :scaled_down_database_connections ) + state = schedule_wal_size_check(state) + {:noreply, state} end def handle_cast(:restore_connection_subsystem, %{wait_until_conn_up_ref: nil} = state) do - StatusMonitor.database_connections_waking_up(state.stack_id) - Electric.Connection.Manager.Supervisor.restart(stack_id: state.stack_id) + %{stack_id: stack_id} = state + + StatusMonitor.database_connections_waking_up(stack_id) + Electric.Connection.Manager.Supervisor.restart(stack_id: stack_id) - ref = StatusMonitor.wait_until_conn_up_async(state.stack_id) + ref = StatusMonitor.wait_until_conn_up_async(stack_id) - {:noreply, %{state | wait_until_conn_up_ref: ref}} + if timer = state.wal_size_check_timer, do: Process.cancel_timer(timer) + + {:noreply, %{state | wait_until_conn_up_ref: ref, wal_size_check_timer: nil}} end def handle_cast(:restore_connection_subsystem, state) do @@ -115,4 +128,40 @@ defmodule Electric.Connection.Restarter do def handle_info({ref, :ok}, %{wait_until_conn_up_ref: ref} = state) do {:noreply, %{state | wait_until_conn_up_ref: nil}} end + + # The timer has already been cancelled and reset, ignore this message. + def handle_info(:check_wal_size, %{wal_size_check_timer: nil} = state) do + {:noreply, state} + end + + def handle_info(:check_wal_size, state) do + state = %{state | wal_size_check_timer: nil} + + wal_size = query_retained_wal_size(state) + + state = + if wal_size >= state.wal_size_threshold do + :ok = restart_connection_subsystem(state.stack_id) + state + else + schedule_wal_size_check(state) + end + + {:noreply, state} + end + + defp schedule_wal_size_check( + %{wal_size_check_timer: nil, wal_size_check_period: period} = state + ) + when is_integer(period) and period > 0 do + timer = Process.send_after(self(), :check_wal_size, period) + %{state | wal_size_check_timer: timer} + end + + defp schedule_wal_size_check(state), do: state + + defp query_retained_wal_size(_state) do + # FIXME: placeholder + 0 + end end diff --git a/packages/sync-service/lib/electric/connection/supervisor.ex b/packages/sync-service/lib/electric/connection/supervisor.ex index ea32f196f4..8da81a79ae 100644 --- a/packages/sync-service/lib/electric/connection/supervisor.ex +++ b/packages/sync-service/lib/electric/connection/supervisor.ex @@ -57,9 +57,16 @@ defmodule Electric.Connection.Supervisor do Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) + tweaks = Keyword.fetch!(opts, :tweaks) + + restarter_opts = + [ + stack_id: stack_id, + stack_events_registry: Keyword.fetch!(opts, :stack_events_registry) + ] ++ Keyword.take(tweaks, [:wal_size_check_period, :wal_size_threshold]) + children = [ - {Electric.Connection.Restarter, - stack_id: stack_id, stack_events_registry: Keyword.fetch!(opts, :stack_events_registry)}, + {Electric.Connection.Restarter, restarter_opts}, {Electric.Connection.Manager.Supervisor, opts} ] From 8ffaf6c7d93118ae47b1f4be746994cd52387e01 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 14 Oct 2025 01:01:23 +0200 Subject: [PATCH 03/18] Implement ELECTRIC_IDLE_WAL_SIZE_CHECK_PERIOD and ELECTRIC_IDLE_WAL_SIZE_THRESHOLD --- packages/sync-service/config/runtime.exs | 9 ++++++++- packages/sync-service/lib/electric/application.ex | 4 +++- packages/sync-service/lib/electric/config.ex | 10 +++++++++- packages/sync-service/lib/electric/stack_supervisor.ex | 10 +++++++++- 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 17b34d2e4d..8a49adb18d 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -272,7 +272,14 @@ config :electric, "ELECTRIC_REPLICATION_IDLE_TIMEOUT", &Electric.Config.parse_human_readable_time!/1, nil - ) + ), + idle_wal_size_check_period: + env!( + "ELECTRIC_IDLE_WAL_SIZE_CHECK_PERIOD", + &Electric.Config.parse_human_readable_time!/1, + nil + ), + idle_wal_size_threshold: env!("ELECTRIC_IDLE_WAL_SIZE_THRESHOLD", :integer, nil) if Electric.telemetry_enabled?() do # Disable the default telemetry_poller process since we start our own in diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 7ed4262612..9b401bda02 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -143,7 +143,9 @@ defmodule Electric.Application do shape_hibernate_after: get_env(opts, :shape_hibernate_after), shape_enable_suspend?: get_env(opts, :shape_enable_suspend?), conn_max_requests: get_env(opts, :conn_max_requests), - process_spawn_opts: get_env(opts, :process_spawn_opts) + process_spawn_opts: get_env(opts, :process_spawn_opts), + idle_wal_size_check_period: get_env(opts, :idle_wal_size_check_period), + idle_wal_size_threshold: get_env(opts, :idle_wal_size_threshold) ], manual_table_publishing?: get_env(opts, :manual_table_publishing?) ) diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index ad6b48398e..eb7f22dedb 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -50,6 +50,13 @@ defmodule Electric.Config do max_txn_size: 250 * 1024 * 1024, # Scaling down on idle is disabled by default replication_idle_timeout: 0, + # If the database provider scales down after 5 min and provided that the + # replication_idle_timeout is about a minute or less, checking WAL size once an hour + # ends up using about 10% of the compute on an otherwise idle database. + idle_wal_size_check_period: 3_600_000, + # We want to wake up and process any transactions that have accumulated in the WAL, hence + # the low threshold. + idle_wal_size_threshold: 1_000, manual_table_publishing?: false, ## HTTP API # set enable_http_api: false to turn off the HTTP server totally @@ -426,7 +433,7 @@ defmodule Electric.Config do end end - @time_units ~w[ms msec s sec m min] + @time_units ~w[ms msec s sec m min h hr] @spec parse_human_readable_time(binary | nil) :: {:ok, pos_integer} | {:error, binary} @@ -445,6 +452,7 @@ defmodule Electric.Config do defp time_multiplier(millisecond) when millisecond in ["ms", "msec"], do: 1 defp time_multiplier(second) when second in ["s", "sec"], do: 1000 defp time_multiplier(minute) when minute in ["m", "min"], do: 1000 * 60 + defp time_multiplier(hour) when hour in ["h", "hr"], do: 1000 * 60 * 60 def parse_human_readable_time!(str) do case parse_human_readable_time(str) do diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index e23ed6b9f8..ebf950d3fd 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -136,7 +136,15 @@ defmodule Electric.StackSupervisor do type: :pos_integer, default: Electric.Config.default(:conn_max_requests) ], - process_spawn_opts: [type: :map, default: %{}] + process_spawn_opts: [type: :map, default: %{}], + idle_wal_size_check_period: [ + type: :integer, + default: Electric.Config.default(:idle_wal_size_check_period) + ], + idle_wal_size_threshold: [ + type: :integer, + default: Electric.Config.default(:idle_wal_size_threshold) + ] ] ], manual_table_publishing?: [ From 6e62d86fb96b713ab87edcc26d2b393fb051e32c Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 14 Oct 2025 01:05:46 +0200 Subject: [PATCH 04/18] Add changeset --- .changeset/little-seas-guess.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/little-seas-guess.md diff --git a/.changeset/little-seas-guess.md b/.changeset/little-seas-guess.md new file mode 100644 index 0000000000..d47c077747 --- /dev/null +++ b/.changeset/little-seas-guess.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Add two new configuration options for periodic retained WAL size checks in scaled down mode: ELECTRIC_IDLE_WAL_SIZE_CHECK_PERIOD and ELECTRIC_IDLE_WAL_SIZE_THRESHOLD. From 546f19dfcf1ec54f671c47d52f60d290adc46ae1 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 11 Dec 2025 11:35:53 +0100 Subject: [PATCH 05/18] Add Electric.Postgres.OneOffConnection module to run one-off DB queries --- .../electric/postgres/one_off_connection.ex | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 packages/sync-service/lib/electric/postgres/one_off_connection.ex diff --git a/packages/sync-service/lib/electric/postgres/one_off_connection.ex b/packages/sync-service/lib/electric/postgres/one_off_connection.ex new file mode 100644 index 0000000000..7942339a1f --- /dev/null +++ b/packages/sync-service/lib/electric/postgres/one_off_connection.ex @@ -0,0 +1,84 @@ +defmodule Electric.Postgres.OneOffConnection do + require Logger + + @behaviour Postgrex.SimpleConnection + + @default_timeout 5000 + + def query(query, kwopts) do + {connection_opts, kwopts} = Keyword.pop(kwopts, :connection_opts) + {timeout, kwopts} = Keyword.pop(kwopts, :timeout, @default_timeout) + + connection_opts = + connection_opts + |> Electric.Utils.deobfuscate_password() + |> Keyword.merge(auto_reconnect: false, sync_connect: true) + + old_flag = Process.flag(:trap_exit, true) + + {:ok, pid} = + Postgrex.SimpleConnection.start_link( + __MODULE__, + [query: query, parent_pid: self()] ++ kwopts, + connection_opts + ) + + mon = Process.monitor(pid) + + result = + receive do + {^pid, %Postgrex.Result{} = result} -> {:ok, result} + {^pid, %Postgrex.Error{} = error} -> {:error, error} + {:DOWN, ^mon, :process, ^pid, reason} -> {:error, reason} + after + timeout -> {:error, :timeout} + end + + Process.exit(pid, :shutdown) + Process.demonitor(mon, [:flush]) + + receive do + {:EXIT, ^pid, reason} -> Logger.debug("OneOffConnection exited: #{inspect(reason)}") + end + + Process.flag(:trap_exit, old_flag) + + result + end + + @impl true + def init(kwopts) do + config = + kwopts + |> Map.new() + + %{stack_id: stack_id} = config + + Process.set_label({config.label, stack_id}) + Logger.metadata(stack_id: stack_id, is_connection_process?: true) + Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) + + {:ok, config} + end + + @impl true + def handle_connect(state) do + {:query, state.query, state} + end + + @impl true + def handle_result([%Postgrex.Result{} = result], state) do + send(state.parent_pid, {self(), result}) + {:noreply, state} + end + + def handle_result(%Postgrex.Error{} = error, state) do + send(state.parent_pid, {self(), error}) + {:noreply, state} + end + + @impl true + def notify(_channel, _payload, _state) do + :ok + end +end From 81ec97165a84eda96c2d0fe7005b2e483036b3d9 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 11 Dec 2025 11:36:14 +0100 Subject: [PATCH 06/18] Reimplement Electric.Postgres.LockBreakerConnection as a one-off query --- .../lib/electric/connection/manager.ex | 101 ++++++++++--- .../postgres/lock_breaker_connection.ex | 133 ------------------ .../test/electric/connection/manager_test.exs | 107 +++++++++++++- .../postgres/lock_breaker_connection_test.exs | 116 --------------- 4 files changed, 187 insertions(+), 270 deletions(-) delete mode 100644 packages/sync-service/lib/electric/postgres/lock_breaker_connection.ex delete mode 100644 packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 060443483a..42c8c1e4fd 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -118,7 +118,9 @@ defmodule Electric.Connection.Manager do end use GenServer, shutdown: :infinity - alias Electric.Postgres.LockBreakerConnection + + import Electric.Utils, only: [quote_string: 1] + alias Electric.Connection.Manager.ConnectionBackoff alias Electric.Connection.Manager.ConnectionResolver alias Electric.DbConnectionError @@ -566,28 +568,24 @@ defmodule Electric.Connection.Manager do def handle_continue( :check_lock_not_abandoned, - %State{replication_pg_backend_pid: pg_backend_pid} = state - ) do - if state.current_step == {:start_replication_client, :acquiring_lock} and - not is_nil(pg_backend_pid) do - with {:ok, conn_opts, state} <- - validate_connection(pooled_connection_opts(state), :pool, state), - {:ok, breaker_pid} <- - LockBreakerConnection.start(connection_opts: conn_opts, stack_id: state.stack_id) do - lock_name = Keyword.fetch!(state.replication_opts, :slot_name) - - LockBreakerConnection.stop_backends_and_close(breaker_pid, lock_name, pg_backend_pid) - else - {:error, reason} -> - # no-op, this is a one-shot attempt at fixing a lock - Logger.warning("Failed try and break stuck lock connection: #{inspect(reason)}") - :ok - end - end + %State{ + current_step: {:start_replication_client, :acquiring_lock}, + replication_pg_backend_pid: pg_backend_pid + } = state + ) + when not is_nil(pg_backend_pid) do + pooled_conn_opts = pooled_connection_opts(state) - {:noreply, state} + with {:ok, conn_opts, state} <- validate_connection(pooled_conn_opts, :pool, state) do + run_lock_breaker_query(conn_opts, state) + {:noreply, state} + else + _ -> {:noreply, state} + end end + def handle_continue(:check_lock_not_abandoned, state), do: {:noreply, state} + @impl true def handle_info( {:timeout, tref, {:check_status, :replication_lock}}, @@ -1298,4 +1296,67 @@ defmodule Electric.Connection.Manager do do: %{state | replication_configuration_timer: nil} defp nillify_timer(state, _tref), do: state + + # Electric takes out a session-level advisory lock on a separate connection to better manage the + # ownership of the replication slot. Unfortunately, we have seen instances (especially on Neon), + # where the Electric disconnects, but the lock is not auto-released. + # + # For these cases, this breaker exists - it'll connect to the database, and check that for + # a given lock name, if that lock is taken, there also exists an active replication slot with the + # same name. If not, it'll terminate the backend that is holding the lock, under the assumption + # that it's one of the abandoned locks. + def run_lock_breaker_query(conn_opts, state) do + database = Keyword.fetch!(conn_opts, :database) + lock_name = Keyword.fetch!(state.replication_opts, :slot_name) + query = lock_breaker_query(database, lock_name, state.replication_pg_backend_pid) + opts = [stack_id: state.stack_id, label: :lock_breaker_connection, connection_opts: conn_opts] + + retval = Electric.Postgres.OneOffConnection.query(query, opts) + + case retval do + {:ok, %Postgrex.Result{columns: ["pg_terminate_backend"], num_rows: 0}} -> + Logger.debug("No stuck backends found") + + {:ok, %Postgrex.Result{columns: ["pg_terminate_backend"]}} -> + Logger.notice( + "Terminated a stuck backend to free the lock #{lock_name} because slot with same name was inactive" + ) + + {:error, reason} -> + # No retries here since this is a one-shot attempt at fixing a lock + Logger.warning("Failed try and break stuck lock connection: #{inspect(reason)}") + end + + retval + end + + defp lock_breaker_query(database, lock_name, lock_connection_pg_backend_pid) + when is_integer(lock_connection_pg_backend_pid) or is_nil(lock_connection_pg_backend_pid) do + # We're using a `WITH` clause to execute all this in one statement + # - See if there are existing but inactive replication slots with the given name + # - Find all backends that are holding locks with the same name + # - Terminate those backends + # + # It's generally impossible for this to return more than one row + + """ + WITH inactive_slots AS ( + select slot_name + from pg_replication_slots + where active = false and database = #{quote_string(database)} and slot_name = #{quote_string(lock_name)} + ), + stuck_backends AS ( + select pid + from pg_locks, inactive_slots + where + hashtext(slot_name) = (classid::bigint << 32) | objid::bigint + and locktype = 'advisory' + and objsubid = 1 + and database = (select oid from pg_database where datname = #{quote_string(database)}) + and granted + and pid != #{lock_connection_pg_backend_pid || 0} + ) + SELECT pg_terminate_backend(pid) FROM stuck_backends; + """ + end end diff --git a/packages/sync-service/lib/electric/postgres/lock_breaker_connection.ex b/packages/sync-service/lib/electric/postgres/lock_breaker_connection.ex deleted file mode 100644 index eb2427bb18..0000000000 --- a/packages/sync-service/lib/electric/postgres/lock_breaker_connection.ex +++ /dev/null @@ -1,133 +0,0 @@ -defmodule Electric.Postgres.LockBreakerConnection do - @moduledoc """ - A Postgres connection that is used to break an abandoned lock. - - Electric takes out a session-level advisory lock on a separate connection to better manage the - ownership of the replication slot. Unfortunately, we have seen instances (especially on Neon), - where the Electric disconnects, but the lock is not auto-released. - - For these cases, this breaker exists - it'll connect to the database, and check that for - a given lock name, if that lock is taken, there also exists an active replication slot with the - same name. If not, it'll terminate the backend that is holding the lock, under the assumption - that it's one of the abandoned locks. - """ - require Logger - - import Electric.Utils, only: [quote_string: 1] - - @behaviour Postgrex.SimpleConnection - - @type option :: - {:connection_opts, Keyword.t()} - | {:stack_id, String.t()} - - @type options :: [option] - - @spec start(options()) :: {:ok, pid()} | {:error, Postgrex.Error.t() | term()} - def start(opts) do - {connection_opts, init_opts} = Keyword.pop(opts, :connection_opts) - - connection_opts = Electric.Utils.deobfuscate_password(connection_opts) - - with {:ok, pid} <- - Postgrex.SimpleConnection.start_link( - __MODULE__, - init_opts |> Keyword.put(:database, connection_opts[:database]), - [ - auto_reconnect: false, - sync_connect: true - ] ++ - connection_opts - ) do - # unlink the lock breaker so that if it crashes it does not affect the caller, - # since it is a one shot fix attempt anyway - Process.unlink(pid) - {:ok, pid} - end - end - - def stop_backends_and_close(server, lock_name, lock_connection_pg_backend_pid \\ nil) do - send(server, {:stop_backends_and_close, lock_name, lock_connection_pg_backend_pid}) - end - - @impl true - def init(opts) do - opts = Map.new(opts) - - Process.set_label({:lock_breaker_connection, opts.stack_id}) - - metadata = [ - is_connection_process?: true, - stack_id: opts.stack_id - ] - - Logger.metadata(metadata) - Electric.Telemetry.Sentry.set_tags_context(metadata) - - {:ok, opts} - end - - @impl true - def handle_connect(state) do - {:noreply, state} - end - - @impl true - def handle_info( - {:stop_backends_and_close, lock_name, lock_connection_pg_backend_pid}, - state - ) do - {:query, lock_breaker_query(lock_name, lock_connection_pg_backend_pid, state.database), - Map.put(state, :lock_name, lock_name)} - end - - @impl true - def handle_result([%Postgrex.Result{columns: ["pg_terminate_backend"]} = result], state) do - if result.num_rows == 0 do - Logger.debug("No stuck backends found") - else - Logger.notice( - "Terminated a stuck backend to free the lock #{state.lock_name} because slot with same name was inactive" - ) - end - - exit(:shutdown) - end - - def handle_result(%Postgrex.Error{} = error, _) do - raise error - end - - @impl true - def notify(_, _, _), do: :ok - - defp lock_breaker_query(lock_name, lock_connection_pg_backend_pid, database) - when is_integer(lock_connection_pg_backend_pid) or is_nil(lock_connection_pg_backend_pid) do - # We're using a `WITH` clause to execute all this in one statement - # - See if there are existing but inactive replication slots with the given name - # - Find all backends that are holding locks with the same name - # - Terminate those backends - # - # It's generally impossible for this to return more than one row - - """ - WITH inactive_slots AS ( - select slot_name - from pg_replication_slots - where active = false and database = #{quote_string(database)} and slot_name = #{quote_string(lock_name)} - ), - stuck_backends AS ( - select pid - from pg_locks, inactive_slots - where - hashtext(slot_name) = (classid::bigint << 32) | objid::bigint - and locktype = 'advisory' - and objsubid = 1 - and database = (select oid from pg_database where datname = #{quote_string(database)}) - and granted - and pid != #{lock_connection_pg_backend_pid || 0} - ) - SELECT pg_terminate_backend(pid) FROM stuck_backends; - """ - end -end diff --git a/packages/sync-service/test/electric/connection/manager_test.exs b/packages/sync-service/test/electric/connection/manager_test.exs index d2c006dbf9..9315680bad 100644 --- a/packages/sync-service/test/electric/connection/manager_test.exs +++ b/packages/sync-service/test/electric/connection/manager_test.exs @@ -5,8 +5,8 @@ defmodule Electric.Connection.ConnectionManagerTest do import Support.ComponentSetup import Support.DbSetup - alias Electric.Replication.ShapeLogCollector alias Electric.Connection + alias Electric.Replication.ShapeLogCollector alias Electric.StatusMonitor setup [ @@ -412,6 +412,103 @@ defmodule Electric.Connection.ConnectionManagerTest do end end + describe "lock breaker query" do + test "should break an abandoned lock if slot is inactive", ctx do + Postgrex.query!( + ctx.db_conn, + "SELECT pg_create_logical_replication_slot('#{ctx.slot_name}', 'pgoutput')" + ) + + test_pid = self() + + # Start a task that will hold the lock until the end of the test + start_supervised!({ + Task, + fn -> + DBConnection.run(ctx.db_conn, fn conn -> + Postgrex.query!(conn, "SELECT pg_advisory_lock(hashtext('#{ctx.slot_name}'))") + + send(test_pid, :lock_acquired) + + Process.sleep(:infinity) + end) + end + }) + + assert_receive :lock_acquired + + # Verify there's an entry for the acquired lock in pg_locks + assert %Postgrex.Result{rows: [[pg_backend_pid]], num_rows: 1} = query_lock_status(ctx) + + # Passing a pg_backend_pid to the run_lock_breaker_query() protects that backend pid from termination + conn_man_state = %{ + stack_id: ctx.stack_id, + replication_opts: [slot_name: ctx.slot_name], + replication_pg_backend_pid: pg_backend_pid + } + + assert {:ok, %Postgrex.Result{columns: ["pg_terminate_backend"], num_rows: 0}} = + Electric.Connection.Manager.run_lock_breaker_query(ctx.db_config, conn_man_state) + + # Verify the lock is still held + assert %Postgrex.Result{rows: [[^pg_backend_pid]], num_rows: 1} = query_lock_status(ctx) + + # Make sure we can stop the lock connection above, so we're passing nil for replication_pg_backend_pid + conn_man_state = %{ + stack_id: ctx.stack_id, + replication_opts: [slot_name: ctx.slot_name], + replication_pg_backend_pid: nil + } + + assert {:ok, + %Postgrex.Result{columns: ["pg_terminate_backend"], num_rows: 1, rows: [["t"]]}} = + Electric.Connection.Manager.run_lock_breaker_query(ctx.db_config, conn_man_state) + + # Verify that the pg_locks entry is gone + assert %Postgrex.Result{rows: [], num_rows: 0} = query_lock_status(ctx) + end + + test "doesn't break the lock if it's taken from expected lock connection", ctx do + Postgrex.query!( + ctx.db_conn, + "SELECT pg_create_logical_replication_slot('#{ctx.slot_name}', 'pgoutput')" + ) + + {:ok, replication_client_pid} = + start_supervised( + {Electric.Postgres.ReplicationClient, + stack_id: ctx.stack_id, + replication_opts: [ + connection_opts: ctx.db_config, + stack_id: ctx.stack_id, + publication_name: ctx.slot_name, + try_creating_publication?: false, + slot_name: ctx.slot_name, + handle_event: nil, + connection_manager: self() + ]} + ) + + replication_client_monitor = Process.monitor(replication_client_pid) + + assert_receive {:"$gen_cast", {:pg_info_obtained, %{pg_backend_pid: pg_backend_pid}}} + assert_receive {:"$gen_cast", :replication_client_lock_acquired} + + conn_man_state = %{ + stack_id: ctx.stack_id, + replication_opts: [slot_name: ctx.slot_name], + replication_pg_backend_pid: pg_backend_pid + } + + assert {:ok, %Postgrex.Result{columns: ["pg_terminate_backend"], num_rows: 0}} = + Electric.Connection.Manager.run_lock_breaker_query(ctx.db_config, conn_man_state) + + refute_received {:DOWN, ^replication_client_monitor, :process, _pid, _reason} + + stop_supervised(Electric.Postgres.ReplicationClient) + end + end + defp wait_until_active(stack_id) do assert_receive {:stack_status, _, :waiting_for_connection_lock} assert_receive {:stack_status, _, :connection_lock_acquired} @@ -426,4 +523,12 @@ defmodule Electric.Connection.ConnectionManagerTest do |> GenServer.whereis() |> Process.monitor() end + + defp query_lock_status(ctx) do + Postgrex.query!( + ctx.db_conn, + "SELECT pid FROM pg_locks WHERE (classid::bigint << 32) | objid::bigint = hashtext($1) AND locktype = 'advisory'", + [ctx.slot_name] + ) + end end diff --git a/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs b/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs deleted file mode 100644 index 18fb168f20..0000000000 --- a/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs +++ /dev/null @@ -1,116 +0,0 @@ -defmodule Electric.Postgres.LockBreakerConnectionTest do - use ExUnit.Case, async: true - import Support.DbSetup, except: [with_publication: 1] - import Support.ComponentSetup - - alias Electric.Postgres.LockBreakerConnection - alias Electric.Postgres.ReplicationClient - - setup [ - :with_unique_db, - :with_stack_id_from_test, - :with_lsn_tracker, - :with_slot_name - ] - - test "should break an abandoned lock if slot is inactive", ctx do - Postgrex.query!( - ctx.db_conn, - "SELECT pg_create_logical_replication_slot('#{ctx.slot_name}', 'pgoutput')" - ) - - test_pid = self() - - start_supervised!({ - Task, - fn -> - DBConnection.run(ctx.db_conn, fn conn -> - Postgrex.query!( - conn, - "SELECT pg_advisory_lock(hashtext('#{ctx.slot_name}'))", - [] - ) - - send(test_pid, :lock_acquired) - - Process.sleep(:infinity) - end) - end - }) - - {:ok, lock_breaker_pid} = - Electric.Postgres.LockBreakerConnection.start( - connection_opts: ctx.db_config, - stack_id: ctx.stack_id - ) - - lock_breaker_monitor = Process.monitor(lock_breaker_pid) - - assert_receive :lock_acquired - - # Verify there's an entry for the acquired lock in pg_locks - assert %Postgrex.Result{rows: [_pg_backend_pid], num_rows: 1} = - Postgrex.query!( - ctx.db_conn, - "SELECT pid FROM pg_locks WHERE objid::bigint = hashtext($1) AND locktype = 'advisory'", - [ctx.slot_name] - ) - - # Make sure we can stop the lock connection above, so we're not specifying current pid - LockBreakerConnection.stop_backends_and_close(lock_breaker_pid, ctx.slot_name) - - assert_receive {:DOWN, ^lock_breaker_monitor, :process, ^lock_breaker_pid, :shutdown} - - # Verify that the pg_locks entry is gone - assert %Postgrex.Result{rows: [], num_rows: 0} = - Postgrex.query!( - ctx.db_conn, - "SELECT pid FROM pg_locks WHERE objid::bigint = hashtext($1) AND locktype = 'advisory'", - [ctx.slot_name] - ) - end - - test "doesn't break the lock if it's taken from expected lock connection", ctx do - Postgrex.query!( - ctx.db_conn, - "SELECT pg_create_logical_replication_slot('#{ctx.slot_name}', 'pgoutput')" - ) - - {:ok, replication_client_pid} = - start_supervised( - {ReplicationClient, - stack_id: ctx.stack_id, - replication_opts: [ - connection_opts: ctx.db_config, - stack_id: ctx.stack_id, - publication_name: ctx.slot_name, - try_creating_publication?: false, - slot_name: ctx.slot_name, - handle_event: nil, - connection_manager: self() - ]} - ) - - replication_client_monitor = Process.monitor(replication_client_pid) - - {:ok, lock_breaker_pid} = - start_supervised(%{ - id: :lock_breaker, - start: - {Electric.Postgres.LockBreakerConnection, :start, - [[connection_opts: ctx.db_config, stack_id: ctx.stack_id]]} - }) - - lock_breaker_monitor = Process.monitor(lock_breaker_pid) - - assert_receive {:"$gen_cast", {:pg_info_obtained, %{pg_backend_pid: pg_backend_pid}}} - assert_receive {:"$gen_cast", :replication_client_lock_acquired} - - LockBreakerConnection.stop_backends_and_close(lock_breaker_pid, ctx.slot_name, pg_backend_pid) - - assert_receive {:DOWN, ^lock_breaker_monitor, :process, ^lock_breaker_pid, :shutdown} - refute_received {:DOWN, ^replication_client_monitor, :process, _pid, _reason} - - stop_supervised(ReplicationClient) - end -end From 7f00c736d7ca9cb821dde621c7d4d501f3533982 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 11 Dec 2025 13:18:59 +0100 Subject: [PATCH 07/18] Remove Electric.Connection.Manager.ConnectionResolver's internal SimpleConnection and use OneOffConnection --- .../lib/electric/connection/manager.ex | 2 +- .../connection/manager/connection_resolver.ex | 54 ++--------- .../electric/postgres/one_off_connection.ex | 41 +++++++-- .../manager/connection_resolver_test.exs | 92 +++---------------- 4 files changed, 53 insertions(+), 136 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 42c8c1e4fd..ad958a44da 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -1311,7 +1311,7 @@ defmodule Electric.Connection.Manager do query = lock_breaker_query(database, lock_name, state.replication_pg_backend_pid) opts = [stack_id: state.stack_id, label: :lock_breaker_connection, connection_opts: conn_opts] - retval = Electric.Postgres.OneOffConnection.query(query, opts) + retval = Electric.Postgres.OneOffConnection.attempt_connection(opts, query) case retval do {:ok, %Postgrex.Result{columns: ["pg_terminate_backend"], num_rows: 0}} -> diff --git a/packages/sync-service/lib/electric/connection/manager/connection_resolver.ex b/packages/sync-service/lib/electric/connection/manager/connection_resolver.ex index 1f0b6a63a1..6addd252f2 100644 --- a/packages/sync-service/lib/electric/connection/manager/connection_resolver.ex +++ b/packages/sync-service/lib/electric/connection/manager/connection_resolver.ex @@ -4,21 +4,6 @@ defmodule Electric.Connection.Manager.ConnectionResolver do require Logger - defmodule Connection do - @moduledoc false - @behaviour Postgrex.SimpleConnection - - def init(stack_id) do - Logger.metadata(stack_id: stack_id, is_connection_process?: true) - Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) - {:ok, []} - end - - def notify(_channel, _payload, _state) do - :ok - end - end - def name(stack_ref) do Electric.ProcessRegistry.name(stack_ref, __MODULE__) end @@ -29,29 +14,20 @@ defmodule Electric.Connection.Manager.ConnectionResolver do end end - def validate(stack_id, db_connection) do - GenServer.call(name(stack_id), {:validate, db_connection}, :infinity) + def validate(stack_id, conn_opts) do + GenServer.call(name(stack_id), {:validate, conn_opts}, :infinity) end @impl GenServer def init(opts) do stack_id = Keyword.fetch!(opts, :stack_id) - # ignore exits from connection processes that fail to start due to - # connection errors or from us killing the connection after we're - # done - Process.flag(:trap_exit, true) - Process.set_label({:connection_resolver, stack_id}) metadata = [is_connection_process?: true, stack_id: stack_id] Logger.metadata(metadata) Electric.Telemetry.Sentry.set_tags_context(metadata) - {_m, _f, _a} = - connection_mod = - Keyword.get(opts, :connection_mod, {Postgrex.SimpleConnection, :start_link, []}) - - {:ok, %{connection_mod: connection_mod, stack_id: stack_id}, {:continue, :notify_ready}} + {:ok, %{stack_id: stack_id}, {:continue, :notify_ready}} end @impl GenServer @@ -62,11 +38,11 @@ defmodule Electric.Connection.Manager.ConnectionResolver do end @impl GenServer - def handle_call({:validate, connection}, _from, state) do + def handle_call({:validate, conn_opts}, _from, state) do # convert to postgrex style for return to conn.manager - connection = populate_connection_opts(connection) + conn_opts = populate_connection_opts(conn_opts) - result = attempt_connection({:cont, connection}, state) + result = attempt_connection({:cont, conn_opts}, state) {:reply, result, state, :hibernate} end @@ -80,22 +56,10 @@ defmodule Electric.Connection.Manager.ConnectionResolver do def handle_info({:EXIT, _pid, _reason}, state), do: {:noreply, state} defp attempt_connection({:cont, conn_opts}, state) do - %{ - connection_mod: {connection_mod, connection_fun, connection_args} - } = state - - connection_opts = - Keyword.merge(Electric.Utils.deobfuscate_password(conn_opts), - auto_reconnect: false, - sync_connect: true - ) - - args = [Connection, state.stack_id, connection_opts | connection_args] - - case apply(connection_mod, connection_fun, args) do - {:ok, conn} -> - Process.exit(conn, :kill) + opts = [stack_id: state.stack_id, label: :connection_resolver, connection_opts: conn_opts] + case Electric.Postgres.OneOffConnection.attempt_connection(opts) do + :success -> {:ok, conn_opts} {:error, error} -> diff --git a/packages/sync-service/lib/electric/postgres/one_off_connection.ex b/packages/sync-service/lib/electric/postgres/one_off_connection.ex index 7942339a1f..fc5978d3ba 100644 --- a/packages/sync-service/lib/electric/postgres/one_off_connection.ex +++ b/packages/sync-service/lib/electric/postgres/one_off_connection.ex @@ -5,7 +5,7 @@ defmodule Electric.Postgres.OneOffConnection do @default_timeout 5000 - def query(query, kwopts) do + def attempt_connection(kwopts, query \\ nil) do {connection_opts, kwopts} = Keyword.pop(kwopts, :connection_opts) {timeout, kwopts} = Keyword.pop(kwopts, :timeout, @default_timeout) @@ -16,13 +16,32 @@ defmodule Electric.Postgres.OneOffConnection do old_flag = Process.flag(:trap_exit, true) - {:ok, pid} = - Postgrex.SimpleConnection.start_link( - __MODULE__, - [query: query, parent_pid: self()] ++ kwopts, - connection_opts - ) + result = + with {:ok, pid} <- + Postgrex.SimpleConnection.start_link( + __MODULE__, + [parent_pid: self(), query: query] ++ kwopts, + connection_opts + ) do + handle_connection(pid, query, timeout) + end + + Process.flag(:trap_exit, old_flag) + + result + end + + defp handle_connection(pid, nil, _timeout) do + Process.exit(pid, :shutdown) + receive do + {:EXIT, ^pid, reason} -> Logger.debug("OneOffConnection exited: #{inspect(reason)}") + end + + :success + end + + defp handle_connection(pid, _query, timeout) do mon = Process.monitor(pid) result = @@ -41,8 +60,6 @@ defmodule Electric.Postgres.OneOffConnection do {:EXIT, ^pid, reason} -> Logger.debug("OneOffConnection exited: #{inspect(reason)}") end - Process.flag(:trap_exit, old_flag) - result end @@ -63,7 +80,11 @@ defmodule Electric.Postgres.OneOffConnection do @impl true def handle_connect(state) do - {:query, state.query, state} + if query = state[:query] do + {:query, query, state} + else + {:noreply, state} + end end @impl true diff --git a/packages/sync-service/test/electric/connection/manager/connection_resolver_test.exs b/packages/sync-service/test/electric/connection/manager/connection_resolver_test.exs index 44563af061..225ae8fafe 100644 --- a/packages/sync-service/test/electric/connection/manager/connection_resolver_test.exs +++ b/packages/sync-service/test/electric/connection/manager/connection_resolver_test.exs @@ -7,40 +7,16 @@ defmodule Electric.Connection.Manager.ConnectionResolverTest do import Support.ComponentSetup, only: [with_stack_id_from_test: 1] import Support.DbSetup - defp start_connection_resolver!(ctx, connection_mod \\ nil) do - opts = [stack_id: ctx.stack_id] - - opts = - if connection_mod do - Keyword.put(opts, :connection_mod, connection_mod) - else - opts - end - - start_supervised!({ConnectionResolver, opts}) - end - setup [ :with_unique_db, - :with_stack_id_from_test + :with_stack_id_from_test, + :start_connection_resolver ] - defmodule ErrorConnection do - def start_link(_handler, _args, conn_opts, match_fun) do - match_fun.(conn_opts) - end - end - - defp assert_obfuscated_password(conn_opts) do - assert is_function(Keyword.get(conn_opts, :password), 0) - end - # actually connect to make sure we can do that # overwrite :connection_mod with custom modules that implement start_link but exit with some pre-defined postgres error # need to assert that the connection options are mutated between attempts test "valid connection opts", ctx do - start_connection_resolver!(ctx) - db_config = Keyword.put(ctx.db_config, :sslmode, :disable) assert {:ok, resolved_db_config} = ConnectionResolver.validate(ctx.stack_id, db_config) @@ -55,8 +31,6 @@ defmodule Electric.Connection.Manager.ConnectionResolverTest do end test "fallback to no-ssl works with ssmodle: :prefer", ctx do - start_connection_resolver!(ctx) - db_config = Keyword.put(ctx.db_config, :sslmode, :prefer) assert {:ok, resolved_db_config} = ConnectionResolver.validate(ctx.stack_id, db_config) @@ -71,34 +45,13 @@ defmodule Electric.Connection.Manager.ConnectionResolverTest do end test "sslmode: :require with no ssl returns error", ctx do - start_connection_resolver!(ctx) db_config = Keyword.put(ctx.db_config, :sslmode, :require) assert {:error, %Postgrex.Error{message: "ssl not available"}} = ConnectionResolver.validate(ctx.stack_id, db_config) end - test "fly connection can fallback to no ssl", ctx do - conn = spawn(fn -> Process.sleep(:infinity) end) - - start_connection_resolver!( - ctx, - {ErrorConnection, :start_link, - [ - fn conn_opts -> - if Keyword.get(conn_opts, :ssl) do - {:error, - %DBConnection.ConnectionError{ - message: "ssl connect: closed", - severity: :error - }} - else - {:ok, conn} - end - end - ]} - ) - + test "connection can fallback to no ssl", ctx do db_config = Keyword.put(ctx.db_config, :sslmode, :prefer) assert {:ok, resolved_db_config} = ConnectionResolver.validate(ctx.stack_id, db_config) @@ -113,8 +66,6 @@ defmodule Electric.Connection.Manager.ConnectionResolverTest do end test "fallback to ipv4 works", ctx do - start_connection_resolver!(ctx) - db_config = Keyword.merge(ctx.db_config, ipv6: true, hostname: "local-ipv4-only.electric-sql.dev") @@ -130,27 +81,8 @@ defmodule Electric.Connection.Manager.ConnectionResolverTest do end test "fallback to ipv4 handles various error results", ctx do - conn = spawn(fn -> Process.sleep(:infinity) end) - - start_connection_resolver!( - ctx, - {ErrorConnection, :start_link, - [ - fn conn_opts -> - if Keyword.get(conn_opts, :ipv6, true) do - {:error, - %DBConnection.ConnectionError{ - message: ipv6_error_message("localhost"), - severity: :error - }} - else - {:ok, conn} - end - end - ]} - ) - - db_config = Keyword.put(ctx.db_config, :ipv6, true) + # Use an IPv4 address for the hostname to ensure that connection attempts with socket_options: [:inet6] fail. + db_config = Keyword.merge(ctx.db_config, hostname: "127.0.0.1", ipv6: true) assert {:ok, resolved_db_config} = ConnectionResolver.validate(ctx.stack_id, db_config) @@ -163,12 +95,12 @@ defmodule Electric.Connection.Manager.ConnectionResolverTest do assert_obfuscated_password(resolved_db_config) end - defp ipv6_error_message(hostname) do - "tcp connect (#{hostname}): " <> - Enum.random([ - "non-existing domain - :nxdomain", - "host is unreachable - :ehostunreach", - "network is unreachable - :enetunreach" - ]) + defp start_connection_resolver(ctx) do + _pid = start_supervised!({ConnectionResolver, stack_id: ctx.stack_id}) + :ok + end + + defp assert_obfuscated_password(conn_opts) do + assert is_function(Keyword.get(conn_opts, :password), 0) end end From aca04e8f784b4d9ce9c3cf633af32658e30b295d Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 18 Dec 2025 13:05:21 +0100 Subject: [PATCH 08/18] Split OneOffConnection's API into separate attempt_connection() and query() functions --- .../lib/electric/connection/manager.ex | 2 +- .../electric/postgres/one_off_connection.ex | 68 ++++++++++++++----- 2 files changed, 52 insertions(+), 18 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index ad958a44da..42c8c1e4fd 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -1311,7 +1311,7 @@ defmodule Electric.Connection.Manager do query = lock_breaker_query(database, lock_name, state.replication_pg_backend_pid) opts = [stack_id: state.stack_id, label: :lock_breaker_connection, connection_opts: conn_opts] - retval = Electric.Postgres.OneOffConnection.attempt_connection(opts, query) + retval = Electric.Postgres.OneOffConnection.query(query, opts) case retval do {:ok, %Postgrex.Result{columns: ["pg_terminate_backend"], num_rows: 0}} -> diff --git a/packages/sync-service/lib/electric/postgres/one_off_connection.ex b/packages/sync-service/lib/electric/postgres/one_off_connection.ex index fc5978d3ba..9dcf7b02ba 100644 --- a/packages/sync-service/lib/electric/postgres/one_off_connection.ex +++ b/packages/sync-service/lib/electric/postgres/one_off_connection.ex @@ -1,47 +1,75 @@ defmodule Electric.Postgres.OneOffConnection do + @moduledoc """ + A wrapper around Postgrex.SimpleConnection that provides synchronous API for querying the + database. + """ + require Logger @behaviour Postgrex.SimpleConnection @default_timeout 5000 - def attempt_connection(kwopts, query \\ nil) do + @doc """ + Attempt a database connection using the given connection options. + + This function is useful to verify that a database connection can be established using the + provided connection options. Once a connection has been established, the connection process + shuts down synchronously before this function returns. + """ + @spec attempt_connection(keyword()) :: :success | {:error, Postgrex.Error.t()} + def attempt_connection(kwopts) do + connect_and_maybe_query(kwopts, &handle_connection/1) + end + + @doc """ + Open a one-off database connection and execute a simple query. + + Once a connection has been established, the query is executed, the connection process shuts + down and the query result is returned from the function. + """ + @spec query(String.t(), keyword()) :: + {:ok, Postgrex.Result.t()} | {:error, Postgrex.Error.t() | :timeout} + def query(query, kwopts) do + timeout = Keyword.get(kwopts, :timeout, @default_timeout) + connect_and_maybe_query([query: query] ++ kwopts, &handle_query_result(&1, timeout)) + end + + ### + + defp connect_and_maybe_query(kwopts, on_connected_callback_fn) do {connection_opts, kwopts} = Keyword.pop(kwopts, :connection_opts) - {timeout, kwopts} = Keyword.pop(kwopts, :timeout, @default_timeout) connection_opts = connection_opts |> Electric.Utils.deobfuscate_password() |> Keyword.merge(auto_reconnect: false, sync_connect: true) - old_flag = Process.flag(:trap_exit, true) + trap_exit_val = Process.flag(:trap_exit, true) result = with {:ok, pid} <- Postgrex.SimpleConnection.start_link( __MODULE__, - [parent_pid: self(), query: query] ++ kwopts, + [parent_pid: self()] ++ kwopts, connection_opts ) do - handle_connection(pid, query, timeout) + on_connected_callback_fn.(pid) end - Process.flag(:trap_exit, old_flag) + Process.flag(:trap_exit, trap_exit_val) result end - defp handle_connection(pid, nil, _timeout) do - Process.exit(pid, :shutdown) - - receive do - {:EXIT, ^pid, reason} -> Logger.debug("OneOffConnection exited: #{inspect(reason)}") - end - + # Callback executed after Postgrex.SimpleConnection has successfully connected and there's no query to run. + defp handle_connection(pid) do + exit_connection_process(pid) :success end - defp handle_connection(pid, _query, timeout) do + # Callback executed after Postgrex.SimpleConnection has successfully connected and sent off a query to the database. + defp handle_query_result(pid, timeout) do mon = Process.monitor(pid) result = @@ -53,16 +81,22 @@ defmodule Electric.Postgres.OneOffConnection do timeout -> {:error, :timeout} end - Process.exit(pid, :shutdown) Process.demonitor(mon, [:flush]) + exit_connection_process(pid) + + result + end + + defp exit_connection_process(pid) do + Process.exit(pid, :shutdown) receive do {:EXIT, ^pid, reason} -> Logger.debug("OneOffConnection exited: #{inspect(reason)}") end - - result end + ### + @impl true def init(kwopts) do config = From 599eb07fcdd0255aca013102167d52d639064867 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 11 Dec 2025 13:29:18 +0100 Subject: [PATCH 09/18] Query retained WAL size in Restarter periodically --- .../wal-size-check-while-scaled-down.lux | 82 +++++++++++++++++++ .../lib/electric/connection/restarter.ex | 50 +++++++++-- 2 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 integration-tests/tests/wal-size-check-while-scaled-down.lux diff --git a/integration-tests/tests/wal-size-check-while-scaled-down.lux b/integration-tests/tests/wal-size-check-while-scaled-down.lux new file mode 100644 index 0000000000..0dae34dd30 --- /dev/null +++ b/integration-tests/tests/wal-size-check-while-scaled-down.lux @@ -0,0 +1,82 @@ +[doc Verify Electric can scale down database connections after a lull in stream transactions] + +[include _macros.luxinc] + +[global pg_container_name=db-connection-scaledown__pg] +[global database_url=postgresql://electric_test:password@localhost:$pg_host_port/electric?sslmode=disable] + +### + +# Start Postgres and create an additional role that will be used by Electric in this test +[invoke setup_pg_with_shell_name \ + "pg" \ + "-e INIT_DB_SQL=\"\ + CREATE ROLE electric_test LOGIN PASSWORD 'password' REPLICATION;\ + GRANT CREATE ON DATABASE electric TO electric_test\"" \ + "" \ + "-v $(realpath ../scripts/init_db.sh):/docker-entrypoint-initdb.d/initdb-init_db.sh"] + +# Create a table for subsequent shape requests +[invoke start_psql] + +[shell psql] + !create table items(val text); + ??CREATE + + !insert into items values ('1'), ('2'); + ??INSERT + + !alter table items owner to electric_test; + ??ALTER TABLE + +# Start Electric and wait for it to finish initialization. +# Have to disable the ConnectionManagerPing process, otherwise it will error when +# Connection.Manager itself goes down as part of the test scenario. +[invoke setup_electric_with_env "DO_NOT_START_CONN_MAN_PING=true ELECTRIC_IDLE_WAL_SIZE_CHECK_PERIOD=1s"] +[shell electric] + [timeout 10] + ??[debug] Replication client started streaming + +# Force Electric to scale down by sending the :idle_check message to the replication client. +# Normally the client performs the check once a minute, so we're speeding it up here for +# testing purposes. +[shell electric] + # [sleep 2] + + !Electric.Postgres.ReplicationClient.name("single_stack") |> GenServer.whereis() |> send(:check_if_idle) + ??[notice] Closing all database connections after the replication stream has been idle for + + ??[debug] Terminating connection manager with reason :shutdown. + +# Confirm that Postgres no longer sees any open connections for the electric user +[shell psql] + [sleep 2] + + !select datname, usename, backend_type, query from pg_stat_activity where usename = 'electric_test'; + ??(0 rows) + + +# Sleep until it's time to verify that Electric has checked retained WAL size in Postgres after ELECTRIC_IDLE_WAL_SIZE_CHECK_PERIOD +[shell electric] + [sleep 2] + + ??Opening a database connection to check the retained WAL size + ?Retained WAL size [0-9]+ is below the threshold [0-9]+\. Scheduling the next check to take place after [0-9]+ + +# Write to the database enough data to force the retained WAL size to go over ELECTRIC_IDLE_WAL_SIZE_THRESHOLD +[shell psql] + !insert into items select * from generate_series(1,1000); + ??INSERT + +# Sleep and verify that Electric wakes up after checking the WAL size and seeing it has exceeded the threshold +[shell electric] + ??Opening a database connection to check the retained WAL size + ?Retained WAL size [0-9]+ has exceeded the threshold [0-9]+\. Time to wake up the connection subsystem + + ??[info] Acquiring lock from postgres + ??[info] Starting replication from postgres + + ??[debug] Chunked 200 in + +[cleanup] + [invoke teardown] diff --git a/packages/sync-service/lib/electric/connection/restarter.ex b/packages/sync-service/lib/electric/connection/restarter.ex index 784655d2b4..f9a2aa8bbf 100644 --- a/packages/sync-service/lib/electric/connection/restarter.ex +++ b/packages/sync-service/lib/electric/connection/restarter.ex @@ -20,6 +20,8 @@ defmodule Electric.Connection.Restarter do alias Electric.StatusMonitor + require Logger + def name(stack_ref) do Electric.ProcessRegistry.name(stack_ref, __MODULE__) end @@ -121,7 +123,7 @@ defmodule Electric.Connection.Restarter do end def handle_call(:restart_connection_subsystem, _from, state) do - :ok = Electric.Connection.Manager.Supervisor.restart(stack_id: state.stack_id) + :ok = do_restart_connection_subsystem(state.stack_id) {:reply, :ok, state} end @@ -141,15 +143,27 @@ defmodule Electric.Connection.Restarter do state = if wal_size >= state.wal_size_threshold do - :ok = restart_connection_subsystem(state.stack_id) + Logger.info( + "Retained WAL size #{wal_size} has exceeded the threshold #{state.wal_size_threshold}. Time to wake up the connection subsystem." + ) + + :ok = do_restart_connection_subsystem(state.stack_id) state else + Logger.info( + "Retained WAL size #{wal_size} is below the threshold #{state.wal_size_threshold}. Scheduling the next check to take place after #{state.wal_size_check_period}" + ) + schedule_wal_size_check(state) end {:noreply, state} end + defp do_restart_connection_subsystem(stack_id) do + Electric.Connection.Manager.Supervisor.restart(stack_id: stack_id) + end + defp schedule_wal_size_check( %{wal_size_check_timer: nil, wal_size_check_period: period} = state ) @@ -160,8 +174,34 @@ defmodule Electric.Connection.Restarter do defp schedule_wal_size_check(state), do: state - defp query_retained_wal_size(_state) do - # FIXME: placeholder - 0 + @retained_wal_size_query """ + SELECT + pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS retained_wal_size + FROM + pg_replication_slots + WHERE + slot_name = + """ + + defp query_retained_wal_size(state) do + Logger.info("Opening a database connection to check the retained WAL size") + + query = @retained_wal_size_query <> Electric.Utils.quote_string(state.slot_name) + + opts = [ + stack_id: state.stack_id, + label: :retained_wal_size_query, + connection_opts: + Electric.Connection.Manager.validated_connection_opts(state.stack_id, :pool) + ] + + case Electric.Postgres.OneOffConnection.query(query, opts) do + {:ok, %Postgrex.Result{columns: ["retained_wal_size"], rows: [[wal_bytes_str]]}} -> + String.to_integer(wal_bytes_str) + + error -> + Logger.info("Error querying retained WAL size: #{inspect(error)}") + 0 + end end end From dcefc738faf93a6467b10b87242d531897065a1f Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 18 Dec 2025 13:28:20 +0100 Subject: [PATCH 10/18] Pass the user config for periodic WAL size checks to Restarter --- .../lib/electric/connection/restarter.ex | 7 +++++-- .../lib/electric/connection/supervisor.ex | 11 +++-------- .../sync-service/lib/electric/core_supervisor.ex | 4 +--- .../sync-service/lib/electric/stack_supervisor.ex | 12 +++++++++++- .../test/electric/connection/manager_test.exs | 10 +++++++++- 5 files changed, 29 insertions(+), 15 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/restarter.ex b/packages/sync-service/lib/electric/connection/restarter.ex index f9a2aa8bbf..2e86546cfb 100644 --- a/packages/sync-service/lib/electric/connection/restarter.ex +++ b/packages/sync-service/lib/electric/connection/restarter.ex @@ -81,8 +81,11 @@ defmodule Electric.Connection.Restarter do %{ stack_id: Keyword.fetch!(opts, :stack_id), stack_events_registry: Keyword.fetch!(opts, :stack_events_registry), - wal_size_check_period: Keyword.get(opts, :wal_size_check_period, 0), - wal_size_threshold: Keyword.get(opts, :wal_size_threshold, 1), + slot_name: Keyword.fetch!(opts, :slot_name), + # NOTE(alco): These defaults are really only used in testing when these config options + # aren't set. The actual real-world defaults are defined in Electric.Config. + wal_size_check_period: Keyword.get(opts, :wal_size_check_period, 3600 * 1000), + wal_size_threshold: Keyword.get(opts, :wal_size_threshold, 100 * 1024 * 1024), wait_until_conn_up_ref: nil, wal_size_check_timer: nil }} diff --git a/packages/sync-service/lib/electric/connection/supervisor.ex b/packages/sync-service/lib/electric/connection/supervisor.ex index 8da81a79ae..5b0df11bdc 100644 --- a/packages/sync-service/lib/electric/connection/supervisor.ex +++ b/packages/sync-service/lib/electric/connection/supervisor.ex @@ -57,17 +57,12 @@ defmodule Electric.Connection.Supervisor do Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) - tweaks = Keyword.fetch!(opts, :tweaks) - - restarter_opts = - [ - stack_id: stack_id, - stack_events_registry: Keyword.fetch!(opts, :stack_events_registry) - ] ++ Keyword.take(tweaks, [:wal_size_check_period, :wal_size_threshold]) + restarter_opts = Keyword.fetch!(opts, :restarter_opts) + connection_manager_opts = Keyword.fetch!(opts, :connection_manager_opts) children = [ {Electric.Connection.Restarter, restarter_opts}, - {Electric.Connection.Manager.Supervisor, opts} + {Electric.Connection.Manager.Supervisor, connection_manager_opts} ] Supervisor.init(children, strategy: :rest_for_one) diff --git a/packages/sync-service/lib/electric/core_supervisor.ex b/packages/sync-service/lib/electric/core_supervisor.ex index 52eb3ab3b4..b3290c46a6 100644 --- a/packages/sync-service/lib/electric/core_supervisor.ex +++ b/packages/sync-service/lib/electric/core_supervisor.ex @@ -24,10 +24,8 @@ defmodule Electric.CoreSupervisor do Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) - connection_manager_opts = Keyword.fetch!(opts, :connection_manager_opts) - children = [ - {Electric.Connection.Supervisor, connection_manager_opts} + {Electric.Connection.Supervisor, opts} ] Supervisor.init(children, strategy: :one_for_one, auto_shutdown: :any_significant) diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index ebf950d3fd..a141e61a88 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -355,6 +355,14 @@ defmodule Electric.StackSupervisor do manual_table_publishing?: config.manual_table_publishing? ] + restarter_opts = [ + stack_id: stack_id, + stack_events_registry: config.stack_events_registry, + slot_name: Keyword.fetch!(config.replication_opts, :slot_name), + wal_size_check_period: Keyword.fetch!(config.tweaks, :idle_wal_size_check_period), + wal_size_threshold: Keyword.fetch!(config.tweaks, :idle_wal_size_threshold) + ] + registry_partitions = Keyword.get(config.tweaks, :registry_partitions, System.schedulers_online()) @@ -385,7 +393,9 @@ defmodule Electric.StackSupervisor do {Electric.Postgres.Inspector.EtsInspector, stack_id: stack_id, pool: metadata_db_pool, persistent_kv: config.persistent_kv}, {Electric.MonitoredCoreSupervisor, - stack_id: stack_id, connection_manager_opts: connection_manager_opts} + stack_id: stack_id, + connection_manager_opts: connection_manager_opts, + restarter_opts: restarter_opts} ] |> Enum.reject(&is_nil/1) diff --git a/packages/sync-service/test/electric/connection/manager_test.exs b/packages/sync-service/test/electric/connection/manager_test.exs index 9315680bad..c83825a090 100644 --- a/packages/sync-service/test/electric/connection/manager_test.exs +++ b/packages/sync-service/test/electric/connection/manager_test.exs @@ -54,10 +54,18 @@ defmodule Electric.Connection.ConnectionManagerTest do stack_events_registry: stack_events_registry ] + restarter_opts = [ + stack_id: stack_id, + stack_events_registry: stack_events_registry, + slot_name: ctx.slot_name + ] + core_sup = start_link_supervised!( {Electric.CoreSupervisor, - stack_id: stack_id, connection_manager_opts: connection_manager_opts}, + stack_id: stack_id, + connection_manager_opts: connection_manager_opts, + restarter_opts: restarter_opts}, # The test supervisor under which this one is started has `auto_shutdown` set to # `:never`, so we need to make sure the core supervisor is not a significant # child, otherwise we'd get the following error: From 5f12c5fa688722cd31cdd2c9b76861496286b366 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 18 Dec 2025 14:48:48 +0100 Subject: [PATCH 11/18] Pretty-print retained WAL size and threshold when logging Restarter notices --- .../wal-size-check-while-scaled-down.lux | 9 +-- .../lib/electric/connection/restarter.ex | 11 ++- packages/sync-service/lib/electric/utils.ex | 76 +++++++++++++++++++ 3 files changed, 89 insertions(+), 7 deletions(-) diff --git a/integration-tests/tests/wal-size-check-while-scaled-down.lux b/integration-tests/tests/wal-size-check-while-scaled-down.lux index 0dae34dd30..53dd7fc290 100644 --- a/integration-tests/tests/wal-size-check-while-scaled-down.lux +++ b/integration-tests/tests/wal-size-check-while-scaled-down.lux @@ -32,7 +32,7 @@ # Start Electric and wait for it to finish initialization. # Have to disable the ConnectionManagerPing process, otherwise it will error when # Connection.Manager itself goes down as part of the test scenario. -[invoke setup_electric_with_env "DO_NOT_START_CONN_MAN_PING=true ELECTRIC_IDLE_WAL_SIZE_CHECK_PERIOD=1s"] +[invoke setup_electric_with_env "DO_NOT_START_CONN_MAN_PING=true ELECTRIC_IDLE_WAL_SIZE_CHECK_PERIOD=1s ELECTRIC_IDLE_WAL_SIZE_THRESHOLD=10000"] [shell electric] [timeout 10] ??[debug] Replication client started streaming @@ -61,7 +61,7 @@ [sleep 2] ??Opening a database connection to check the retained WAL size - ?Retained WAL size [0-9]+ is below the threshold [0-9]+\. Scheduling the next check to take place after [0-9]+ + ?Retained WAL size [0-9]+B is below the threshold of ~10\.0KB\. Scheduling the next check to take place after 1sec # Write to the database enough data to force the retained WAL size to go over ELECTRIC_IDLE_WAL_SIZE_THRESHOLD [shell psql] @@ -71,12 +71,11 @@ # Sleep and verify that Electric wakes up after checking the WAL size and seeing it has exceeded the threshold [shell electric] ??Opening a database connection to check the retained WAL size - ?Retained WAL size [0-9]+ has exceeded the threshold [0-9]+\. Time to wake up the connection subsystem + ?Retained WAL size ~[0-9.]+KB has exceeded the threshold of ~10\.0KB\. Time to wake up the connection subsystem ??[info] Acquiring lock from postgres ??[info] Starting replication from postgres - - ??[debug] Chunked 200 in + ??[debug] Replication client started streaming [cleanup] [invoke teardown] diff --git a/packages/sync-service/lib/electric/connection/restarter.ex b/packages/sync-service/lib/electric/connection/restarter.ex index 2e86546cfb..ed95e9a424 100644 --- a/packages/sync-service/lib/electric/connection/restarter.ex +++ b/packages/sync-service/lib/electric/connection/restarter.ex @@ -143,18 +143,25 @@ defmodule Electric.Connection.Restarter do state = %{state | wal_size_check_timer: nil} wal_size = query_retained_wal_size(state) + formatted_wal_size = Electric.Utils.format_bytes_to_human_readable_size(wal_size) + + formatted_threshold = + Electric.Utils.format_bytes_to_human_readable_size(state.wal_size_threshold) + + formatted_period = + Electric.Utils.format_milliseconds_to_human_readable_interval(state.wal_size_check_period) state = if wal_size >= state.wal_size_threshold do Logger.info( - "Retained WAL size #{wal_size} has exceeded the threshold #{state.wal_size_threshold}. Time to wake up the connection subsystem." + "Retained WAL size #{formatted_wal_size} has exceeded the threshold of #{formatted_threshold}. Time to wake up the connection subsystem." ) :ok = do_restart_connection_subsystem(state.stack_id) state else Logger.info( - "Retained WAL size #{wal_size} is below the threshold #{state.wal_size_threshold}. Scheduling the next check to take place after #{state.wal_size_check_period}" + "Retained WAL size #{formatted_wal_size} is below the threshold of #{formatted_threshold}. Scheduling the next check to take place after #{formatted_period}" ) schedule_wal_size_check(state) diff --git a/packages/sync-service/lib/electric/utils.ex b/packages/sync-service/lib/electric/utils.ex index 878770f7d1..d93df53629 100644 --- a/packages/sync-service/lib/electric/utils.ex +++ b/packages/sync-service/lib/electric/utils.ex @@ -599,4 +599,80 @@ defmodule Electric.Utils do Map.new([{prefix, nested} | rest]) end end + + @doc """ + Format the given milliseconds into a human-readable time interval string. + + ## Examples + + iex> format_milliseconds_to_human_readable_interval(100) + "100ms" + + iex> format_milliseconds_to_human_readable_interval(13500) + "13sec 500ms" + + iex> format_milliseconds_to_human_readable_interval(960001) + "16min 1ms" + + iex> format_milliseconds_to_human_readable_interval(3630000) + "1hr 30sec" + """ + def format_milliseconds_to_human_readable_interval(millisec) do + hours = div(millisec, 3600 * 1000) + remainder = millisec - hours * 3600 * 1000 + + minutes = div(remainder, 60 * 1000) + remainder = remainder - minutes * 60 * 1000 + + seconds = div(remainder, 1000) + remainder = remainder - seconds * 1000 + + hrs_unit = if hours > 1, do: "hrs", else: "hr" + + [{hours, hrs_unit}, {minutes, "min"}, {seconds, "sec"}, {remainder, "ms"}] + |> Enum.reject(fn {num, _} -> num == 0 end) + |> Enum.map(fn {num, units} -> to_string(num) <> units end) + |> Enum.join(" ") + end + + @doc """ + Format the given bytes into a human-readable size string. + + ## Examples + + iex> format_bytes_to_human_readable_size(100) + "100B" + + iex> format_bytes_to_human_readable_size(999) + "999B" + + iex> format_bytes_to_human_readable_size(1400) + "~1.4KB" + + iex> format_bytes_to_human_readable_size(123444) + "~123.4KB" + + iex> format_bytes_to_human_readable_size(98_800_431) + "~98.8MB" + + iex> format_bytes_to_human_readable_size(999_999_999) + "~1000.0MB" + + iex> format_bytes_to_human_readable_size(1_234_567_890) + "~1.2GB" + """ + def format_bytes_to_human_readable_size(bytes) do + cond do + bytes >= 1_000_000_000 -> "~#{round_decimal(bytes / 1_000_000_000)}GB" + bytes >= 1_000_000 -> "~#{round_decimal(bytes / 1_000_000)}MB" + bytes >= 1_000 -> "~#{round_decimal(bytes / 1_000)}KB" + true -> "#{bytes}B" + end + end + + defp round_decimal(float) do + float + |> Decimal.from_float() + |> Decimal.round(1) + end end From 1fdacf232bd92010a579cc77ec98122c7de10561 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 18 Dec 2025 14:50:35 +0100 Subject: [PATCH 12/18] Set a more sensible default for WAL size threshold --- packages/sync-service/lib/electric/config.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index eb7f22dedb..f08cd3eef4 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -56,7 +56,7 @@ defmodule Electric.Config do idle_wal_size_check_period: 3_600_000, # We want to wake up and process any transactions that have accumulated in the WAL, hence # the low threshold. - idle_wal_size_threshold: 1_000, + idle_wal_size_threshold: 100 * 1024 * 1024, manual_table_publishing?: false, ## HTTP API # set enable_http_api: false to turn off the HTTP server totally From f2f0ec963b464567094a4a6d7b473413bb50c4c4 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 18 Dec 2025 14:52:51 +0100 Subject: [PATCH 13/18] Remove superfluous logging from OneOffConnection --- .../sync-service/lib/electric/postgres/one_off_connection.ex | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/sync-service/lib/electric/postgres/one_off_connection.ex b/packages/sync-service/lib/electric/postgres/one_off_connection.ex index 9dcf7b02ba..d7dbfdce2c 100644 --- a/packages/sync-service/lib/electric/postgres/one_off_connection.ex +++ b/packages/sync-service/lib/electric/postgres/one_off_connection.ex @@ -4,8 +4,6 @@ defmodule Electric.Postgres.OneOffConnection do database. """ - require Logger - @behaviour Postgrex.SimpleConnection @default_timeout 5000 @@ -91,7 +89,7 @@ defmodule Electric.Postgres.OneOffConnection do Process.exit(pid, :shutdown) receive do - {:EXIT, ^pid, reason} -> Logger.debug("OneOffConnection exited: #{inspect(reason)}") + {:EXIT, ^pid, _reason} -> :ok end end From 21011528b16b0093ae5253341c6a122625a9dfd5 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Sat, 20 Dec 2025 01:44:57 +0100 Subject: [PATCH 14/18] Do not persist validation connection opts across conn man restarts To avoid the failure state where Electric can no longer connect to the db without first doing the revalidation --- .../lib/electric/connection/manager.ex | 10 ++++------ .../sync-service/lib/electric/stack_config.ex | 18 ++++++++++++------ 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 42c8c1e4fd..6a822cdfa4 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -322,12 +322,10 @@ defmodule Electric.Connection.Manager do end defp init_validated_connection_opts(%{stack_id: stack_id} = state) do - stack_validated_connection_opts = - Electric.StackConfig.lookup(stack_id, @validated_conn_opts_config_key) - - Map.update!(state, :validated_connection_opts, fn map -> - Map.new(map, fn {type, nil} -> {type, stack_validated_connection_opts[type]} end) - end) + # Wipe any previously stored validated connection opts on startup to avoid persisting a + # configuration that is no longer valid. We always start connection manager with the + # options specified by the user to avoid gettting into an unrecoverable failure state. + Electric.StackConfig.erase(stack_id, @validated_conn_opts_config_key) end defp update_validated_connection_opts(%{stack_id: stack_id} = state, type, validated_opts) do diff --git a/packages/sync-service/lib/electric/stack_config.ex b/packages/sync-service/lib/electric/stack_config.ex index cc614cd055..a7935dc90b 100644 --- a/packages/sync-service/lib/electric/stack_config.ex +++ b/packages/sync-service/lib/electric/stack_config.ex @@ -9,12 +9,6 @@ defmodule Electric.StackConfig do :ets.lookup_element(table(stack_id), key, 2, default) end - def spawn_opts(stack_id, process_name) do - stack_id - |> lookup(:process_spawn_opts, %{}) - |> Map.get(process_name, []) - end - def lookup!(stack_id, key) do :ets.lookup_element(table(stack_id), key, 2) rescue @@ -23,6 +17,18 @@ defmodule Electric.StackConfig do message: "stack config value #{inspect(key)} is missing for stack #{stack_id}" end + def erase(stack_id, key) do + :ets.delete(table(stack_id), key) + rescue + ArgumentError -> :ok + end + + def spawn_opts(stack_id, process_name) do + stack_id + |> lookup(:process_spawn_opts, %{}) + |> Map.get(process_name, []) + end + @doc false # Should provide all required values not defined dynamically at stack init def default_seed_config do From 7314c01ad75d47a2b7320dc50acb623f1ca8507b Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 22 Dec 2025 00:16:06 +0100 Subject: [PATCH 15/18] fixup! Do not persist validation connection opts across conn man restarts --- packages/sync-service/lib/electric/connection/manager.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 6a822cdfa4..3db72ab629 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -326,6 +326,7 @@ defmodule Electric.Connection.Manager do # configuration that is no longer valid. We always start connection manager with the # options specified by the user to avoid gettting into an unrecoverable failure state. Electric.StackConfig.erase(stack_id, @validated_conn_opts_config_key) + state end defp update_validated_connection_opts(%{stack_id: stack_id} = state, type, validated_opts) do From 9d32a33cbba6b7a05e084591d41c872e37bf924b Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 22 Dec 2025 00:31:49 +0100 Subject: [PATCH 16/18] Use consistent prefix ELECTRIC_REPLICATION_IDLE for the new configuration opts --- .changeset/little-seas-guess.md | 2 +- .../tests/wal-size-check-while-scaled-down.lux | 6 +++--- packages/sync-service/config/runtime.exs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.changeset/little-seas-guess.md b/.changeset/little-seas-guess.md index d47c077747..b435e40782 100644 --- a/.changeset/little-seas-guess.md +++ b/.changeset/little-seas-guess.md @@ -2,4 +2,4 @@ '@core/sync-service': patch --- -Add two new configuration options for periodic retained WAL size checks in scaled down mode: ELECTRIC_IDLE_WAL_SIZE_CHECK_PERIOD and ELECTRIC_IDLE_WAL_SIZE_THRESHOLD. +Add two new configuration options for periodic retained WAL size checks in scaled down mode: ELECTRIC_REPLICATION_IDLE_WAL_SIZE_CHECK_PERIOD and ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD. diff --git a/integration-tests/tests/wal-size-check-while-scaled-down.lux b/integration-tests/tests/wal-size-check-while-scaled-down.lux index 53dd7fc290..56d8b5b63f 100644 --- a/integration-tests/tests/wal-size-check-while-scaled-down.lux +++ b/integration-tests/tests/wal-size-check-while-scaled-down.lux @@ -32,7 +32,7 @@ # Start Electric and wait for it to finish initialization. # Have to disable the ConnectionManagerPing process, otherwise it will error when # Connection.Manager itself goes down as part of the test scenario. -[invoke setup_electric_with_env "DO_NOT_START_CONN_MAN_PING=true ELECTRIC_IDLE_WAL_SIZE_CHECK_PERIOD=1s ELECTRIC_IDLE_WAL_SIZE_THRESHOLD=10000"] +[invoke setup_electric_with_env "DO_NOT_START_CONN_MAN_PING=true ELECTRIC_REPLICATION_IDLE_WAL_SIZE_CHECK_PERIOD=1s ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD=10000"] [shell electric] [timeout 10] ??[debug] Replication client started streaming @@ -56,14 +56,14 @@ ??(0 rows) -# Sleep until it's time to verify that Electric has checked retained WAL size in Postgres after ELECTRIC_IDLE_WAL_SIZE_CHECK_PERIOD +# Sleep until it's time to verify that Electric has checked retained WAL size in Postgres after ELECTRIC_REPLICATION_IDLE_WAL_SIZE_CHECK_PERIOD [shell electric] [sleep 2] ??Opening a database connection to check the retained WAL size ?Retained WAL size [0-9]+B is below the threshold of ~10\.0KB\. Scheduling the next check to take place after 1sec -# Write to the database enough data to force the retained WAL size to go over ELECTRIC_IDLE_WAL_SIZE_THRESHOLD +# Write to the database enough data to force the retained WAL size to go over ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD [shell psql] !insert into items select * from generate_series(1,1000); ??INSERT diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 8a49adb18d..8c93b7386f 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -275,11 +275,11 @@ config :electric, ), idle_wal_size_check_period: env!( - "ELECTRIC_IDLE_WAL_SIZE_CHECK_PERIOD", + "ELECTRIC_REPLICATION_IDLE_WAL_SIZE_CHECK_PERIOD", &Electric.Config.parse_human_readable_time!/1, nil ), - idle_wal_size_threshold: env!("ELECTRIC_IDLE_WAL_SIZE_THRESHOLD", :integer, nil) + idle_wal_size_threshold: env!("ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD", :integer, nil) if Electric.telemetry_enabled?() do # Disable the default telemetry_poller process since we start our own in From 0c93d47623644ad07da56bcf0f4b36518e2728e4 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Sat, 27 Dec 2025 11:07:53 +0100 Subject: [PATCH 17/18] Cleanups for the new Lux test --- .../tests/wal-size-check-while-scaled-down.lux | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/integration-tests/tests/wal-size-check-while-scaled-down.lux b/integration-tests/tests/wal-size-check-while-scaled-down.lux index 56d8b5b63f..1e3d69db1d 100644 --- a/integration-tests/tests/wal-size-check-while-scaled-down.lux +++ b/integration-tests/tests/wal-size-check-while-scaled-down.lux @@ -2,7 +2,7 @@ [include _macros.luxinc] -[global pg_container_name=db-connection-scaledown__pg] +[global pg_container_name=wal-size-check-while-scaled-down__pg] [global database_url=postgresql://electric_test:password@localhost:$pg_host_port/electric?sslmode=disable] ### @@ -41,8 +41,6 @@ # Normally the client performs the check once a minute, so we're speeding it up here for # testing purposes. [shell electric] - # [sleep 2] - !Electric.Postgres.ReplicationClient.name("single_stack") |> GenServer.whereis() |> send(:check_if_idle) ??[notice] Closing all database connections after the replication stream has been idle for @@ -68,7 +66,7 @@ !insert into items select * from generate_series(1,1000); ??INSERT -# Sleep and verify that Electric wakes up after checking the WAL size and seeing it has exceeded the threshold +# Verify that Electric wakes up after checking the WAL size and seeing it has exceeded the threshold [shell electric] ??Opening a database connection to check the retained WAL size ?Retained WAL size ~[0-9.]+KB has exceeded the threshold of ~10\.0KB\. Time to wake up the connection subsystem From 3d534372f9cc1aefe98f409b6108a7d06b4795e0 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Sat, 27 Dec 2025 11:08:12 +0100 Subject: [PATCH 18/18] Parse WAL size threshold as human-readable size value --- packages/sync-service/config/runtime.exs | 7 ++- packages/sync-service/lib/electric/config.ex | 64 +++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 8c93b7386f..e49d2871a9 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -279,7 +279,12 @@ config :electric, &Electric.Config.parse_human_readable_time!/1, nil ), - idle_wal_size_threshold: env!("ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD", :integer, nil) + idle_wal_size_threshold: + env!( + "ELECTRIC_REPLICATION_IDLE_WAL_SIZE_THRESHOLD", + &Electric.Config.parse_human_readable_size!/1, + nil + ) if Electric.telemetry_enabled?() do # Disable the default telemetry_poller process since we start our own in diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index f08cd3eef4..113b3b8fd1 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -435,7 +435,7 @@ defmodule Electric.Config do @time_units ~w[ms msec s sec m min h hr] - @spec parse_human_readable_time(binary | nil) :: {:ok, pos_integer} | {:error, binary} + @spec parse_human_readable_time(binary) :: {:ok, pos_integer} | {:error, binary} def parse_human_readable_time(str) do with {num, suffix} <- Float.parse(str), @@ -461,6 +461,68 @@ defmodule Electric.Config do end end + @doc """ + Parse human-readable memory/storage size string into bytes. + + ## Examples + + iex> parse_human_readable_size("1GiB") + {:ok, #{1024 * 1024 * 1024}} + + iex> parse_human_readable_size("2.23GB") + {:ok, 2_230_000_000} + + iex> parse_human_readable_size("256MiB") + {:ok, #{256 * 1024 * 1024}} + + iex> parse_human_readable_size("377MB") + {:ok, 377_000_000} + + iex> parse_human_readable_size("430KiB") + {:ok, #{430 * 1024}} + + iex> parse_human_readable_size("142888KB") + {:ok, 142_888_000} + + iex> parse_human_readable_size("123456789") + {:ok, 123_456_789} + + iex> parse_human_readable_size("") + {:error, ~S'invalid size unit: "". Must be one of ["KB", "KiB", "MB", "MiB", "GB", "GiB"]'} + + iex> parse_human_readable_size("foo") + {:error, ~S'invalid size unit: "foo". Must be one of ["KB", "KiB", "MB", "MiB", "GB", "GiB"]'} + """ + @spec parse_human_readable_size(binary) :: {:ok, pos_integer} | {:error, binary} + + @size_units ~w[KB KiB MB MiB GB GiB] + + def parse_human_readable_size(str) do + with {num, suffix} <- Float.parse(str), + true <- num > 0, + suffix = String.trim(suffix), + true <- suffix == "" or suffix in @size_units do + {:ok, trunc(num * size_multiplier(suffix))} + else + _ -> {:error, "invalid size unit: #{inspect(str)}. Must be one of #{inspect(@size_units)}"} + end + end + + defp size_multiplier(""), do: 1 + defp size_multiplier("KB"), do: 1_000 + defp size_multiplier("KiB"), do: 1024 + defp size_multiplier("MB"), do: 1_000_000 + defp size_multiplier("MiB"), do: 1024 * 1024 + defp size_multiplier("GB"), do: 1_000_000_000 + defp size_multiplier("GiB"), do: 1024 * 1024 * 1024 + + def parse_human_readable_size!(str) do + case parse_human_readable_size(str) do + {:ok, result} -> result + {:error, message} -> raise Dotenvy.Error, message: message + end + end + def validate_security_config!(secret, insecure) do cond do insecure && secret != nil ->