Skip to content

Conversation

@mlarribe
Copy link

@mlarribe mlarribe commented Oct 15, 2025

Context:

I was testing the feature Mongo.replace_one with the enabled params : upsert: true, retryable_writes: true when I found out that none of my document was getting updated despite the change on the documents every time from an extract to next the very first attempt to replace_one.

The digging

After some investigation I found that the issue was linked to the retryable_writes behaviour that was "using/re-using" the same sessions lsid and txnNumber.
From the documentation reusing the same lsid was supposed to be ok but not for txnNumber, I was surprised to discover that the lsid and txnNumber was staying exactly the same after multiple find_one followed by some minor change on the doc followed by an attempt to be Mongo.replace_one again. (the session_server was reused without increasing the txnNumber)

Thanks

Big heads up to log feature: log: &IO.inspect/1 that helped a start diagnose it even from a novice experience on the driver it self

I did some dummy test on own, and it solved my issue.
I may still have miss edge case so, Peers review will be very appreciated, I'm still very on the learning curve of the driver

…ion to prevent silent no-write operation

fix retryable_write faulty, fix the issue that prevent replace_one to occures on concecutive update on the same document when sessions is checkin and then reused again (same document or different does matter, it's stays silently blocked until the session expires)
@zookzook
Copy link
Owner

zookzook commented Nov 16, 2025

Thank you for providing a detailed description of your issue. Let's check the documentation that I found for the lsid txNumber:

The server requires each operation executed within a transaction to provide an lsid and txnNumber in its command
document. Each field is obtained from the ClientSession object passed to the operation from the application. Drivers
will be responsible for maintaining a monotonically increasing transaction number for each ServerSession used by a
ClientSession object. The txnNumber is incremented by the call to startTransaction and remains the same for all
commands in the transaction.

Drivers that pool ServerSessions MUST preserve the transaction number when reusing a server session from the pool with a
new ClientSession (this can be tracked as another property on the driver's object for the server session).

Drivers MUST ensure that each transaction specifies a transaction number larger than any previously used transaction
number for its session ID.

The driver increments the txNumber when call startTransation:

  def handle_call_event(:start_transaction, transaction, %Session{server_session: session} = data) when transaction in [:no_transaction, :transaction_aborted, :transaction_committed] do
    {:next_state, :starting_transaction, %Session{data | recovery_token: nil, server_session: ServerSession.next_txn_num(session)}, :ok}
  end

Your merge request would increment the txNumber two times in case of startTransaction. Could you provide an example and explain what do you expect and what the drive does? If would be easier for me the unterstand the issue better and to reproduce it, which is very important, so I can add a unit test for this issue.

@mlarribe
Copy link
Author

mlarribe commented Dec 2, 2025

I managed to reproduce the edge case in a simple without my fix

##Context: Cluster with Replication set to 3 
conn = :poll_test_mongo

Mongo.start_link([
  name: conn,
  url: url,
  pool_size: 100,
  socket_options: [:inet6]
])

id = 42

##Run example
my-app(78)> Mongo.insert_one(conn, "test", %{_id: id, val: 42, toto: "azerty"}) #insert document first
{:ok, %Mongo.InsertOneResult{acknowledged: true, inserted_id: 42}}
my-app(79)>
nil
my-app(80)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true) #get the document
%{"_id" => 42, "toto" => "azerty", "val" => 42}
my-app(81)> transformed_object = obj |> Map.update("fieldA", 0, fn val -> val + 1 end) #prepare a modification of the document
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
my-app(82)> Mongo.replace_one(conn, "test", %{_id: id}, transformed_object, [upsert: true, retryable_writes: true]) #replace the doc
{:ok,
 %Mongo.UpdateResult{
   acknowledged: true,
   matched_count: 1,
   modified_count: 1,
   upserted_ids: []
 }}
my-app(83)>
nil
my-app(84)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true) #[BREAK] turn out the doc didn't changed
%{"_id" => 42, "toto" => "azerty", "val" => 42}
my-app(85)>
nil
my-app(86)>
nil
my-app(87)>
nil
my-app(88)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true)
%{"_id" => 42, "toto" => "azerty", "val" => 42}
my-app(89)> transformed_object = obj |> Map.update("fieldA", 0, fn val -> val + 1 end)
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
my-app(90)> Mongo.replace_one(conn, "test", %{_id: id}, transformed_object, [upsert: true, retryable_writes: true])
{:ok,
 %Mongo.UpdateResult{
   acknowledged: true,
   matched_count: 1,
   modified_count: 1,
   upserted_ids: []
 }}
my-app(91)>
nil
my-app(92)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true) #[BREAK] still broken
%{"_id" => 42, "toto" => "azerty", "val" => 42}

On the test we can see that the document can't be updated what so ever till the sessions expires causing no update on the DB when ever we conduct replace_one several times with retry_writes after a first entrie.
From my investigation it was due to the reuse of session with a faulty non increasing of the counter (txnNumber) on the driver side.

So when a new query show-up it tries to use an active session but since it wasn't updated properly (not sure about the best spot for the increase were supposed to be done to eventually avoid this double increase that you mention) it re-serves the same sessions so basically same lsid with the same txnNumber so that Mongo behave as expect, knowing already the result for the combo indices so it returns the results without cares.

@zookzook
Copy link
Owner

Thanks for the detailed example. I have checked the specification for retryable_writes, and you are right!!!

Drivers MUST ensure that each retryable write command specifies a transaction number larger than any previously used transaction number for its session ID.

https://specifications.readthedocs.io/en/latest/retryable-writes/retryable-writes/#generating-transaction-ids

I will check the code again, because I believe there is maybe a better place to increment the transaction number.

@zookzook
Copy link
Owner

zookzook commented Dec 14, 2025

I have checked the code. We already increment the transaction number here:

  def init(topology, conn, address, server_session, wire_version, opts) do
    ## in case of `:retryable_write` we need to inc the transaction id
    server_session =
      case opts[:retryable_write] do
        true -> ServerSession.next_txn_num(server_session)
        _ -> server_session
      end

Now I tried your example and it works:

iex [13:49 :: 2] > id = 42
42
iex [13:49 :: 3] > Mongo.insert_one(conn, "test", %{_id: id, val: 42, toto: "azerty"})
13:49:57.029 [info] CMD insert "test" [documents: [%{val: 42, _id: 42, toto: "azerty"}]] db=48.1ms (module=Mongo function=do_log/5 line=1952 )

{:ok, %Mongo.InsertOneResult{acknowledged: true, inserted_id: 42}}
iex [13:49 :: 4] > obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true)
13:50:08.015 [info] CMD find "test" [filter: [_id: 42], limit: 1, batchSize: 1] db=0.3ms (module=Mongo function=do_log/5 line=1952 )

%{"_id" => 42, "toto" => "azerty", "val" => 42}
iex [13:49 :: 5] > transformed_object = obj |> Map.update("fieldA", 0, fn val -> val + 1 end)
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
iex [13:49 :: 6] > Mongo.replace_one(conn, "test", %{_id: id}, transformed_object, [upsert: true, retryable_writes: true])
13:50:30.067 [info] CMD update "test" [
  updates: [
    %{
      multi: false,
      q: %{_id: 42},
      upsert: true,
      u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
    }
  ]
] db=18.1ms (module=Mongo function=do_log/5 line=1952 )

{:ok, %Mongo.UpdateResult{acknowledged: true, matched_count: 1, modified_count: 1, upserted_ids: []}}
iex [13:49 :: 7] > obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true)
13:50:42.324 [info] CMD find "test" [filter: [_id: 42], limit: 1, batchSize: 1] db=0.9ms (module=Mongo function=do_log/5 line=1952 )

%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
iex [13:49 :: 8] >

The result contains fieldA with value of 0.

What MongoDB version do you use? I used 8.0.10 in a replica set of 3.

@mlarribe
Copy link
Author

I'm using a MongoDB 6.0.24 in a replicat set of 3.

(I'll try to add more verbose around the use of the sessions to see how it jump up, last time it wasn't increased properperly at the re-use that basically where I started to look for a workaround)

since the logger doesn't show the session information used by the request i's not super usefull to cornerise* the issue, (the logger was mostly helpfull at begging when I plugged the driver to my whole solution without expectacted to find any issue)

I saw that use version of elixir-mongodb-driver was relesese I'll try again with this one just in case

@mlarribe
Copy link
Author

mlarribe commented Dec 15, 2025

I've re-run the same test, with the lastest version available of the driver.

here the results I've hotloaded on a specific spots some dbg() in order to extract the context.

For the record: I have placed 2 dbg on 1 module Mongo

  defp update_documents(topology_pid, coll, filter, update, multi, opts) do
    write_concern = write_concern(opts)

    update =
      %{
        q: filter,
        u: update,
        upsert: Keyword.get(opts, :upsert),
        multi: multi,
        collation: Keyword.get(opts, :collation),
        arrayFilters: Keyword.get(opts, :array_filters),
        hint: Keyword.get(opts, :hint)
      }
      |> filter_nils()

    cmd =
      [
        update: coll,
        updates: [update],
        ordered: Keyword.get(opts, :ordered),
        writeConcern: write_concern,
        bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)
      ]
      |> filter_nils()
    dbg() #ADDED HERE for context
    with {:ok, doc} <- issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do
      case doc do
        %{"writeErrors" => _} ->
          {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}

        %{"n" => n, "nModified" => n_modified, "upserted" => upserted} ->
          case acknowledged?(write_concern) do
            false ->
              {:ok, %Mongo.UpdateResult{acknowledged: false}}

            true ->
              {:ok,
               %Mongo.UpdateResult{
                 matched_count: n,
                 modified_count: n_modified,
                 upserted_ids: filter_upsert_ids(upserted)
               }}
          end

        %{"n" => n, "nModified" => n_modified} ->
          case acknowledged?(write_concern) do
            false -> {:ok, %Mongo.UpdateResult{acknowledged: false}}
            true -> {:ok, %Mongo.UpdateResult{matched_count: n, modified_count: n_modified}}
          end

        _ ->
          {:ok, nil}
      end
    end
  end

update_documents is called by replace_onein my case

  def exec_command_session(session, cmd, opts) do
    with {:ok, conn, new_cmd, opts} <- Session.bind_session(session, cmd, opts),
         {:ok, _cmd, response} <- DBConnection.execute(conn, %Query{action: {:command, new_cmd}}, [], opts),
         :ok <- Session.update_session(session, response, opts),
         {:ok, {_flags, doc}} <- check_for_error(response, cmd, opts) do
          dbg() #ADDED HERE for context
      {:ok, doc}
    else
      {:error, error} ->
        {:error, error}
    end
  end

this one is obviously where the magic happened

Before diving into the bulky log below(sorry in advance for that), It will be more friendly to wrap the binding on an IDE and then open after when you find the discrepencies in the end results. (If you have a better way to share/represent with less info required please let me know)

pay attention to lsid and txnNumber, they didn't change overtime during the test while in the documentation you mentionned it's supposed to handled by the client drivers side (which sound to me a client issue not a MongoDB issue dispite the possible different behaviour you experienced).

during the binding session after the previous we should kind of expect the txnNumber to got bumper in or another. On this context at least it didn't happend well on client side .

...(116)> Mongo.delete_one(conn, "test", %{_id: id}) #RESET
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
  _cmd: %Mongo.Query{
    action: {:command,
     [
       delete: "test",
       deletes: [%{limit: 1, q: %{_id: 42}}],
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>}
     ]}
  },
  _flags: 0,
  cmd: [delete: "test", deletes: [%{limit: 1, q: %{_id: 42}}]],
  conn: #PID<0.9567.0>,
  doc: %{
    "$clusterTime" => %{
      "clusterTime" => #BSON.Timestamp<1765804902:1>,
      "signature" => %{
        "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
        "keyId" => 0
      }
    },
    "electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
    "n" => 1,
    "ok" => 1.0,
    "opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:1>},
    "operationTime" => #BSON.Timestamp<1765804902:1>
  },
  new_cmd: [
    delete: "test",
    deletes: [%{limit: 1, q: %{_id: 42}}],
    lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>}
  ],
  opts: [session: #PID<0.9898.0>, compression: true, write_counter: 1],
  response: {%{
     "$clusterTime" => %{
       "clusterTime" => #BSON.Timestamp<1765804902:1>,
       "signature" => %{
         "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
         "keyId" => 0
       }
     },
     "electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
     "n" => 1,
     "ok" => 1.0,
     "opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:1>},
     "operationTime" => #BSON.Timestamp<1765804902:1>
   },
   %Mongo.Events.CommandStartedEvent{
     command: [
       delete: "test",
       deletes: [%{limit: 1, q: %{_id: 42}}],
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>}
     ],
     database_name: "tbx-sb",
     command_name: :delete,
     request_id: 0,
     operation_id: nil,
     connection_id: #PID<0.9560.0>
   }, 0, 2459},
  session: #PID<0.9898.0>
]

{:ok, %Mongo.DeleteResult{acknowledged: true, deleted_count: 1}}
...(117)>
nil
...(118)> Mongo.insert_one(conn, "test", %{_id: id, val: 42, toto: "azerty"}) #FIRST INSERT
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
  _cmd: %Mongo.Query{
    action: {:command,
     [
       insert: "test",
       documents: [%{_id: 42, toto: "azerty", val: 42}],
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>}
     ]}
  },
  _flags: 0,
  cmd: [insert: "test", documents: [%{_id: 42, toto: "azerty", val: 42}]],
  conn: #PID<0.9567.0>,
  doc: %{
    "$clusterTime" => %{
      "clusterTime" => #BSON.Timestamp<1765804902:2>,
      "signature" => %{
        "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
        "keyId" => 0
      }
    },
    "electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
    "n" => 1,
    "ok" => 1.0,
    "opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:2>},
    "operationTime" => #BSON.Timestamp<1765804902:2>
  },
  new_cmd: [
    insert: "test",
    documents: [%{_id: 42, toto: "azerty", val: 42}],
    lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>}
  ],
  opts: [session: #PID<0.9899.0>, compression: true, write_counter: 1],
  response: {%{
     "$clusterTime" => %{
       "clusterTime" => #BSON.Timestamp<1765804902:2>,
       "signature" => %{
         "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
         "keyId" => 0
       }
     },
     "electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
     "n" => 1,
     "ok" => 1.0,
     "opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:2>},
     "operationTime" => #BSON.Timestamp<1765804902:2>
   },
   %Mongo.Events.CommandStartedEvent{
     command: [
       insert: "test",
       documents: [%{_id: 42, toto: "azerty", val: 42}],
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>}
     ],
     database_name: "tbx-sb",
     command_name: :insert,
     request_id: 0,
     operation_id: nil,
     connection_id: #PID<0.9560.0>
   }, 0, 1776},
  session: #PID<0.9899.0>
]

{:ok, %Mongo.InsertOneResult{acknowledged: true, inserted_id: 42}}
...(119)>
nil
...(120)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true) #CHECK OBJ UPDATE
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
  _cmd: %Mongo.Query{
    action: {:command,
     [
       find: "test",
       filter: [_id: 42],
       limit: 1,
       batchSize: 1,
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
       "$readPreference": %{mode: :primary}
     ]}
  },
  _flags: 0,
  cmd: [find: "test", filter: [_id: 42], limit: 1, batchSize: 1],
  conn: #PID<0.9567.0>,
  doc: %{
    "$clusterTime" => %{
      "clusterTime" => #BSON.Timestamp<1765804902:2>,
      "signature" => %{
        "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
        "keyId" => 0
      }
    },
    "cursor" => %{
      "firstBatch" => [%{"_id" => 42, "toto" => "azerty", "val" => 42}],
      "id" => 0,
      "ns" => "tbx-sb.test"
    },
    "ok" => 1.0,
    "operationTime" => #BSON.Timestamp<1765804902:2>
  },
  new_cmd: [
    find: "test",
    filter: [_id: 42],
    limit: 1,
    batchSize: 1,
    lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
    "$readPreference": %{mode: :primary}
  ],
  opts: [
    compression: true,
    batch_size: 1,
    retryable_reads: true,
    read_counter: 1
  ],
  response: {%{
     "$clusterTime" => %{
       "clusterTime" => #BSON.Timestamp<1765804902:2>,
       "signature" => %{
         "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
         "keyId" => 0
       }
     },
     "cursor" => %{
       "firstBatch" => [%{"_id" => 42, "toto" => "azerty", "val" => 42}],
       "id" => 0,
       "ns" => "tbx-sb.test"
     },
     "ok" => 1.0,
     "operationTime" => #BSON.Timestamp<1765804902:2>
   },
   %Mongo.Events.CommandStartedEvent{
     command: [
       find: "test",
       filter: [_id: 42],
       limit: 1,
       batchSize: 1,
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
       "$readPreference": %{mode: :primary}
     ],
     database_name: "tbx-sb",
     command_name: :find,
     request_id: 0,
     operation_id: nil,
     connection_id: #PID<0.9560.0>
   }, 0, 755},
  session: #PID<0.9900.0>
]

%{"_id" => 42, "toto" => "azerty", "val" => 42}
...(121)> transformed_object = obj |> Map.update("fieldA", 0, fn val -> val + 1 end)
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
...(122)> Mongo.replace_one(conn, "test", %{_id: id}, transformed_object, [upsert: true, retryable_writes: true]) #FIRST REPLACE OBJ
[(mongodb_driver 1.6.0) iex:1304: Mongo.update_documents/6]
binding() #=> [
  cmd: [
    update: "test",
    updates: [
      %{
        multi: false,
        q: %{_id: 42},
        u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42},
        upsert: true
      }
    ]
  ],
  coll: "test",
  filter: %{_id: 42},
  multi: false,
  opts: [upsert: true, retryable_writes: true],
  topology_pid: :poll_test_mongo,
  update: %{
    multi: false,
    q: %{_id: 42},
    u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42},
    upsert: true
  },
  write_concern: nil
]

[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
  _cmd: %Mongo.Query{
    action: {:command,
     [
       update: "test",
       updates: [
         %{
           multi: false,
           q: %{_id: 42},
           u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42},
           upsert: true
         }
       ],
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
       txnNumber: #BSON.LongNumber<0>
     ]}
  },
  _flags: 0,
  cmd: [
    update: "test",
    updates: [
      %{
        multi: false,
        q: %{_id: 42},
        u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42},
        upsert: true
      }
    ]
  ],
  conn: #PID<0.9567.0>,
  doc: %{
    "$clusterTime" => %{
      "clusterTime" => #BSON.Timestamp<1765804902:3>,
      "signature" => %{
        "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
        "keyId" => 0
      }
    },
    "electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
    "n" => 1,
    "nModified" => 1,
    "ok" => 1.0,
    "opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:3>},
    "operationTime" => #BSON.Timestamp<1765804902:3>
  },
  new_cmd: [
    update: "test",
    updates: [
      %{
        multi: false,
        q: %{_id: 42},
        u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42},
        upsert: true
      }
    ],
    lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
    txnNumber: #BSON.LongNumber<0>
  ],
  opts: [
    session: #PID<0.9901.0>,
    compression: true,
    upsert: true,
    retryable_writes: true,
    write_counter: 1
  ],
  response: {%{
     "$clusterTime" => %{
       "clusterTime" => #BSON.Timestamp<1765804902:3>,
       "signature" => %{
         "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
         "keyId" => 0
       }
     },
     "electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
     "n" => 1,
     "nModified" => 1,
     "ok" => 1.0,
     "opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:3>},
     "operationTime" => #BSON.Timestamp<1765804902:3>
   },
   %Mongo.Events.CommandStartedEvent{
     command: [
       update: "test",
       updates: [
         %{
           multi: false,
           q: %{_id: 42},
           u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42},
           upsert: true
         }
       ],
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
       txnNumber: #BSON.LongNumber<0>
     ],
     database_name: "tbx-sb",
     command_name: :update,
     request_id: 0,
     operation_id: nil,
     connection_id: #PID<0.9560.0>
   }, 0, 2954},
  session: #PID<0.9901.0>
]

{:ok,
 %Mongo.UpdateResult{
   acknowledged: true,
   matched_count: 1,
   modified_count: 1,
   upserted_ids: []
 }}
...(123)>
nil
...(124)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true) #CHECK OBJ UPDATE
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
  _cmd: %Mongo.Query{
    action: {:command,
     [
       find: "test",
       filter: [_id: 42],
       limit: 1,
       batchSize: 1,
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
       "$readPreference": %{mode: :primary}
     ]}
  },
  _flags: 0,
  cmd: [find: "test", filter: [_id: 42], limit: 1, batchSize: 1],
  conn: #PID<0.9567.0>,
  doc: %{
    "$clusterTime" => %{
      "clusterTime" => #BSON.Timestamp<1765804902:3>,
      "signature" => %{
        "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
        "keyId" => 0
      }
    },
    "cursor" => %{
      "firstBatch" => [
        %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
      ],
      "id" => 0,
      "ns" => "tbx-sb.test"
    },
    "ok" => 1.0,
    "operationTime" => #BSON.Timestamp<1765804902:3>
  },
  new_cmd: [
    find: "test",
    filter: [_id: 42],
    limit: 1,
    batchSize: 1,
    lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
    "$readPreference": %{mode: :primary}
  ],
  opts: [
    compression: true,
    batch_size: 1,
    retryable_reads: true,
    read_counter: 1
  ],
  response: {%{
     "$clusterTime" => %{
       "clusterTime" => #BSON.Timestamp<1765804902:3>,
       "signature" => %{
         "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
         "keyId" => 0
       }
     },
     "cursor" => %{
       "firstBatch" => [
         %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
       ],
       "id" => 0,
       "ns" => "tbx-sb.test"
     },
     "ok" => 1.0,
     "operationTime" => #BSON.Timestamp<1765804902:3>
   },
   %Mongo.Events.CommandStartedEvent{
     command: [
       find: "test",
       filter: [_id: 42],
       limit: 1,
       batchSize: 1,
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
       "$readPreference": %{mode: :primary}
     ],
     database_name: "tbx-sb",
     command_name: :find,
     request_id: 0,
     operation_id: nil,
     connection_id: #PID<0.9560.0>
   }, 0, 514},
  session: #PID<0.9902.0>
]

%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
...(125)>
nil
...(126)>
nil
...(127)>
nil
...(128)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true) #CHECK OBJ UPDATE
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
  _cmd: %Mongo.Query{
    action: {:command,
     [
       find: "test",
       filter: [_id: 42],
       limit: 1,
       batchSize: 1,
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
       "$readPreference": %{mode: :primary}
     ]}
  },
  _flags: 0,
  cmd: [find: "test", filter: [_id: 42], limit: 1, batchSize: 1],
  conn: #PID<0.9567.0>,
  doc: %{
    "$clusterTime" => %{
      "clusterTime" => #BSON.Timestamp<1765804902:3>,
      "signature" => %{
        "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
        "keyId" => 0
      }
    },
    "cursor" => %{
      "firstBatch" => [
        %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
      ],
      "id" => 0,
      "ns" => "tbx-sb.test"
    },
    "ok" => 1.0,
    "operationTime" => #BSON.Timestamp<1765804902:3>
  },
  new_cmd: [
    find: "test",
    filter: [_id: 42],
    limit: 1,
    batchSize: 1,
    lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
    "$readPreference": %{mode: :primary}
  ],
  opts: [
    compression: true,
    batch_size: 1,
    retryable_reads: true,
    read_counter: 1
  ],
  response: {%{
     "$clusterTime" => %{
       "clusterTime" => #BSON.Timestamp<1765804902:3>,
       "signature" => %{
         "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
         "keyId" => 0
       }
     },
     "cursor" => %{
       "firstBatch" => [
         %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
       ],
       "id" => 0,
       "ns" => "tbx-sb.test"
     },
     "ok" => 1.0,
     "operationTime" => #BSON.Timestamp<1765804902:3>
   },
   %Mongo.Events.CommandStartedEvent{
     command: [
       find: "test",
       filter: [_id: 42],
       limit: 1,
       batchSize: 1,
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
       "$readPreference": %{mode: :primary}
     ],
     database_name: "tbx-sb",
     command_name: :find,
     request_id: 0,
     operation_id: nil,
     connection_id: #PID<0.9560.0>
   }, 0, 806},
  session: #PID<0.9903.0>
]

%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
...(129)> transformed_object = obj |> Map.update("fieldA", 0, fn val -> val + 1 end)
%{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42}
...(130)> Mongo.replace_one(conn, "test", %{_id: id}, transformed_object, [upsert: true, retryable_writes: true]) #SECONDE OBJ UPDATE
[(mongodb_driver 1.6.0) iex:1304: Mongo.update_documents/6]
binding() #=> [
  cmd: [
    update: "test",
    updates: [
      %{
        multi: false,
        q: %{_id: 42},
        u: %{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42},
        upsert: true
      }
    ]
  ],
  coll: "test",
  filter: %{_id: 42},
  multi: false,
  opts: [upsert: true, retryable_writes: true],
  topology_pid: :poll_test_mongo,
  update: %{
    multi: false,
    q: %{_id: 42},
    u: %{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42},
    upsert: true
  },
  write_concern: nil
]

[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
  _cmd: %Mongo.Query{
    action: {:command,
     [
       update: "test",
       updates: [
         %{
           multi: false,
           q: %{_id: 42},
           u: %{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42},
           upsert: true
         }
       ],
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
       txnNumber: #BSON.LongNumber<0>
     ]}
  },
  _flags: 0,
  cmd: [
    update: "test",
    updates: [
      %{
        multi: false,
        q: %{_id: 42},
        u: %{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42},
        upsert: true
      }
    ]
  ],
  conn: #PID<0.9567.0>,
  doc: %{
    "$clusterTime" => %{
      "clusterTime" => #BSON.Timestamp<1765804902:3>,
      "signature" => %{
        "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
        "keyId" => 0
      }
    },
    "electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
    "n" => 1,
    "nModified" => 1,
    "ok" => 1.0,
    "opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:3>},
    "operationTime" => #BSON.Timestamp<1765804902:3>,
    "retriedStmtIds" => [0]
  },
  new_cmd: [
    update: "test",
    updates: [
      %{
        multi: false,
        q: %{_id: 42},
        u: %{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42},
        upsert: true
      }
    ],
    lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
    txnNumber: #BSON.LongNumber<0>
  ],
  opts: [
    session: #PID<0.9904.0>,
    compression: true,
    upsert: true,
    retryable_writes: true,
    write_counter: 1
  ],
  response: {%{
     "$clusterTime" => %{
       "clusterTime" => #BSON.Timestamp<1765804902:3>,
       "signature" => %{
         "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
         "keyId" => 0
       }
     },
     "electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
     "n" => 1,
     "nModified" => 1,
     "ok" => 1.0,
     "opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:3>},
     "operationTime" => #BSON.Timestamp<1765804902:3>,
     "retriedStmtIds" => [0]
   },
   %Mongo.Events.CommandStartedEvent{
     command: [
       update: "test",
       updates: [
         %{
           multi: false,
           q: %{_id: 42},
           u: %{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42},
           upsert: true
         }
       ],
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
       txnNumber: #BSON.LongNumber<0>
     ],
     database_name: "tbx-sb",
     command_name: :update,
     request_id: 0,
     operation_id: nil,
     connection_id: #PID<0.9560.0>
   }, 0, 857},
  session: #PID<0.9904.0>
]

{:ok,
 %Mongo.UpdateResult{
   acknowledged: true,
   matched_count: 1,
   modified_count: 1,
   upserted_ids: []
 }}
...(131)>
nil
...(132)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true)
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
  _cmd: %Mongo.Query{
    action: {:command,
     [
       find: "test",
       filter: [_id: 42],
       limit: 1,
       batchSize: 1,
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
       "$readPreference": %{mode: :primary}
     ]}
  },
  _flags: 0,
  cmd: [find: "test", filter: [_id: 42], limit: 1, batchSize: 1],
  conn: #PID<0.9567.0>,
  doc: %{
    "$clusterTime" => %{
      "clusterTime" => #BSON.Timestamp<1765804902:3>,
      "signature" => %{
        "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
        "keyId" => 0
      }
    },
    "cursor" => %{
      "firstBatch" => [
        %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
      ],
      "id" => 0,
      "ns" => "tbx-sb.test"
    },
    "ok" => 1.0,
    "operationTime" => #BSON.Timestamp<1765804902:3>
  },
  new_cmd: [
    find: "test",
    filter: [_id: 42],
    limit: 1,
    batchSize: 1,
    lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
    "$readPreference": %{mode: :primary}
  ],
  opts: [
    compression: true,
    batch_size: 1,
    retryable_reads: true,
    read_counter: 1
  ],
  response: {%{
     "$clusterTime" => %{
       "clusterTime" => #BSON.Timestamp<1765804902:3>,
       "signature" => %{
         "hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
         "keyId" => 0
       }
     },
     "cursor" => %{
       "firstBatch" => [
         %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
       ],
       "id" => 0,
       "ns" => "tbx-sb.test"
     },
     "ok" => 1.0,
     "operationTime" => #BSON.Timestamp<1765804902:3>
   },
   %Mongo.Events.CommandStartedEvent{
     command: [
       find: "test",
       filter: [_id: 42],
       limit: 1,
       batchSize: 1,
       lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
       "$readPreference": %{mode: :primary}
     ],
     database_name: "tbx-sb",
     command_name: :find,
     request_id: 0,
     operation_id: nil,
     connection_id: #PID<0.9560.0>
   }, 0, 492},
  session: #PID<0.9905.0>
]

%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42} #BROKEN SHOULD BE fieldA == "1" check the previous call resultes and bind for comparaison

@zookzook
Copy link
Owner

Thank you again for investing your time. I found the bug; it was simply a typo in a case statement. I fixed the typo here: b38f2fc

Please feel free to check out the master branch. I added a unit test as well. Thank you for your patience to chase this issue.

The "s" was missing here:

 ## in case of `:retryable_writes` we need to inc the transaction id
    server_session =
      case opts[:retryable_writes] do
        true -> ServerSession.next_txn_num(server_session)
        _ -> server_session
      end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants