diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 738eb5711..20cd48c99 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -155,6 +155,8 @@ jobs: run: bash integration/js/pg_tests/run.sh - name: Ruby run: bash integration/ruby/run.sh + - name: Prepared statements (full) + run: bash integration/prepared_statements_full/run.sh - name: Java run: bash integration/java/run.sh - name: Mirror diff --git a/docs/issues/PROTOCOL_OUT_OF_SYNC.md b/docs/issues/PROTOCOL_OUT_OF_SYNC.md new file mode 100644 index 000000000..77689cebe --- /dev/null +++ b/docs/issues/PROTOCOL_OUT_OF_SYNC.md @@ -0,0 +1,315 @@ +# ProtocolOutOfSync — Known Root Causes + +`Error::ProtocolOutOfSync` fires when `ProtocolState`'s expected-response queue diverges from what +the backend actually sends. The catch site is `server.rs:394`; the connection is permanently marked +`State::Error` and discarded from the pool. + +**Queue mechanics** (`pgdog/src/backend/protocol/state.rs`). `handle()` pushes one `ExecutionItem` +per anticipated response before forwarding any client message. As server bytes arrive, `forward()` +calls `action(code)` which pops the queue front and checks the match. Two conditions raise +`ProtocolOutOfSync`: + +- **Empty queue** — a tracked message arrives but nothing was expected. +- **Ignore mismatch** — queue front is an `Ignore` slot but the server sent a different code. + +--- + +## ✅ Issue 1 — Failed `Prepare` orphans the EXECUTE ReadyForQuery + +**Severity:** High — triggered by normal server behaviour; no client misbehaviour required. + +**Location:** `pgdog/src/backend/prepared_statements.rs`, `ProtocolMessage::Prepare` arm; +`pgdog/src/backend/protocol/state.rs`, Error handler. + +### Description + +When pgdog injects a `PREPARE` to rewrite a simple-query `EXECUTE` and that `PREPARE` fails on the +server, the old Error handler incorrectly cleared the queue. The subsequent `ReadyForQuery` from the +now-orphaned `EXECUTE` hit an empty queue and raised `ProtocolOutOfSync`. + +### Code path + +The simple-query rewriter turns `EXECUTE stmt_name(args)` into two prepended messages, each handled +independently by `handle()`. After both calls the queue is: + +``` +[Ignore(CommandComplete), Ignore(ReadyForQuery), Code(ReadyForQuery)] + ↑──────────── handle(Prepare) ─────────────↑ ↑─── handle(Query) ───↑ +``` + +The old handler popped the last item, cleared the queue, and optionally re-added a trailing +`Code(ReadyForQuery)`. That assumed a flat, single-request queue. With the injected sub-request the +queue is compound, so clearing it discarded the client's own `Code(ReadyForQuery)`: + +| Step | Server sends | Old handler action | Queue after | +|---|---|---|---| +| 1 | `Error` for PREPARE | `pop_back` → `Code(RFQ)` re-added | `[Code(RFQ)]` | +| 2 | `ReadyForQuery` for PREPARE | pops `Code(RFQ)` normally | **empty** | +| 3 | `Error` for EXECUTE (statement absent) | `pop_back` → None; nothing re-added | **empty** | +| 4 | `ReadyForQuery` for EXECUTE | `pop_front` on empty → **ProtocolOutOfSync** | — | + +Under high concurrency this became near-deterministic: the pool fast-path (`Guard::drop` → `checkin` +→ `put`) hands a connection directly to a waiting client with no healthcheck and no opportunity to +drain the kernel socket buffer. The next query consumed the stale EXECUTE `Error + ReadyForQuery`, +producing `ProtocolOutOfSync`. + +### Reproduction (historical) + +```sh +cd integration/prepared_statements_full && bash run.sh +``` + +### Tests + +**State-machine unit test (`state.rs`, no backend needed)** + +- **`test_injected_prepare_error_full_lifecycle`** — builds the exact queue that + `prepared_statements.rs` produces (`add_ignore('C')`, `add_ignore('Z')`, `add('Z')`), fires + `action('E')` and asserts the intermediate queue shape `[Ignore(RFQ), Ignore(Error), Code(Z)]`, + then walks the remaining Z→Ignore, E→Ignore (fast-path), Z→Forward sequence to completion. + +**Server-level integration test (`server.rs`, requires PostgreSQL)** + +The test that previously asserted `ProtocolOutOfSync` on the fourth message now asserts `E` then +`Z` (both forwarded). All three configurations now pass. + +| Test | Pool mode | `got:` | `extended` | What it proves | +|---|---|---|---|---| +| 1 | session | Z | false | Failed PREPARE no longer orphans the EXECUTE RFQ — **fixed** | +| 2 | transaction | — | false | Stale-chain: injected E+Z drained internally; pool socket is clean — **fixed** | +| 3 | session | Z | true | `extended` now resets after RFQ drain — **fixed by Issue 4** | + +- **Test 1** (`pgdog_session`): session-pooled connection; failed PREPARE then EXECUTE; subsequent `SELECT 1` must succeed. +- **Test 2** (`pgdog_tx_single`, `pool_size=1`): same failure sequence in transaction mode. Issue 1 drains the orphaned EXECUTE E+Z internally before the connection returns to the pool, so no stale bytes shift subsequent queries. +- **Test 3** (`pgdog_session` with prior `exec_params`): `extended=true` before the failure; Issue 4 ensures the flag resets after the RFQ drains the queue. + +See `integration/prepared_statements_full/protocol_out_of_sync_spec.rb` for full test bodies. + +### Fix + +Error handler in `state.rs`, `ExecutionCode::Error` arm. On error, find the first +`Code(ReadyForQuery)` in the queue (the client's RFQ boundary), drain everything before it, count +the `Ignore(RFQ)` slots in the drained portion, and prepend one `[Ignore(RFQ), Ignore(Error)]` pair +per slot. A fast-path at the top of the arm handles subsequent errors from the same injected +sub-request — when the queue front is already `Ignore(Error)` — by popping and returning +`Action::Ignore` directly. + +See also: `test_injected_prepare_error_full_lifecycle` in `state.rs`. + +--- + +## ✅ Issue 2 — Double `action()` call in `forward()` for server CopyDone + +**Severity:** Medium — requires the client to omit a trailing `Sync`. + +**Location:** `pgdog/src/backend/prepared_statements.rs`, `forward()`. + +### Description + +`forward()` called `state.action(code)` unconditionally near the top of the function, then called +it a second time inside the `'c'` (CopyDone) match arm. Without a `Code(ReadyForQuery)` backstop in +the queue the second call hit an empty queue and raised `ProtocolOutOfSync`. + +### Code path + +Normal path (safe): `Code(ReadyForQuery)` is always in the queue. `action('Z')` pushes it back +rather than consuming it, making the double call idempotent. + +Unsafe path — client sends `Parse + Bind + Execute + Flush` (no `Sync`). `handle()` builds: + +``` +[Code(ParseComplete), Code(BindComplete), Code(ExecutionCompleted)] +``` + +No `Code(ReadyForQuery)` is added. When the server responded with CopyDone: + +``` +First action('c'): pops Code(ExecutionCompleted) — consumed +Second action('c'): empty queue → ProtocolOutOfSync +``` + +### Reproduction (historical) + +Not triggerable via the `pg` gem or any libpq-based driver — libpq always appends `Sync` after +`Execute`. Required sending raw protocol messages directly. + +```sh +cargo test -p pgdog --lib -- test_copydone_double_action_oos_without_rfq_backstop +``` + +### Tests + +**State-machine unit tests (`state.rs`, no backend needed)** + +- **`test_copydone_double_action_safe_with_rfq_backstop`** — queue `[Code(Copy), Code(ReadyForQuery)]`; + two `action('c')` calls both succeed; RFQ slot is pushed back and survives. +- **`test_copydone_double_action_oos_without_rfq_backstop`** — documents the raw state-machine + invariant: calling `action('c')` twice with no RFQ backstop still causes `ProtocolOutOfSync` + directly on the state machine. `forward()` no longer makes this second call; this path is + unreachable through normal protocol flow. Test is retained to pin the underlying invariant. + +**Server-level tests (`server.rs`, require PostgreSQL)** + +- **`test_copydone_single_action_without_sync`** — `Parse + Bind + Execute + Flush` (no Sync); + reads ParseComplete, BindComplete, CopyOutResponse, CopyData ×2, then asserts CopyDone is + forwarded successfully. The trailing CommandComplete then hits an empty queue (no RFQ backstop) + and raises `ProtocolOutOfSync` — that is the correct remaining behavior with no `Sync`. +- **`test_copydone_double_action_safe_with_sync`** — same pipeline with `Sync`; full sequence + completes without error; asserts `server.done()`. + +```sh +cargo test -p pgdog --lib -- test_copydone_double_action +cargo test -p pgdog -- test_copydone +``` + +### Fix + +Removed the redundant `self.state.action(code)?` from the `'c'` arm in `forward()`. The call at +the top of the function already advances the state machine for CopyDone; the arm body is now empty. + +--- + +## ✅ Issue 3 — Stale ReadyForQuery hits an `Ignore(ParseComplete)` slot + +**Severity:** Low — practically unreachable in normal operation. + +**Location:** `pgdog/src/backend/protocol/state.rs`, Ignore arm. + +### Description + +If a `ReadyForQuery` byte from a prior request cycle remains unread in the TCP receive buffer when +the next request starts, `action('Z')` fires while the queue front is `Ignore(ParseComplete)`. The +Ignore arm requires an exact code match; `ReadyForQuery != ParseComplete` → `ProtocolOutOfSync`. + +### Code path + +pgdog injects a Parse for a missing statement; queue front: + +``` +[Ignore(ParseComplete), Code(BindComplete), ...] +``` + +Stale `ReadyForQuery` arrives before `ParseComplete`: + +``` +action('Z'): generic pop → Ignore(ParseComplete) + → ReadyForQuery != ParseComplete → ProtocolOutOfSync +``` + +### Reproduction + +Not reproducible through normal pool operation. The `done()` guard chain prevents pool reclaim while +any `Ignore` item is present: + +- `ProtocolState::done()` = `is_empty() && !out_of_sync` → `false` while any `Ignore` slot exists. +- `PreparedStatements::done()` adds a second gate blocking reclaim while an injected Parse is in flight. +- `Pool::maybe_check_in()` discards errored connections before `can_check_in()` is evaluated. + +The precondition requires a concurrent-access bug that bypasses the pool guard, or direct TCP stream +injection. + +### Tests + +State-machine unit tests in `state.rs` cover the `action()` mismatch directly. A server-level +integration test is not practical; the precondition cannot be reached through normal sequential +protocol flow. + +```sh +cargo test -p pgdog --lib -- test_stale_rfq +``` + +### Fix + +No code change required. The existing `done()` guard chain already prevents the precondition from +arising. If it were somehow reached, the resulting `ProtocolOutOfSync` would discard the connection +before reuse, bounding the blast radius to a single request. + +--- + +## ✅ Issue 4 — `extended` flag is permanently set and never resets + +**Severity:** Low-medium — affects connection-lifecycle semantics and silently changes Error handler +behaviour for all subsequent requests on a connection. + +**Location:** `pgdog/src/backend/protocol/state.rs`, `add()` / `add_ignore()`; `Code` arm of +`action()`. + +### Description + +`ProtocolState.extended` was set to `true` the first time any parameterised query ran on a +connection and was never reset. The Error handler checked this flag to set `out_of_sync = true`; +because the flag was permanent, every subsequent error on that connection — including plain +simple-query errors — set `out_of_sync = true` spuriously. + +### Code path + +`add()` and `add_ignore()` latch the flag via `self.extended = self.extended || code.extended()` +whenever `ParseComplete ('1')` or `BindComplete ('2')` is enqueued. Once set, the Error handler +checked `self.extended` to set `out_of_sync = true`, with no reset path, so every subsequent +error on the connection triggered it regardless of whether the current request was parameterised. + +### Consequences + +- `done()` stayed `false` one extra round-trip (until RFQ cleared `out_of_sync`) on simple-query + errors for connections that had ever served a parameterised query. +- Future changes to the Error handler that added `extended`-specific behaviour would silently apply + to all long-lived connections, not just those currently mid-pipeline. +- `extended` read as "has this connection *ever* been in extended-protocol mode", not "is this + connection *currently* in extended-protocol mode" — a semantic mismatch. + +### Reproduction (historical) + +1. Connect to pgdog. +2. Execute a parameterised query (any `$1` placeholder) — permanently sets `extended = true`. +3. Execute `SELECT 1/0` (simple query). +4. Observe `server.out_of_sync() == true` immediately after the `'E'` response, before RFQ arrives. + Expected: `false`. + +```sh +cargo test -p pgdog -- test_extended_resets_after_rfq_drain +``` + +### Tests + +**State-machine unit tests (`state.rs`, no backend needed)** + +- **`test_extended_resets_on_rfq_drain`** — parameterised queue drains; `extended` is `true` before + the final RFQ and `false` after. +- **`test_extended_stays_true_mid_pipeline`** — an intermediate RFQ with items still queued behind + it does not prematurely reset `extended`; only the last RFQ that drains the queue resets it. +- **`test_no_spurious_out_of_sync_after_extended_reset`** — after a parameterised pipeline + completes and `extended` resets, a subsequent simple-query error does not set `out_of_sync`. + +**Server-level test (`server.rs`, requires PostgreSQL)** + +- **`test_extended_resets_after_rfq_drain`** — four phases on one connection: (1) baseline simple + error, no `out_of_sync`; (2) parameterised query sets `extended`, RFQ drain resets it; (3) and + (4) simple errors after reset, both assert `out_of_sync == false`. + +```sh +cargo test -p pgdog --lib -- test_extended_resets +cargo test -p pgdog -- test_extended_resets_after_rfq_drain +``` + +### Fix + +In the `Code(in_queue_code)` arm of `action()`, after `pop_front()` has already consumed the +RFQ item, `self.extended` is reset to `false` when the queue is now empty. The check must live +here — after the pop — so `is_empty()` reflects the post-pop state. Placing it in the outer +`ReadyForQuery` match arm (as originally proposed) runs before `pop_front()` and would never +observe an empty queue. Resetting only when `is_empty()` is safe: pipelined requests still in +the queue keep `extended = true` until the entire pipeline finishes. + +--- + +## Common thread + +All four issues share the same underlying fragility: the `ProtocolState` queue and the actual server +response stream diverge whenever an error or unexpected message interrupts a multi-message +sub-request injected transparently by pgdog. The Error handler was written for a single +client-visible request and did not account for the compound structures the prepared-statement +rewriter produces. + +Issue 4 was a secondary consequence: `extended` was added as a guard for the Error handler but was +attached to the connection rather than the current pipeline, so it outlived the requests it was meant +to describe. diff --git a/integration/prepared_statements_full/Gemfile b/integration/prepared_statements_full/Gemfile new file mode 100644 index 000000000..6f147c82f --- /dev/null +++ b/integration/prepared_statements_full/Gemfile @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +source 'https://rubygems.org' +gem 'pg' +gem 'rails' +gem 'rspec', '~> 3.4' +gem 'rubocop' +gem 'toxiproxy' diff --git a/integration/prepared_statements_full/Gemfile.lock b/integration/prepared_statements_full/Gemfile.lock new file mode 100644 index 000000000..a9e17f4d2 --- /dev/null +++ b/integration/prepared_statements_full/Gemfile.lock @@ -0,0 +1,276 @@ +GEM + remote: https://rubygems.org/ + specs: + action_text-trix (2.1.18) + railties + actioncable (8.1.3) + actionpack (= 8.1.3) + activesupport (= 8.1.3) + nio4r (~> 2.0) + websocket-driver (>= 0.6.1) + zeitwerk (~> 2.6) + actionmailbox (8.1.3) + actionpack (= 8.1.3) + activejob (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + mail (>= 2.8.0) + actionmailer (8.1.3) + actionpack (= 8.1.3) + actionview (= 8.1.3) + activejob (= 8.1.3) + activesupport (= 8.1.3) + mail (>= 2.8.0) + rails-dom-testing (~> 2.2) + actionpack (8.1.3) + actionview (= 8.1.3) + activesupport (= 8.1.3) + nokogiri (>= 1.8.5) + rack (>= 2.2.4) + rack-session (>= 1.0.1) + rack-test (>= 0.6.3) + rails-dom-testing (~> 2.2) + rails-html-sanitizer (~> 1.6) + useragent (~> 0.16) + actiontext (8.1.3) + action_text-trix (~> 2.1.15) + actionpack (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + globalid (>= 0.6.0) + nokogiri (>= 1.8.5) + actionview (8.1.3) + activesupport (= 8.1.3) + builder (~> 3.1) + erubi (~> 1.11) + rails-dom-testing (~> 2.2) + rails-html-sanitizer (~> 1.6) + activejob (8.1.3) + activesupport (= 8.1.3) + globalid (>= 0.3.6) + activemodel (8.1.3) + activesupport (= 8.1.3) + activerecord (8.1.3) + activemodel (= 8.1.3) + activesupport (= 8.1.3) + timeout (>= 0.4.0) + activestorage (8.1.3) + actionpack (= 8.1.3) + activejob (= 8.1.3) + activerecord (= 8.1.3) + activesupport (= 8.1.3) + marcel (~> 1.0) + activesupport (8.1.3) + base64 + bigdecimal + concurrent-ruby (~> 1.0, >= 1.3.1) + connection_pool (>= 2.2.5) + drb + i18n (>= 1.6, < 2) + json + logger (>= 1.4.2) + minitest (>= 5.1) + securerandom (>= 0.3) + tzinfo (~> 2.0, >= 2.0.5) + uri (>= 0.13.1) + ast (2.4.3) + base64 (0.3.0) + bigdecimal (4.1.1) + builder (3.3.0) + concurrent-ruby (1.3.6) + connection_pool (3.0.2) + crass (1.0.6) + date (3.5.1) + diff-lcs (1.6.2) + drb (2.2.3) + erb (6.0.2) + erubi (1.13.1) + globalid (1.3.0) + activesupport (>= 6.1) + i18n (1.14.8) + concurrent-ruby (~> 1.0) + io-console (0.8.2) + irb (1.17.0) + pp (>= 0.6.0) + prism (>= 1.3.0) + rdoc (>= 4.0.0) + reline (>= 0.4.2) + json (2.19.3) + language_server-protocol (3.17.0.5) + lint_roller (1.1.0) + logger (1.7.0) + loofah (2.25.1) + crass (~> 1.0.2) + nokogiri (>= 1.12.0) + mail (2.9.0) + logger + mini_mime (>= 0.1.1) + net-imap + net-pop + net-smtp + marcel (1.1.0) + mini_mime (1.1.5) + minitest (6.0.3) + drb (~> 2.0) + prism (~> 1.5) + net-imap (0.6.3) + date + net-protocol + net-pop (0.1.2) + net-protocol + net-protocol (0.2.2) + timeout + net-smtp (0.5.1) + net-protocol + nio4r (2.7.5) + nokogiri (1.19.2-aarch64-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-aarch64-linux-musl) + racc (~> 1.4) + nokogiri (1.19.2-arm-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-arm-linux-musl) + racc (~> 1.4) + nokogiri (1.19.2-arm64-darwin) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-darwin) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-linux-musl) + racc (~> 1.4) + parallel (1.28.0) + parser (3.3.11.1) + ast (~> 2.4.1) + racc + pg (1.6.3) + pg (1.6.3-aarch64-linux) + pg (1.6.3-aarch64-linux-musl) + pg (1.6.3-arm64-darwin) + pg (1.6.3-x86_64-darwin) + pg (1.6.3-x86_64-linux) + pg (1.6.3-x86_64-linux-musl) + pp (0.6.3) + prettyprint + prettyprint (0.2.0) + prism (1.9.0) + psych (5.3.1) + date + stringio + racc (1.8.1) + rack (3.2.6) + rack-session (2.1.1) + base64 (>= 0.1.0) + rack (>= 3.0.0) + rack-test (2.2.0) + rack (>= 1.3) + rackup (2.3.1) + rack (>= 3) + rails (8.1.3) + actioncable (= 8.1.3) + actionmailbox (= 8.1.3) + actionmailer (= 8.1.3) + actionpack (= 8.1.3) + actiontext (= 8.1.3) + actionview (= 8.1.3) + activejob (= 8.1.3) + activemodel (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + bundler (>= 1.15.0) + railties (= 8.1.3) + rails-dom-testing (2.3.0) + activesupport (>= 5.0.0) + minitest + nokogiri (>= 1.6) + rails-html-sanitizer (1.7.0) + loofah (~> 2.25) + nokogiri (>= 1.15.7, != 1.16.7, != 1.16.6, != 1.16.5, != 1.16.4, != 1.16.3, != 1.16.2, != 1.16.1, != 1.16.0.rc1, != 1.16.0) + railties (8.1.3) + actionpack (= 8.1.3) + activesupport (= 8.1.3) + irb (~> 1.13) + rackup (>= 1.0.0) + rake (>= 12.2) + thor (~> 1.0, >= 1.2.2) + tsort (>= 0.2) + zeitwerk (~> 2.6) + rainbow (3.1.1) + rake (13.3.1) + rdoc (7.2.0) + erb + psych (>= 4.0.0) + tsort + regexp_parser (2.12.0) + reline (0.6.3) + io-console (~> 0.5) + rspec (3.13.2) + rspec-core (~> 3.13.0) + rspec-expectations (~> 3.13.0) + rspec-mocks (~> 3.13.0) + rspec-core (3.13.6) + rspec-support (~> 3.13.0) + rspec-expectations (3.13.5) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-mocks (3.13.8) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-support (3.13.7) + rubocop (1.86.0) + json (~> 2.3) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.1.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 2.9.3, < 3.0) + rubocop-ast (>= 1.49.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 4.0) + rubocop-ast (1.49.1) + parser (>= 3.3.7.2) + prism (~> 1.7) + ruby-progressbar (1.13.0) + securerandom (0.4.1) + stringio (3.2.0) + thor (1.5.0) + timeout (0.6.1) + toxiproxy (2.0.2) + tsort (0.2.0) + tzinfo (2.0.6) + concurrent-ruby (~> 1.0) + unicode-display_width (3.2.0) + unicode-emoji (~> 4.1) + unicode-emoji (4.2.0) + uri (1.1.1) + useragent (0.16.11) + websocket-driver (0.8.0) + base64 + websocket-extensions (>= 0.1.0) + websocket-extensions (0.1.5) + zeitwerk (2.7.5) + +PLATFORMS + aarch64-linux + aarch64-linux-gnu + aarch64-linux-musl + arm-linux-gnu + arm-linux-musl + arm64-darwin + x86_64-darwin + x86_64-linux-gnu + x86_64-linux-musl + +DEPENDENCIES + pg + rails + rspec (~> 3.4) + rubocop + toxiproxy + +BUNDLED WITH + 2.7.2 diff --git a/integration/prepared_statements_full/dev.sh b/integration/prepared_statements_full/dev.sh new file mode 100755 index 000000000..f36274590 --- /dev/null +++ b/integration/prepared_statements_full/dev.sh @@ -0,0 +1,12 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +pushd ${SCRIPT_DIR} + +export GEM_HOME=~/.gem +mkdir -p ${GEM_HOME} +bundle install +bundle exec rspec *_spec.rb + +popd diff --git a/integration/prepared_statements_full/protocol_out_of_sync_spec.rb b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb new file mode 100644 index 000000000..1f601a965 --- /dev/null +++ b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb @@ -0,0 +1,90 @@ +# frozen_string_literal: true + +require_relative 'rspec_helper' + +# Triggers the Issue 1 scenario: PREPARE fails, pgdog injects a retry PREPARE that also fails, +# leaving an orphaned EXECUTE response. After the fix, pgdog drains the orphaned E+Z internally +# so no stale bytes remain on the wire when this helper returns. +def trigger_prepare_inject_failure(conn, statement_name:) + # 1. PREPARE fails — pgdog keeps the statement in its local cache despite the error. + expect { conn.exec "PREPARE #{statement_name} AS SELECT 1 FROM __pgdog_nonexistent_table__" } + .to raise_error(PG::Error, /__pgdog_nonexistent_table__/) + + # 2. EXECUTE triggers [Prepare, Query] injection; injected PREPARE fails again. + # After fix: pgdog consumes the orphaned EXECUTE E+Z internally; nothing stale on wire. + expect { conn.exec "EXECUTE #{statement_name}" } + .to raise_error(PG::Error, /__pgdog_nonexistent_table__/) +end + +describe 'protocol out of sync regressions' do + after do + ensure_done + end + + # Issue 1 — Session mode: orphaned EXECUTE ReadyForQuery must not leak to the next query. + # Bug: stale E+Z left on wire; next query consumed stale E, orphaned Z hit empty queue → ConnectionBad. + # Fix: Error handler must preserve Code(ReadyForQuery) for the outer EXECUTE when an injected + # PREPARE fails; no stale bytes reach the client. + it 'next query succeeds after failed injected PREPARE in session mode' do + conn = connect_pgdog(user: 'pgdog_session') + begin + trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_session') + + # After fix: no stale messages on wire; next query must succeed without ConnectionBad. + result = conn.exec 'SELECT 1 AS alive' + expect(result.first['alive']).to eq('1') + ensure + conn.close unless conn.finished? + end + end + + # Transaction mode (pool_size=1): Issue 1 fix drains orphaned EXECUTE E+Z internally; + # no stale bytes reach the pool — subsequent queries on pool-recycled connections must succeed. + it 'next query succeeds after failed injected PREPARE in transaction mode' do + conn = connect_pgdog(user: 'pgdog_tx_single') + begin + # 1. Trigger PREPARE injection failure; pgdog drains orphaned EXECUTE E+Z internally. + trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_tx') + + tmp = "#{Process.pid}_#{rand(1_000_000)}" + + # 2. CREATE TABLE — must succeed; no stale E+'I'-Z in buffer after fix. + write_sql = "CREATE TEMP TABLE pgdog_prepare_inject_#{tmp} (id int)" + conn.exec write_sql + + # 3. INSERT (1) — must succeed with its own real response. + conn.exec "INSERT INTO pgdog_prepare_inject_#{tmp} (id) VALUES (1)" + + # 4. BEGIN — must succeed; real C+'T'-Z consumed by this query. + conn.exec 'BEGIN' + + # 5. INSERT (2) + END — must succeed; no stale 'T'-Z in pool to shift the chain. + conn.exec "INSERT INTO pgdog_prepare_inject_#{tmp} (id) VALUES (2)" + sleep 0.05 # let event loop process actual INSERT before Ruby sends END + conn.exec 'END' + ensure + conn.close unless conn.finished? + end + end + + # Issue 1 — Session mode with prior exec_params: extended=true set permanently. + # Bug: same as Test 1; extended=true additionally sets out_of_sync=true in the Error handler, + # changing connection-lifecycle behaviour. Either way, the next query must not fail. + # Fix: same root fix; extended flag behaviour (Issue 4) is a separate concern. + it 'next query succeeds after failed injected PREPARE when prior extended query ran first' do + conn = connect_pgdog(user: 'pgdog_session') + begin + # Parameterised query runs first — sets extended=true on the connection. + result = conn.exec_params('SELECT $1::int AS primer', [42]) + expect(result.first['primer']).to eq('42') + + trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_ext') + + # After fix: stale E+Z handled internally even with extended=true; next query succeeds. + result = conn.exec 'SELECT 1 AS alive' + expect(result.first['alive']).to eq('1') + ensure + conn.close unless conn.finished? + end + end +end diff --git a/integration/prepared_statements_full/rspec_helper.rb b/integration/prepared_statements_full/rspec_helper.rb new file mode 100644 index 000000000..782cd1146 --- /dev/null +++ b/integration/prepared_statements_full/rspec_helper.rb @@ -0,0 +1,106 @@ +# frozen_string_literal: true + +require 'active_record' +require 'rspec' +require 'pg' +require 'toxiproxy' + +def admin + PG.connect('postgres://admin:pgdog@127.0.0.1:6432/admin') +end + +def admin_exec(sql) + conn = admin + conn.exec sql +ensure + conn&.close +end + +def failover + PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/failover') +end + +def admin_stats(database, column = nil) + conn = admin + stats = conn.exec 'SHOW STATS' + conn.close + stats = stats.select { |item| item['database'] == database } + return stats.map { |item| item[column].to_i } unless column.nil? + + stats +end + +def ensure_done + deadline = Time.now + 2 + pools = [] + clients = [] + servers = [] + pg_clients = [] + current_client_id = nil + + loop do + conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') + begin + pools = conn.exec 'SHOW POOLS' + current_client_id = conn.backend_pid + clients = conn.exec 'SHOW CLIENTS' + servers = conn.exec 'SHOW SERVERS' + ensure + conn.close + end + + pg_conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') + begin + pg_clients = pg_conn.exec 'SELECT state FROM pg_stat_activity'\ + " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ + " AND backend_type = 'client backend'"\ + " AND query NOT LIKE '%pg_stat_activity%'" + ensure + pg_conn.close + end + + pools_ready = pools.all? do |pool| + pool['sv_active'] == '0' && pool['cl_waiting'] == '0' && pool['out_of_sync'] == '0' + end + clients_ready = clients.all? do |client| + client['id'].to_i == current_client_id || client['state'] == 'idle' + end + servers_ready = servers + .select { |server| server['application_name'] != 'PgDog Pub/Sub Listener' } + .all? { |server| server['state'] == 'idle' } + pg_clients_ready = pg_clients.all? { |client| client['state'] == 'idle' } + + break if pools_ready && clients_ready && servers_ready && pg_clients_ready + break if Time.now >= deadline + + sleep 0.05 + end + + pools.each do |pool| + expect(pool['sv_active']).to eq('0') + expect(pool['cl_waiting']).to eq('0') + expect(pool['out_of_sync']).to eq('0') + end + + clients.each do |client| + next if client['id'].to_i == current_client_id + expect(client['state']).to eq('idle') + end + + servers + .select do |server| + server['application_name'] != 'PgDog Pub/Sub Listener' + end + .each do |server| + expect(server['state']).to eq('idle') + end + + pg_clients.each do |client| + expect(client['state']).to eq('idle') + end +end + + +def connect_pgdog(user: 'pgdog') + PG.connect(dbname: 'pgdog', user:, password: 'pgdog', port: 6432, host: '127.0.0.1') +end \ No newline at end of file diff --git a/integration/prepared_statements_full/run.sh b/integration/prepared_statements_full/run.sh new file mode 100755 index 000000000..7efdf24d1 --- /dev/null +++ b/integration/prepared_statements_full/run.sh @@ -0,0 +1,11 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source ${SCRIPT_DIR}/../common.sh + +run_pgdog "integration/prepared_statements_full" +wait_for_pgdog + +bash ${SCRIPT_DIR}/dev.sh + +stop_pgdog diff --git a/integration/prepared_statements_full/users.toml b/integration/prepared_statements_full/users.toml index 9a8205f04..a97596a57 100644 --- a/integration/prepared_statements_full/users.toml +++ b/integration/prepared_statements_full/users.toml @@ -2,3 +2,19 @@ database = "pgdog" name = "pgdog" password = "pgdog" + + +[[users]] +name = "pgdog_session" +database = "pgdog" +password = "pgdog" +server_user = "pgdog" +pooler_mode = "session" + +[[users]] +name = "pgdog_tx_single" +database = "pgdog" +password = "pgdog" +server_user = "pgdog" +pooler_mode = "transaction" +pool_size = 1 \ No newline at end of file diff --git a/integration/ruby/lb_spec.rb b/integration/ruby/lb_spec.rb index a3a6c3c39..061d614eb 100644 --- a/integration/ruby/lb_spec.rb +++ b/integration/ruby/lb_spec.rb @@ -13,7 +13,7 @@ it 'distributes traffic evenly' do conn = failover # Reset stats and bans - admin.exec "RECONNECT" + admin_exec 'RECONNECT' before = admin_stats('failover') 250.times do diff --git a/integration/ruby/rspec_helper.rb b/integration/ruby/rspec_helper.rb index 5eb8317ae..ac55b4acf 100644 --- a/integration/ruby/rspec_helper.rb +++ b/integration/ruby/rspec_helper.rb @@ -9,6 +9,13 @@ def admin PG.connect('postgres://admin:pgdog@127.0.0.1:6432/admin') end +def admin_exec(sql) + conn = admin + conn.exec sql +ensure + conn&.close +end + def failover PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/failover') end @@ -24,20 +31,62 @@ def admin_stats(database, column = nil) end def ensure_done - conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') - pools = conn.exec 'SHOW POOLS' + deadline = Time.now + 2 + pools = [] + clients = [] + servers = [] + pg_clients = [] + current_client_id = nil + + loop do + conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') + begin + pools = conn.exec 'SHOW POOLS' + current_client_id = conn.backend_pid + clients = conn.exec 'SHOW CLIENTS' + servers = conn.exec 'SHOW SERVERS' + ensure + conn.close + end + + pg_conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') + begin + pg_clients = pg_conn.exec 'SELECT state FROM pg_stat_activity'\ + " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ + " AND backend_type = 'client backend'"\ + " AND query NOT LIKE '%pg_stat_activity%'" + ensure + pg_conn.close + end + + pools_ready = pools.all? do |pool| + pool['sv_active'] == '0' && pool['cl_waiting'] == '0' && pool['out_of_sync'] == '0' + end + clients_ready = clients.all? do |client| + client['id'].to_i == current_client_id || client['state'] == 'idle' + end + servers_ready = servers + .select { |server| server['application_name'] != 'PgDog Pub/Sub Listener' } + .all? { |server| server['state'] == 'idle' } + pg_clients_ready = pg_clients.all? { |client| client['state'] == 'idle' } + + break if pools_ready && clients_ready && servers_ready && pg_clients_ready + break if Time.now >= deadline + + sleep 0.05 + end + pools.each do |pool| expect(pool['sv_active']).to eq('0') expect(pool['cl_waiting']).to eq('0') expect(pool['out_of_sync']).to eq('0') end - current_client_id = conn.backend_pid - clients = conn.exec 'SHOW CLIENTS' + clients.each do |client| next if client['id'].to_i == current_client_id expect(client['state']).to eq('idle') end - servers = conn.exec 'SHOW SERVERS' + servers .select do |server| server['application_name'] != 'PgDog Pub/Sub Listener' @@ -46,12 +95,7 @@ def ensure_done expect(server['state']).to eq('idle') end - conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') - clients = conn.exec 'SELECT state FROM pg_stat_activity'\ - " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ - " AND backend_type = 'client backend'"\ - " AND query NOT LIKE '%pg_stat_activity%'" - clients.each do |client| + pg_clients.each do |client| expect(client['state']).to eq('idle') end -end +end \ No newline at end of file diff --git a/pgdog/src/backend/prepared_statements.rs b/pgdog/src/backend/prepared_statements.rs index 76ad8480a..93d90c38d 100644 --- a/pgdog/src/backend/prepared_statements.rs +++ b/pgdog/src/backend/prepared_statements.rs @@ -231,9 +231,7 @@ impl PreparedStatements { } // Backend told us the copy is done. - 'c' => { - self.state.action(code)?; - } + 'c' => {} _ => (), } diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index ca86915d2..1b4214f35 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -1,3 +1,5 @@ +use tracing::error; + use crate::{ net::{Message, Protocol}, stats::memory::MemoryUsage, @@ -146,17 +148,61 @@ impl ProtocolState { match code { ExecutionCode::Untracked => return Ok(Action::Forward), ExecutionCode::Error => { - // Remove everything from the execution queue. - // The connection is out of sync until client re-syncs it. + if matches!( + self.queue.front(), + Some(ExecutionItem::Ignore(ExecutionCode::Error)) + ) { + // We ignore errors only for the pgdog-injected sub-request. + // In that case the first error is already processed and + // sent to the client, for the remaining expected errors + // we've added ignores for errors and RFQ. + // The error is ignored but still be logged by [backend::server] module + self.queue.pop_front(); + return Ok(Action::Ignore); + } + + // This is the first (and client-visible) error in the chain. It is forwarded + // so the client receives exactly one Error+RFQ for their request. + // For extended-protocol pipelines also mark out-of-sync so the connection + // is not reused until the client re-syncs. if self.extended { self.out_of_sync = true; } - let last = self.queue.pop_back(); - self.queue.clear(); - if let Some(ExecutionItem::Code(ExecutionCode::ReadyForQuery)) = last { + + // find the first position for RFQ code to effectively + // separate the pgdog-injected sub-request from the remaining queries + let Some(rfq_pos) = self + .queue + .iter() + .position(|i| matches!(i, ExecutionItem::Code(ExecutionCode::ReadyForQuery))) + else { + self.queue.clear(); + return Ok(Action::Forward); + }; + + // broken_queue - pgdog-injected sub-request part that contains multiple requests + // that are not be executed properly anyway, since we've got an error previously + let broken_queue = self.queue.drain(..rfq_pos); + + // Count how many queries are expected to finish in the pgdog-injected sub-request + // The current use case is only the Prepare + Execute messages from the [backend::server] + // And in case the prepare fails the execute will fail as well. + // WARN: That is not most reliable solution in case the injected set of queries + // will extend, but it should work for now. + let count_ignores = broken_queue + .filter(|i| matches!(i, ExecutionItem::Ignore(ExecutionCode::ReadyForQuery))) + .count(); + + // For every message that we expect to run add ignore for one error and one RFQ + // For prepare it'll be a one iteration that will create the query + // [Ignore(RFQ), Ignore(Error), Code(RFQ)] + for _ in 0..count_ignores { + self.queue + .push_front(ExecutionItem::Ignore(ExecutionCode::Error)); self.queue - .push_back(ExecutionItem::Code(ExecutionCode::ReadyForQuery)); + .push_front(ExecutionItem::Ignore(ExecutionCode::ReadyForQuery)); } + return Ok(Action::Forward); } @@ -165,7 +211,10 @@ impl ProtocolState { } _ => (), }; - let in_queue = self.queue.pop_front().ok_or(Error::ProtocolOutOfSync)?; + let in_queue = self.queue.pop_front().ok_or_else(|| { + error!("Unexpected action {code:?}: queue is empty"); + Error::ProtocolOutOfSync + })?; match in_queue { // The queue is waiting for the server to send ReadyForQuery, // but it sent something else. That means the execution pipeline @@ -175,6 +224,11 @@ impl ProtocolState { && in_queue_code == ExecutionCode::ReadyForQuery { self.queue.push_front(in_queue); + } else if in_queue_code == ExecutionCode::ReadyForQuery && self.queue.is_empty() { + // The last RFQ of this pipeline was just consumed and nothing remains. + // Reset extended so subsequent simple-query errors are not spuriously + // treated as mid-extended-pipeline and do not trigger out_of_sync. + self.extended = false; } Ok(Action::Forward) @@ -185,6 +239,8 @@ impl ProtocolState { if code == in_queue { Ok(Action::Ignore) } else { + error!(?self, "Unexpected action {code:?}: expected: {in_queue:?}"); + Err(Error::ProtocolOutOfSync) } } @@ -208,11 +264,6 @@ impl ProtocolState { &self.queue } - #[cfg(test)] - pub(crate) fn queue_mut(&mut self) -> &mut VecDeque { - &mut self.queue - } - pub(crate) fn done(&self) -> bool { self.is_empty() && !self.out_of_sync } @@ -334,7 +385,8 @@ mod test { assert_eq!(state.action('C').unwrap(), Action::Forward); assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); - assert!(state.extended); + // extended resets to false once the final RFQ drains the queue. + assert!(!state.extended); } #[test] @@ -847,4 +899,219 @@ mod test { assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); } + + // Double action('c') for server CopyDone + + // Safe path: Code(ReadyForQuery) backstop makes the double action('c') call idempotent. + #[test] + fn test_copydone_double_action_safe_with_rfq_backstop() { + let mut state = ProtocolState::default(); + // 1. Queue: CopyOut slot + RFQ backstop (from Sync). + state.add('G'); // CopyOut + state.add('Z'); // ReadyForQuery backstop + + // 2. First action('c'): pops CopyOut; RFQ backstop untouched. + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert_eq!(state.len(), 1); + + // 3. Second action('c'): sees RFQ at front; pushes it back (idempotent). + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert_eq!(state.len(), 1); // RFQ still present for the server's ReadyForQuery + } + + // Documents raw state-machine behavior: calling action('c') twice with no RFQ backstop + // causes ProtocolOutOfSync. forward() was the only caller that did this; the second call + // has been removed from the 'c' arm in prepared_statements.rs, making this path unreachable + // through normal protocol flow. The test is kept to pin the underlying invariant. + #[test] + fn test_copydone_double_action_oos_without_rfq_backstop() { + let mut state = ProtocolState::default(); + // Queue: Execute + Flush (no Sync) — no RFQ backstop. + state.add('C'); // ExecutionCompleted + + // First action('c'): pops ExecutionCompleted; queue empty. + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert!(state.is_empty()); + + // Second action('c') directly: empty queue → ProtocolOutOfSync. + // This is the raw state machine. forward() no longer makes this second call. + assert!(state.action('c').is_err()); + } + + // Stale RFQ arrives before injected ParseComplete — Ignore arm rejects the mismatch. + #[test] + fn test_stale_rfq_hits_ignore_parsecomplete() { + let mut state = ProtocolState::default(); + // 1. pgdog injects Parse; queue: [Ignore(ParseComplete), BindComplete, CommandComplete, RFQ]. + state.add_ignore('1'); // ParseComplete — injected + state.add('2'); // BindComplete + state.add('C'); // CommandComplete + state.add('Z'); // ReadyForQuery + + // Stale RFQ from prior cycle arrives before ParseComplete. + // ReadyForQuery != ParseComplete → ProtocolOutOfSync. + assert!( + state.action('Z').is_err(), + "stale RFQ against Ignore(ParseComplete) must produce ProtocolOutOfSync" + ); + } + + // Variant: stale RFQ hits Ignore(BindComplete) — same mismatch for any Ignore slot. + #[test] + fn test_stale_rfq_hits_ignore_bindcomplete() { + let mut state = ProtocolState::default(); + // Both Parse and Bind are injected (Describe path). + state.add_ignore('1'); // ParseComplete — injected + state.add_ignore('2'); // BindComplete — injected + state.add('T'); // RowDescription + state.add('C'); // CommandComplete + state.add('Z'); // ReadyForQuery + + // ParseComplete arrives normally and is swallowed. + assert_eq!(state.action('1').unwrap(), Action::Ignore); + + // Queue front is now Ignore(BindComplete). + // A stale RFQ arrives before BindComplete → ProtocolOutOfSync. + assert!( + state.action('Z').is_err(), + "stale RFQ against Ignore(BindComplete) must produce ProtocolOutOfSync" + ); + } + + // Happy path: injected ParseComplete arrives in order — silently ignored, rest forwarded. + #[test] + fn test_injected_parse_happy_path() { + let mut state = ProtocolState::default(); + state.add_ignore('1'); // ParseComplete — injected, swallowed + state.add('2'); // BindComplete + state.add('C'); // CommandComplete + state.add('Z'); // ReadyForQuery + + assert_eq!(state.action('1').unwrap(), Action::Ignore); // swallowed + assert_eq!(state.action('2').unwrap(), Action::Forward); // forwarded + assert_eq!(state.action('C').unwrap(), Action::Forward); // forwarded + assert_eq!(state.action('Z').unwrap(), Action::Forward); // forwarded + assert!(state.is_empty()); + } + + // Replicates the full lifecycle of an injected PREPARE that errors: + // + // Client sends: PREPARE foo AS ... (simple-query style) + // EXECUTE (via Query) + // + // pgdog injects ahead of the client's Query: + // add_ignore('C') — CommandComplete from PREPARE + // add_ignore('Z') — RFQ from PREPARE + // Then the client's Query adds: + // add('Z') — the client-visible RFQ + // + // Queue before first error: [Ignore(C), Ignore(Z), Code(Z)] + // + // Server responds to PREPARE with an error: + // 'E' → error branch fires: drain [Ignore(C), Ignore(Z)], count 1 Ignore(RFQ), + // push_front loop produces [Ignore(RFQ), Ignore(Error), Code(Z)]. + // Action::Forward — client receives this error. + // 'Z' → matches Ignore(RFQ) → Action::Ignore (PREPARE's RFQ suppressed) + // + // Server responds to EXECUTE (which fails because PREPARE never succeeded): + // 'E' → fast-path: front is Ignore(Error) → pop → Action::Ignore (suppressed) + // 'Z' → Code(Z) → Action::Forward — client receives the closing RFQ + #[test] + fn test_injected_prepare_error_full_lifecycle() { + let mut state = ProtocolState::default(); + + // --- setup: replicate what prepared_statements.rs does --- + // ProtocolMessage::Prepare injects: + state.add_ignore('C'); // Ignore(CommandComplete) — PREPARE response + state.add_ignore('Z'); // Ignore(RFQ) — PREPARE response + // ProtocolMessage::Query (client EXECUTE) adds: + state.add('Z'); // Code(RFQ) — client-visible + + // --- server sends Error for PREPARE --- + // Error branch: drains [Ignore(C), Ignore(Z)], finds 1 Ignore(Z), + // rebuilds queue as [Ignore(RFQ), Ignore(Error), Code(Z)]. + assert_eq!(state.action('E').unwrap(), Action::Forward); + + // --- server sends RFQ for PREPARE (now suppressed) --- + assert_eq!(state.action('Z').unwrap(), Action::Ignore); + + // --- server sends Error for EXECUTE (prepare never succeeded) --- + // Fast-path: Ignore(Error) is at front → pop and ignore. + assert_eq!(state.action('E').unwrap(), Action::Ignore); + + // --- server sends RFQ for EXECUTE --- + // Code(Z) is at front → forwarded to client. + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!(state.is_empty()); + } + // ======================================== + // extended flag reset tests (Issue 4) + // ======================================== + + // extended resets to false once the last RFQ of a pipeline is consumed. + #[test] + fn test_extended_resets_on_rfq_drain() { + let mut state = ProtocolState::default(); + // add_ignore('1') sets extended=true (ParseComplete is an extended-protocol code). + state.add_ignore('1'); // ParseComplete — injected, sets extended=true + state.add('Z'); // RFQ — client-visible + + assert_eq!(state.action('1').unwrap(), Action::Ignore); + assert!(state.extended, "extended must be true before RFQ"); + + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!( + !state.extended, + "extended must reset to false after last RFQ drains queue" + ); + assert!(state.is_empty()); + } + + // extended must NOT reset mid-pipeline: an RFQ that still has items behind it + // belongs to a pipelined request and should not prematurely clear the flag. + #[test] + fn test_extended_stays_true_mid_pipeline() { + let mut state = ProtocolState::default(); + state.add_ignore('1'); // ParseComplete — sets extended=true + state.add('Z'); // first pipeline RFQ + state.add('Z'); // second pipeline RFQ + + assert_eq!(state.action('1').unwrap(), Action::Ignore); + assert_eq!(state.action('Z').unwrap(), Action::Forward); // first RFQ, one item remains + assert!( + state.extended, + "extended must stay true while pipeline is not fully drained" + ); + + assert_eq!(state.action('Z').unwrap(), Action::Forward); // second RFQ drains queue + assert!( + !state.extended, + "extended must reset once queue is fully drained" + ); + } + + // After extended resets, a plain simple-query error must not set out_of_sync. + // Before the fix, extended stuck permanently and every subsequent error triggered + // out_of_sync=true spuriously. + #[test] + fn test_no_spurious_out_of_sync_after_extended_reset() { + let mut state = ProtocolState::default(); + + // Phase 1: parameterised query sets extended=true, then resets on drain. + state.add_ignore('1'); // ParseComplete + state.add('Z'); + assert_eq!(state.action('1').unwrap(), Action::Ignore); + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!(!state.extended); + + // Phase 2: simple-query error on a now-reset connection. + state.add('Z'); // RFQ from simple query + assert_eq!(state.action('E').unwrap(), Action::Forward); // error forwarded + assert!( + !state.out_of_sync(), + "out_of_sync must be false: extended was reset" + ); + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!(state.done()); + } } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index ee5e03d01..ed149b753 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -388,6 +388,13 @@ impl Server { Ok(forward) => { if forward { break message; + } else if message.code() == 'E' { + // we got an error that will not be forwarded to the client, + // but it still be useful for tracing + error!( + "Ignore error from stream: {:?}", + ErrorResponse::from_bytes(message.payload()) + ); } } Err(err) => { @@ -1038,7 +1045,6 @@ impl Drop for Server { } } -// Used for testing. #[cfg(test)] pub mod test { use bytes::{BufMut, BytesMut}; @@ -1392,7 +1398,7 @@ pub mod test { let (new, name) = global.write().insert(&parse); assert!(new); let parse = parse.rename(&name); - assert_eq!(parse.name(), "__pgdog_1"); + assert!(parse.name().starts_with("__pgdog_")); let mut server = test_server().await; @@ -1401,7 +1407,7 @@ pub mod test { .send( &vec![ ProtocolMessage::from(Bind::new_params( - "__pgdog_1", + &name, &[Parameter { len: 1, data: "1".as_bytes().into(), @@ -2562,38 +2568,6 @@ pub mod test { ); } - #[tokio::test] - async fn test_protocol_out_of_sync_sets_error_state() { - let mut server = test_server().await; - - server - .send(&vec![Query::new("SELECT 1").into()].into()) - .await - .unwrap(); - - for c in ['T', 'D'] { - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), c); - } - - // simulate an unlikely, but existent out-of-sync state - server - .prepared_statements_mut() - .state_mut() - .queue_mut() - .clear(); - - let res = server.read().await; - assert!( - matches!(res, Err(Error::ProtocolOutOfSync)), - "protocol should be out of sync" - ); - assert!( - server.stats().get_state() == State::Error, - "state should be Error after detecting desync" - ) - } - #[tokio::test] async fn test_reset_clears_client_params() { let mut server = test_server().await; @@ -3430,4 +3404,183 @@ pub mod test { assert!(server.force_close()); assert_eq!(server.stats().get_state(), State::ForceClose); } + + // Failed injected PREPARE leaves EXECUTE ReadyForQuery unmatched — Error handler empties the queue. + #[tokio::test] + async fn test_prepare_execute_inject_failure_orphans_execute_rfq() { + let mut server = test_server().await; + + // 1. Send [Prepare, Query] as the rewriter injects for EXECUTE. + server + .send( + &vec![ + ProtocolMessage::Prepare { + name: "__pgdog_prepare_inject_test".to_string(), + statement: "SELECT 1 FROM __pgdog_nonexistent_table__".to_string(), + }, + ProtocolMessage::Query(Query::new("EXECUTE __pgdog_prepare_inject_test()")), + ] + .into(), + ) + .await + .unwrap(); + + // 2. PREPARE 'E' forwarded; 'Z' consumes re-added Code(RFQ) — queue empty. + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); // 'E' PREPARE error + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); // 'Z' RFQ — queue now empty + } + + // Extended Execute + Flush (no Sync): single action('c') now succeeds. + // CopyDone is forwarded to client; the trailing CommandComplete then hits an empty + // queue (no RFQ backstop, no Sync) and raises ProtocolOutOfSync. + // This is distinct from the former double-action bug, which fired on CopyDone itself. + #[tokio::test] + async fn test_copydone_single_action_without_sync() { + let mut server = test_server().await; + + // 1. Parse + Bind + Execute + Flush (not Sync); no RFQ backstop in queue. + server + .send( + &vec![ + ProtocolMessage::Parse(Parse::new_anonymous("COPY (VALUES (1),(2)) TO STDOUT")), + ProtocolMessage::Bind(Bind::new_params("", &[])), + ProtocolMessage::Execute(Execute::new()), + // Flush (not Sync): prompts PostgreSQL to send buffered responses. + // handle() maps this to Other, adding nothing to the queue. + Flush.into(), + ] + .into(), + ) + .await + .unwrap(); + + // 2. ParseComplete, BindComplete, CopyOutResponse, CopyData x2 arrive normally. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'H'); // CopyOutResponse + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 1 + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 2 + + // 3. CopyDone — fixed: single action() pops ExecutionCompleted; no second call. + assert_eq!(server.read().await.unwrap().code(), 'c'); // CopyDone forwarded + + // 4. CommandComplete hits empty queue (no RFQ backstop without Sync). + assert!( + matches!(server.read().await.unwrap_err(), Error::ProtocolOutOfSync), + "expected ProtocolOutOfSync on CommandComplete with empty queue" + ); + } + + // Safe path: Sync adds Code(RFQ) backstop — double action('c') is idempotent. + #[tokio::test] + async fn test_copydone_double_action_safe_with_sync() { + let mut server = test_server().await; + + // 1. Parse + Bind + Execute + Sync; RFQ backstop added to queue. + server + .send( + &vec![ + ProtocolMessage::Parse(Parse::new_anonymous("COPY (VALUES (1),(2)) TO STDOUT")), + ProtocolMessage::Bind(Bind::new_params("", &[])), + ProtocolMessage::Execute(Execute::new()), + ProtocolMessage::Sync(Sync), + ] + .into(), + ) + .await + .unwrap(); + + // 2. Full response sequence — CopyDone is safe with RFQ backstop. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'H'); // CopyOutResponse + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 1 + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 2 + assert_eq!(server.read().await.unwrap().code(), 'c'); // CopyDone -- safe with RFQ backstop + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + assert!( + server.done(), + "server must be done after full response sequence" + ); + } + + // After a parameterised query, extended resets once the RFQ drains the queue. + // Subsequent simple-query errors must NOT set out_of_sync. + #[tokio::test] + async fn test_extended_resets_after_rfq_drain() { + use crate::net::bind::Parameter; + + let mut server = test_server().await; + + // 1. Baseline: extended=false; simple-query error must not set out_of_sync. + server + .send(&vec![Query::new("SELECT 1/0").into()].into()) + .await + .unwrap(); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); + assert!(!server.out_of_sync()); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); + assert!(server.done()); + + // 2. Parameterised query: Parse+Bind+Execute+Sync sets extended=true, then + // the final RFQ drains the queue and resets extended to false. + let bind = Bind::new_params_codes( + "", + &[Parameter { + len: 1, + data: "1".as_bytes().into(), + }], + &[Format::Text], + ); + server + .send( + &vec![ + ProtocolMessage::from(Parse::new_anonymous("SELECT $1::int")), + ProtocolMessage::from(bind), + ProtocolMessage::from(Execute::new()), + ProtocolMessage::from(Sync), + ] + .into(), + ) + .await + .unwrap(); + + for c in ['1', '2', 'D', 'C', 'Z'] { + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), c); + } + assert!(server.done()); // extended was reset when 'Z' drained the queue + + // 3. Simple-query error after extended resets: out_of_sync must be false. + server + .send(&vec![Query::new("SELECT 1/0").into()].into()) + .await + .unwrap(); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); + assert!( + !server.out_of_sync(), + "out_of_sync must be false: extended was reset by prior RFQ drain" + ); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); + assert!(server.done()); + + // 4. Confirm: same result on the next error — extended stays reset across requests. + server + .send(&vec![Query::new("SELECT 1/0").into()].into()) + .await + .unwrap(); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); + assert!(!server.out_of_sync()); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); + assert!(server.done()); + } }