diff --git a/src/couch/priv/couch_ejson_compare/couch_ejson_compare.c b/src/couch/priv/couch_ejson_compare/couch_ejson_compare.c index 09608451c8..650ac84507 100644 --- a/src/couch/priv/couch_ejson_compare/couch_ejson_compare.c +++ b/src/couch/priv/couch_ejson_compare/couch_ejson_compare.c @@ -72,6 +72,7 @@ static ErlNifMutex* collMutex = NULL; static ERL_NIF_TERM less_json_nif(ErlNifEnv*, int, const ERL_NIF_TERM []); static ERL_NIF_TERM compare_strings_nif(ErlNifEnv*, int, const ERL_NIF_TERM []); +static ERL_NIF_TERM get_sort_key_nif(ErlNifEnv*, int, const ERL_NIF_TERM []); static ERL_NIF_TERM get_icu_version(ErlNifEnv*, int, const ERL_NIF_TERM []); static ERL_NIF_TERM get_uca_version(ErlNifEnv*, int, const ERL_NIF_TERM []); static ERL_NIF_TERM get_collator_version(ErlNifEnv*, int, const ERL_NIF_TERM []); @@ -174,6 +175,81 @@ compare_strings_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) } +/* + * Return an ICU collation sort key as an Erlang binary. Two keys that collate + * equally produce identical sort keys. Sort keys can be used to keep a bunch + * of ICU key-value pairs in collation order (in a sorted KV data structure for + * example) and minimize the times ICU pair-wise comparison function would be + * called when keeping that data structure sorted. The only caveat is not to + * compare sort keys generated by different major versions of libicu, so use + * them on the same node in memory and don't store them on disk. + */ +ERL_NIF_TERM +get_sort_key_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + ErlNifBinary bin; + UCollator* coll; + UErrorCode status = U_ZERO_ERROR; + UCharIterator iter; + /* Part of libicu "Between calls to the API you need to save a 64-bit state" + * see https://unicode-org.github.io/icu/userguide/collation/api.html */ + uint32_t state[2] = {0, 0}; + /* This our stack cache */ + uint8_t keystack[256]; + uint8_t* key = keystack; + int32_t keycap = (int32_t) sizeof(keystack); + int32_t keylen = 0; + unsigned char* out; + ERL_NIF_TERM result; + + if (!enif_inspect_binary(env, argv[0], &bin)) { + return enif_make_badarg(env); + } + + coll = get_collator(); + if (coll == NULL) { + return enif_make_badarg(env); + } + + uiter_setUTF8(&iter, (const char*) bin.data, (uint32_t) bin.size); + + /* At first use a short 256 stack cache to fill the key in. If that gets + * too small start allocating memory. If we get less than our buffer size + * it means we're done. */ + for (;;) { + int32_t want = keycap - keylen; + int32_t got = ucol_nextSortKeyPart(coll, &iter, state, key + keylen, want, &status); + if (U_FAILURE(status)) { + if (key != keystack) { + enif_free(key); + } + return enif_make_badarg(env); + } + keylen += got; + if (got < want) { + break; + } + int32_t newcap = keycap * 2; + if (key == keystack) { + key = enif_alloc(newcap); + memcpy(key, keystack, keylen); + } else { + key = enif_realloc(key, newcap); + } + keycap = newcap; + } + /* Note: this crashes when out of memory */ + out = enif_make_new_binary(env, keylen, &result); + memcpy(out, key, keylen); + + if (key != keystack) { + enif_free(key); + } + + return result; +} + + ERL_NIF_TERM get_icu_version(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { @@ -555,6 +631,7 @@ on_unload(ErlNifEnv* env, void* priv_data) static ErlNifFunc nif_functions[] = { {"less_nif", 2, less_json_nif}, {"compare_strings_nif", 2, compare_strings_nif}, + {"get_sort_key_nif", 1, get_sort_key_nif}, {"get_icu_version", 0, get_icu_version}, {"get_uca_version", 0, get_uca_version}, {"get_collator_version", 0, get_collator_version} diff --git a/src/couch/src/couch_ejson_compare.erl b/src/couch/src/couch_ejson_compare.erl index 669f413644..79ab1b065d 100644 --- a/src/couch/src/couch_ejson_compare.erl +++ b/src/couch/src/couch_ejson_compare.erl @@ -16,6 +16,7 @@ less/2, less_json_ids/2, less_json/2, + sort_key/1, get_icu_version/0, get_uca_version/0, get_collator_version/0 @@ -25,7 +26,8 @@ -export([ less_nif/2, less_erl/2, - compare_strings_nif/2 + compare_strings_nif/2, + get_sort_key_nif/1 ]). -on_load(init/0). @@ -58,6 +60,35 @@ less_json_ids({JsonA, IdA}, {JsonB, IdB}) -> less_json(A, B) -> less(A, B) < 0. +% Encode ejson to terms with native `less` ordering matching less/2 ICU +% collation order. The leading integer is the ejson object rank see [1]. The +% collection order is: +% +% null < false < true < num < str < array < object +% +% Sort keys should not be stored or compared against sort key generated by +% other major libicu version. The intent so to use these only at runtime, in +% memory on the same node (on the coordinator mostly likely when merge-sorting +% incoming view rows). +% +% [1] https://docs.couchdb.org/en/stable/ddocs/views/collation.html#collation-specification +% +sort_key(null) -> {0}; +sort_key(false) -> {1}; +sort_key(true) -> {2}; +sort_key(N) when is_number(N) -> {3, N}; +sort_key(B) when is_binary(B) -> {4, get_sort_key_nif(B)}; +sort_key(L) when is_list(L) -> {5, [sort_key(E) || E <- L]}; +sort_key({P}) when is_list(P) -> {6, [{sort_key(K), sort_key(V)} || {K, V} <- P]}; +% {<<255,255,255,255>>} sentinel (?MAX_JSON_OBJ max key) sorts above all json +% so give it the highest value. This may be a belt-and-suspenders just in case. +% In practice we should never emit that object but might see it as part of a +% query. +sort_key({<<255, 255, 255, 255>>}) -> {7, []}. + +get_sort_key_nif(_A) -> + erlang:nif_error(get_sort_key_nif_load_error). + get_icu_version() -> erlang:nif_error(get_icu_version). diff --git a/src/couch/test/eunit/couch_ejson_compare_tests.erl b/src/couch/test/eunit/couch_ejson_compare_tests.erl index 85b01aaaaf..e4e08be190 100644 --- a/src/couch/test/eunit/couch_ejson_compare_tests.erl +++ b/src/couch/test/eunit/couch_ejson_compare_tests.erl @@ -104,6 +104,18 @@ prop_any_json_is_less_than_max_json() -> less(V, ?MAX_JSON_OBJ) =:= -1 end). +% ?MAX_JSON_OBJ (used in Mango) sorts the highest +prop_sort_key_test_values_less_than_max() -> + ?FORALL(V, oneof(?TEST_VALUES), begin + sort_key_cmp(V, ?MAX_JSON_OBJ) =:= -1 + end). + +% Any json value sorts lower than ?MAX_JSON_OBJ +prop_sort_key_any_json_less_than_max() -> + ?FORALL(V, json(), begin + sort_key_cmp(V, ?MAX_JSON_OBJ) =:= -1 + end). + % In general, for any json, the nif collator matches the erlang collator prop_nif_matches_erlang() -> ?FORALL( @@ -114,6 +126,48 @@ prop_nif_matches_erlang() -> end) ). +% Check sort key orders json values same as as less +prop_sort_key_matches_less() -> + ?FORALL( + A, + json(), + ?FORALL(B, json(), begin + sort_key_cmp(A, B) =:= less(A, B) + end) + ). + +% Sorting a list with sort key is the same as sorting it with less +prop_sort_key_sorts_like_less() -> + ?FORALL(L, list(json()), begin + ByLess = lists:sort(fun(A, B) -> couch_ejson_compare:less(A, B) =< 0 end, L), + BySortKey = lists:sort( + fun(A, B) -> couch_ejson_compare:sort_key(A) =< couch_ejson_compare:sort_key(B) end, L + ), + ByLess =:= BySortKey + end). + +% Specifically check unicode strings. (The general idea here is we we'd like to +% spend our "randomizaton budget" exploring unicode strings more than various +% term shapes). +prop_sort_key_nif_matches_less() -> + ?FORALL( + A, + sort_key_string(), + ?FORALL(B, sort_key_string(), begin + sort_key_nif_cmp(A, B) =:= less(A, B) + end) + ). + +% Extra check that grouping works. Surround the value various zero-width +% characters and ensure sort string is the same as without them. In other words +% we'd group things like this together. +prop_sort_key_equivalent_strings() -> + ?FORALL({Prefix, Suffix}, {zero_width_list(), zero_width_list()}, begin + Binary = unicode:characters_to_binary(Prefix ++ [$a] ++ Suffix), + SortKey = couch_ejson_compare:get_sort_key_nif(<<"a">>), + SortKey =:= couch_ejson_compare:get_sort_key_nif(Binary) + end). + % Generators json() -> @@ -164,6 +218,24 @@ zero_width_list() -> zero_width_chars() -> oneof([16#200B, 16#200C, 16#200D]). +% Besides handling json string we also handle ?MAX_UNICODE_STRING (the +% <<255,255,255,255>> to sorting values so make we mix that top sorting value +% into our values we pass into the ICU library. It should handle them as of ICU +% version >= 59 +sort_key_string() -> + oneof([json_string(), ?MAX_UNICODE_STRING]). + +sort_key_cmp(A, B) -> + term_cmp(couch_ejson_compare:sort_key(A), couch_ejson_compare:sort_key(B)). + +sort_key_nif_cmp(A, B) -> + term_cmp(couch_ejson_compare:get_sort_key_nif(A), couch_ejson_compare:get_sort_key_nif(B)). + +% Helper to return the same shape as less/2 +term_cmp(A, B) when A < B -> -1; +term_cmp(A, B) when A > B -> 1; +term_cmp(_, _) -> 0. + -else. -include_lib("couch/include/couch_eunit.hrl"). @@ -238,6 +310,36 @@ compare_strings_nif_test() -> ?assertError(badarg, compare_strings(<<"a">>, 42)), ?assertError(badarg, compare_strings(42, 42)). +% Here we test sort key can handle keys larger than the internal 256 stack +% buffer just so we can get some coverage there. +get_sort_key_nif_large_test() -> + Small = binary:copy(<<"a">>, 16), + Large = binary:copy(<<"a">>, 1000), + SmallKey = couch_ejson_compare:get_sort_key_nif(Small), + LargeKey = couch_ejson_compare:get_sort_key_nif(Large), + + ?assert(byte_size(SmallKey) =< 256), + ?assert(byte_size(LargeKey) > 512), + + % Check the large heap path against less/2 just for belt and suspenders + Larger = <>, + ?assertEqual(-1, less(Large, Larger)), + ?assert(LargeKey < couch_ejson_compare:get_sort_key_nif(Larger)), + + % Adding a lot of zero width junk to a large string still works + ZeroWidth = binary:copy(<<16#E2, 16#80, 16#8B>>, 300), + Equiv = <>, + ?assertEqual(0, less(Large, Equiv)), + ?assertEqual(LargeKey, couch_ejson_compare:get_sort_key_nif(Equiv)). + +sort_key_max_json_obj_test() -> + Max = couch_ejson_compare:sort_key(?MAX_JSON_OBJ), + ?assertEqual(Max, couch_ejson_compare:sort_key(?MAX_JSON_OBJ)), + lists:foreach( + fun(V) -> ?assertEqual(-1, sort_key_cmp(V, ?MAX_JSON_OBJ)) end, + [null, false, true, 42, <<"z">>, ?MAX_UNICODE_STRING, [1, 2], {[{<<"a">>, 1}]}] + ). + % Helper functions less(A, B) -> diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index 6d715e4e2f..83c8250890 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -24,6 +24,7 @@ maybe_update_others/5 ]). -export([fix_skip_and_limit/1]). +-export([reduce_buffer_new/0, reduce_buffer_add/4, buf_key/2]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -262,14 +263,16 @@ get_next_row(#collector{reducer = RedSrc} = State0) when RedSrc =/= undefined -> #collector{ query_args = #mrargs{direction = Dir, extra = Options}, keys = Keys, - rows = RowDict0, + rows = Buffer0, lang = Lang, counters = Counters0, collation = Collation } = State0, - {Key, RestKeys} = find_next_key(Keys, Dir, Collation, RowDict0), - case reduce_row_dict_take(Key, RowDict0, Collation) of - {Records, RowDict} -> + case reduce_take(Keys, Dir, Collation, Buffer0) of + {skip, RestKeys} -> + % explicit key not present in the buffer, move on to the next one + get_next_row(State0#collector{keys = RestKeys}); + {Key, Rows, RestKeys, Buffer} -> Counters = lists:foldl( fun(Row, CntrsAcc) -> {Worker, From} = fabric_view_row:get_worker(Row), @@ -282,9 +285,9 @@ get_next_row(#collector{reducer = RedSrc} = State0) when RedSrc =/= undefined -> fabric_dict:update_counter(Worker, -1, CntrsAcc) end, Counters0, - Records + Rows ), - Wrapped = [[fabric_view_row:get_value(R)] || R <- Records], + Wrapped = [[fabric_view_row:get_value(R)] || R <- Rows], ReduceCtx = ?l2b( io_lib:format("~s/~s/_view/~s", [ State0#collector.db_name, @@ -294,66 +297,86 @@ get_next_row(#collector{reducer = RedSrc} = State0) when RedSrc =/= undefined -> ), {ok, [Reduced]} = couch_query_servers:rereduce(Lang, [RedSrc], Wrapped, ReduceCtx), {ok, Finalized} = couch_query_servers:finalize(RedSrc, Reduced), - State = State0#collector{keys = RestKeys, rows = RowDict, counters = Counters}, + State = State0#collector{keys = RestKeys, rows = Buffer, counters = Counters}, ViewRow = fabric_view_row:from_props( [{key, Key}, {id, reduced}, {value, Finalized}], Options ), - {ViewRow, State}; - error -> - get_next_row(State0#collector{keys = RestKeys}) + {ViewRow, State} end; get_next_row(State) -> - #collector{rows = [Row | Rest], counters = Counters0} = State, + #collector{rows = [Head | Rest], counters = Counters0} = State, + % map rows may be wrapped as {BufKey, Row} by fabric_view_map:merge_row; + % a plain row is tagged view_row, a wrapped one has a sort-key tuple first. + Row = + case element(1, Head) of + view_row -> Head; + _ -> element(2, Head) + end, {Worker, From} = fabric_view_row:get_worker(Row), rexi:stream_ack(From), Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), {Row, State#collector{rows = Rest, counters = Counters1}}. -reduce_row_dict_take(Key, RowMap, <<"raw">>) -> - maps:take(Key, RowMap); -reduce_row_dict_take(Key, RowMap, _Collation) -> - EqKeys = [K || K <- maps:keys(RowMap), couch_ejson_compare:less(K, Key) =:= 0], - case EqKeys of - [] -> - error; - [_ | _] -> - Vals = [map_get(K, RowMap) || K <- EqKeys], - {lists:flatten(Vals), maps:without(EqKeys, RowMap)} +% The reduce buffer is a gb_tree keyed by buf_key. A buf_key can be either a +% raw key (when using the "raw" collation) or an ICU sort term. Under ICU keys +% that collate equal will group under the same buf_key. Then take_smallest of a +% gb_tree will yield the next row to be emitted with an O(log N) complexity. +% (As opposed O(n) if just search through all the entries with a pair-wise ICU +% compare(A,B) function. +reduce_buffer_new() -> + gb_trees:empty(). + +reduce_buffer_add(Collation, Key, Row, Buffer) -> + BufKey = buf_key(Collation, Key), + case gb_trees:lookup(BufKey, Buffer) of + none -> + gb_trees:insert(BufKey, {Key, [Row]}, Buffer); + {value, {Key0, Rows}} -> + gb_trees:update(BufKey, {Key0, Rows ++ [Row]}, Buffer) end. -%% TODO: rectify nil <-> undefined discrepancies -find_next_key(nil, Dir, Collation, RowDict) -> - find_next_key(undefined, Dir, Collation, RowDict); -find_next_key(undefined, Dir, Collation, RowDict) -> - % Note: we need the smallest key only here. Before this used to effectively - % be hd(lists:sort()). With a potentially expensive collator comparison - % function we'd like to avoid resorting instead we just get the minimum. - case maps:keys(RowDict) of - [] -> +% The buffer key for views. For a "raw" collator it's just the key as is. For +% the default ICU collation it's sort key generated by ICU We also handle the +% partition wrapped rows and unwrap them before comparing (see detach_partition/1). +buf_key(Collation, {p, Partition, Key}) -> + {p, Partition, buf_key(Collation, Key)}; +buf_key(<<"raw">>, Key) -> + Key; +buf_key(_Collation, Key) -> + couch_ejson_compare:sort_key(Key). + +% Get the next key from the reduce buffer. This would be called from +% get_next_row() and and only after we've checked fabric_dict:any(0, Counters), +% so that when a group key is removed, all shards would have stream passed it. +% The rows we return is the complete set and we can then re-reduce them. +% +reduce_take(undefined, Dir, _Collation, Buffer) -> + % Get the next key in collation order or throw(complete) when no more keys + case gb_trees:is_empty(Buffer) of + true -> throw(complete); - [First | Rest] -> - Less = fun(A, B) -> compare(Dir, Collation, A, B) end, - MinKey = lists:foldl( - fun(Key, Min) -> - case Less(Key, Min) of - true -> Key; - false -> Min - end + false -> + {_BufKey, {Key, Rows}, Buffer1} = + case Dir of + rev -> gb_trees:take_largest(Buffer); + _ -> gb_trees:take_smallest(Buffer) end, - First, - Rest - ), - {MinKey, nil} + {Key, Rows, undefined, Buffer1} end; -find_next_key([], _, _, _) -> +reduce_take([], _Dir, _Collation, _Buffer) -> + % For an explicit list of user provided keys: + % - Get the next matching key or throw(complete) if no more keys remain + % - Return {skip, RestKeys} if we have keys but this key is not found throw(complete); -find_next_key([Key | Rest], _, _, _) -> - {Key, Rest}. - -compare(fwd, <<"raw">>, A, B) -> A < B; -compare(rev, <<"raw">>, A, B) -> B < A; -compare(fwd, _, A, B) -> couch_ejson_compare:less_json(A, B); -compare(rev, _, A, B) -> couch_ejson_compare:less_json(B, A). +reduce_take([Key | RestKeys], _Dir, Collation, Buffer) -> + % Explicit keys are emitted in the requested order + BufKey = buf_key(Collation, Key), + case gb_trees:lookup(BufKey, Buffer) of + none -> + {skip, RestKeys}; + {value, {Key0, Rows}} -> + {Key0, Rows, RestKeys, gb_trees:delete(BufKey, Buffer)} + end. extract_view(Pid, ViewName, [], _ViewType) -> couch_log:error("missing_named_view ~p", [ViewName]), @@ -494,6 +517,15 @@ remove_finalizer(Args) -> -include_lib("couch/include/couch_eunit.hrl"). +% These used to be our main collation comparison before switching to sort keys. +% Move them to the testing section to test before and after and that our sort +% keys work the same as pair-wise comparisons +% +compare(fwd, <<"raw">>, A, B) -> A < B; +compare(rev, <<"raw">>, A, B) -> B < A; +compare(fwd, _, A, B) -> couch_ejson_compare:less_json(A, B); +compare(rev, _, A, B) -> couch_ejson_compare:less_json(B, A). + remove_overlapping_shards_test() -> Cb = undefined, @@ -817,9 +849,13 @@ t_get_next_row_reduce(_) -> {view_row, #{value => value3, worker => {worker2, W2From}}} ], Values = [[value1], [value2], [value3]], - RowDict1 = maps:from_list([{key1, ViewRows1}, {key2, undefined}, {key3, undefined}]), - RowDict2 = maps:from_list([{key1, ViewRows2}, {key2, undefined}, {key3, undefined}]), - RowDict3 = maps:from_list([{key2, undefined}, {key3, undefined}]), + BuildBuffer = fun(Pairs) -> + lists:foldl(fun({K, V}, T) -> gb_trees:insert(K, {K, V}, T) end, gb_trees:empty(), Pairs) + end, + Buffer1 = BuildBuffer([{key1, ViewRows1}, {key2, undefined}, {key3, undefined}]), + Buffer2 = BuildBuffer([{key1, ViewRows2}, {key2, undefined}, {key3, undefined}]), + Buffer1Rest = gb_trees:delete(key1, Buffer1), + Buffer2Rest = gb_trees:delete(key1, Buffer2), Language = <<"language">>, Collation = <<"raw">>, Counters1 = [{worker1, 3}, {worker2, 5}], @@ -827,7 +863,7 @@ t_get_next_row_reduce(_) -> State1 = #collector{ query_args = QueryArgs1, keys = Keys, - rows = RowDict1, + rows = Buffer1, lang = Language, counters = Counters1, collation = Collation, @@ -836,7 +872,7 @@ t_get_next_row_reduce(_) -> State2 = #collector{ query_args = QueryArgs2, keys = Keys, - rows = RowDict2, + rows = Buffer2, lang = Language, counters = Counters1, collation = Collation, @@ -845,7 +881,7 @@ t_get_next_row_reduce(_) -> State3 = #collector{ query_args = QueryArgs1, keys = KeysRest, - rows = RowDict3, + rows = Buffer1Rest, lang = Language, collation = Collation, counters = Counters2, @@ -854,7 +890,7 @@ t_get_next_row_reduce(_) -> State4 = #collector{ query_args = QueryArgs2, keys = KeysRest, - rows = RowDict3, + rows = Buffer2Rest, lang = Language, collation = Collation, counters = Counters2, @@ -870,91 +906,115 @@ t_get_next_row_reduce(_) -> ?assertEqual({Row1, State3}, get_next_row(State1)), ?assertEqual({Row2, State4}, get_next_row(State2)). -find_next_key_empty_test() -> - ?assertThrow(complete, find_next_key(undefined, fwd, <<"raw">>, #{})), - ?assertThrow(complete, find_next_key(nil, fwd, <<"raw">>, #{})). - -find_next_key_min_raw_fwd_test() -> - RowDict = maps:from_list([{3, a}, {1, b}, {2, c}]), - ?assertEqual({1, nil}, find_next_key(undefined, fwd, <<"raw">>, RowDict)), - % a nil key list delegates to the undefined clause - ?assertEqual({1, nil}, find_next_key(nil, fwd, <<"raw">>, RowDict)). - -find_next_key_min_raw_rev_test() -> - RowDict = maps:from_list([{3, a}, {1, b}, {2, c}]), - ?assertEqual({3, nil}, find_next_key(undefined, rev, <<"raw">>, RowDict)). - -% Check ICU collator. Assert that sort and find_next_key does the same thing -% -find_next_key_min_collation_test() -> - % Not these are different under ICU vs raw Erlang - % M, a, z (Erlang) - % a, M, z (ICU) - Keys = [<<"z">>, <<"M">>, <<"a">>], - RowDict = maps:from_list([{K, v} || K <- Keys]), - lists:foreach( - fun(Dir) -> - Cmp = fun(A, B) -> compare(Dir, <<"icu">>, A, B) end, - [Expected | _] = lists:sort(Cmp, maps:keys(RowDict)), - ?assertEqual({Expected, nil}, find_next_key(undefined, Dir, <<"icu">>, RowDict)) +% Build reduce buffer as handle_row may do it, one row at a time +red_buffer(Collation, KeyRows) -> + lists:foldl( + fun({Key, Rows}, Buffer) -> + lists:foldl( + fun(Row, Acc) -> reduce_buffer_add(Collation, Key, Row, Acc) end, + Buffer, + Rows + ) end, - [fwd, rev] - ), - % Sanity check - IcuMin = element(1, find_next_key(undefined, fwd, <<"icu">>, RowDict)), - RawMin = element(1, find_next_key(undefined, fwd, <<"raw">>, RowDict)), + reduce_buffer_new(), + KeyRows + ). + +reduce_take_empty_test() -> + Empty = reduce_buffer_new(), + ?assertThrow(complete, reduce_take(undefined, fwd, <<"raw">>, Empty)), + ?assertThrow(complete, reduce_take([], fwd, <<"raw">>, Empty)). + +reduce_take_min_raw_fwd_test() -> + Buffer = red_buffer(<<"raw">>, [{3, [a]}, {1, [b]}, {2, [c]}]), + {Key, Rows, RestKeys, Buffer1} = reduce_take(undefined, fwd, <<"raw">>, Buffer), + ?assertEqual(1, Key), + ?assertEqual([b], Rows), + ?assertEqual(undefined, RestKeys), + % the next smallest is 2 + ?assertEqual(2, element(1, reduce_take(undefined, fwd, <<"raw">>, Buffer1))). + +reduce_take_min_raw_rev_test() -> + Buffer = red_buffer(<<"raw">>, [{3, [a]}, {1, [b]}, {2, [c]}]), + ?assertEqual(3, element(1, reduce_take(undefined, rev, <<"raw">>, Buffer))). + +% With ICU "a" < "M" < "z". With raw "M" < "a" < "z" +reduce_take_collation_order_test() -> + Pairs = [{K, [v]} || K <- [<<"z">>, <<"M">>, <<"a">>]], + IcuMin = element(1, reduce_take(undefined, fwd, <<"icu">>, red_buffer(<<"icu">>, Pairs))), + RawMin = element(1, reduce_take(undefined, fwd, <<"raw">>, red_buffer(<<"raw">>, Pairs))), ?assertEqual(<<"a">>, IcuMin), ?assertEqual(<<"M">>, RawMin), - % They do different things ?assertNotEqual(IcuMin, RawMin). -% Check min scan against sorting and taking the head -find_next_key_matches_sort_test() -> +% Sorting and taking the head returns th same as our sortkey consturct +reduce_take_matches_sort_test() -> Keys = [<<"k5">>, <<"k1">>, <<"k3">>, <<"k2">>, <<"k4">>], - RowDict = maps:from_list([{K, v} || K <- Keys]), + Pairs = [{K, [v]} || K <- Keys], lists:foreach( - fun(Dir) -> - CmpFun = fun(A, B) -> compare(Dir, <<"raw">>, A, B) end, - [Expected | _] = lists:sort(CmpFun, maps:keys(RowDict)), - ?assertEqual({Expected, nil}, find_next_key(undefined, Dir, <<"raw">>, RowDict)) + fun(Col) -> + Buffer = red_buffer(Col, Pairs), + lists:foreach( + fun(Dir) -> + Cmp = fun(A, B) -> compare(Dir, Col, A, B) end, + [Expected | _] = lists:sort(Cmp, Keys), + ?assertEqual( + Expected, element(1, reduce_take(undefined, Dir, Col, Buffer)) + ) + end, + [fwd, rev] + ) end, - [fwd, rev] + [<<"raw">>, <<"icu">>] ). -% A key list is returned head-first and untouched. -find_next_key_explicit_keys_test() -> - ?assertEqual({k1, [k2, k3]}, find_next_key([k1, k2, k3], fwd, <<"raw">>, #{})), - ?assertThrow(complete, find_next_key([], fwd, <<"raw">>, #{})). - -% Raw collation: keys match exactly -reduce_row_dict_take_raw_test() -> - RowMap = #{<<"a">> => [r1, r2], <<"b">> => [r3]}, - ?assertEqual({[r1, r2], #{<<"b">> => [r3]}}, reduce_row_dict_take(<<"a">>, RowMap, <<"raw">>)). - -reduce_row_dict_take_raw_missing_test() -> - RowMap = #{<<"a">> => [r1]}, - ?assertEqual(error, reduce_row_dict_take(<<"x">>, RowMap, <<"raw">>)). - -% With ICU use nfc and nfd forms of "é", they should match as equivalent but -% not equal in raw. We're making sure ICU is doing ICU things here. -reduce_row_dict_take_collation_test() -> +% Check explicit keys (user passes keys=...). +% - They should emit in that order +% - If any missing they should be skipped +% - Empty keys throws `complete` +reduce_take_explicit_keys_test() -> + Buffer = red_buffer(<<"raw">>, [{<<"a">>, [r1]}, {<<"b">>, [r2]}]), + {Key, Rows, RestKeys, Buffer1} = reduce_take([<<"a">>, <<"b">>], fwd, <<"raw">>, Buffer), + ?assertEqual(<<"a">>, Key), + ?assertEqual([r1], Rows), + ?assertEqual([<<"b">>], RestKeys), + ?assertEqual({skip, [<<"b">>]}, reduce_take([<<"x">>, <<"b">>], fwd, <<"raw">>, Buffer)), + ?assertThrow(complete, reduce_take([], fwd, <<"raw">>, Buffer1)). + +% A proper ICU check. "é" under NFC and NFD collate equal but have different +% bytes +reduce_take_collation_grouping_test() -> Nfc = <<195, 169>>, Nfd = <<101, 204, 129>>, - % Sanity check ?assertEqual(0, couch_ejson_compare:less(Nfc, Nfd)), - RowMap = #{Nfc => [r1], Nfd => [r2], <<"z">> => [r3]}, - {Vals, Rest} = reduce_row_dict_take(Nfc, RowMap, <<"icu">>), - ?assertEqual([r1, r2], lists:sort(Vals)), - ?assertEqual(#{<<"z">> => [r3]}, Rest). - -% With ICU and key that only equals itself -reduce_row_dict_take_collation_single_test() -> - RowMap = #{<<"a">> => [r1], <<"b">> => [r2]}, - ?assertEqual({[r1], #{<<"b">> => [r2]}}, reduce_row_dict_take(<<"a">>, RowMap, <<"icu">>)). - -% With ICU but key is missing altogether -reduce_row_dict_take_collation_missing_test() -> - RowMap = #{<<"a">> => [r1], <<"b">> => [r2]}, - ?assertEqual(error, reduce_row_dict_take(<<"zzz">>, RowMap, <<"icu">>)). + Buffer = red_buffer(<<"icu">>, [{Nfc, [r1]}, {Nfd, [r2]}, {<<"z">>, [r3]}]), + {Key, Rows, _, Buffer1} = reduce_take(undefined, fwd, <<"icu">>, Buffer), + ?assertEqual(Nfc, Key), + ?assertEqual([r1, r2], lists:sort(Rows)), + % We should have taken both r1 and r2 and only z should be left + {Key2, Rows2, _, _} = reduce_take(undefined, fwd, <<"icu">>, Buffer1), + ?assertEqual(<<"z">>, Key2), + ?assertEqual([r3], Rows2). + +% With ICU and a key that only equals itself +reduce_take_collation_single_test() -> + Buffer = red_buffer(<<"icu">>, [{<<"a">>, [r1]}, {<<"b">>, [r2]}]), + {Key, Rows, _, _} = reduce_take(undefined, fwd, <<"icu">>, Buffer), + ?assertEqual(<<"a">>, Key), + ?assertEqual([r1], Rows), + ?assertEqual({skip, []}, reduce_take([<<"zzz">>], fwd, <<"icu">>, Buffer)). + +% Partitioned view keys look like {p, Partition, Key}. Our buf_key should handle that. +buf_key_partition_test() -> + P = <<"foo">>, + ?assertEqual({p, P, 2}, buf_key(<<"raw">>, {p, P, 2})), + ?assertEqual({p, P, couch_ejson_compare:sort_key(2)}, buf_key(<<"icu">>, {p, P, 2})), + lists:foreach( + fun(Collation) -> + ?assert(buf_key(Collation, {p, P, 2}) < buf_key(Collation, {p, P, 4})), + ?assert(buf_key(Collation, {p, P, <<"a">>}) < buf_key(Collation, {p, P, <<"b">>})) + end, + [<<"raw">>, <<"icu">>] + ). -endif. diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index 56389e7df1..c6604dd8df 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -224,15 +224,21 @@ handle_message(insufficient_storage, _Worker, State) -> {stop, State}. merge_row(Dir, Collation, undefined, Row, Rows0) -> + % Use a sort key as the buffer key. When using ICU for collation (as we do + % by default) we spend some CPU time once to generate a sort key and then + % subsequently compare rows in Erlang during merging, as opposed to what we + % did previously, when we used ICU pair-wise comparisons NIF calls against + % all the existing rows in the buffer. + RowKey = fabric_view_row:get_key(Row), + BufKey = {fabric_view:buf_key(Collation, RowKey), fabric_view_row:get_id(Row)}, Rows1 = lists:merge( - fun(RowA, RowB) -> - KeyA = fabric_view_row:get_key(RowA), - KeyB = fabric_view_row:get_key(RowB), - IdA = fabric_view_row:get_id(RowA), - IdB = fabric_view_row:get_id(RowB), - compare(Dir, Collation, {KeyA, IdA}, {KeyB, IdB}) + fun({BKa, _}, {BKb, _}) -> + case Dir of + fwd -> BKa < BKb; + rev -> BKa > BKb + end end, - [Row], + [{BufKey, Row}], Rows0 ), {Rows1, undefined}; @@ -282,11 +288,6 @@ merge_row(Dir, Collation, KeyDict0, Row, Rows0) -> {Rows1, KeyDict1} end. -compare(fwd, <<"raw">>, A, B) -> A < B; -compare(rev, <<"raw">>, A, B) -> B < A; -compare(fwd, _, A, B) -> couch_ejson_compare:less_json_ids(A, B); -compare(rev, _, A, B) -> couch_ejson_compare:less_json_ids(B, A). - % KeyDict captures the user-supplied ordering of keys POSTed by the user by % mapping to integers (see fabric_view:keydict/1). It's possible that these keys % do not compare equal (i.e., =:=, used by dict) to those returned by the view @@ -323,7 +324,7 @@ handle_message_test_() -> foreach, fun() -> meck:new(foo, [non_strict]), - meck:new(fabric_view) + meck:new(fabric_view, [passthrough]) end, fun(_) -> meck:unload() end, [ @@ -465,10 +466,12 @@ t_handle_message_sorted(_) -> Rows21 = {view_row, #{id => id1, key => key1}}, Rows22 = {view_row, #{id => id2, key => key2, doc => doc2, worker => Worker}}, Rows23 = {view_row, #{id => id3, key => key3}}, - Rows1 = [Rows11, Rows13], - Rows2 = [Rows21, Rows23], - Rows3 = [Rows11, Rows12, Rows13], - Rows4 = [Rows21, Rows22, Rows23], + % with no POSTed keys the buffer holds rows wrapped as {{BufKey, Id}, Row} + Wrap = fun(Rows) -> [wrap(<<"raw">>, R) || R <- Rows] end, + Rows1 = Wrap([Rows11, Rows13]), + Rows2 = Wrap([Rows21, Rows23]), + Rows3 = Wrap([Rows11, Rows12, Rows13]), + Rows4 = Wrap([Rows21, Rows22, Rows23]), State1 = #collector{ sorted = true, limit = 10, @@ -552,6 +555,7 @@ merge_row_test_() -> fun(_) -> ok end, [ ?TDEF_FE(t_merge_row_no_keys), + ?TDEF_FE(t_merge_row_partition_keys), ?TDEF_FE(t_merge_row_raw), ?TDEF_FE(t_merge_row) ] @@ -561,17 +565,46 @@ t_merge_row_no_keys(_) -> Row1 = #view_row{id = id2, key = <<"key2">>}, Rows11 = #view_row{id = id1, key = <<"key1">>}, Rows13 = #view_row{id = id3, key = <<"key3">>}, - Rows1 = [Rows11, Rows13], - Rows3 = [Rows11, Row1, Rows13], Row2 = {view_row, #{id => id2, key => <<"key2">>}}, Rows21 = {view_row, #{id => id1, key => <<"key1">>}}, Rows23 = {view_row, #{id => id3, key => <<"key3">>}}, - Rows2 = [Rows23, Rows21], - Rows4 = [Rows23, Row2, Rows21], - ?assertEqual({Rows3, undefined}, merge_row(fwd, <<"raw">>, undefined, Row1, Rows1)), - ?assertEqual({Rows3, undefined}, merge_row(fwd, <<"collation">>, undefined, Row1, Rows1)), - ?assertEqual({Rows4, undefined}, merge_row(rev, <<"raw">>, undefined, Row2, Rows2)), - ?assertEqual({Rows4, undefined}, merge_row(rev, <<"collation">>, undefined, Row2, Rows2)). + % check merged row order for row/icu and fwd/reverse cases + lists:foreach( + fun(Col) -> + Wrap = fun(Rows) -> [wrap(Col, R) || R <- Rows] end, + {Fwd, undefined} = merge_row(fwd, Col, undefined, Row1, Wrap([Rows11, Rows13])), + ?assertEqual([Rows11, Row1, Rows13], unwrap(Fwd)), + {Rev, undefined} = merge_row(rev, Col, undefined, Row2, Wrap([Rows23, Rows21])), + ?assertEqual([Rows23, Row2, Rows21], unwrap(Rev)) + end, + [<<"raw">>, <<"icu">>] + ). + +% Partitioned views attach the partition to the key as {p, Partition, Key}; +% merge_row must handle that (regression: buf_key -> sort_key/1 crashed on the +% 3-tuple, 500ing partitioned view queries). +t_merge_row_partition_keys(_) -> + P = <<"foo">>, + Row1 = #view_row{id = id4, key = {p, P, 4}}, + Rows12 = #view_row{id = id2, key = {p, P, 2}}, + Rows16 = #view_row{id = id6, key = {p, P, 6}}, + lists:foreach( + fun(Col) -> + Wrap = fun(Rows) -> [wrap(Col, R) || R <- Rows] end, + {Merged, undefined} = merge_row(fwd, Col, undefined, Row1, Wrap([Rows12, Rows16])), + ?assertEqual([Rows12, Row1, Rows16], unwrap(Merged)) + end, + [<<"raw">>, <<"icu">>] + ). + +% This mimicks clause 1 of merge_row buffer +wrap(Collation, Row) -> + Key = fabric_view_row:get_key(Row), + Id = fabric_view_row:get_id(Row), + {{fabric_view:buf_key(Collation, Key), Id}, Row}. + +unwrap(Buffer) -> + [Row || {_BufKey, Row} <- Buffer]. t_merge_row_raw(_) -> Keys1 = dict:from_list([{key1, id1}, {key2, id2}, {key3, id3}]), diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index 93838fe0b1..24d7717482 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -89,7 +89,7 @@ go2(DbName, DDocId, VName, Workers, {red, {_, Lang, View}, _} = VInfo, Args, Cal os_proc = OsProc, reducer = RedSrc, collation = couch_util:get_value(<<"collation">>, View#mrview.options), - rows = #{}, + rows = fabric_view:reduce_buffer_new(), user_acc = Acc0, update_seq = case UpdateSeq of @@ -121,13 +121,13 @@ go2(DbName, DDocId, VName, Workers, {red, {_, Lang, View}, _} = VInfo, Args, Cal end. handle_row(Row0, {Worker, _} = Source, State) -> - #collector{counters = Counters0, rows = Rows0} = State, + #collector{counters = Counters0, rows = Buffer0, collation = Collation} = State, true = fabric_dict:is_key(Worker, Counters0), Row = fabric_view_row:set_worker(Row0, Source), Key = fabric_view_row:get_key(Row), - Rows = maps:update_with(Key, fun(Rs) -> Rs ++ [Row] end, [Row], Rows0), + Buffer = fabric_view:reduce_buffer_add(Collation, Key, Row, Buffer0), C1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows = Rows, counters = C1}, + State1 = State#collector{rows = Buffer, counters = C1}, fabric_view:maybe_send_row(State1). handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> @@ -197,7 +197,7 @@ handle_message_test_() -> foreach, fun() -> meck:new(foo, [non_strict]), - meck:new(fabric_view) + meck:new(fabric_view, [passthrough]) end, fun(_) -> meck:unload() end, [ @@ -282,18 +282,24 @@ t_handle_message_row(_) -> Worker = {worker, from}, Counters1 = [{worker, 3}], Counters2 = [{worker, 4}], + Collation = <<"raw">>, Row1 = #view_row{key = key1}, Row11 = #view_row{key = key1, worker = Worker}, - Rows1 = maps:from_list([{key1, []}, {key2, []}, {key3, []}]), - Rows3 = maps:from_list([{key1, [Row11]}, {key2, []}, {key3, []}]), Row2 = {view_row, #{key => key1}}, Row21 = {view_row, #{key => key1, worker => Worker}}, - Rows2 = maps:from_list([{key1, []}, {key2, []}, {key3, []}]), - Rows4 = maps:from_list([{key1, [Row21]}, {key2, []}, {key3, []}]), - State1 = #collector{counters = Counters1, rows = Rows1}, - State2 = #collector{counters = Counters1, rows = Rows2}, - State3 = #collector{counters = Counters2, rows = Rows3}, - State4 = #collector{counters = Counters2, rows = Rows4}, + % Feed the same seed buffer to the input and expected to get the resulting + % trees to be the same + Seed = lists:foldl( + fun(K, T) -> gb_trees:insert(K, {K, []}, T) end, + fabric_view:reduce_buffer_new(), + [key1, key2, key3] + ), + Buffer3 = fabric_view:reduce_buffer_add(Collation, key1, Row11, Seed), + Buffer4 = fabric_view:reduce_buffer_add(Collation, key1, Row21, Seed), + State1 = #collector{counters = Counters1, rows = Seed, collation = Collation}, + State2 = #collector{counters = Counters1, rows = Seed, collation = Collation}, + State3 = #collector{counters = Counters2, rows = Buffer3, collation = Collation}, + State4 = #collector{counters = Counters2, rows = Buffer4, collation = Collation}, meck:expect(fabric_view, maybe_send_row, [ {[State3], meck:val(maybe_row1)}, {[State4], meck:val(maybe_row2)} ]),