From 9bca3c2c6bda395334a08df6c7db88ad3e62fc89 Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Wed, 24 Jun 2026 16:40:24 -0400 Subject: [PATCH 1/2] Sort key libicu NIF Add a sort key libicu NIF function. A sort key is an opaque binary representation generated by libicu from a key, which then can then be compared directly against other sort keys to produce an equivalent collation order as calling the pair-wise comparison libicu function. The idea to use sort keys in the fabric view row "merge head" structure, where we merge together streaming rows from multiple workers. When we do that we keep either a sorted list (for map-only views) and then do an insertion sort step and take the minimum, or we keep the rows in key/value structure for reduce views and find the minimum key and its grouped values. In either case we can reduce the number of libicu compare(a,b) calls from O(K^2) to just O(K) sort key generating calls and since libicu calls are not cheap, it worth adding an extra NIF calls just for it. As a side note: we've actually implemented this once during the now abandonned CouchDB 4.0 /w FoundationDB backed attempt, there we stored sort key in the database, which libicu workers do not recommend doing. Here we're planning on using in memory only on the coordinator. https://unicode-org.github.io/icu/userguide/collation/concepts#sortkeys-vs-comparison --- .../couch_ejson_compare/couch_ejson_compare.c | 77 +++++++++++++ src/couch/src/couch_ejson_compare.erl | 33 +++++- .../test/eunit/couch_ejson_compare_tests.erl | 102 ++++++++++++++++++ 3 files changed, 211 insertions(+), 1 deletion(-) diff --git a/src/couch/priv/couch_ejson_compare/couch_ejson_compare.c b/src/couch/priv/couch_ejson_compare/couch_ejson_compare.c index 09608451c81..650ac845074 100644 --- a/src/couch/priv/couch_ejson_compare/couch_ejson_compare.c +++ b/src/couch/priv/couch_ejson_compare/couch_ejson_compare.c @@ -72,6 +72,7 @@ static ErlNifMutex* collMutex = NULL; static ERL_NIF_TERM less_json_nif(ErlNifEnv*, int, const ERL_NIF_TERM []); static ERL_NIF_TERM compare_strings_nif(ErlNifEnv*, int, const ERL_NIF_TERM []); +static ERL_NIF_TERM get_sort_key_nif(ErlNifEnv*, int, const ERL_NIF_TERM []); static ERL_NIF_TERM get_icu_version(ErlNifEnv*, int, const ERL_NIF_TERM []); static ERL_NIF_TERM get_uca_version(ErlNifEnv*, int, const ERL_NIF_TERM []); static ERL_NIF_TERM get_collator_version(ErlNifEnv*, int, const ERL_NIF_TERM []); @@ -174,6 +175,81 @@ compare_strings_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) } +/* + * Return an ICU collation sort key as an Erlang binary. Two keys that collate + * equally produce identical sort keys. Sort keys can be used to keep a bunch + * of ICU key-value pairs in collation order (in a sorted KV data structure for + * example) and minimize the times ICU pair-wise comparison function would be + * called when keeping that data structure sorted. The only caveat is not to + * compare sort keys generated by different major versions of libicu, so use + * them on the same node in memory and don't store them on disk. + */ +ERL_NIF_TERM +get_sort_key_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + ErlNifBinary bin; + UCollator* coll; + UErrorCode status = U_ZERO_ERROR; + UCharIterator iter; + /* Part of libicu "Between calls to the API you need to save a 64-bit state" + * see https://unicode-org.github.io/icu/userguide/collation/api.html */ + uint32_t state[2] = {0, 0}; + /* This our stack cache */ + uint8_t keystack[256]; + uint8_t* key = keystack; + int32_t keycap = (int32_t) sizeof(keystack); + int32_t keylen = 0; + unsigned char* out; + ERL_NIF_TERM result; + + if (!enif_inspect_binary(env, argv[0], &bin)) { + return enif_make_badarg(env); + } + + coll = get_collator(); + if (coll == NULL) { + return enif_make_badarg(env); + } + + uiter_setUTF8(&iter, (const char*) bin.data, (uint32_t) bin.size); + + /* At first use a short 256 stack cache to fill the key in. If that gets + * too small start allocating memory. If we get less than our buffer size + * it means we're done. */ + for (;;) { + int32_t want = keycap - keylen; + int32_t got = ucol_nextSortKeyPart(coll, &iter, state, key + keylen, want, &status); + if (U_FAILURE(status)) { + if (key != keystack) { + enif_free(key); + } + return enif_make_badarg(env); + } + keylen += got; + if (got < want) { + break; + } + int32_t newcap = keycap * 2; + if (key == keystack) { + key = enif_alloc(newcap); + memcpy(key, keystack, keylen); + } else { + key = enif_realloc(key, newcap); + } + keycap = newcap; + } + /* Note: this crashes when out of memory */ + out = enif_make_new_binary(env, keylen, &result); + memcpy(out, key, keylen); + + if (key != keystack) { + enif_free(key); + } + + return result; +} + + ERL_NIF_TERM get_icu_version(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { @@ -555,6 +631,7 @@ on_unload(ErlNifEnv* env, void* priv_data) static ErlNifFunc nif_functions[] = { {"less_nif", 2, less_json_nif}, {"compare_strings_nif", 2, compare_strings_nif}, + {"get_sort_key_nif", 1, get_sort_key_nif}, {"get_icu_version", 0, get_icu_version}, {"get_uca_version", 0, get_uca_version}, {"get_collator_version", 0, get_collator_version} diff --git a/src/couch/src/couch_ejson_compare.erl b/src/couch/src/couch_ejson_compare.erl index 669f4136445..79ab1b065da 100644 --- a/src/couch/src/couch_ejson_compare.erl +++ b/src/couch/src/couch_ejson_compare.erl @@ -16,6 +16,7 @@ less/2, less_json_ids/2, less_json/2, + sort_key/1, get_icu_version/0, get_uca_version/0, get_collator_version/0 @@ -25,7 +26,8 @@ -export([ less_nif/2, less_erl/2, - compare_strings_nif/2 + compare_strings_nif/2, + get_sort_key_nif/1 ]). -on_load(init/0). @@ -58,6 +60,35 @@ less_json_ids({JsonA, IdA}, {JsonB, IdB}) -> less_json(A, B) -> less(A, B) < 0. +% Encode ejson to terms with native `less` ordering matching less/2 ICU +% collation order. The leading integer is the ejson object rank see [1]. The +% collection order is: +% +% null < false < true < num < str < array < object +% +% Sort keys should not be stored or compared against sort key generated by +% other major libicu version. The intent so to use these only at runtime, in +% memory on the same node (on the coordinator mostly likely when merge-sorting +% incoming view rows). +% +% [1] https://docs.couchdb.org/en/stable/ddocs/views/collation.html#collation-specification +% +sort_key(null) -> {0}; +sort_key(false) -> {1}; +sort_key(true) -> {2}; +sort_key(N) when is_number(N) -> {3, N}; +sort_key(B) when is_binary(B) -> {4, get_sort_key_nif(B)}; +sort_key(L) when is_list(L) -> {5, [sort_key(E) || E <- L]}; +sort_key({P}) when is_list(P) -> {6, [{sort_key(K), sort_key(V)} || {K, V} <- P]}; +% {<<255,255,255,255>>} sentinel (?MAX_JSON_OBJ max key) sorts above all json +% so give it the highest value. This may be a belt-and-suspenders just in case. +% In practice we should never emit that object but might see it as part of a +% query. +sort_key({<<255, 255, 255, 255>>}) -> {7, []}. + +get_sort_key_nif(_A) -> + erlang:nif_error(get_sort_key_nif_load_error). + get_icu_version() -> erlang:nif_error(get_icu_version). diff --git a/src/couch/test/eunit/couch_ejson_compare_tests.erl b/src/couch/test/eunit/couch_ejson_compare_tests.erl index 85b01aaaafa..e4e08be1907 100644 --- a/src/couch/test/eunit/couch_ejson_compare_tests.erl +++ b/src/couch/test/eunit/couch_ejson_compare_tests.erl @@ -104,6 +104,18 @@ prop_any_json_is_less_than_max_json() -> less(V, ?MAX_JSON_OBJ) =:= -1 end). +% ?MAX_JSON_OBJ (used in Mango) sorts the highest +prop_sort_key_test_values_less_than_max() -> + ?FORALL(V, oneof(?TEST_VALUES), begin + sort_key_cmp(V, ?MAX_JSON_OBJ) =:= -1 + end). + +% Any json value sorts lower than ?MAX_JSON_OBJ +prop_sort_key_any_json_less_than_max() -> + ?FORALL(V, json(), begin + sort_key_cmp(V, ?MAX_JSON_OBJ) =:= -1 + end). + % In general, for any json, the nif collator matches the erlang collator prop_nif_matches_erlang() -> ?FORALL( @@ -114,6 +126,48 @@ prop_nif_matches_erlang() -> end) ). +% Check sort key orders json values same as as less +prop_sort_key_matches_less() -> + ?FORALL( + A, + json(), + ?FORALL(B, json(), begin + sort_key_cmp(A, B) =:= less(A, B) + end) + ). + +% Sorting a list with sort key is the same as sorting it with less +prop_sort_key_sorts_like_less() -> + ?FORALL(L, list(json()), begin + ByLess = lists:sort(fun(A, B) -> couch_ejson_compare:less(A, B) =< 0 end, L), + BySortKey = lists:sort( + fun(A, B) -> couch_ejson_compare:sort_key(A) =< couch_ejson_compare:sort_key(B) end, L + ), + ByLess =:= BySortKey + end). + +% Specifically check unicode strings. (The general idea here is we we'd like to +% spend our "randomizaton budget" exploring unicode strings more than various +% term shapes). +prop_sort_key_nif_matches_less() -> + ?FORALL( + A, + sort_key_string(), + ?FORALL(B, sort_key_string(), begin + sort_key_nif_cmp(A, B) =:= less(A, B) + end) + ). + +% Extra check that grouping works. Surround the value various zero-width +% characters and ensure sort string is the same as without them. In other words +% we'd group things like this together. +prop_sort_key_equivalent_strings() -> + ?FORALL({Prefix, Suffix}, {zero_width_list(), zero_width_list()}, begin + Binary = unicode:characters_to_binary(Prefix ++ [$a] ++ Suffix), + SortKey = couch_ejson_compare:get_sort_key_nif(<<"a">>), + SortKey =:= couch_ejson_compare:get_sort_key_nif(Binary) + end). + % Generators json() -> @@ -164,6 +218,24 @@ zero_width_list() -> zero_width_chars() -> oneof([16#200B, 16#200C, 16#200D]). +% Besides handling json string we also handle ?MAX_UNICODE_STRING (the +% <<255,255,255,255>> to sorting values so make we mix that top sorting value +% into our values we pass into the ICU library. It should handle them as of ICU +% version >= 59 +sort_key_string() -> + oneof([json_string(), ?MAX_UNICODE_STRING]). + +sort_key_cmp(A, B) -> + term_cmp(couch_ejson_compare:sort_key(A), couch_ejson_compare:sort_key(B)). + +sort_key_nif_cmp(A, B) -> + term_cmp(couch_ejson_compare:get_sort_key_nif(A), couch_ejson_compare:get_sort_key_nif(B)). + +% Helper to return the same shape as less/2 +term_cmp(A, B) when A < B -> -1; +term_cmp(A, B) when A > B -> 1; +term_cmp(_, _) -> 0. + -else. -include_lib("couch/include/couch_eunit.hrl"). @@ -238,6 +310,36 @@ compare_strings_nif_test() -> ?assertError(badarg, compare_strings(<<"a">>, 42)), ?assertError(badarg, compare_strings(42, 42)). +% Here we test sort key can handle keys larger than the internal 256 stack +% buffer just so we can get some coverage there. +get_sort_key_nif_large_test() -> + Small = binary:copy(<<"a">>, 16), + Large = binary:copy(<<"a">>, 1000), + SmallKey = couch_ejson_compare:get_sort_key_nif(Small), + LargeKey = couch_ejson_compare:get_sort_key_nif(Large), + + ?assert(byte_size(SmallKey) =< 256), + ?assert(byte_size(LargeKey) > 512), + + % Check the large heap path against less/2 just for belt and suspenders + Larger = <>, + ?assertEqual(-1, less(Large, Larger)), + ?assert(LargeKey < couch_ejson_compare:get_sort_key_nif(Larger)), + + % Adding a lot of zero width junk to a large string still works + ZeroWidth = binary:copy(<<16#E2, 16#80, 16#8B>>, 300), + Equiv = <>, + ?assertEqual(0, less(Large, Equiv)), + ?assertEqual(LargeKey, couch_ejson_compare:get_sort_key_nif(Equiv)). + +sort_key_max_json_obj_test() -> + Max = couch_ejson_compare:sort_key(?MAX_JSON_OBJ), + ?assertEqual(Max, couch_ejson_compare:sort_key(?MAX_JSON_OBJ)), + lists:foreach( + fun(V) -> ?assertEqual(-1, sort_key_cmp(V, ?MAX_JSON_OBJ)) end, + [null, false, true, 42, <<"z">>, ?MAX_UNICODE_STRING, [1, 2], {[{<<"a">>, 1}]}] + ). + % Helper functions less(A, B) -> From 93d2666f860d119bfcc4002e12159404d9b79275 Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Fri, 26 Jun 2026 16:30:49 -0400 Subject: [PATCH 2/2] Speed up views with ICU sort keys In the previous commit we implemented sort keys and here is where we're using them to optimize views. On coordinators there are two separate places we optimize: the reduce views and map-only views. They are implemented somewhat differently. For both cases we win by generating the sort key once per row as it comes in, pay the CPU price once, and then when we merge sort it or insert it into the gb_tree when reducing. After that we only do Erlang comparisons, avoiding expensive repeated ICU pair-wise calls. For both map and reduce views use a common buf_key/2 function to generate the sort key or a raw key, depending on the user's collator setting. The map-only change is relatively straight-forward. We just use `{{buf_key, Id}, ROw}` as the sortable rows and keep the same merge-sort behavior. For reduce views we actually get a nice simplification. Previously, we had a map keyed by ejson key and search over it (order O(N)) on every emit. There were two distinct steps: 1) find the lowest key 2) find any other keys collating equal to it. We don't have to do that any longer, with gb_trees use the buf_key as the key and simply take the small (or greatest key). On a quick benchmark of 100k docs with Q=8 saw a decent speedup: ``` reduce (group level=3) : 5974ms -> 3294ms (1.8x) maps : 2699ms -> 1917ms (1.4x) ``` --- src/fabric/src/fabric_view.erl | 320 +++++++++++++++----------- src/fabric/src/fabric_view_map.erl | 83 +++++-- src/fabric/src/fabric_view_reduce.erl | 32 +-- 3 files changed, 267 insertions(+), 168 deletions(-) diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index 6d715e4e2fd..83c82508900 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -24,6 +24,7 @@ maybe_update_others/5 ]). -export([fix_skip_and_limit/1]). +-export([reduce_buffer_new/0, reduce_buffer_add/4, buf_key/2]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -262,14 +263,16 @@ get_next_row(#collector{reducer = RedSrc} = State0) when RedSrc =/= undefined -> #collector{ query_args = #mrargs{direction = Dir, extra = Options}, keys = Keys, - rows = RowDict0, + rows = Buffer0, lang = Lang, counters = Counters0, collation = Collation } = State0, - {Key, RestKeys} = find_next_key(Keys, Dir, Collation, RowDict0), - case reduce_row_dict_take(Key, RowDict0, Collation) of - {Records, RowDict} -> + case reduce_take(Keys, Dir, Collation, Buffer0) of + {skip, RestKeys} -> + % explicit key not present in the buffer, move on to the next one + get_next_row(State0#collector{keys = RestKeys}); + {Key, Rows, RestKeys, Buffer} -> Counters = lists:foldl( fun(Row, CntrsAcc) -> {Worker, From} = fabric_view_row:get_worker(Row), @@ -282,9 +285,9 @@ get_next_row(#collector{reducer = RedSrc} = State0) when RedSrc =/= undefined -> fabric_dict:update_counter(Worker, -1, CntrsAcc) end, Counters0, - Records + Rows ), - Wrapped = [[fabric_view_row:get_value(R)] || R <- Records], + Wrapped = [[fabric_view_row:get_value(R)] || R <- Rows], ReduceCtx = ?l2b( io_lib:format("~s/~s/_view/~s", [ State0#collector.db_name, @@ -294,66 +297,86 @@ get_next_row(#collector{reducer = RedSrc} = State0) when RedSrc =/= undefined -> ), {ok, [Reduced]} = couch_query_servers:rereduce(Lang, [RedSrc], Wrapped, ReduceCtx), {ok, Finalized} = couch_query_servers:finalize(RedSrc, Reduced), - State = State0#collector{keys = RestKeys, rows = RowDict, counters = Counters}, + State = State0#collector{keys = RestKeys, rows = Buffer, counters = Counters}, ViewRow = fabric_view_row:from_props( [{key, Key}, {id, reduced}, {value, Finalized}], Options ), - {ViewRow, State}; - error -> - get_next_row(State0#collector{keys = RestKeys}) + {ViewRow, State} end; get_next_row(State) -> - #collector{rows = [Row | Rest], counters = Counters0} = State, + #collector{rows = [Head | Rest], counters = Counters0} = State, + % map rows may be wrapped as {BufKey, Row} by fabric_view_map:merge_row; + % a plain row is tagged view_row, a wrapped one has a sort-key tuple first. + Row = + case element(1, Head) of + view_row -> Head; + _ -> element(2, Head) + end, {Worker, From} = fabric_view_row:get_worker(Row), rexi:stream_ack(From), Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), {Row, State#collector{rows = Rest, counters = Counters1}}. -reduce_row_dict_take(Key, RowMap, <<"raw">>) -> - maps:take(Key, RowMap); -reduce_row_dict_take(Key, RowMap, _Collation) -> - EqKeys = [K || K <- maps:keys(RowMap), couch_ejson_compare:less(K, Key) =:= 0], - case EqKeys of - [] -> - error; - [_ | _] -> - Vals = [map_get(K, RowMap) || K <- EqKeys], - {lists:flatten(Vals), maps:without(EqKeys, RowMap)} +% The reduce buffer is a gb_tree keyed by buf_key. A buf_key can be either a +% raw key (when using the "raw" collation) or an ICU sort term. Under ICU keys +% that collate equal will group under the same buf_key. Then take_smallest of a +% gb_tree will yield the next row to be emitted with an O(log N) complexity. +% (As opposed O(n) if just search through all the entries with a pair-wise ICU +% compare(A,B) function. +reduce_buffer_new() -> + gb_trees:empty(). + +reduce_buffer_add(Collation, Key, Row, Buffer) -> + BufKey = buf_key(Collation, Key), + case gb_trees:lookup(BufKey, Buffer) of + none -> + gb_trees:insert(BufKey, {Key, [Row]}, Buffer); + {value, {Key0, Rows}} -> + gb_trees:update(BufKey, {Key0, Rows ++ [Row]}, Buffer) end. -%% TODO: rectify nil <-> undefined discrepancies -find_next_key(nil, Dir, Collation, RowDict) -> - find_next_key(undefined, Dir, Collation, RowDict); -find_next_key(undefined, Dir, Collation, RowDict) -> - % Note: we need the smallest key only here. Before this used to effectively - % be hd(lists:sort()). With a potentially expensive collator comparison - % function we'd like to avoid resorting instead we just get the minimum. - case maps:keys(RowDict) of - [] -> +% The buffer key for views. For a "raw" collator it's just the key as is. For +% the default ICU collation it's sort key generated by ICU We also handle the +% partition wrapped rows and unwrap them before comparing (see detach_partition/1). +buf_key(Collation, {p, Partition, Key}) -> + {p, Partition, buf_key(Collation, Key)}; +buf_key(<<"raw">>, Key) -> + Key; +buf_key(_Collation, Key) -> + couch_ejson_compare:sort_key(Key). + +% Get the next key from the reduce buffer. This would be called from +% get_next_row() and and only after we've checked fabric_dict:any(0, Counters), +% so that when a group key is removed, all shards would have stream passed it. +% The rows we return is the complete set and we can then re-reduce them. +% +reduce_take(undefined, Dir, _Collation, Buffer) -> + % Get the next key in collation order or throw(complete) when no more keys + case gb_trees:is_empty(Buffer) of + true -> throw(complete); - [First | Rest] -> - Less = fun(A, B) -> compare(Dir, Collation, A, B) end, - MinKey = lists:foldl( - fun(Key, Min) -> - case Less(Key, Min) of - true -> Key; - false -> Min - end + false -> + {_BufKey, {Key, Rows}, Buffer1} = + case Dir of + rev -> gb_trees:take_largest(Buffer); + _ -> gb_trees:take_smallest(Buffer) end, - First, - Rest - ), - {MinKey, nil} + {Key, Rows, undefined, Buffer1} end; -find_next_key([], _, _, _) -> +reduce_take([], _Dir, _Collation, _Buffer) -> + % For an explicit list of user provided keys: + % - Get the next matching key or throw(complete) if no more keys remain + % - Return {skip, RestKeys} if we have keys but this key is not found throw(complete); -find_next_key([Key | Rest], _, _, _) -> - {Key, Rest}. - -compare(fwd, <<"raw">>, A, B) -> A < B; -compare(rev, <<"raw">>, A, B) -> B < A; -compare(fwd, _, A, B) -> couch_ejson_compare:less_json(A, B); -compare(rev, _, A, B) -> couch_ejson_compare:less_json(B, A). +reduce_take([Key | RestKeys], _Dir, Collation, Buffer) -> + % Explicit keys are emitted in the requested order + BufKey = buf_key(Collation, Key), + case gb_trees:lookup(BufKey, Buffer) of + none -> + {skip, RestKeys}; + {value, {Key0, Rows}} -> + {Key0, Rows, RestKeys, gb_trees:delete(BufKey, Buffer)} + end. extract_view(Pid, ViewName, [], _ViewType) -> couch_log:error("missing_named_view ~p", [ViewName]), @@ -494,6 +517,15 @@ remove_finalizer(Args) -> -include_lib("couch/include/couch_eunit.hrl"). +% These used to be our main collation comparison before switching to sort keys. +% Move them to the testing section to test before and after and that our sort +% keys work the same as pair-wise comparisons +% +compare(fwd, <<"raw">>, A, B) -> A < B; +compare(rev, <<"raw">>, A, B) -> B < A; +compare(fwd, _, A, B) -> couch_ejson_compare:less_json(A, B); +compare(rev, _, A, B) -> couch_ejson_compare:less_json(B, A). + remove_overlapping_shards_test() -> Cb = undefined, @@ -817,9 +849,13 @@ t_get_next_row_reduce(_) -> {view_row, #{value => value3, worker => {worker2, W2From}}} ], Values = [[value1], [value2], [value3]], - RowDict1 = maps:from_list([{key1, ViewRows1}, {key2, undefined}, {key3, undefined}]), - RowDict2 = maps:from_list([{key1, ViewRows2}, {key2, undefined}, {key3, undefined}]), - RowDict3 = maps:from_list([{key2, undefined}, {key3, undefined}]), + BuildBuffer = fun(Pairs) -> + lists:foldl(fun({K, V}, T) -> gb_trees:insert(K, {K, V}, T) end, gb_trees:empty(), Pairs) + end, + Buffer1 = BuildBuffer([{key1, ViewRows1}, {key2, undefined}, {key3, undefined}]), + Buffer2 = BuildBuffer([{key1, ViewRows2}, {key2, undefined}, {key3, undefined}]), + Buffer1Rest = gb_trees:delete(key1, Buffer1), + Buffer2Rest = gb_trees:delete(key1, Buffer2), Language = <<"language">>, Collation = <<"raw">>, Counters1 = [{worker1, 3}, {worker2, 5}], @@ -827,7 +863,7 @@ t_get_next_row_reduce(_) -> State1 = #collector{ query_args = QueryArgs1, keys = Keys, - rows = RowDict1, + rows = Buffer1, lang = Language, counters = Counters1, collation = Collation, @@ -836,7 +872,7 @@ t_get_next_row_reduce(_) -> State2 = #collector{ query_args = QueryArgs2, keys = Keys, - rows = RowDict2, + rows = Buffer2, lang = Language, counters = Counters1, collation = Collation, @@ -845,7 +881,7 @@ t_get_next_row_reduce(_) -> State3 = #collector{ query_args = QueryArgs1, keys = KeysRest, - rows = RowDict3, + rows = Buffer1Rest, lang = Language, collation = Collation, counters = Counters2, @@ -854,7 +890,7 @@ t_get_next_row_reduce(_) -> State4 = #collector{ query_args = QueryArgs2, keys = KeysRest, - rows = RowDict3, + rows = Buffer2Rest, lang = Language, collation = Collation, counters = Counters2, @@ -870,91 +906,115 @@ t_get_next_row_reduce(_) -> ?assertEqual({Row1, State3}, get_next_row(State1)), ?assertEqual({Row2, State4}, get_next_row(State2)). -find_next_key_empty_test() -> - ?assertThrow(complete, find_next_key(undefined, fwd, <<"raw">>, #{})), - ?assertThrow(complete, find_next_key(nil, fwd, <<"raw">>, #{})). - -find_next_key_min_raw_fwd_test() -> - RowDict = maps:from_list([{3, a}, {1, b}, {2, c}]), - ?assertEqual({1, nil}, find_next_key(undefined, fwd, <<"raw">>, RowDict)), - % a nil key list delegates to the undefined clause - ?assertEqual({1, nil}, find_next_key(nil, fwd, <<"raw">>, RowDict)). - -find_next_key_min_raw_rev_test() -> - RowDict = maps:from_list([{3, a}, {1, b}, {2, c}]), - ?assertEqual({3, nil}, find_next_key(undefined, rev, <<"raw">>, RowDict)). - -% Check ICU collator. Assert that sort and find_next_key does the same thing -% -find_next_key_min_collation_test() -> - % Not these are different under ICU vs raw Erlang - % M, a, z (Erlang) - % a, M, z (ICU) - Keys = [<<"z">>, <<"M">>, <<"a">>], - RowDict = maps:from_list([{K, v} || K <- Keys]), - lists:foreach( - fun(Dir) -> - Cmp = fun(A, B) -> compare(Dir, <<"icu">>, A, B) end, - [Expected | _] = lists:sort(Cmp, maps:keys(RowDict)), - ?assertEqual({Expected, nil}, find_next_key(undefined, Dir, <<"icu">>, RowDict)) +% Build reduce buffer as handle_row may do it, one row at a time +red_buffer(Collation, KeyRows) -> + lists:foldl( + fun({Key, Rows}, Buffer) -> + lists:foldl( + fun(Row, Acc) -> reduce_buffer_add(Collation, Key, Row, Acc) end, + Buffer, + Rows + ) end, - [fwd, rev] - ), - % Sanity check - IcuMin = element(1, find_next_key(undefined, fwd, <<"icu">>, RowDict)), - RawMin = element(1, find_next_key(undefined, fwd, <<"raw">>, RowDict)), + reduce_buffer_new(), + KeyRows + ). + +reduce_take_empty_test() -> + Empty = reduce_buffer_new(), + ?assertThrow(complete, reduce_take(undefined, fwd, <<"raw">>, Empty)), + ?assertThrow(complete, reduce_take([], fwd, <<"raw">>, Empty)). + +reduce_take_min_raw_fwd_test() -> + Buffer = red_buffer(<<"raw">>, [{3, [a]}, {1, [b]}, {2, [c]}]), + {Key, Rows, RestKeys, Buffer1} = reduce_take(undefined, fwd, <<"raw">>, Buffer), + ?assertEqual(1, Key), + ?assertEqual([b], Rows), + ?assertEqual(undefined, RestKeys), + % the next smallest is 2 + ?assertEqual(2, element(1, reduce_take(undefined, fwd, <<"raw">>, Buffer1))). + +reduce_take_min_raw_rev_test() -> + Buffer = red_buffer(<<"raw">>, [{3, [a]}, {1, [b]}, {2, [c]}]), + ?assertEqual(3, element(1, reduce_take(undefined, rev, <<"raw">>, Buffer))). + +% With ICU "a" < "M" < "z". With raw "M" < "a" < "z" +reduce_take_collation_order_test() -> + Pairs = [{K, [v]} || K <- [<<"z">>, <<"M">>, <<"a">>]], + IcuMin = element(1, reduce_take(undefined, fwd, <<"icu">>, red_buffer(<<"icu">>, Pairs))), + RawMin = element(1, reduce_take(undefined, fwd, <<"raw">>, red_buffer(<<"raw">>, Pairs))), ?assertEqual(<<"a">>, IcuMin), ?assertEqual(<<"M">>, RawMin), - % They do different things ?assertNotEqual(IcuMin, RawMin). -% Check min scan against sorting and taking the head -find_next_key_matches_sort_test() -> +% Sorting and taking the head returns th same as our sortkey consturct +reduce_take_matches_sort_test() -> Keys = [<<"k5">>, <<"k1">>, <<"k3">>, <<"k2">>, <<"k4">>], - RowDict = maps:from_list([{K, v} || K <- Keys]), + Pairs = [{K, [v]} || K <- Keys], lists:foreach( - fun(Dir) -> - CmpFun = fun(A, B) -> compare(Dir, <<"raw">>, A, B) end, - [Expected | _] = lists:sort(CmpFun, maps:keys(RowDict)), - ?assertEqual({Expected, nil}, find_next_key(undefined, Dir, <<"raw">>, RowDict)) + fun(Col) -> + Buffer = red_buffer(Col, Pairs), + lists:foreach( + fun(Dir) -> + Cmp = fun(A, B) -> compare(Dir, Col, A, B) end, + [Expected | _] = lists:sort(Cmp, Keys), + ?assertEqual( + Expected, element(1, reduce_take(undefined, Dir, Col, Buffer)) + ) + end, + [fwd, rev] + ) end, - [fwd, rev] + [<<"raw">>, <<"icu">>] ). -% A key list is returned head-first and untouched. -find_next_key_explicit_keys_test() -> - ?assertEqual({k1, [k2, k3]}, find_next_key([k1, k2, k3], fwd, <<"raw">>, #{})), - ?assertThrow(complete, find_next_key([], fwd, <<"raw">>, #{})). - -% Raw collation: keys match exactly -reduce_row_dict_take_raw_test() -> - RowMap = #{<<"a">> => [r1, r2], <<"b">> => [r3]}, - ?assertEqual({[r1, r2], #{<<"b">> => [r3]}}, reduce_row_dict_take(<<"a">>, RowMap, <<"raw">>)). - -reduce_row_dict_take_raw_missing_test() -> - RowMap = #{<<"a">> => [r1]}, - ?assertEqual(error, reduce_row_dict_take(<<"x">>, RowMap, <<"raw">>)). - -% With ICU use nfc and nfd forms of "é", they should match as equivalent but -% not equal in raw. We're making sure ICU is doing ICU things here. -reduce_row_dict_take_collation_test() -> +% Check explicit keys (user passes keys=...). +% - They should emit in that order +% - If any missing they should be skipped +% - Empty keys throws `complete` +reduce_take_explicit_keys_test() -> + Buffer = red_buffer(<<"raw">>, [{<<"a">>, [r1]}, {<<"b">>, [r2]}]), + {Key, Rows, RestKeys, Buffer1} = reduce_take([<<"a">>, <<"b">>], fwd, <<"raw">>, Buffer), + ?assertEqual(<<"a">>, Key), + ?assertEqual([r1], Rows), + ?assertEqual([<<"b">>], RestKeys), + ?assertEqual({skip, [<<"b">>]}, reduce_take([<<"x">>, <<"b">>], fwd, <<"raw">>, Buffer)), + ?assertThrow(complete, reduce_take([], fwd, <<"raw">>, Buffer1)). + +% A proper ICU check. "é" under NFC and NFD collate equal but have different +% bytes +reduce_take_collation_grouping_test() -> Nfc = <<195, 169>>, Nfd = <<101, 204, 129>>, - % Sanity check ?assertEqual(0, couch_ejson_compare:less(Nfc, Nfd)), - RowMap = #{Nfc => [r1], Nfd => [r2], <<"z">> => [r3]}, - {Vals, Rest} = reduce_row_dict_take(Nfc, RowMap, <<"icu">>), - ?assertEqual([r1, r2], lists:sort(Vals)), - ?assertEqual(#{<<"z">> => [r3]}, Rest). - -% With ICU and key that only equals itself -reduce_row_dict_take_collation_single_test() -> - RowMap = #{<<"a">> => [r1], <<"b">> => [r2]}, - ?assertEqual({[r1], #{<<"b">> => [r2]}}, reduce_row_dict_take(<<"a">>, RowMap, <<"icu">>)). - -% With ICU but key is missing altogether -reduce_row_dict_take_collation_missing_test() -> - RowMap = #{<<"a">> => [r1], <<"b">> => [r2]}, - ?assertEqual(error, reduce_row_dict_take(<<"zzz">>, RowMap, <<"icu">>)). + Buffer = red_buffer(<<"icu">>, [{Nfc, [r1]}, {Nfd, [r2]}, {<<"z">>, [r3]}]), + {Key, Rows, _, Buffer1} = reduce_take(undefined, fwd, <<"icu">>, Buffer), + ?assertEqual(Nfc, Key), + ?assertEqual([r1, r2], lists:sort(Rows)), + % We should have taken both r1 and r2 and only z should be left + {Key2, Rows2, _, _} = reduce_take(undefined, fwd, <<"icu">>, Buffer1), + ?assertEqual(<<"z">>, Key2), + ?assertEqual([r3], Rows2). + +% With ICU and a key that only equals itself +reduce_take_collation_single_test() -> + Buffer = red_buffer(<<"icu">>, [{<<"a">>, [r1]}, {<<"b">>, [r2]}]), + {Key, Rows, _, _} = reduce_take(undefined, fwd, <<"icu">>, Buffer), + ?assertEqual(<<"a">>, Key), + ?assertEqual([r1], Rows), + ?assertEqual({skip, []}, reduce_take([<<"zzz">>], fwd, <<"icu">>, Buffer)). + +% Partitioned view keys look like {p, Partition, Key}. Our buf_key should handle that. +buf_key_partition_test() -> + P = <<"foo">>, + ?assertEqual({p, P, 2}, buf_key(<<"raw">>, {p, P, 2})), + ?assertEqual({p, P, couch_ejson_compare:sort_key(2)}, buf_key(<<"icu">>, {p, P, 2})), + lists:foreach( + fun(Collation) -> + ?assert(buf_key(Collation, {p, P, 2}) < buf_key(Collation, {p, P, 4})), + ?assert(buf_key(Collation, {p, P, <<"a">>}) < buf_key(Collation, {p, P, <<"b">>})) + end, + [<<"raw">>, <<"icu">>] + ). -endif. diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index 56389e7df1f..c6604dd8df3 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -224,15 +224,21 @@ handle_message(insufficient_storage, _Worker, State) -> {stop, State}. merge_row(Dir, Collation, undefined, Row, Rows0) -> + % Use a sort key as the buffer key. When using ICU for collation (as we do + % by default) we spend some CPU time once to generate a sort key and then + % subsequently compare rows in Erlang during merging, as opposed to what we + % did previously, when we used ICU pair-wise comparisons NIF calls against + % all the existing rows in the buffer. + RowKey = fabric_view_row:get_key(Row), + BufKey = {fabric_view:buf_key(Collation, RowKey), fabric_view_row:get_id(Row)}, Rows1 = lists:merge( - fun(RowA, RowB) -> - KeyA = fabric_view_row:get_key(RowA), - KeyB = fabric_view_row:get_key(RowB), - IdA = fabric_view_row:get_id(RowA), - IdB = fabric_view_row:get_id(RowB), - compare(Dir, Collation, {KeyA, IdA}, {KeyB, IdB}) + fun({BKa, _}, {BKb, _}) -> + case Dir of + fwd -> BKa < BKb; + rev -> BKa > BKb + end end, - [Row], + [{BufKey, Row}], Rows0 ), {Rows1, undefined}; @@ -282,11 +288,6 @@ merge_row(Dir, Collation, KeyDict0, Row, Rows0) -> {Rows1, KeyDict1} end. -compare(fwd, <<"raw">>, A, B) -> A < B; -compare(rev, <<"raw">>, A, B) -> B < A; -compare(fwd, _, A, B) -> couch_ejson_compare:less_json_ids(A, B); -compare(rev, _, A, B) -> couch_ejson_compare:less_json_ids(B, A). - % KeyDict captures the user-supplied ordering of keys POSTed by the user by % mapping to integers (see fabric_view:keydict/1). It's possible that these keys % do not compare equal (i.e., =:=, used by dict) to those returned by the view @@ -323,7 +324,7 @@ handle_message_test_() -> foreach, fun() -> meck:new(foo, [non_strict]), - meck:new(fabric_view) + meck:new(fabric_view, [passthrough]) end, fun(_) -> meck:unload() end, [ @@ -465,10 +466,12 @@ t_handle_message_sorted(_) -> Rows21 = {view_row, #{id => id1, key => key1}}, Rows22 = {view_row, #{id => id2, key => key2, doc => doc2, worker => Worker}}, Rows23 = {view_row, #{id => id3, key => key3}}, - Rows1 = [Rows11, Rows13], - Rows2 = [Rows21, Rows23], - Rows3 = [Rows11, Rows12, Rows13], - Rows4 = [Rows21, Rows22, Rows23], + % with no POSTed keys the buffer holds rows wrapped as {{BufKey, Id}, Row} + Wrap = fun(Rows) -> [wrap(<<"raw">>, R) || R <- Rows] end, + Rows1 = Wrap([Rows11, Rows13]), + Rows2 = Wrap([Rows21, Rows23]), + Rows3 = Wrap([Rows11, Rows12, Rows13]), + Rows4 = Wrap([Rows21, Rows22, Rows23]), State1 = #collector{ sorted = true, limit = 10, @@ -552,6 +555,7 @@ merge_row_test_() -> fun(_) -> ok end, [ ?TDEF_FE(t_merge_row_no_keys), + ?TDEF_FE(t_merge_row_partition_keys), ?TDEF_FE(t_merge_row_raw), ?TDEF_FE(t_merge_row) ] @@ -561,17 +565,46 @@ t_merge_row_no_keys(_) -> Row1 = #view_row{id = id2, key = <<"key2">>}, Rows11 = #view_row{id = id1, key = <<"key1">>}, Rows13 = #view_row{id = id3, key = <<"key3">>}, - Rows1 = [Rows11, Rows13], - Rows3 = [Rows11, Row1, Rows13], Row2 = {view_row, #{id => id2, key => <<"key2">>}}, Rows21 = {view_row, #{id => id1, key => <<"key1">>}}, Rows23 = {view_row, #{id => id3, key => <<"key3">>}}, - Rows2 = [Rows23, Rows21], - Rows4 = [Rows23, Row2, Rows21], - ?assertEqual({Rows3, undefined}, merge_row(fwd, <<"raw">>, undefined, Row1, Rows1)), - ?assertEqual({Rows3, undefined}, merge_row(fwd, <<"collation">>, undefined, Row1, Rows1)), - ?assertEqual({Rows4, undefined}, merge_row(rev, <<"raw">>, undefined, Row2, Rows2)), - ?assertEqual({Rows4, undefined}, merge_row(rev, <<"collation">>, undefined, Row2, Rows2)). + % check merged row order for row/icu and fwd/reverse cases + lists:foreach( + fun(Col) -> + Wrap = fun(Rows) -> [wrap(Col, R) || R <- Rows] end, + {Fwd, undefined} = merge_row(fwd, Col, undefined, Row1, Wrap([Rows11, Rows13])), + ?assertEqual([Rows11, Row1, Rows13], unwrap(Fwd)), + {Rev, undefined} = merge_row(rev, Col, undefined, Row2, Wrap([Rows23, Rows21])), + ?assertEqual([Rows23, Row2, Rows21], unwrap(Rev)) + end, + [<<"raw">>, <<"icu">>] + ). + +% Partitioned views attach the partition to the key as {p, Partition, Key}; +% merge_row must handle that (regression: buf_key -> sort_key/1 crashed on the +% 3-tuple, 500ing partitioned view queries). +t_merge_row_partition_keys(_) -> + P = <<"foo">>, + Row1 = #view_row{id = id4, key = {p, P, 4}}, + Rows12 = #view_row{id = id2, key = {p, P, 2}}, + Rows16 = #view_row{id = id6, key = {p, P, 6}}, + lists:foreach( + fun(Col) -> + Wrap = fun(Rows) -> [wrap(Col, R) || R <- Rows] end, + {Merged, undefined} = merge_row(fwd, Col, undefined, Row1, Wrap([Rows12, Rows16])), + ?assertEqual([Rows12, Row1, Rows16], unwrap(Merged)) + end, + [<<"raw">>, <<"icu">>] + ). + +% This mimicks clause 1 of merge_row buffer +wrap(Collation, Row) -> + Key = fabric_view_row:get_key(Row), + Id = fabric_view_row:get_id(Row), + {{fabric_view:buf_key(Collation, Key), Id}, Row}. + +unwrap(Buffer) -> + [Row || {_BufKey, Row} <- Buffer]. t_merge_row_raw(_) -> Keys1 = dict:from_list([{key1, id1}, {key2, id2}, {key3, id3}]), diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index 93838fe0b18..24d77174827 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -89,7 +89,7 @@ go2(DbName, DDocId, VName, Workers, {red, {_, Lang, View}, _} = VInfo, Args, Cal os_proc = OsProc, reducer = RedSrc, collation = couch_util:get_value(<<"collation">>, View#mrview.options), - rows = #{}, + rows = fabric_view:reduce_buffer_new(), user_acc = Acc0, update_seq = case UpdateSeq of @@ -121,13 +121,13 @@ go2(DbName, DDocId, VName, Workers, {red, {_, Lang, View}, _} = VInfo, Args, Cal end. handle_row(Row0, {Worker, _} = Source, State) -> - #collector{counters = Counters0, rows = Rows0} = State, + #collector{counters = Counters0, rows = Buffer0, collation = Collation} = State, true = fabric_dict:is_key(Worker, Counters0), Row = fabric_view_row:set_worker(Row0, Source), Key = fabric_view_row:get_key(Row), - Rows = maps:update_with(Key, fun(Rs) -> Rs ++ [Row] end, [Row], Rows0), + Buffer = fabric_view:reduce_buffer_add(Collation, Key, Row, Buffer0), C1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows = Rows, counters = C1}, + State1 = State#collector{rows = Buffer, counters = C1}, fabric_view:maybe_send_row(State1). handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> @@ -197,7 +197,7 @@ handle_message_test_() -> foreach, fun() -> meck:new(foo, [non_strict]), - meck:new(fabric_view) + meck:new(fabric_view, [passthrough]) end, fun(_) -> meck:unload() end, [ @@ -282,18 +282,24 @@ t_handle_message_row(_) -> Worker = {worker, from}, Counters1 = [{worker, 3}], Counters2 = [{worker, 4}], + Collation = <<"raw">>, Row1 = #view_row{key = key1}, Row11 = #view_row{key = key1, worker = Worker}, - Rows1 = maps:from_list([{key1, []}, {key2, []}, {key3, []}]), - Rows3 = maps:from_list([{key1, [Row11]}, {key2, []}, {key3, []}]), Row2 = {view_row, #{key => key1}}, Row21 = {view_row, #{key => key1, worker => Worker}}, - Rows2 = maps:from_list([{key1, []}, {key2, []}, {key3, []}]), - Rows4 = maps:from_list([{key1, [Row21]}, {key2, []}, {key3, []}]), - State1 = #collector{counters = Counters1, rows = Rows1}, - State2 = #collector{counters = Counters1, rows = Rows2}, - State3 = #collector{counters = Counters2, rows = Rows3}, - State4 = #collector{counters = Counters2, rows = Rows4}, + % Feed the same seed buffer to the input and expected to get the resulting + % trees to be the same + Seed = lists:foldl( + fun(K, T) -> gb_trees:insert(K, {K, []}, T) end, + fabric_view:reduce_buffer_new(), + [key1, key2, key3] + ), + Buffer3 = fabric_view:reduce_buffer_add(Collation, key1, Row11, Seed), + Buffer4 = fabric_view:reduce_buffer_add(Collation, key1, Row21, Seed), + State1 = #collector{counters = Counters1, rows = Seed, collation = Collation}, + State2 = #collector{counters = Counters1, rows = Seed, collation = Collation}, + State3 = #collector{counters = Counters2, rows = Buffer3, collation = Collation}, + State4 = #collector{counters = Counters2, rows = Buffer4, collation = Collation}, meck:expect(fabric_view, maybe_send_row, [ {[State3], meck:val(maybe_row1)}, {[State4], meck:val(maybe_row2)} ]),