From 49344a042287614e57b58c90007b603c76820d93 Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Thu, 8 Sep 2016 15:46:03 -0400 Subject: [PATCH] refactor test to include wait-for around validation due to yz changes to draining, settings --- tests/yz_crdt.erl | 554 ++++++++++++++++++++++++---------------------- 1 file changed, 285 insertions(+), 269 deletions(-) diff --git a/tests/yz_crdt.erl b/tests/yz_crdt.erl index 4ddb7e4d..47610137 100644 --- a/tests/yz_crdt.erl +++ b/tests/yz_crdt.erl @@ -8,7 +8,7 @@ -define(HARNESS, (rt_config:get(rt_harness))). -define(INDEX, <<"maps">>). -define(TYPE, <<"maps">>). --define(KEY, "Chris Meiklejohn"). +-define(KEY, <<"Chris Meiklejohn">>). -define(BUCKET, {?TYPE, <<"testbucket">>}). -define(GET(K,L), proplists:get_value(K, L)). -define(N, 3). @@ -49,115 +49,59 @@ confirm() -> {n_val, ?N}, {search_index, ?INDEX}]), - %% Write some sample data. + lager:info("Write some sample data"), + test_sample_data(Pid, Nodes, ?BUCKET, ?KEY, ?INDEX), + lager:info("Search and Validate our CRDT writes/updates."), + ok = rt:wait_until(fun() -> validate_sample_data(Pid, ?KEY, ?INDEX) + end), + lager:info("Test setting the register of a map twice to different values." + "\nThe # of results should still be 1"), + test_repeat_sets(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY), + ok = rt:wait_until(fun() -> validate_test_repeat_set(Pid, ?INDEX) + end), + + lager:info("FYI: delete_mode is on keep here to make sure YZ handles" + " deletes correctly throughout."), + lager:info("Test varying deletes operations"), + test_and_validate_delete(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY), + + lager:info("Test to make sure yz AAE handles deletes/removes correctly"), + test_and_validate_delete_aae(Pid, Nodes, ?BUCKET, ?INDEX), + + lager:info("Test to make sure siblings don't exist after partition"), + test_siblings(Nodes, ?BUCKET, ?INDEX), + lager:info("Verify counts and operations after heal + transfers + commits"), + ok = rt:wait_until(fun() -> validate_test_siblings(Pid, ?BUCKET, ?INDEX) + end), + + %% Stop PB connection + riakc_pb_socket:stop(Pid), + + pass. + +test_sample_data(Pid, Cluster, Bucket, Key, Index) -> Map1 = riakc_map:update( {<<"name">>, register}, fun(R) -> - riakc_register:set(list_to_binary(?KEY), R) + riakc_register:set(Key, R) end, riakc_map:new()), Map2 = riakc_map:update( {<<"interests">>, set}, fun(S) -> riakc_set:add_element(<<"thing">>, S) end, Map1), + ok = riakc_pb_socket:update_type( Pid, - ?BUCKET, - ?KEY, + Bucket, + Key, riakc_map:to_op(Map2)), + drain_and_commit(Cluster, Index). - yokozuna_rt:drain_solrqs(Nodes), - yokozuna_rt:commit(Nodes, ?INDEX), - - %% Wait for yokozuna index to trigger. - yokozuna_rt:commit(Nodes, ?INDEX), - - %% Perform simple queries, check for register, set fields. - ok = rt:wait_until( - fun() -> - validate_search_results(Pid) - end), - %% Stop PB connection. - riakc_pb_socket:stop(Pid), - - %% Clean cluster. - rt:clean_cluster(Nodes), - - pass. - -validate_search_results(Pid) -> - try - {ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search( - Pid, ?INDEX, <<"name_register:Chris*">>), - lager:info("Search name_register:Chris*: ~p~n", [Results1a]), - ?assertEqual(length(Results1a), 1), - ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results1a)), - list_to_binary(?KEY)), - ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results1a)), - <<"thing">>), - - {ok, {search_results, Results2a, _, _}} = riakc_pb_socket:search( - Pid, ?INDEX, <<"interests_set:thing*">>), - lager:info("Search interests_set:thing*: ~p~n", [Results2a]), - ?assertEqual(length(Results2a), 1), - ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results2a)), - list_to_binary(?KEY)), - ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results2a)), - <<"thing">>), - - {ok, {search_results, Results3a, _, _}} = riakc_pb_socket:search( - Pid, ?INDEX, <<"_yz_rb:testbucket">>), - lager:info("Search testbucket: ~p~n", [Results3a]), - ?assertEqual(length(Results3a), 1), - ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results3a)), - list_to_binary(?KEY)), - ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results3a)), - <<"thing">>), - - %% Redo queries and check if results are equal - {ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search( - Pid, ?INDEX, <<"name_register:Chris*">>), - ?assertEqual(number_of_fields(Results1a, ?INDEX), - number_of_fields(Results1b, ?INDEX)), - - {ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search( - Pid, ?INDEX, <<"interests_set:thing*">>), - ?assertEqual(number_of_fields(Results2a, ?INDEX), - number_of_fields(Results2b, ?INDEX)), - - {ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search( - Pid, ?INDEX, <<"_yz_rb:testbucket">>), - ?assertEqual(number_of_fields(Results3a, ?INDEX), - number_of_fields(Results3b, ?INDEX)), - true - catch Err:Reason -> - lager:info("Waiting for CRDT search results to converge. Error was ~p.", [{Err, Reason}]), - false - end, - - - - lager:info("Test setting the register of a map twice to different values." - "\nThe # of results should still be 1"), - test_repeat_sets(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY), - - lager:info("FYI: delete_mode is on keep here to make sure YZ handles" - " deletes correctly throughout."), - - lager:info("Test varying deletes operations"), - test_delete(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY), - - lager:info("Test to make sure yz AAE handles deletes/removes correctly"), - test_delete_aae(Pid, Nodes, ?BUCKET, ?INDEX), - - lager:info("Test to make sure siblings don't exist after partition" - " occurrs and we heal the cluster"), - test_siblings(Pid, Nodes, ?BUCKET, ?INDEX), - - pass. - +%% @doc Test setting the register of a map twice to different values. +%% The # of results should still be 1. test_repeat_sets(Pid, Cluster, Bucket, Index, Key) -> {ok, M1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key), M2 = riakc_map:update( @@ -181,26 +125,19 @@ test_repeat_sets(Pid, Cluster, Bucket, Index, Key) -> Key, riakc_map:to_op(M3)), - yokozuna_rt:commit(Cluster, Index), - - {ok, {search_results, Results, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"update_register:*">>), - - lager:info("Search update_register:*: ~p~n", [Results]), - ?assertEqual(1, length(Results)). + drain_and_commit(Cluster, Index). %% @doc Tests varying deletes of within a CRDT map and checks for correct counts %% - Remove registers, remove and add elements within a set %% - Delete the map (associated w/ a key) %% - Recreate objects in the map and delete the map again -test_delete(Pid, Cluster, Bucket, Index, Key) -> +test_and_validate_delete(Pid, Cluster, Bucket, Index, Key) -> {ok, M1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key), lager:info("Remove register from map"), M2 = riakc_map:erase({<<"name">>, register}, M1), - lager:info("Delete element from set (in map) & Add element to set (in map)"), + lager:info("Delete element from set (in map) & Add element to set"), M3 = riakc_map:update( {<<"interests">>, set}, fun(S) -> @@ -214,13 +151,10 @@ test_delete(Pid, Cluster, Bucket, Index, Key) -> Key, riakc_map:to_op(M3)), - yokozuna_rt:commit(Cluster, Index), + drain_and_commit(Cluster, Index), - {ok, {search_results, Results1, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"name_register:*">>), - lager:info("Search deleted/erased name_register:*: ~p~n", [Results1]), - ?assertEqual(0, length(Results1)), + lager:info("Search deleted/erased name_register:*"), + search_and_validate_found(Pid, Index, <<"name_register:*">>, 0), lager:info("Add another element to set (in map)"), M4 = riakc_map:update( @@ -235,25 +169,21 @@ test_delete(Pid, Cluster, Bucket, Index, Key) -> Key, riakc_map:to_op(M4)), - yokozuna_rt:commit(Cluster, Index), + drain_and_commit(Cluster, Index), - {ok, {search_results, Results2, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"interests_set:thing*">>), - lager:info("Search deleted interests_set:thing*: ~p~n", [Results2]), - ?assertEqual(0, length(Results2)), + lager:info("Search deleted interests_set:thing*"), + search_and_validate_found(Pid, Index, <<"interests_set:thing*">>, 0), lager:info("Delete key for map"), ?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key)), - yokozuna_rt:commit(Cluster, Index), + + drain_and_commit(Cluster, Index), + ?assertEqual({error, {notfound, map}}, riakc_pb_socket:fetch_type(Pid, Bucket, Key)), - {ok, {search_results, Results3, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"*:*">>), - lager:info("Search deleted map *:*: ~p~n", [Results3]), - ?assertEqual(0, length(Results3)), + lager:info("Search deleted map: *:*"), + search_and_validate_found(Pid, Index, <<"*:*">>, 0), lager:info("Recreate object and check counts..."), @@ -270,36 +200,32 @@ test_delete(Pid, Cluster, Bucket, Index, Key) -> Key, riakc_map:to_op(M5)), - yokozuna_rt:commit(Cluster, Index), + drain_and_commit(Cluster, Index), {ok, M6} = riakc_pb_socket:fetch_type(Pid, Bucket, Key), Keys = riakc_map:fetch_keys(M6), ?assertEqual(1, length(Keys)), ?assert(riakc_map:is_key({<<"name">>, register}, M6)), - {ok, {search_results, Results4, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"*:*">>), - lager:info("Search recreated map *:*: ~p~n", [Results4]), - ?assertEqual(1, length(Results4)), + + lager:info("Search recreated map: *:*"), + search_and_validate_found(Pid, Index, <<"*:*">>, 1), lager:info("Delete key for map again"), ?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key)), - yokozuna_rt:commit(Cluster, Index), + + drain_and_commit(Cluster, Index), ?assertEqual({error, {notfound, map}}, riakc_pb_socket:fetch_type(Pid, Bucket, Key)), - {ok, {search_results, Results5, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"*:*">>), - lager:info("Search ~p deleted map *:*: ~p~n", [Key, Results5]), - ?assertEqual(0, length(Results5)). + lager:info("Search ~p deleted map: *:*", [Key]), + search_and_validate_found(Pid, Index, <<"*:*">>, 0). %% @doc Tests key/map delete and AAE %% - Use intercept to trap yz_kv:delete_operation to skip over %% - Makes sure that yz AAE handles tombstone on expire/exchange %% - Recreate objects and check -test_delete_aae(Pid, Cluster, Bucket, Index) -> +test_and_validate_delete_aae(Pid, Cluster, Bucket, Index) -> Key1 = <<"ohyokozuna">>, M1 = riakc_map:update( {<<"name">>, register}, @@ -324,6 +250,8 @@ test_delete_aae(Pid, Cluster, Bucket, Index) -> Key2, riakc_map:to_op(M2)), + drain_and_commit(Cluster, Index), + lager:info("Add and load handle_delete_operation intercept"), [make_intercepts_tab(ANode) || ANode <- Cluster], @@ -339,28 +267,21 @@ test_delete_aae(Pid, Cluster, Bucket, Index) -> ?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key2)), ?assertEqual({error, {notfound, map}}, riakc_pb_socket:fetch_type(Pid, Bucket, Key2)), - yokozuna_rt:commit(Cluster, Index), - {ok, {search_results, Results1, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"*:*">>), + drain_and_commit(Cluster, Index), + lager:info("Search all results, expect extra b/c tombstone" - " and we've modified the delete op... *:*: ~p~n", - [length(Results1)]), - ?assertEqual(2, length(Results1)), + " and we've modified the delete op : *:*"), + search_and_validate_found(Pid, Index, <<"*:*">>, 2), lager:info("Expire and re-check"), yokozuna_rt:expire_trees(Cluster), yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()), + drain_and_commit(Cluster, Index), - yokozuna_rt:commit(Cluster, Index), - - {ok, {search_results, Results2, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"*:*">>), lager:info("Search all results, expect removed tombstone b/c AAE" - " should clean it up ... *:*: ~p~n", [length(Results2)]), - ?assertEqual(1, length(Results2)), + " should clean it up: *:*"), + search_and_validate_found(Pid, Index, <<"*:*">>, 1), lager:info("Recreate object and check counts"), @@ -377,24 +298,22 @@ test_delete_aae(Pid, Cluster, Bucket, Index) -> Key2, riakc_map:to_op(M3)), - yokozuna_rt:commit(Cluster, Index), + drain_and_commit(Cluster, Index), {ok, M4} = riakc_pb_socket:fetch_type(Pid, Bucket, Key2), Keys = riakc_map:fetch_keys(M4), ?assertEqual(1, length(Keys)), ?assert(riakc_map:is_key({<<"name">>, register}, M4)), - {ok, {search_results, Results3, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"*:*">>), - lager:info("Search recreated map *:*: ~p~n", [Results3]), - ?assertEqual(2, length(Results3)). + + lager:info("Search recreated map: *:*"), + search_and_validate_found(Pid, Index, <<"*:*">>, 2). %% @doc Tests sibling handling/merge when there's a partition %% - Write/remove from separate partitions %% - Verify counts and that CRDTs have no siblings, vtags, %% after healing partitions. The CRDT map merges so the search %% results be consistent. -test_siblings(Pid, Cluster, Bucket, Index) -> +test_siblings(Cluster, Bucket, Index) -> Key1 = <<"Movies">>, Key2 = <<"Games">>, Set1 = <<"directors">>, @@ -403,135 +322,214 @@ test_siblings(Pid, Cluster, Bucket, Index) -> %% make siblings {P1, P2} = lists:split(1, Cluster), + %% Create an object in Partition 1 and siblings in Partition 2 + lager:info("Create partition: ~p | ~p", [P1, P2]), + Partition = rt:partition(P1, P2), + %% PB connections for accessing each side Pid1 = rt:pbc(hd(P1)), Pid2 = rt:pbc(hd(P2)), - %% Create an object in Partition 1 and siblings in Partition 2 - lager:info("Create partition: ~p | ~p", [P1, P2]), - Partition = rt:partition(P1, P2), - try - %% P1 writes - lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p", - [Key1, <<"Kubrick">>]), - M1 = riakc_map:update( - {Set1, set}, - fun(S) -> - riakc_set:add_element(<<"Kubrick">>, S) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid1, - Bucket, - Key1, - riakc_map:to_op(M1)), + riakc_pb_socket:set_options(Pid1, [queue_if_disconnected, auto_reconnect]), + riakc_pb_socket:set_options(Pid2, [queue_if_disconnected, auto_reconnect]), - lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p", - [Key1, <<"Demme">>]), - M2 = riakc_map:update( - {Set1, set}, - fun(S) -> - riakc_set:add_element(<<"Demme">>, S) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid1, - Bucket, - Key1, - riakc_map:to_op(M2)), + %% P1 writes + lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p", + [Key1, <<"Kubrick">>]), + M1 = riakc_map:update( + {Set1, set}, + fun(S) -> + riakc_set:add_element(<<"Kubrick">>, S) + end, riakc_map:new()), + ok = riakc_pb_socket:update_type( + Pid1, + Bucket, + Key1, + riakc_map:to_op(M1)), - %% P2 Siblings - lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p", - [Key2, <<"Sonic">>]), - M3 = riakc_map:update( - {Set2, set}, - fun(S) -> - riakc_set:add_element(<<"Sonic">>, S) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid2, - Bucket, - Key2, - riakc_map:to_op(M3)), + lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p", + [Key1, <<"Demme">>]), + M2 = riakc_map:update( + {Set1, set}, + fun(S) -> + riakc_set:add_element(<<"Demme">>, S) + end, riakc_map:new()), + ok = riakc_pb_socket:update_type( + Pid1, + Bucket, + Key1, + riakc_map:to_op(M2)), - lager:info("Delete key from Partition 2: Key ~p", [Key2]), - ok = riakc_pb_socket:delete(Pid2, Bucket, Key2), + %% P2 Siblings + lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p", + [Key2, <<"Sonic">>]), + M3 = riakc_map:update( + {Set2, set}, + fun(S) -> + riakc_set:add_element(<<"Sonic">>, S) + end, riakc_map:new()), + ok = riakc_pb_socket:update_type( + Pid2, + Bucket, + Key2, + riakc_map:to_op(M3)), - lager:info("Writing to Partition 2 Set 2: after delete: Key ~p | Char" - " ~p", [Key2, <<"Crash">>]), - M4 = riakc_map:update( - {Set2, set}, - fun(S) -> - riakc_set:add_element(<<"Crash">>, S) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid2, - Bucket, - Key2, - riakc_map:to_op(M4)), + lager:info("Delete key from Partition 2: Key ~p", [Key2]), + ok = riakc_pb_socket:delete(Pid2, Bucket, Key2), - lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p", - [Key2, <<"Mario">>]), - M5 = riakc_map:update( - {Set2, set}, - fun(S) -> - riakc_set:add_element(<<"Mario">>, S) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid2, - Bucket, - Key2, - riakc_map:to_op(M5)), + lager:info("Writing to Partition 2 Set 2: after delete: Key ~p | Char" + " ~p", [Key2, <<"Crash">>]), + M4 = riakc_map:update( + {Set2, set}, + fun(S) -> + riakc_set:add_element(<<"Crash">>, S) + end, riakc_map:new()), + ok = riakc_pb_socket:update_type( + Pid2, + Bucket, + Key2, + riakc_map:to_op(M4)), - lager:info("Writing to Partition 2 Set 1: Key ~p | Director ~p", - [Key1, <<"Klimov">>]), - M6 = riakc_map:update( - {Set1, set}, - fun(S) -> - riakc_set:add_element(<<"Klimov">>, S) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid2, - Bucket, - Key1, - riakc_map:to_op(M6)), - ok - after - rt:heal(Partition) - end, + lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p", + [Key2, <<"Mario">>]), + M5 = riakc_map:update( + {Set2, set}, + fun(S) -> + riakc_set:add_element(<<"Mario">>, S) + end, riakc_map:new()), + ok = riakc_pb_socket:update_type( + Pid2, + Bucket, + Key2, + riakc_map:to_op(M5)), + lager:info("Writing to Partition 2 Set 1: Key ~p | Director ~p", + [Key1, <<"Klimov">>]), + M6 = riakc_map:update( + {Set1, set}, + fun(S) -> + riakc_set:add_element(<<"Klimov">>, S) + end, riakc_map:new()), + ok = riakc_pb_socket:update_type( + Pid2, + Bucket, + Key1, + riakc_map:to_op(M6)), + + rt:heal(Partition), rt:wait_until_transfers_complete(Cluster), - yokozuna_rt:commit(Cluster, ?INDEX), - %% Verify Counts - lager:info("Verify counts and operations after heal + transfers + commits"), + drain_and_commit(Cluster, Index). - {ok, MF1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key1), - Keys = riakc_map:fetch_keys(MF1), - ?assertEqual(1, length(Keys)), - ?assert(riakc_map:is_key({<<"directors">>, set}, MF1)), - {ok, {search_results, Results1, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"directors_set:*">>), - lager:info("Search movies map directors_set:*: ~p~n", [Results1]), - ?assertEqual(3, length(proplists:lookup_all(<<"directors_set">>, - ?GET(?INDEX, Results1)))), +validate_sample_data(Pid, Key, Index) -> + try + Thing = <<"thing">>, - {ok, MF2} = riakc_pb_socket:fetch_type(Pid, Bucket, Key2), - Keys2 = riakc_map:fetch_keys(MF2), - ?assertEqual(1, length(Keys2)), - ?assert(riakc_map:is_key({<<"characters">>, set}, MF2)), - {ok, {search_results, Results2, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"characters_set:*">>), - lager:info("Search games map characters_set:*: ~p~n", [Results2]), - ?assertEqual(2, length(proplists:lookup_all(<<"characters_set">>, - ?GET(?INDEX, Results2)))), + {ok, {search_results, Results1a, _, Found1}} = riakc_pb_socket:search( + Pid, Index, <<"name_register:Chris*">>), + ?assertEqual(1, Found1), - {ok, {search_results, Results3, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"_yz_vtag:*">>), - lager:info("Search vtags in search *:*: ~p~n", [Results3]), - ?assertEqual(0, length(Results3)). + ?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results1a)), + Key), + ?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results1a)), + Thing), + {ok, {search_results, Results2a, _, Found2}} = riakc_pb_socket:search( + Pid, Index, <<"interests_set:thing*">>), + ?assertEqual(1, Found2), + ?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results2a)), + Key), + ?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results2a)), + Thing), + + {ok, {search_results, Results3a, _, Found3}} = riakc_pb_socket:search( + Pid, Index, <<"_yz_rb:testbucket">>), + ?assertEqual(1, Found3), + ?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results3a)), + Key), + ?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results3a)), + Thing), + + %% Redo queries and check if results are equal + {ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search( + Pid, Index, <<"name_register:Chris*">>), + ?assertEqual(number_of_fields(Results1a, Index), + number_of_fields(Results1b, Index)), + + {ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search( + Pid, Index, <<"interests_set:thing*">>), + ?assertEqual(number_of_fields(Results2a, Index), + number_of_fields(Results2b, Index)), + + {ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search( + Pid, Index, <<"_yz_rb:testbucket">>), + ?assertEqual(number_of_fields(Results3a, Index), + number_of_fields(Results3b, Index)), + + true + catch Err:Reason -> + lager:info("Waiting for CRDT search results to converge. Error" + " was ~p.", [{Err, Reason}]), + false + end. + +validate_test_repeat_set(Pid, Index) -> + try + {ok, {search_results, _R, _, Found}} = riakc_pb_socket:search( + Pid, Index, + <<"update_register:*">>), + ?assertEqual(1, Found), + + true + catch Err:Reason -> + lager:info("Waiting for CRDT search results to converge. Error" + " was ~p.", [{Err, Reason}]), + false + end. + +validate_test_siblings(Pid, Bucket, Index) -> + try + Key1 = <<"Movies">>, + Key2 = <<"Games">>, + + {ok, MF1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key1), + Keys = riakc_map:fetch_keys(MF1), + ?assertEqual(1, length(Keys)), + ?assert(riakc_map:is_key({<<"directors">>, set}, MF1)), + {ok, {search_results, Results1, _, _}} = riakc_pb_socket:search( + Pid, Index, + <<"directors_set:*">>), + lager:info("Search movies map directors_set:*: ~p~n", [Results1]), + ?assertEqual(3, length(proplists:lookup_all(<<"directors_set">>, + ?GET(?INDEX, Results1)))), + + {ok, MF2} = riakc_pb_socket:fetch_type(Pid, Bucket, Key2), + Keys2 = riakc_map:fetch_keys(MF2), + ?assertEqual(1, length(Keys2)), + ?assert(riakc_map:is_key({<<"characters">>, set}, MF2)), + {ok, {search_results, Results2, _, _}} = riakc_pb_socket:search( + Pid, Index, + <<"characters_set:*">>), + lager:info("Search games map characters_set:*: ~p~n", [Results2]), + ?assertEqual(2, length(proplists:lookup_all(<<"characters_set">>, + ?GET(?INDEX, Results2)))), + + {ok, {search_results, Results3, _, Found}} = riakc_pb_socket:search( + Pid, Index, + <<"_yz_vtag:*">>), + lager:info("Search vtags in search *:*: ~p~n", [Results3]), + ?assertEqual(0, Found), + true + catch Err:Reason -> + lager:info("Waiting for CRDT search results to converge. Error" + " was ~p.", [{Err, Reason}]), + false + end. + +%% @private +drain_and_commit(Cluster, Index) -> + yokozuna_rt:drain_solrqs(Cluster), + yokozuna_rt:commit(Cluster, Index). %% @private number_of_fields(Resp, Index) -> @@ -542,3 +540,21 @@ make_intercepts_tab(Node) -> SupPid = rpc:call(Node, erlang, whereis, [sasl_safe_sup]), intercepts_tab = rpc:call(Node, ets, new, [intercepts_tab, [named_table, public, set, {heir, SupPid, {}}]]). + +%% @private +search_and_validate_found(Pid, Index, Search, ExpectedCount) -> + ok = rt:wait_until( + fun() -> + try + {ok, {search_results, Results2, _, F}} = + riakc_pb_socket:search(Pid, Index, Search), + ?assertEqual(ExpectedCount, F), + true + catch Err:Reason -> + lager:info("Waiting for CRDT search results to" + " converge. Error was ~p.", + [{Err, Reason}]), + false + end + end). +