diff --git a/.changeset/dirty-trains-fry.md b/.changeset/dirty-trains-fry.md new file mode 100644 index 0000000000..cc1860b431 --- /dev/null +++ b/.changeset/dirty-trains-fry.md @@ -0,0 +1,5 @@ +--- +'@core/elixir-client': patch +--- + +Sync CDN-resilience fixes from the TypeScript client: cache-buster on every 409, self-heal a stuck expired handle cache and synthetic must-refetch header response for all 409s diff --git a/packages/elixir-client/lib/electric/client/expired_shapes_cache.ex b/packages/elixir-client/lib/electric/client/expired_shapes_cache.ex index cd3839a480..1b5e7d68f4 100644 --- a/packages/elixir-client/lib/electric/client/expired_shapes_cache.ex +++ b/packages/elixir-client/lib/electric/client/expired_shapes_cache.ex @@ -55,6 +55,14 @@ defmodule Electric.Client.ExpiredShapesCache do GenServer.call(__MODULE__, :clear) end + @doc """ + Clear the expired handle for a single shape key. + """ + @spec clear_handle(String.t()) :: :ok + def clear_handle(shape_key) do + GenServer.call(__MODULE__, {:clear_handle, shape_key}) + end + @doc """ Get the current number of entries in the cache. @@ -102,6 +110,12 @@ defmodule Electric.Client.ExpiredShapesCache do {:reply, :ok, state} end + @impl true + def handle_call({:clear_handle, shape_key}, _from, state) do + :ets.delete(@table_name, shape_key) + {:reply, :ok, state} + end + @impl true def handle_cast({:touch, shape_key}, state) do timestamp = System.monotonic_time() diff --git a/packages/elixir-client/lib/electric/client/poll.ex b/packages/elixir-client/lib/electric/client/poll.ex index 59b3751c9e..a346ad7e80 100644 --- a/packages/elixir-client/lib/electric/client/poll.ex +++ b/packages/elixir-client/lib/electric/client/poll.ex @@ -144,7 +144,7 @@ defmodule Electric.Client.Poll do # the client loops infinitely (the URL never changes). cond do response_handle == expired_handle -> - handle_stale_response(state) + handle_stale_response(state, shape_key) # Normal: process response true -> @@ -171,15 +171,35 @@ defmodule Electric.Client.Poll do {:ok, messages, new_state} end - defp handle_stale_response(state) do - if state.stale_cache_retry_count >= @max_stale_retries do - {:error, - %Client.Error{ - message: - "CDN continues serving stale cached responses after #{@max_stale_retries} retry attempts" - }} - else - {:stale_retry, ShapeState.enter_stale_retry(state)} + defp handle_stale_response(state, shape_key) do + cond do + state.stale_cache_retry_count < @max_stale_retries -> + {:stale_retry, ShapeState.enter_stale_retry(state)} + + state.self_heal_attempted? -> + {:error, + %Client.Error{ + message: + "CDN continues serving stale cached responses after #{@max_stale_retries} " <> + "retry attempts and one self-heal attempt" + }} + + true -> + # Self-heal: clear the expired entry from local cache so the next + # request omits the `expired_handle` param. Since the server never + # reuses handles (SPEC.md S0), the next response should bypass stale + # detection. If it doesn't (broken CDN), we error on the next pass via + # the `self_heal_attempted?` branch above. + ExpiredShapesCache.clear_handle(shape_key) + + new_state = + ShapeState.enter_stale_retry(%{ + state + | self_heal_attempted?: true, + stale_cache_retry_count: 0 + }) + + {:stale_retry, new_state} end end @@ -196,13 +216,25 @@ defmodule Electric.Client.Poll do new_state = ShapeState.reset(state, handle) new_state = handle_schema(resp, client, new_state) new_state = ShapeState.clear_stale_retry(new_state) - - %{value_mapper_fun: value_mapper_fun} = new_state - - messages = - resp.body - |> ensure_enum() - |> Enum.flat_map(&Message.parse(&1, handle, value_mapper_fun, resp.request_timestamp)) + # Add a cache-buster on every 409 so that the next request URL cannot match + # a URL the CDN has cached. Without this, a CDN that strips the + # `expired_handle` query param from its cache key keeps serving the same + # cached 409 indefinitely. + new_state = %{new_state | stale_cache_buster: ShapeState.generate_cache_buster()} + + # Always emit a synthetic must-refetch control message rather than + # forwarding whatever the server (or a misbehaving proxy) put in the 409 + # body. Subscribers must receive the signal to clear local state on every + # 409, even when the body is empty or stripped of the control message. + # Any data rows present in the body refer to the old, expired handle, so + # discarding them is correct. + messages = [ + %Message.ControlMessage{ + control: :must_refetch, + handle: handle, + request_timestamp: resp.request_timestamp + } + ] {:must_refetch, messages, new_state} end diff --git a/packages/elixir-client/lib/electric/client/shape_state.ex b/packages/elixir-client/lib/electric/client/shape_state.ex index f76961291a..765b5f485a 100644 --- a/packages/elixir-client/lib/electric/client/shape_state.ex +++ b/packages/elixir-client/lib/electric/client/shape_state.ex @@ -46,6 +46,7 @@ defmodule Electric.Client.ShapeState do tag_to_keys: %{}, key_data: %{}, stale_cache_retry_count: 0, + self_heal_attempted?: false, disjunct_positions: nil, recent_requests: [], fast_loop_consecutive_count: 0 @@ -63,6 +64,7 @@ defmodule Electric.Client.ShapeState do disjunct_positions: [[non_neg_integer()]] | nil, stale_cache_buster: String.t() | nil, stale_cache_retry_count: non_neg_integer(), + self_heal_attempted?: boolean(), recent_requests: [{integer(), Offset.t()}], fast_loop_consecutive_count: non_neg_integer() } @@ -164,7 +166,8 @@ defmodule Electric.Client.ShapeState do %{ state | stale_cache_buster: nil, - stale_cache_retry_count: 0 + stale_cache_retry_count: 0, + self_heal_attempted?: false } end diff --git a/packages/elixir-client/test/electric/client_test.exs b/packages/elixir-client/test/electric/client_test.exs index 91ce33ba19..32e765acc1 100644 --- a/packages/elixir-client/test/electric/client_test.exs +++ b/packages/elixir-client/test/electric/client_test.exs @@ -2616,6 +2616,74 @@ defmodule Electric.ClientTest do assert params["expired_handle"] == "old-expired-handle" end + test "post-409 request includes cache-buster parameter", ctx do + # When a 409 response arrives, the next request URL must include a fresh + # cache-buster query param so it cannot match any URL the CDN has cached. + # Without this, a CDN that strips the `expired_handle` parameter from its + # cache key can keep serving the same stale 409 forever. + parent = self() + {:ok, request_count} = Agent.start_link(fn -> 0 end) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + body1 = + Jason.encode!([ + %{ + "headers" => %{"operation" => "insert"}, + "offset" => "1_0", + "value" => %{"id" => "1"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body409 = Jason.encode!([%{"headers" => %{"control" => "must-refetch"}}]) + + body2 = + Jason.encode!([ + %{ + "headers" => %{"operation" => "insert"}, + "offset" => "1_0", + "value" => %{"id" => "2"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + Bypass.stub(ctx.bypass, "GET", "/v1/shape", fn conn -> + count = Agent.get_and_update(request_count, fn c -> {c + 1, c + 1} end) + send(parent, {:request, count, conn.query_params}) + + case count do + 1 -> + bypass_resp(conn, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + + 2 -> + bypass_resp(conn, body409, status: 409, shape_handle: "my-shape-2") + + _ -> + bypass_resp(conn, body2, + shape_handle: "my-shape-2", + last_offset: "1_0", + schema: schema + ) + end + end) + + stream(ctx, 5) + + assert_receive {:request, 1, _params1} + assert_receive {:request, 2, _params2} + assert_receive {:request, 3, params3} + + assert Map.has_key?(params3, "cache-buster"), + "Expected cache-buster on first request after 409, got params: #{inspect(params3)}. " <> + "Without cache-buster, a CDN that strips expired_handle from its cache key " <> + "can keep serving the cached 409 indefinitely." + end + test "stale response triggers retry with cache-buster parameter", ctx do alias Electric.Client.ShapeKey alias Electric.Client.ExpiredShapesCache @@ -2667,7 +2735,165 @@ defmodule Electric.ClientTest do assert Map.has_key?(params2, "cache-buster") end - test "fails after max stale cache retries exceeded", ctx do + test "self-heals after max stale retries by clearing expired entry and retrying without expired_handle", + ctx do + # If a proxy strips the cache-buster query param from its cache key, the + # CDN keeps serving the same stale response no matter how many retries + # we attempt. Once stale retries are exhausted, the client should clear + # the expired entry from its local cache and retry once without the + # `expired_handle` parameter — since the server never reuses handles, + # this lets the request bypass stale detection on subsequent responses. + alias Electric.Client.ShapeKey + alias Electric.Client.ExpiredShapesCache + + parent = self() + {:ok, request_count} = Agent.start_link(fn -> 0 end) + + shape_params = ShapeDefinition.params(ctx.shape) + shape_key = ShapeKey.canonical(ctx.client.endpoint, shape_params) + ExpiredShapesCache.mark_expired(shape_key, "stuck-stale-handle") + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + body_fresh = + Jason.encode!([ + %{ + "headers" => %{"operation" => "insert"}, + "offset" => "1_0", + "value" => %{"id" => "fresh"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + Bypass.stub(ctx.bypass, "GET", "/v1/shape", fn conn -> + count = Agent.get_and_update(request_count, fn c -> {c + 1, c + 1} end) + send(parent, {:request, count, conn.query_params}) + + if Map.has_key?(conn.query_params, "expired_handle") do + # Broken CDN: keeps serving the stale handle regardless of the + # cache-buster value. + bypass_resp(conn, "[]", + shape_handle: "stuck-stale-handle", + last_offset: "1_0", + schema: schema + ) + else + # Self-heal succeeded: client dropped expired_handle, so we get + # a fresh response. + bypass_resp(conn, body_fresh, + shape_handle: "fresh-handle", + last_offset: "1_0", + schema: schema + ) + end + end) + + msgs = stream(ctx, 2) + + insert = + Enum.find(msgs, &match?(%ChangeMessage{headers: %{operation: :insert}}, &1)) + + assert insert, "Expected client to recover via self-healing, got messages: #{inspect(msgs)}" + assert insert.value["id"] == "fresh" + + # Expired entry should have been cleared during self-healing. + assert ExpiredShapesCache.get_expired_handle(shape_key) == nil, + "Expected expired entry to be cleared after self-healing" + + # The first @max_stale_retries+1 requests should include expired_handle; + # the self-healing request must drop it. + heal_request_n = + Enum.reduce_while(1..10, nil, fn n, _ -> + receive do + {:request, ^n, params} -> + if Map.has_key?(params, "expired_handle"), + do: {:cont, nil}, + else: {:halt, n} + after + 500 -> {:halt, nil} + end + end) + + assert heal_request_n != nil, + "Expected the client to drop expired_handle after exhausting stale retries" + + assert heal_request_n >= 4, + "Self-heal fired too early at request #{heal_request_n}; " <> + "expected at least 4 requests with expired_handle before self-healing" + end + + test "emits synthetic must-refetch control message when 409 body has none", ctx do + # The sync service does include a must-refetch control message in 409 + # bodies, but a misbehaving proxy might strip it. The client must + # synthesise a must-refetch control message on every 409 so that + # downstream subscribers always receive the signal to clear local state. + body1 = + Jason.encode!([ + %{ + "headers" => %{"operation" => "insert"}, + "offset" => "1_0", + "value" => %{"id" => "1"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body2 = + Jason.encode!([ + %{ + "headers" => %{"operation" => "insert"}, + "offset" => "1_0", + "value" => %{"id" => "2"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + Bypass.stub(ctx.bypass, "GET", "/v1/shape", fn conn -> + offset = conn.query_params["offset"] + handle = conn.query_params["handle"] + + case {offset, handle} do + {"-1", nil} -> + bypass_resp(conn, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + + {"1_0", "my-shape"} -> + # 409 with EMPTY body — no must-refetch control message included. + # Subscribers should still see one (synthesised by the client). + bypass_resp(conn, "[]", status: 409, shape_handle: "my-shape-2") + + {"-1", "my-shape-2"} -> + bypass_resp(conn, body2, + shape_handle: "my-shape-2", + last_offset: "1_0", + schema: schema + ) + + _ -> + # Live poll after recovery — return up-to-date with no changes. + bypass_resp( + conn, + Jason.encode!([ + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 10_000}} + ]), + shape_handle: "my-shape-2", + last_offset: "1_0" + ) + end + end) + + msgs = stream(ctx, 5) + + assert Enum.any?(msgs, &match?(%ControlMessage{control: :must_refetch}, &1)), + "Expected a synthetic must-refetch ControlMessage in stream output. " <> + "Got: #{inspect(msgs)}" + end + + test "raises when CDN is permanently broken (stale retries + self-heal both fail)", ctx do alias Electric.Client.ShapeKey alias Electric.Client.ExpiredShapesCache @@ -2678,7 +2904,10 @@ defmodule Electric.ClientTest do schema = Jason.encode!(%{"id" => %{type: "text"}}) - # Always return the stale handle (simulating broken CDN) + # Always return the stale handle (simulating broken CDN). After + # self-healing clears the expired entry, this same handle is accepted as + # fresh, but offset never advances — so fast-loop detection eventually + # fires. Bypass.expect(ctx.bypass, fn conn -> bypass_resp(conn, "[]", shape_handle: "permanently-stale-handle", @@ -2687,8 +2916,7 @@ defmodule Electric.ClientTest do ) end) - # Should raise error after 3 retries - assert_raise Client.Error, ~r/stale cached responses/, fn -> + assert_raise Client.Error, ~r/stale cached responses|fast retry loop/, fn -> stream(ctx) |> Enum.take(1) end end