mirror of
https://github.com/valitydev/riak_test.git
synced 2024-11-06 08:35:22 +00:00
Merge pull request #1135 from basho/dr-wait-for-crdt-results
Wait for CRDT search results, rather than failing after the first attempt.
This commit is contained in:
commit
facf2c71a0
@ -26,6 +26,7 @@
|
||||
check_exists/2,
|
||||
clear_trees/1,
|
||||
commit/2,
|
||||
drain_solrqs/1,
|
||||
expire_trees/1,
|
||||
gen_keys/1,
|
||||
host_entries/1,
|
||||
@ -455,6 +456,13 @@ commit(Nodes, Index) ->
|
||||
rpc:multicall(Nodes, yz_solr, commit, [Index]),
|
||||
ok.
|
||||
|
||||
-spec drain_solrqs(node() | cluster()) -> ok.
|
||||
drain_solrqs(Cluster) when is_list(Cluster) ->
|
||||
[drain_solrqs(Node) || Node <- Cluster];
|
||||
drain_solrqs(Node) ->
|
||||
rpc:call(Node, yz_solrq_drain_mgr, drain, []),
|
||||
ok.
|
||||
|
||||
-spec override_schema(pid(), [node()], index_name(), schema_name(), string()) ->
|
||||
{ok, [node()]}.
|
||||
override_schema(Pid, Cluster, Index, Schema, RawUpdate) ->
|
||||
|
@ -59,60 +59,68 @@ confirm() ->
|
||||
?KEY,
|
||||
riakc_map:to_op(Map2)),
|
||||
|
||||
yokozuna_rt:drain_solrqs(Nodes),
|
||||
yokozuna_rt:commit(Nodes, ?INDEX),
|
||||
|
||||
%% Perform simple queries, check for register, set fields.
|
||||
{ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"name_register:Chris*">>),
|
||||
lager:info("Search name_register:Chris*: ~p~n", [Results1a]),
|
||||
?assertEqual(length(Results1a), 1),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results1a)),
|
||||
list_to_binary(?KEY)),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results1a)),
|
||||
<<"thing">>),
|
||||
|
||||
{ok, {search_results, Results2a, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"interests_set:thing*">>),
|
||||
lager:info("Search interests_set:thing*: ~p~n", [Results2a]),
|
||||
?assertEqual(length(Results2a), 1),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results2a)),
|
||||
list_to_binary(?KEY)),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results2a)),
|
||||
<<"thing">>),
|
||||
|
||||
{ok, {search_results, Results3a, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"_yz_rb:testbucket">>),
|
||||
lager:info("Search testbucket: ~p~n", [Results3a]),
|
||||
?assertEqual(length(Results3a), 1),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results3a)),
|
||||
list_to_binary(?KEY)),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results3a)),
|
||||
<<"thing">>),
|
||||
|
||||
%% Redo queries and check if results are equal
|
||||
{ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"name_register:Chris*">>),
|
||||
?assertEqual(number_of_fields(Results1a),
|
||||
number_of_fields(Results1b)),
|
||||
|
||||
{ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"interests_set:thing*">>),
|
||||
?assertEqual(number_of_fields(Results2a),
|
||||
number_of_fields(Results2b)),
|
||||
|
||||
{ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"_yz_rb:testbucket">>),
|
||||
?assertEqual(number_of_fields(Results3a),
|
||||
number_of_fields(Results3b)),
|
||||
|
||||
ok = rt:wait_until(
|
||||
fun() ->
|
||||
validate_search_results(Pid)
|
||||
end),
|
||||
%% Stop PB connection.
|
||||
riakc_pb_socket:stop(Pid),
|
||||
|
||||
%% Clean cluster.
|
||||
rt:clean_cluster(Nodes),
|
||||
|
||||
pass.
|
||||
|
||||
validate_search_results(Pid) ->
|
||||
try
|
||||
{ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"name_register:Chris*">>),
|
||||
lager:info("Search name_register:Chris*: ~p~n", [Results1a]),
|
||||
?assertEqual(length(Results1a), 1),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results1a)),
|
||||
list_to_binary(?KEY)),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results1a)),
|
||||
<<"thing">>),
|
||||
|
||||
{ok, {search_results, Results2a, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"interests_set:thing*">>),
|
||||
lager:info("Search interests_set:thing*: ~p~n", [Results2a]),
|
||||
?assertEqual(length(Results2a), 1),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results2a)),
|
||||
list_to_binary(?KEY)),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results2a)),
|
||||
<<"thing">>),
|
||||
|
||||
{ok, {search_results, Results3a, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"_yz_rb:testbucket">>),
|
||||
lager:info("Search testbucket: ~p~n", [Results3a]),
|
||||
?assertEqual(length(Results3a), 1),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results3a)),
|
||||
list_to_binary(?KEY)),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results3a)),
|
||||
<<"thing">>),
|
||||
|
||||
%% Redo queries and check if results are equal
|
||||
{ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"name_register:Chris*">>),
|
||||
?assertEqual(number_of_fields(Results1a),
|
||||
number_of_fields(Results1b)),
|
||||
|
||||
{ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"interests_set:thing*">>),
|
||||
?assertEqual(number_of_fields(Results2a),
|
||||
number_of_fields(Results2b)),
|
||||
|
||||
{ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"_yz_rb:testbucket">>),
|
||||
?assertEqual(number_of_fields(Results3a),
|
||||
number_of_fields(Results3b)),
|
||||
true
|
||||
catch Err:Reason ->
|
||||
lager:info("Waiting for CRDT search results to converge. Error was ~p.", [{Err, Reason}]),
|
||||
false
|
||||
end.
|
||||
|
||||
%% @private
|
||||
number_of_fields(Resp) ->
|
||||
length(?GET(?INDEX, Resp)).
|
||||
|
Loading…
Reference in New Issue
Block a user