From 43e4c0b0186631749381d616dd7e02e8875d9f83 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 6 May 2026 17:02:12 +0200 Subject: [PATCH 1/9] feat(sync-service): add shape_suspend_after configuration Add new config option to control the delay between hibernation and suspension. Default is 60 seconds. This prepares for hibernate-then-suspend behavior where consumers first hibernate (to trigger GC) before suspending. Co-Authored-By: Claude Opus 4.5 --- packages/sync-service/config/runtime.exs | 4 ++++ packages/sync-service/lib/electric/application.ex | 1 + packages/sync-service/lib/electric/config.ex | 2 ++ packages/sync-service/lib/electric/stack_config.ex | 1 + packages/sync-service/lib/electric/stack_supervisor.ex | 6 ++++++ 5 files changed, 14 insertions(+) diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 955218d8e0..89be072e01 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -164,6 +164,9 @@ shape_hibernate_after = shape_enable_suspend? = env!("ELECTRIC_SHAPE_SUSPEND_CONSUMER", :boolean, nil) +shape_suspend_after = + env!("ELECTRIC_SHAPE_SUSPEND_AFTER", &Electric.Config.parse_human_readable_time!/1, nil) + system_metrics_poll_interval = env!( "ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL", @@ -264,6 +267,7 @@ config :electric, env!("ELECTRIC_SUBQUERY_BUFFER_MAX_TRANSACTIONS", :integer, nil), shape_hibernate_after: shape_hibernate_after, shape_enable_suspend?: shape_enable_suspend?, + shape_suspend_after: shape_suspend_after, storage_dir: storage_dir, storage: storage_spec, cleanup_interval_ms: diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 2221552035..086dd0d85f 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -145,6 +145,7 @@ defmodule Electric.Application do cleanup_interval_ms: get_env(opts, :cleanup_interval_ms), shape_hibernate_after: get_env(opts, :shape_hibernate_after), shape_enable_suspend?: get_env(opts, :shape_enable_suspend?), + shape_suspend_after: get_env(opts, :shape_suspend_after), conn_max_requests: get_env(opts, :conn_max_requests), handler_fullsweep_after: get_env(opts, :handler_fullsweep_after), process_spawn_opts: get_env(opts, :process_spawn_opts) diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index c33ac3c271..68e6163448 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -89,6 +89,8 @@ defmodule Electric.Config do # Should we terminate consumer processes after `shape_hibernate_after` ms # or just hibernate them? shape_enable_suspend?: false, + # How long after hibernation before suspending (terminating) the consumer + shape_suspend_after: :timer.minutes(10), # Sets max_requests for Bandit handler processes: # https://hexdocs.pm/bandit/Bandit.html#t:http_1_options/0 # "The maximum number of requests to serve in a single HTTP/{1,2} diff --git a/packages/sync-service/lib/electric/stack_config.ex b/packages/sync-service/lib/electric/stack_config.ex index cc614cd055..462d474953 100644 --- a/packages/sync-service/lib/electric/stack_config.ex +++ b/packages/sync-service/lib/electric/stack_config.ex @@ -30,6 +30,7 @@ defmodule Electric.StackConfig do snapshot_timeout_to_first_data: :timer.seconds(30), shape_hibernate_after: Electric.Config.default(:shape_hibernate_after), shape_enable_suspend?: Electric.Config.default(:shape_enable_suspend?), + shape_suspend_after: Electric.Config.default(:shape_suspend_after), chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(), feature_flags: [], process_spawn_opts: %{} diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index af22b52e35..98d3ba27e1 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -136,6 +136,10 @@ defmodule Electric.StackSupervisor do type: :boolean, default: Electric.Config.default(:shape_enable_suspend?) ], + shape_suspend_after: [ + type: :non_neg_integer, + default: Electric.Config.default(:shape_suspend_after) + ], snapshot_timeout_to_first_data: [ type: :pos_integer, default: Electric.Config.default(:snapshot_timeout_to_first_data) @@ -352,6 +356,7 @@ defmodule Electric.StackSupervisor do shape_hibernate_after = Keyword.fetch!(config.tweaks, :shape_hibernate_after) shape_enable_suspend? = Keyword.fetch!(config.tweaks, :shape_enable_suspend?) + shape_suspend_after = Keyword.fetch!(config.tweaks, :shape_suspend_after) process_spawn_opts = Keyword.fetch!(config.tweaks, :process_spawn_opts) shape_cache_opts = [ @@ -401,6 +406,7 @@ defmodule Electric.StackSupervisor do inspector: inspector, shape_hibernate_after: shape_hibernate_after, shape_enable_suspend?: shape_enable_suspend?, + shape_suspend_after: shape_suspend_after, process_spawn_opts: process_spawn_opts, feature_flags: Map.get(config, :feature_flags, []) ]}, From 2f9edda11433aef2bc3213134d35a960d1eea8c8 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 6 May 2026 17:06:08 +0200 Subject: [PATCH 2/9] feat(sync-service): add suspend_timer and suspend_after to Consumer.State Add state fields to track the scheduled suspend timer and the configured delay between hibernation and suspension. Co-Authored-By: Claude Opus 4.5 --- .../lib/electric/shapes/consumer/state.ex | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index 9414f07571..9897e2ef29 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -41,7 +41,11 @@ defmodule Electric.Shapes.Consumer.State do # When a {Storage, :flushed, offset} message arrives during a pending # transaction, we defer the notification and store the max flushed offset # here. Multiple deferred notifications are collapsed into a single most recent offset. - pending_flush_offset: nil + pending_flush_offset: nil, + # Timer reference for scheduled suspend, set when entering hibernation + suspend_timer: nil, + # How long after hibernation to suspend (in ms) + suspend_after: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() @@ -95,6 +99,12 @@ defmodule Electric.Shapes.Consumer.State do :shape_hibernate_after, Electric.Config.default(:shape_hibernate_after) ), + suspend_after: + Electric.StackConfig.lookup( + stack_id, + :shape_suspend_after, + Electric.Config.default(:shape_suspend_after) + ), buffering?: true } end From 6fdc08cc2153915c35724d0fb939896df80b9d01 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 6 May 2026 17:10:05 +0200 Subject: [PATCH 3/9] feat(sync-service): implement hibernate-then-suspend in Consumer When suspend is enabled, consumers now: 1. Hibernate first on timeout (triggering GC) 2. Schedule a suspend timer for suspend_after ms later 3. Suspend (terminate) when the suspend timer fires Any activity cancels the pending suspend timer, restarting the cycle. This ensures GC runs before eventual process termination. Co-Authored-By: Claude Opus 4.5 --- .../lib/electric/shapes/consumer.ex | 60 ++++++++++++------- .../lib/electric/shapes/consumer/state.ex | 6 +- .../lib/electric/shapes/consumer_registry.ex | 17 +++--- .../test/electric/shapes/consumer_test.exs | 14 ++++- .../test/support/component_setup.ex | 1 + 5 files changed, 62 insertions(+), 36 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index c00721e323..61ea5b0ea8 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -186,8 +186,8 @@ defmodule Electric.Shapes.Consumer do def handle_call(:await_snapshot_start, from, state) do Logger.debug("Starting a wait on the snapshot #{state.shape_handle} for #{inspect(from)}}") - - {:noreply, State.add_waiter(state, from), state.hibernate_after} + state = State.add_waiter(state, from) + {:noreply, state, state.hibernate_after} end def handle_call({:handle_event, event, trace_context}, _from, state) do @@ -205,9 +205,8 @@ defmodule Electric.Shapes.Consumer do def handle_call({:subscribe_materializer, pid}, _from, state) do Logger.debug("Subscribing materializer for #{state.shape_handle}") Process.monitor(pid, tag: :materializer_down) - - {:reply, {:ok, state.latest_offset}, %{state | materializer_subscribed?: true}, - state.hibernate_after} + state = %{state | materializer_subscribed?: true} + {:reply, {:ok, state.latest_offset}, state, state.hibernate_after} end def handle_call({:stop, reason}, _from, state) do @@ -386,30 +385,37 @@ defmodule Electric.Shapes.Consumer do {:stop, reason, state} end - # Set a new value for hibernate after and set a timeout between - # hibernate_after and max_timeout in order to spread - # consumer suspend events. - def handle_info({:configure_suspend, hibernate_after, jitter_period}, state) do - {:noreply, %{state | hibernate_after: hibernate_after}, - Enum.random(hibernate_after..jitter_period)} + # Set new values for hibernate_after and suspend_after, and set a jittered + # timeout between hibernate_after and jitter_period to spread hibernation + # events. Each consumer will hibernate at the jittered timeout, then schedule + # suspension for suspend_after ms later. + def handle_info({:configure_suspend, hibernate_after, suspend_after, jitter_period}, state) do + state = %{state | hibernate_after: hibernate_after, suspend_after: suspend_after} + {:noreply, state, Enum.random(hibernate_after..jitter_period)} end def handle_info(:timeout, state) do - # we can only suspend (terminate) the consumer process if - # - # 1. Consumer suspend has been enabled in the stack config - # 2. we're not waiting for snapshot information - # 3. we are not part of a subquery dependency tree, that is either - # a. we have no dependent shapes - # b. we don't have a materializer subscribed - - if consumer_suspend_enabled?(state) and consumer_can_suspend?(state) do + state = %{state | writer: ShapeCache.Storage.hibernate(state.writer)} + + state = + if consumer_suspend_enabled?(state) and consumer_can_suspend?(state), + do: schedule_suspend_timer(state), + else: state + + {:noreply, state, :hibernate} + end + + # Suspend timer uses a generation number to handle stale timers. If activity + # occurred after the timer was scheduled, a new timer with a higher generation + # will have been scheduled, making this one stale (generation mismatch). + def handle_info({:suspend_timeout, generation}, state) do + if generation == state.suspend_generation and + consumer_suspend_enabled?(state) and consumer_can_suspend?(state) do Logger.debug(fn -> ["Suspending consumer ", to_string(state.shape_handle)] end) {:stop, ShapeCleaner.consumer_suspend_reason(), state} else - state = %{state | writer: ShapeCache.Storage.hibernate(state.writer)} - - {:noreply, state, :hibernate} + # Stale timer or conditions changed - just restart the hibernate timeout + {:noreply, state, state.hibernate_after} end end @@ -422,6 +428,14 @@ defmodule Electric.Shapes.Consumer do not state.materializer_subscribed? end + defp schedule_suspend_timer(%{suspend_after: nil} = state), do: state + + defp schedule_suspend_timer(%{suspend_after: suspend_after, suspend_generation: gen} = state) do + next_gen = gen + 1 + :erlang.send_after(suspend_after, self(), {:suspend_timeout, next_gen}) + %{state | suspend_generation: next_gen} + end + @impl GenServer def terminate(reason, state) do Logger.debug(fn -> diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index 9897e2ef29..2b366021b2 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -42,8 +42,10 @@ defmodule Electric.Shapes.Consumer.State do # transaction, we defer the notification and store the max flushed offset # here. Multiple deferred notifications are collapsed into a single most recent offset. pending_flush_offset: nil, - # Timer reference for scheduled suspend, set when entering hibernation - suspend_timer: nil, + # Generation counter for suspend timers - incremented each time we schedule + # a new suspend timer. When a timer fires, it checks if its generation matches + # the current one; if not, activity occurred and the timer is stale (ignored). + suspend_generation: 0, # How long after hibernation to suspend (in ms) suspend_after: nil ] diff --git a/packages/sync-service/lib/electric/shapes/consumer_registry.ex b/packages/sync-service/lib/electric/shapes/consumer_registry.ex index 935420b2d0..7f39f08af9 100644 --- a/packages/sync-service/lib/electric/shapes/consumer_registry.ex +++ b/packages/sync-service/lib/electric/shapes/consumer_registry.ex @@ -223,27 +223,28 @@ defmodule Electric.Shapes.ConsumerRegistry do To re-enable consumer suspend: - # set the hibernation timeout to 1 minute but phase the suspension of - # existing consumers over a 20 minute period - Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 60_000 * 20) + # set the hibernation timeout to 1 minute, suspend 1 minute after hibernation, + # and phase the suspension of existing consumers over a 20 minute period + Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 60_000, 60_000 * 20) Disabling suspension is as easy as: Electric.StackConfig.put(stack_id, :shape_enable_suspend?, false) """ - @spec enable_suspend(stack_id(), pos_integer(), pos_integer()) :: + @spec enable_suspend(stack_id(), pos_integer(), pos_integer(), pos_integer()) :: consumer_count :: non_neg_integer() - def enable_suspend(stack_id, hibernate_after, jitter_period) - when is_integer(hibernate_after) and is_integer(jitter_period) and - jitter_period > hibernate_after do + def enable_suspend(stack_id, hibernate_after, suspend_after, jitter_period) + when is_integer(hibernate_after) and is_integer(suspend_after) and + is_integer(jitter_period) and jitter_period > hibernate_after do Electric.StackConfig.put(stack_id, :shape_hibernate_after, hibernate_after) + Electric.StackConfig.put(stack_id, :shape_suspend_after, suspend_after) Electric.StackConfig.put(stack_id, :shape_enable_suspend?, true) :ets.foldl( fn {_shape_handle, pid}, n -> if Process.alive?(pid), - do: send(pid, {:configure_suspend, hibernate_after, jitter_period}) + do: send(pid, {:configure_suspend, hibernate_after, suspend_after, jitter_period}) n + 1 end, diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 40b27576c5..367a088595 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -598,6 +598,12 @@ defmodule Electric.Shapes.ConsumerTest do Map.get(ctx, :hibernate_after, 10_000) ) + Electric.StackConfig.put( + ctx.stack_id, + :shape_suspend_after, + Map.get(ctx, :shape_suspend_after, 10_000) + ) + if not Map.get(ctx, :allow_subqueries, true) do Electric.StackConfig.put(ctx.stack_id, :feature_flags, []) end @@ -1375,7 +1381,8 @@ defmodule Electric.Shapes.ConsumerTest do assert_receive {:flush_boundary_updated, 301}, 1_000 end - @tag hibernate_after: 10, with_pure_file_storage_opts: [flush_period: 1] + @tag hibernate_after: 10, shape_suspend_after: 10 + @tag with_pure_file_storage_opts: [flush_period: 1] @tag suspend: true test "should terminate after :hibernate_after ms", ctx do register_as_replication_client(ctx.stack_id) @@ -1409,7 +1416,8 @@ defmodule Electric.Shapes.ConsumerTest do refute Consumer.whereis(ctx.stack_id, shape_handle) end - @tag hibernate_after: 10, with_pure_file_storage_opts: [flush_period: 1] + @tag hibernate_after: 10, shape_suspend_after: 10 + @tag with_pure_file_storage_opts: [flush_period: 1] @tag suspend: true test "should hibernate not suspend if has dependencies", ctx do register_as_replication_client(ctx.stack_id) @@ -1494,7 +1502,7 @@ defmodule Electric.Shapes.ConsumerTest do assert Consumer.whereis(ctx.stack_id, shape_handle) - Shapes.ConsumerRegistry.enable_suspend(ctx.stack_id, 5, 10) + Shapes.ConsumerRegistry.enable_suspend(ctx.stack_id, 5, 5, 10) Process.sleep(60) diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index bcb4ee0949..c4d27dc09d 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -110,6 +110,7 @@ defmodule Support.ComponentSetup do shape_changes_registry: Map.get(ctx, :registry, Electric.StackSupervisor.registry_name(stack_id)), shape_hibernate_after: Map.get(ctx, :shape_hibernate_after, 1_000), + shape_suspend_after: Map.get(ctx, :shape_suspend_after, 1_000), shape_enable_suspend?: Map.get(ctx, :suspend, false), feature_flags: Electric.Config.get_env(:feature_flags), process_spawn_opts: Map.get(ctx, :process_spawn_opts, %{}) From 2b280767fe75599d0fdf86bc464df87d04cab231 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 6 May 2026 17:15:27 +0200 Subject: [PATCH 4/9] feat(sync-service): update enable_suspend to include suspend_after Change enable_suspend/3 to enable_suspend/4, adding the suspend_after parameter to configure the delay between hibernation and suspension. Co-Authored-By: Claude Opus 4.5 --- .../sync-service/lib/electric/shapes/consumer_registry.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer_registry.ex b/packages/sync-service/lib/electric/shapes/consumer_registry.ex index 7f39f08af9..c474fcb5a9 100644 --- a/packages/sync-service/lib/electric/shapes/consumer_registry.ex +++ b/packages/sync-service/lib/electric/shapes/consumer_registry.ex @@ -223,9 +223,9 @@ defmodule Electric.Shapes.ConsumerRegistry do To re-enable consumer suspend: - # set the hibernation timeout to 1 minute, suspend 1 minute after hibernation, + # set the hibernation timeout to 1 minute, suspend 4 minutes after hibernation, # and phase the suspension of existing consumers over a 20 minute period - Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 60_000, 60_000 * 20) + Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 4 * 60_000, 60_000 * 20) Disabling suspension is as easy as: From e2ac1f974158675800d8a65a0a1bee66038a8beb Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 6 May 2026 17:22:28 +0200 Subject: [PATCH 5/9] test(sync-service): add tests for hibernate-then-suspend behavior Add tests verifying: - Consumer hibernates first, then suspends after suspend_after ms - Activity during hibernation cancels the pending suspend timer - Update enable_suspend test for new 4-arity function - Update existing suspend test to use explicit suspend_after tag Co-Authored-By: Claude Opus 4.5 --- .../test/electric/shapes/consumer_test.exs | 119 +++++++++++++++++- 1 file changed, 116 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 367a088595..1f68110b5d 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -601,7 +601,7 @@ defmodule Electric.Shapes.ConsumerTest do Electric.StackConfig.put( ctx.stack_id, :shape_suspend_after, - Map.get(ctx, :shape_suspend_after, 10_000) + Map.get(ctx, :shape_suspend_after, 60_000) ) if not Map.get(ctx, :allow_subqueries, true) do @@ -1381,10 +1381,10 @@ defmodule Electric.Shapes.ConsumerTest do assert_receive {:flush_boundary_updated, 301}, 1_000 end - @tag hibernate_after: 10, shape_suspend_after: 10 + @tag hibernate_after: 10, shape_suspend_after: 20 @tag with_pure_file_storage_opts: [flush_period: 1] @tag suspend: true - test "should terminate after :hibernate_after ms", ctx do + test "should suspend after hibernate_after + shape_suspend_after ms", ctx do register_as_replication_client(ctx.stack_id) {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) @@ -1502,6 +1502,7 @@ defmodule Electric.Shapes.ConsumerTest do assert Consumer.whereis(ctx.stack_id, shape_handle) + # hibernate_after=5, shape_suspend_after=5, jitter_period=10 Shapes.ConsumerRegistry.enable_suspend(ctx.stack_id, 5, 5, 10) Process.sleep(60) @@ -1511,6 +1512,118 @@ defmodule Electric.Shapes.ConsumerTest do refute Consumer.whereis(ctx.stack_id, shape_handle) end + @tag hibernate_after: 10, shape_suspend_after: 150, with_pure_file_storage_opts: [flush_period: 1] + @tag suspend: true + test "should hibernate first then suspend after shape_suspend_after ms", ctx do + register_as_replication_client(ctx.stack_id) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + lsn1 = Lsn.from_integer(300) + + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + assert is_pid(consumer_pid) + ref = Process.monitor(consumer_pid) + + txn = + complete_txn_fragment(2, lsn1, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "21"}, + log_offset: LogOffset.new(lsn1, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + assert_receive {:flush_boundary_updated, 300}, 1_000 + + # Wait for hibernate_after (10ms) + small buffer + # Suspend won't happen until hibernate_after + shape_suspend_after = 10 + 150 = 160ms + Process.sleep(50) + + # Should be hibernated, not suspended yet (we're at ~50ms, suspend at ~160ms) + assert {:current_function, {:gen_server, :loop_hibernate, 4}} = + Process.info(consumer_pid, :current_function) + + refute_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}}, 0 + + # Wait for shape_suspend_after (150ms from hibernate) to complete + # We're at ~50ms, need to wait another ~150ms to be past 160ms + Process.sleep(180) + + # Now should be suspended + assert_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}} + + refute Consumer.whereis(ctx.stack_id, shape_handle) + end + + @tag hibernate_after: 10, shape_suspend_after: 200, with_pure_file_storage_opts: [flush_period: 1] + @tag suspend: true + test "activity during hibernation cancels pending suspend", ctx do + register_as_replication_client(ctx.stack_id) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + lsn1 = Lsn.from_integer(300) + lsn2 = Lsn.from_integer(301) + + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + assert is_pid(consumer_pid) + ref = Process.monitor(consumer_pid) + + txn1 = + complete_txn_fragment(2, lsn1, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "21"}, + log_offset: LogOffset.new(lsn1, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn1, ctx.stack_id) + assert_receive {:flush_boundary_updated, 300}, 1_000 + + # Wait for hibernate (hibernate_after=10ms + buffer) + Process.sleep(30) + + # Should be hibernated + assert {:current_function, {:gen_server, :loop_hibernate, 4}} = + Process.info(consumer_pid, :current_function) + + # Wait ~50ms so suspend timer has been running but not fired yet + # (shape_suspend_after=200ms, so we're at ~80ms total, well before 200ms) + Process.sleep(50) + + # Send another transaction - this should cancel the suspend timer + txn2 = + complete_txn_fragment(3, lsn2, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "22"}, + log_offset: LogOffset.new(lsn2, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn2, ctx.stack_id) + assert_receive {:flush_boundary_updated, 301}, 1_000 + + # Wait past what would have been the original shape_suspend_after window + # Original timer started at ~30ms, would fire at ~230ms + # We're now at ~80ms, wait 160ms to reach ~240ms + # But new timer started at ~80ms, would fire at ~280ms + # So at ~240ms the process should still be alive + Process.sleep(160) + + # Should NOT have suspended because activity reset the timer + refute_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}}, 0 + + # Process should still be alive (hibernated again) + assert Process.alive?(consumer_pid) + end + @tag with_pure_file_storage_opts: [compaction_period: 5, keep_complete_chunks: 133] test "compaction is scheduled and invoked for a shape that has compaction enabled", ctx do parent = self() From 3d0045991b65893275c67b9f2fd8445cb0ac12ea Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 6 May 2026 17:27:06 +0200 Subject: [PATCH 6/9] docs(sync-service): clarify hibernate/suspend config comments Co-Authored-By: Claude Opus 4.5 --- packages/sync-service/lib/electric/config.ex | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index 68e6163448..290fbfa4f3 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -85,11 +85,14 @@ defmodule Electric.Config do otel_sampling_ratio: 0.01, metrics_sampling_ratio: 1, ## Memory + # After this duration of inactivity, consumer processes will hibernate + # to allow garbage collection shape_hibernate_after: :timer.seconds(30), - # Should we terminate consumer processes after `shape_hibernate_after` ms - # or just hibernate them? + # If enabled, terminate (suspend) consumer processes after hibernating. + # This frees memory more aggressively than hibernation alone. shape_enable_suspend?: false, - # How long after hibernation before suspending (terminating) the consumer + # After hibernating, wait this duration before suspending (terminating). + # Only applies when shape_enable_suspend? is true. shape_suspend_after: :timer.minutes(10), # Sets max_requests for Bandit handler processes: # https://hexdocs.pm/bandit/Bandit.html#t:http_1_options/0 From 8788a01b49a1cade6a9da20cfefc6c680f04fd5a Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 6 May 2026 23:17:57 +0200 Subject: [PATCH 7/9] chore: add changeset for hibernate-then-suspend Co-Authored-By: Claude Opus 4.5 --- .changeset/hibernate-then-suspend.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/hibernate-then-suspend.md diff --git a/.changeset/hibernate-then-suspend.md b/.changeset/hibernate-then-suspend.md new file mode 100644 index 0000000000..6b18ca0957 --- /dev/null +++ b/.changeset/hibernate-then-suspend.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': minor +--- + +Add hibernate-then-suspend behavior for Consumer processes. When suspend is enabled, consumers now hibernate first (triggering GC) before suspending. Adds `shape_suspend_after` config (default 60s) to control the delay between hibernation and suspension. Any activity cancels the pending suspend timer, restarting the cycle. From c452d96de9378e74a2f6bf0f91df2b7f626fd37e Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 6 May 2026 23:28:18 +0200 Subject: [PATCH 8/9] fix(sync-service): address review feedback - Clarify enable_suspend/4 docstring: jitter controls hibernation timing, suspension happens suspend_after ms after hibernation - Update :configure_suspend handler comment to describe hibernate-then-suspend - Add defensive guard for suspend_after: nil in schedule_suspend_timer/1 Co-Authored-By: Claude Opus 4.5 --- .../lib/electric/shapes/consumer_registry.ex | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer_registry.ex b/packages/sync-service/lib/electric/shapes/consumer_registry.ex index c474fcb5a9..b98ba330ae 100644 --- a/packages/sync-service/lib/electric/shapes/consumer_registry.ex +++ b/packages/sync-service/lib/electric/shapes/consumer_registry.ex @@ -217,14 +217,15 @@ defmodule Electric.Shapes.ConsumerRegistry do disabled, because the configuration message will have the side-effect of waking all consumers from hibernation. - The `jitter_period` value allows for spreading the suspension of existing - consumers over a large time period to avoid a sudden rush of consumer - shutdowns after `hibernate_after` ms. + The `jitter_period` value allows for spreading the hibernation of existing + consumers over a time period to avoid a sudden rush of hibernation events. + Each consumer picks a random timeout between `hibernate_after` and `jitter_period`, + then hibernates and schedules suspension for `suspend_after` ms later. To re-enable consumer suspend: - # set the hibernation timeout to 1 minute, suspend 4 minutes after hibernation, - # and phase the suspension of existing consumers over a 20 minute period + # hibernation timeout: 1 min, suspend timeout: 4 min, jitter window: 20 min + # Consumers will hibernate between 1-20 min, then suspend 4 min after hibernating Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 4 * 60_000, 60_000 * 20) Disabling suspension is as easy as: From 6029612cda716388bc5d19747d19e1bd172c43aa Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 7 May 2026 10:50:30 +0200 Subject: [PATCH 9/9] fix(integration-tests): set ELECTRIC_SHAPE_SUSPEND_AFTER in suspension test With hibernate-then-suspend, consumers hibernate first then wait suspend_after before terminating. Set ELECTRIC_SHAPE_SUSPEND_AFTER=200ms alongside ELECTRIC_SHAPE_HIBERNATE_AFTER=200ms so the test completes within the expected timeframe. Co-Authored-By: Claude Opus 4.5 --- integration-tests/tests/shape-suspension-resumption.lux | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/tests/shape-suspension-resumption.lux b/integration-tests/tests/shape-suspension-resumption.lux index 198917c27c..e3035151ba 100644 --- a/integration-tests/tests/shape-suspension-resumption.lux +++ b/integration-tests/tests/shape-suspension-resumption.lux @@ -20,7 +20,7 @@ # Start Electric and wait for it to finish initialization. # Set a very small hibernation timeout so consumers will shutdown quickly -[invoke setup_electric_with_env "ELECTRIC_SHAPE_HIBERNATE_AFTER=200ms ELECTRIC_SHAPE_SUSPEND_CONSUMER=true"] +[invoke setup_electric_with_env "ELECTRIC_SHAPE_HIBERNATE_AFTER=200ms ELECTRIC_SHAPE_SUSPEND_AFTER=200ms ELECTRIC_SHAPE_SUSPEND_CONSUMER=true"] [shell electric] [timeout 10] ??[debug] Replication client started streaming