From 61c1ea801cb2352644ce6c572e67aac8d02403a3 Mon Sep 17 00:00:00 2001 From: Doug Rohrer Date: Tue, 30 Aug 2016 01:30:40 +0000 Subject: [PATCH] Wait for CRDT search results, rather than failing after the first attempt. --- src/yokozuna_rt.erl | 8 ++++ tests/yz_crdt.erl | 102 ++++++++++++++++++++++++-------------------- 2 files changed, 63 insertions(+), 47 deletions(-) diff --git a/src/yokozuna_rt.erl b/src/yokozuna_rt.erl index 750bf88a..3a9ed096 100644 --- a/src/yokozuna_rt.erl +++ b/src/yokozuna_rt.erl @@ -26,6 +26,7 @@ check_exists/2, clear_trees/1, commit/2, + drain_solrqs/1, expire_trees/1, gen_keys/1, host_entries/1, @@ -454,6 +455,13 @@ commit(Nodes, Index) -> rpc:multicall(Nodes, yz_solr, commit, [Index]), ok. +-spec drain_solrqs(node() | cluster()) -> ok. +drain_solrqs(Cluster) when is_list(Cluster) -> + [drain_solrqs(Node) || Node <- Cluster]; +drain_solrqs(Node) -> + rpc:call(Node, yz_solrq_drain_mgr, drain, []), + ok. + -spec override_schema(pid(), [node()], index_name(), schema_name(), string()) -> {ok, [node()]}. override_schema(Pid, Cluster, Index, Schema, RawUpdate) -> diff --git a/tests/yz_crdt.erl b/tests/yz_crdt.erl index 1a103815..44e37f01 100644 --- a/tests/yz_crdt.erl +++ b/tests/yz_crdt.erl @@ -59,60 +59,68 @@ confirm() -> ?KEY, riakc_map:to_op(Map2)), + yokozuna_rt:drain_solrqs(Nodes), yokozuna_rt:commit(Nodes, ?INDEX), - %% Perform simple queries, check for register, set fields. - {ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search( - Pid, ?INDEX, <<"name_register:Chris*">>), - lager:info("Search name_register:Chris*: ~p~n", [Results1a]), - ?assertEqual(length(Results1a), 1), - ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results1a)), - list_to_binary(?KEY)), - ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results1a)), - <<"thing">>), - - {ok, {search_results, Results2a, _, _}} = riakc_pb_socket:search( - Pid, ?INDEX, <<"interests_set:thing*">>), - lager:info("Search interests_set:thing*: ~p~n", [Results2a]), - ?assertEqual(length(Results2a), 1), - ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results2a)), - list_to_binary(?KEY)), - ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results2a)), - <<"thing">>), - - {ok, {search_results, Results3a, _, _}} = riakc_pb_socket:search( - Pid, ?INDEX, <<"_yz_rb:testbucket">>), - lager:info("Search testbucket: ~p~n", [Results3a]), - ?assertEqual(length(Results3a), 1), - ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results3a)), - list_to_binary(?KEY)), - ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results3a)), - <<"thing">>), - - %% Redo queries and check if results are equal - {ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search( - Pid, ?INDEX, <<"name_register:Chris*">>), - ?assertEqual(number_of_fields(Results1a), - number_of_fields(Results1b)), - - {ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search( - Pid, ?INDEX, <<"interests_set:thing*">>), - ?assertEqual(number_of_fields(Results2a), - number_of_fields(Results2b)), - - {ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search( - Pid, ?INDEX, <<"_yz_rb:testbucket">>), - ?assertEqual(number_of_fields(Results3a), - number_of_fields(Results3b)), - + ok = rt:wait_until( + fun() -> + validate_search_results(Pid) + end), %% Stop PB connection. riakc_pb_socket:stop(Pid), - %% Clean cluster. - rt:clean_cluster(Nodes), - pass. +validate_search_results(Pid) -> + try + {ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search( + Pid, ?INDEX, <<"name_register:Chris*">>), + lager:info("Search name_register:Chris*: ~p~n", [Results1a]), + ?assertEqual(length(Results1a), 1), + ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results1a)), + list_to_binary(?KEY)), + ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results1a)), + <<"thing">>), + + {ok, {search_results, Results2a, _, _}} = riakc_pb_socket:search( + Pid, ?INDEX, <<"interests_set:thing*">>), + lager:info("Search interests_set:thing*: ~p~n", [Results2a]), + ?assertEqual(length(Results2a), 1), + ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results2a)), + list_to_binary(?KEY)), + ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results2a)), + <<"thing">>), + + {ok, {search_results, Results3a, _, _}} = riakc_pb_socket:search( + Pid, ?INDEX, <<"_yz_rb:testbucket">>), + lager:info("Search testbucket: ~p~n", [Results3a]), + ?assertEqual(length(Results3a), 1), + ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results3a)), + list_to_binary(?KEY)), + ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results3a)), + <<"thing">>), + + %% Redo queries and check if results are equal + {ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search( + Pid, ?INDEX, <<"name_register:Chris*">>), + ?assertEqual(number_of_fields(Results1a), + number_of_fields(Results1b)), + + {ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search( + Pid, ?INDEX, <<"interests_set:thing*">>), + ?assertEqual(number_of_fields(Results2a), + number_of_fields(Results2b)), + + {ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search( + Pid, ?INDEX, <<"_yz_rb:testbucket">>), + ?assertEqual(number_of_fields(Results3a), + number_of_fields(Results3b)), + true + catch Err:Reason -> + lager:info("Waiting for CRDT search results to converge. Error was ~p.", [{Err, Reason}]), + false + end. + %% @private number_of_fields(Resp) -> length(?GET(?INDEX, Resp)).