-
Notifications
You must be signed in to change notification settings - Fork 74
Update session.ex add ServerSession.next_txn_num when ending the session to prevent silent no-write operation #287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…ion to prevent silent no-write operation fix retryable_write faulty, fix the issue that prevent replace_one to occures on concecutive update on the same document when sessions is checkin and then reused again (same document or different does matter, it's stays silently blocked until the session expires)
|
Thank you for providing a detailed description of your issue. Let's check the documentation that I found for the
The driver increments the txNumber when call startTransation: def handle_call_event(:start_transaction, transaction, %Session{server_session: session} = data) when transaction in [:no_transaction, :transaction_aborted, :transaction_committed] do
{:next_state, :starting_transaction, %Session{data | recovery_token: nil, server_session: ServerSession.next_txn_num(session)}, :ok}
endYour merge request would increment the |
|
I managed to reproduce the edge case in a simple without my fix ##Context: Cluster with Replication set to 3
conn = :poll_test_mongo
Mongo.start_link([
name: conn,
url: url,
pool_size: 100,
socket_options: [:inet6]
])
id = 42
##Run example
my-app(78)> Mongo.insert_one(conn, "test", %{_id: id, val: 42, toto: "azerty"}) #insert document first
{:ok, %Mongo.InsertOneResult{acknowledged: true, inserted_id: 42}}
my-app(79)>
nil
my-app(80)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true) #get the document
%{"_id" => 42, "toto" => "azerty", "val" => 42}
my-app(81)> transformed_object = obj |> Map.update("fieldA", 0, fn val -> val + 1 end) #prepare a modification of the document
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
my-app(82)> Mongo.replace_one(conn, "test", %{_id: id}, transformed_object, [upsert: true, retryable_writes: true]) #replace the doc
{:ok,
%Mongo.UpdateResult{
acknowledged: true,
matched_count: 1,
modified_count: 1,
upserted_ids: []
}}
my-app(83)>
nil
my-app(84)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true) #[BREAK] turn out the doc didn't changed
%{"_id" => 42, "toto" => "azerty", "val" => 42}
my-app(85)>
nil
my-app(86)>
nil
my-app(87)>
nil
my-app(88)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true)
%{"_id" => 42, "toto" => "azerty", "val" => 42}
my-app(89)> transformed_object = obj |> Map.update("fieldA", 0, fn val -> val + 1 end)
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
my-app(90)> Mongo.replace_one(conn, "test", %{_id: id}, transformed_object, [upsert: true, retryable_writes: true])
{:ok,
%Mongo.UpdateResult{
acknowledged: true,
matched_count: 1,
modified_count: 1,
upserted_ids: []
}}
my-app(91)>
nil
my-app(92)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true) #[BREAK] still broken
%{"_id" => 42, "toto" => "azerty", "val" => 42}On the test we can see that the document can't be updated what so ever till the sessions expires causing no update on the DB when ever we conduct replace_one several times with retry_writes after a first entrie. So when a new query show-up it tries to use an active session but since it wasn't updated properly (not sure about the best spot for the increase were supposed to be done to eventually avoid this double increase that you mention) it re-serves the same sessions so basically same |
|
Thanks for the detailed example. I have checked the specification for retryable_writes, and you are right!!!
I will check the code again, because I believe there is maybe a better place to increment the transaction number. |
|
I have checked the code. We already increment the transaction number here: def init(topology, conn, address, server_session, wire_version, opts) do
## in case of `:retryable_write` we need to inc the transaction id
server_session =
case opts[:retryable_write] do
true -> ServerSession.next_txn_num(server_session)
_ -> server_session
end
Now I tried your example and it works: iex [13:49 :: 2] > id = 42
42
iex [13:49 :: 3] > Mongo.insert_one(conn, "test", %{_id: id, val: 42, toto: "azerty"})
13:49:57.029 [info] CMD insert "test" [documents: [%{val: 42, _id: 42, toto: "azerty"}]] db=48.1ms (module=Mongo function=do_log/5 line=1952 )
{:ok, %Mongo.InsertOneResult{acknowledged: true, inserted_id: 42}}
iex [13:49 :: 4] > obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true)
13:50:08.015 [info] CMD find "test" [filter: [_id: 42], limit: 1, batchSize: 1] db=0.3ms (module=Mongo function=do_log/5 line=1952 )
%{"_id" => 42, "toto" => "azerty", "val" => 42}
iex [13:49 :: 5] > transformed_object = obj |> Map.update("fieldA", 0, fn val -> val + 1 end)
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
iex [13:49 :: 6] > Mongo.replace_one(conn, "test", %{_id: id}, transformed_object, [upsert: true, retryable_writes: true])
13:50:30.067 [info] CMD update "test" [
updates: [
%{
multi: false,
q: %{_id: 42},
upsert: true,
u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
}
]
] db=18.1ms (module=Mongo function=do_log/5 line=1952 )
{:ok, %Mongo.UpdateResult{acknowledged: true, matched_count: 1, modified_count: 1, upserted_ids: []}}
iex [13:49 :: 7] > obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true)
13:50:42.324 [info] CMD find "test" [filter: [_id: 42], limit: 1, batchSize: 1] db=0.9ms (module=Mongo function=do_log/5 line=1952 )
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
iex [13:49 :: 8] >
The result contains fieldA with value of 0. What MongoDB version do you use? I used 8.0.10 in a replica set of 3. |
|
I'm using a MongoDB 6.0.24 in a replicat set of 3. (I'll try to add more verbose around the use of the sessions to see how it jump up, last time it wasn't increased properperly at the re-use that basically where I started to look for a workaround) since the logger doesn't show the session information used by the request i's not super usefull to cornerise* the issue, (the logger was mostly helpfull at begging when I plugged the driver to my whole solution without expectacted to find any issue) I saw that use version of elixir-mongodb-driver was relesese I'll try again with this one just in case |
|
I've re-run the same test, with the lastest version available of the driver. here the results I've hotloaded on a specific spots some dbg() in order to extract the context. For the record: I have placed 2 dbg on 1 module defp update_documents(topology_pid, coll, filter, update, multi, opts) do
write_concern = write_concern(opts)
update =
%{
q: filter,
u: update,
upsert: Keyword.get(opts, :upsert),
multi: multi,
collation: Keyword.get(opts, :collation),
arrayFilters: Keyword.get(opts, :array_filters),
hint: Keyword.get(opts, :hint)
}
|> filter_nils()
cmd =
[
update: coll,
updates: [update],
ordered: Keyword.get(opts, :ordered),
writeConcern: write_concern,
bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)
]
|> filter_nils()
dbg() #ADDED HERE for context
with {:ok, doc} <- issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do
case doc do
%{"writeErrors" => _} ->
{:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
%{"n" => n, "nModified" => n_modified, "upserted" => upserted} ->
case acknowledged?(write_concern) do
false ->
{:ok, %Mongo.UpdateResult{acknowledged: false}}
true ->
{:ok,
%Mongo.UpdateResult{
matched_count: n,
modified_count: n_modified,
upserted_ids: filter_upsert_ids(upserted)
}}
end
%{"n" => n, "nModified" => n_modified} ->
case acknowledged?(write_concern) do
false -> {:ok, %Mongo.UpdateResult{acknowledged: false}}
true -> {:ok, %Mongo.UpdateResult{matched_count: n, modified_count: n_modified}}
end
_ ->
{:ok, nil}
end
end
end
def exec_command_session(session, cmd, opts) do
with {:ok, conn, new_cmd, opts} <- Session.bind_session(session, cmd, opts),
{:ok, _cmd, response} <- DBConnection.execute(conn, %Query{action: {:command, new_cmd}}, [], opts),
:ok <- Session.update_session(session, response, opts),
{:ok, {_flags, doc}} <- check_for_error(response, cmd, opts) do
dbg() #ADDED HERE for context
{:ok, doc}
else
{:error, error} ->
{:error, error}
end
endthis one is obviously where the magic happened Before diving into the bulky log below(sorry in advance for that), It will be more friendly to wrap the binding on an IDE and then open after when you find the discrepencies in the end results. (If you have a better way to share/represent with less info required please let me know) pay attention to during the binding session after the previous we should kind of expect the ...(116)> Mongo.delete_one(conn, "test", %{_id: id}) #RESET
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
_cmd: %Mongo.Query{
action: {:command,
[
delete: "test",
deletes: [%{limit: 1, q: %{_id: 42}}],
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>}
]}
},
_flags: 0,
cmd: [delete: "test", deletes: [%{limit: 1, q: %{_id: 42}}]],
conn: #PID<0.9567.0>,
doc: %{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:1>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
"n" => 1,
"ok" => 1.0,
"opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:1>},
"operationTime" => #BSON.Timestamp<1765804902:1>
},
new_cmd: [
delete: "test",
deletes: [%{limit: 1, q: %{_id: 42}}],
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>}
],
opts: [session: #PID<0.9898.0>, compression: true, write_counter: 1],
response: {%{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:1>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
"n" => 1,
"ok" => 1.0,
"opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:1>},
"operationTime" => #BSON.Timestamp<1765804902:1>
},
%Mongo.Events.CommandStartedEvent{
command: [
delete: "test",
deletes: [%{limit: 1, q: %{_id: 42}}],
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>}
],
database_name: "tbx-sb",
command_name: :delete,
request_id: 0,
operation_id: nil,
connection_id: #PID<0.9560.0>
}, 0, 2459},
session: #PID<0.9898.0>
]
{:ok, %Mongo.DeleteResult{acknowledged: true, deleted_count: 1}}
...(117)>
nil
...(118)> Mongo.insert_one(conn, "test", %{_id: id, val: 42, toto: "azerty"}) #FIRST INSERT
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
_cmd: %Mongo.Query{
action: {:command,
[
insert: "test",
documents: [%{_id: 42, toto: "azerty", val: 42}],
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>}
]}
},
_flags: 0,
cmd: [insert: "test", documents: [%{_id: 42, toto: "azerty", val: 42}]],
conn: #PID<0.9567.0>,
doc: %{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:2>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
"n" => 1,
"ok" => 1.0,
"opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:2>},
"operationTime" => #BSON.Timestamp<1765804902:2>
},
new_cmd: [
insert: "test",
documents: [%{_id: 42, toto: "azerty", val: 42}],
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>}
],
opts: [session: #PID<0.9899.0>, compression: true, write_counter: 1],
response: {%{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:2>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
"n" => 1,
"ok" => 1.0,
"opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:2>},
"operationTime" => #BSON.Timestamp<1765804902:2>
},
%Mongo.Events.CommandStartedEvent{
command: [
insert: "test",
documents: [%{_id: 42, toto: "azerty", val: 42}],
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>}
],
database_name: "tbx-sb",
command_name: :insert,
request_id: 0,
operation_id: nil,
connection_id: #PID<0.9560.0>
}, 0, 1776},
session: #PID<0.9899.0>
]
{:ok, %Mongo.InsertOneResult{acknowledged: true, inserted_id: 42}}
...(119)>
nil
...(120)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true) #CHECK OBJ UPDATE
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
_cmd: %Mongo.Query{
action: {:command,
[
find: "test",
filter: [_id: 42],
limit: 1,
batchSize: 1,
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
"$readPreference": %{mode: :primary}
]}
},
_flags: 0,
cmd: [find: "test", filter: [_id: 42], limit: 1, batchSize: 1],
conn: #PID<0.9567.0>,
doc: %{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:2>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"cursor" => %{
"firstBatch" => [%{"_id" => 42, "toto" => "azerty", "val" => 42}],
"id" => 0,
"ns" => "tbx-sb.test"
},
"ok" => 1.0,
"operationTime" => #BSON.Timestamp<1765804902:2>
},
new_cmd: [
find: "test",
filter: [_id: 42],
limit: 1,
batchSize: 1,
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
"$readPreference": %{mode: :primary}
],
opts: [
compression: true,
batch_size: 1,
retryable_reads: true,
read_counter: 1
],
response: {%{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:2>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"cursor" => %{
"firstBatch" => [%{"_id" => 42, "toto" => "azerty", "val" => 42}],
"id" => 0,
"ns" => "tbx-sb.test"
},
"ok" => 1.0,
"operationTime" => #BSON.Timestamp<1765804902:2>
},
%Mongo.Events.CommandStartedEvent{
command: [
find: "test",
filter: [_id: 42],
limit: 1,
batchSize: 1,
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
"$readPreference": %{mode: :primary}
],
database_name: "tbx-sb",
command_name: :find,
request_id: 0,
operation_id: nil,
connection_id: #PID<0.9560.0>
}, 0, 755},
session: #PID<0.9900.0>
]
%{"_id" => 42, "toto" => "azerty", "val" => 42}
...(121)> transformed_object = obj |> Map.update("fieldA", 0, fn val -> val + 1 end)
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
...(122)> Mongo.replace_one(conn, "test", %{_id: id}, transformed_object, [upsert: true, retryable_writes: true]) #FIRST REPLACE OBJ
[(mongodb_driver 1.6.0) iex:1304: Mongo.update_documents/6]
binding() #=> [
cmd: [
update: "test",
updates: [
%{
multi: false,
q: %{_id: 42},
u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42},
upsert: true
}
]
],
coll: "test",
filter: %{_id: 42},
multi: false,
opts: [upsert: true, retryable_writes: true],
topology_pid: :poll_test_mongo,
update: %{
multi: false,
q: %{_id: 42},
u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42},
upsert: true
},
write_concern: nil
]
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
_cmd: %Mongo.Query{
action: {:command,
[
update: "test",
updates: [
%{
multi: false,
q: %{_id: 42},
u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42},
upsert: true
}
],
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
txnNumber: #BSON.LongNumber<0>
]}
},
_flags: 0,
cmd: [
update: "test",
updates: [
%{
multi: false,
q: %{_id: 42},
u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42},
upsert: true
}
]
],
conn: #PID<0.9567.0>,
doc: %{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:3>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
"n" => 1,
"nModified" => 1,
"ok" => 1.0,
"opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:3>},
"operationTime" => #BSON.Timestamp<1765804902:3>
},
new_cmd: [
update: "test",
updates: [
%{
multi: false,
q: %{_id: 42},
u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42},
upsert: true
}
],
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
txnNumber: #BSON.LongNumber<0>
],
opts: [
session: #PID<0.9901.0>,
compression: true,
upsert: true,
retryable_writes: true,
write_counter: 1
],
response: {%{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:3>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
"n" => 1,
"nModified" => 1,
"ok" => 1.0,
"opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:3>},
"operationTime" => #BSON.Timestamp<1765804902:3>
},
%Mongo.Events.CommandStartedEvent{
command: [
update: "test",
updates: [
%{
multi: false,
q: %{_id: 42},
u: %{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42},
upsert: true
}
],
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
txnNumber: #BSON.LongNumber<0>
],
database_name: "tbx-sb",
command_name: :update,
request_id: 0,
operation_id: nil,
connection_id: #PID<0.9560.0>
}, 0, 2954},
session: #PID<0.9901.0>
]
{:ok,
%Mongo.UpdateResult{
acknowledged: true,
matched_count: 1,
modified_count: 1,
upserted_ids: []
}}
...(123)>
nil
...(124)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true) #CHECK OBJ UPDATE
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
_cmd: %Mongo.Query{
action: {:command,
[
find: "test",
filter: [_id: 42],
limit: 1,
batchSize: 1,
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
"$readPreference": %{mode: :primary}
]}
},
_flags: 0,
cmd: [find: "test", filter: [_id: 42], limit: 1, batchSize: 1],
conn: #PID<0.9567.0>,
doc: %{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:3>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"cursor" => %{
"firstBatch" => [
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
],
"id" => 0,
"ns" => "tbx-sb.test"
},
"ok" => 1.0,
"operationTime" => #BSON.Timestamp<1765804902:3>
},
new_cmd: [
find: "test",
filter: [_id: 42],
limit: 1,
batchSize: 1,
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
"$readPreference": %{mode: :primary}
],
opts: [
compression: true,
batch_size: 1,
retryable_reads: true,
read_counter: 1
],
response: {%{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:3>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"cursor" => %{
"firstBatch" => [
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
],
"id" => 0,
"ns" => "tbx-sb.test"
},
"ok" => 1.0,
"operationTime" => #BSON.Timestamp<1765804902:3>
},
%Mongo.Events.CommandStartedEvent{
command: [
find: "test",
filter: [_id: 42],
limit: 1,
batchSize: 1,
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
"$readPreference": %{mode: :primary}
],
database_name: "tbx-sb",
command_name: :find,
request_id: 0,
operation_id: nil,
connection_id: #PID<0.9560.0>
}, 0, 514},
session: #PID<0.9902.0>
]
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
...(125)>
nil
...(126)>
nil
...(127)>
nil
...(128)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true) #CHECK OBJ UPDATE
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
_cmd: %Mongo.Query{
action: {:command,
[
find: "test",
filter: [_id: 42],
limit: 1,
batchSize: 1,
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
"$readPreference": %{mode: :primary}
]}
},
_flags: 0,
cmd: [find: "test", filter: [_id: 42], limit: 1, batchSize: 1],
conn: #PID<0.9567.0>,
doc: %{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:3>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"cursor" => %{
"firstBatch" => [
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
],
"id" => 0,
"ns" => "tbx-sb.test"
},
"ok" => 1.0,
"operationTime" => #BSON.Timestamp<1765804902:3>
},
new_cmd: [
find: "test",
filter: [_id: 42],
limit: 1,
batchSize: 1,
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
"$readPreference": %{mode: :primary}
],
opts: [
compression: true,
batch_size: 1,
retryable_reads: true,
read_counter: 1
],
response: {%{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:3>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"cursor" => %{
"firstBatch" => [
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
],
"id" => 0,
"ns" => "tbx-sb.test"
},
"ok" => 1.0,
"operationTime" => #BSON.Timestamp<1765804902:3>
},
%Mongo.Events.CommandStartedEvent{
command: [
find: "test",
filter: [_id: 42],
limit: 1,
batchSize: 1,
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
"$readPreference": %{mode: :primary}
],
database_name: "tbx-sb",
command_name: :find,
request_id: 0,
operation_id: nil,
connection_id: #PID<0.9560.0>
}, 0, 806},
session: #PID<0.9903.0>
]
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
...(129)> transformed_object = obj |> Map.update("fieldA", 0, fn val -> val + 1 end)
%{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42}
...(130)> Mongo.replace_one(conn, "test", %{_id: id}, transformed_object, [upsert: true, retryable_writes: true]) #SECONDE OBJ UPDATE
[(mongodb_driver 1.6.0) iex:1304: Mongo.update_documents/6]
binding() #=> [
cmd: [
update: "test",
updates: [
%{
multi: false,
q: %{_id: 42},
u: %{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42},
upsert: true
}
]
],
coll: "test",
filter: %{_id: 42},
multi: false,
opts: [upsert: true, retryable_writes: true],
topology_pid: :poll_test_mongo,
update: %{
multi: false,
q: %{_id: 42},
u: %{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42},
upsert: true
},
write_concern: nil
]
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
_cmd: %Mongo.Query{
action: {:command,
[
update: "test",
updates: [
%{
multi: false,
q: %{_id: 42},
u: %{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42},
upsert: true
}
],
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
txnNumber: #BSON.LongNumber<0>
]}
},
_flags: 0,
cmd: [
update: "test",
updates: [
%{
multi: false,
q: %{_id: 42},
u: %{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42},
upsert: true
}
]
],
conn: #PID<0.9567.0>,
doc: %{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:3>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
"n" => 1,
"nModified" => 1,
"ok" => 1.0,
"opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:3>},
"operationTime" => #BSON.Timestamp<1765804902:3>,
"retriedStmtIds" => [0]
},
new_cmd: [
update: "test",
updates: [
%{
multi: false,
q: %{_id: 42},
u: %{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42},
upsert: true
}
],
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
txnNumber: #BSON.LongNumber<0>
],
opts: [
session: #PID<0.9904.0>,
compression: true,
upsert: true,
retryable_writes: true,
write_counter: 1
],
response: {%{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:3>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"electionId" => #BSON.ObjectId<7fffffff0000000000000036>,
"n" => 1,
"nModified" => 1,
"ok" => 1.0,
"opTime" => %{"t" => 54, "ts" => #BSON.Timestamp<1765804902:3>},
"operationTime" => #BSON.Timestamp<1765804902:3>,
"retriedStmtIds" => [0]
},
%Mongo.Events.CommandStartedEvent{
command: [
update: "test",
updates: [
%{
multi: false,
q: %{_id: 42},
u: %{"_id" => 42, "fieldA" => 1, "toto" => "azerty", "val" => 42},
upsert: true
}
],
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
txnNumber: #BSON.LongNumber<0>
],
database_name: "tbx-sb",
command_name: :update,
request_id: 0,
operation_id: nil,
connection_id: #PID<0.9560.0>
}, 0, 857},
session: #PID<0.9904.0>
]
{:ok,
%Mongo.UpdateResult{
acknowledged: true,
matched_count: 1,
modified_count: 1,
upserted_ids: []
}}
...(131)>
nil
...(132)> obj = Mongo.find_one(conn, "test", %{_id: id}, retryable_reads: true)
[(mongodb_driver 1.6.0) iex:1690: Mongo.exec_command_session/3]
binding() #=> [
_cmd: %Mongo.Query{
action: {:command,
[
find: "test",
filter: [_id: 42],
limit: 1,
batchSize: 1,
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
"$readPreference": %{mode: :primary}
]}
},
_flags: 0,
cmd: [find: "test", filter: [_id: 42], limit: 1, batchSize: 1],
conn: #PID<0.9567.0>,
doc: %{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:3>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"cursor" => %{
"firstBatch" => [
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
],
"id" => 0,
"ns" => "tbx-sb.test"
},
"ok" => 1.0,
"operationTime" => #BSON.Timestamp<1765804902:3>
},
new_cmd: [
find: "test",
filter: [_id: 42],
limit: 1,
batchSize: 1,
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
"$readPreference": %{mode: :primary}
],
opts: [
compression: true,
batch_size: 1,
retryable_reads: true,
read_counter: 1
],
response: {%{
"$clusterTime" => %{
"clusterTime" => #BSON.Timestamp<1765804902:3>,
"signature" => %{
"hash" => #BSON.Binary<0000000000000000000000000000000000000000>,
"keyId" => 0
}
},
"cursor" => %{
"firstBatch" => [
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42}
],
"id" => 0,
"ns" => "tbx-sb.test"
},
"ok" => 1.0,
"operationTime" => #BSON.Timestamp<1765804902:3>
},
%Mongo.Events.CommandStartedEvent{
command: [
find: "test",
filter: [_id: 42],
limit: 1,
batchSize: 1,
lsid: %{id: #BSON.UUID<0524c82a-7f8c-4a83-a82b-d29d32e53acb>},
"$readPreference": %{mode: :primary}
],
database_name: "tbx-sb",
command_name: :find,
request_id: 0,
operation_id: nil,
connection_id: #PID<0.9560.0>
}, 0, 492},
session: #PID<0.9905.0>
]
%{"_id" => 42, "fieldA" => 0, "toto" => "azerty", "val" => 42} #BROKEN SHOULD BE fieldA == "1" check the previous call resultes and bind for comparaison |
|
Thank you again for investing your time. I found the bug; it was simply a typo in a case statement. I fixed the typo here: b38f2fc Please feel free to check out the master branch. I added a unit test as well. Thank you for your patience to chase this issue. The "s" was missing here: ## in case of `:retryable_writes` we need to inc the transaction id
server_session =
case opts[:retryable_writes] do
true -> ServerSession.next_txn_num(server_session)
_ -> server_session
end |
Context:
I was testing the feature
Mongo.replace_onewith the enabled params :upsert: true, retryable_writes: truewhen I found out that none of my document was getting updated despite the change on the documents every time from an extract to next the very first attempt to replace_one.The digging
After some investigation I found that the issue was linked to the
retryable_writesbehaviour that was "using/re-using" the same sessionslsidandtxnNumber.From the documentation reusing the same
lsidwas supposed to be ok but not fortxnNumber, I was surprised to discover that thelsidandtxnNumberwas staying exactly the same after multiple find_one followed by some minor change on the doc followed by an attempt to beMongo.replace_oneagain. (the session_server was reused without increasing thetxnNumber)Thanks
Big heads up to log feature:
log: &IO.inspect/1that helped a start diagnose it even from a novice experience on the driver it selfI did some dummy test on own, and it solved my issue.
I may still have miss edge case so, Peers review will be very appreciated, I'm still very on the learning curve of the driver