mirror of
https://github.com/valitydev/riak_test.git
synced 2024-11-06 08:35:22 +00:00
refactor test to include wait-for around validation due to yz changes to draining, settings
This commit is contained in:
parent
9564ab5823
commit
49344a0422
@ -8,7 +8,7 @@
|
||||
-define(HARNESS, (rt_config:get(rt_harness))).
|
||||
-define(INDEX, <<"maps">>).
|
||||
-define(TYPE, <<"maps">>).
|
||||
-define(KEY, "Chris Meiklejohn").
|
||||
-define(KEY, <<"Chris Meiklejohn">>).
|
||||
-define(BUCKET, {?TYPE, <<"testbucket">>}).
|
||||
-define(GET(K,L), proplists:get_value(K, L)).
|
||||
-define(N, 3).
|
||||
@ -49,115 +49,59 @@ confirm() ->
|
||||
{n_val, ?N},
|
||||
{search_index, ?INDEX}]),
|
||||
|
||||
%% Write some sample data.
|
||||
lager:info("Write some sample data"),
|
||||
test_sample_data(Pid, Nodes, ?BUCKET, ?KEY, ?INDEX),
|
||||
lager:info("Search and Validate our CRDT writes/updates."),
|
||||
ok = rt:wait_until(fun() -> validate_sample_data(Pid, ?KEY, ?INDEX)
|
||||
end),
|
||||
|
||||
lager:info("Test setting the register of a map twice to different values."
|
||||
"\nThe # of results should still be 1"),
|
||||
test_repeat_sets(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY),
|
||||
ok = rt:wait_until(fun() -> validate_test_repeat_set(Pid, ?INDEX)
|
||||
end),
|
||||
|
||||
lager:info("FYI: delete_mode is on keep here to make sure YZ handles"
|
||||
" deletes correctly throughout."),
|
||||
lager:info("Test varying deletes operations"),
|
||||
test_and_validate_delete(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY),
|
||||
|
||||
lager:info("Test to make sure yz AAE handles deletes/removes correctly"),
|
||||
test_and_validate_delete_aae(Pid, Nodes, ?BUCKET, ?INDEX),
|
||||
|
||||
lager:info("Test to make sure siblings don't exist after partition"),
|
||||
test_siblings(Nodes, ?BUCKET, ?INDEX),
|
||||
lager:info("Verify counts and operations after heal + transfers + commits"),
|
||||
ok = rt:wait_until(fun() -> validate_test_siblings(Pid, ?BUCKET, ?INDEX)
|
||||
end),
|
||||
|
||||
%% Stop PB connection
|
||||
riakc_pb_socket:stop(Pid),
|
||||
|
||||
pass.
|
||||
|
||||
test_sample_data(Pid, Cluster, Bucket, Key, Index) ->
|
||||
Map1 = riakc_map:update(
|
||||
{<<"name">>, register},
|
||||
fun(R) ->
|
||||
riakc_register:set(list_to_binary(?KEY), R)
|
||||
riakc_register:set(Key, R)
|
||||
end, riakc_map:new()),
|
||||
Map2 = riakc_map:update(
|
||||
{<<"interests">>, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"thing">>, S) end,
|
||||
Map1),
|
||||
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
?BUCKET,
|
||||
?KEY,
|
||||
Bucket,
|
||||
Key,
|
||||
riakc_map:to_op(Map2)),
|
||||
|
||||
drain_and_commit(Cluster, Index).
|
||||
|
||||
yokozuna_rt:drain_solrqs(Nodes),
|
||||
yokozuna_rt:commit(Nodes, ?INDEX),
|
||||
|
||||
%% Wait for yokozuna index to trigger.
|
||||
yokozuna_rt:commit(Nodes, ?INDEX),
|
||||
|
||||
%% Perform simple queries, check for register, set fields.
|
||||
ok = rt:wait_until(
|
||||
fun() ->
|
||||
validate_search_results(Pid)
|
||||
end),
|
||||
%% Stop PB connection.
|
||||
riakc_pb_socket:stop(Pid),
|
||||
|
||||
%% Clean cluster.
|
||||
rt:clean_cluster(Nodes),
|
||||
|
||||
pass.
|
||||
|
||||
validate_search_results(Pid) ->
|
||||
try
|
||||
{ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"name_register:Chris*">>),
|
||||
lager:info("Search name_register:Chris*: ~p~n", [Results1a]),
|
||||
?assertEqual(length(Results1a), 1),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results1a)),
|
||||
list_to_binary(?KEY)),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results1a)),
|
||||
<<"thing">>),
|
||||
|
||||
{ok, {search_results, Results2a, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"interests_set:thing*">>),
|
||||
lager:info("Search interests_set:thing*: ~p~n", [Results2a]),
|
||||
?assertEqual(length(Results2a), 1),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results2a)),
|
||||
list_to_binary(?KEY)),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results2a)),
|
||||
<<"thing">>),
|
||||
|
||||
{ok, {search_results, Results3a, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"_yz_rb:testbucket">>),
|
||||
lager:info("Search testbucket: ~p~n", [Results3a]),
|
||||
?assertEqual(length(Results3a), 1),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results3a)),
|
||||
list_to_binary(?KEY)),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results3a)),
|
||||
<<"thing">>),
|
||||
|
||||
%% Redo queries and check if results are equal
|
||||
{ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"name_register:Chris*">>),
|
||||
?assertEqual(number_of_fields(Results1a, ?INDEX),
|
||||
number_of_fields(Results1b, ?INDEX)),
|
||||
|
||||
{ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"interests_set:thing*">>),
|
||||
?assertEqual(number_of_fields(Results2a, ?INDEX),
|
||||
number_of_fields(Results2b, ?INDEX)),
|
||||
|
||||
{ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"_yz_rb:testbucket">>),
|
||||
?assertEqual(number_of_fields(Results3a, ?INDEX),
|
||||
number_of_fields(Results3b, ?INDEX)),
|
||||
true
|
||||
catch Err:Reason ->
|
||||
lager:info("Waiting for CRDT search results to converge. Error was ~p.", [{Err, Reason}]),
|
||||
false
|
||||
end,
|
||||
|
||||
|
||||
|
||||
lager:info("Test setting the register of a map twice to different values."
|
||||
"\nThe # of results should still be 1"),
|
||||
test_repeat_sets(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY),
|
||||
|
||||
lager:info("FYI: delete_mode is on keep here to make sure YZ handles"
|
||||
" deletes correctly throughout."),
|
||||
|
||||
lager:info("Test varying deletes operations"),
|
||||
test_delete(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY),
|
||||
|
||||
lager:info("Test to make sure yz AAE handles deletes/removes correctly"),
|
||||
test_delete_aae(Pid, Nodes, ?BUCKET, ?INDEX),
|
||||
|
||||
lager:info("Test to make sure siblings don't exist after partition"
|
||||
" occurrs and we heal the cluster"),
|
||||
test_siblings(Pid, Nodes, ?BUCKET, ?INDEX),
|
||||
|
||||
pass.
|
||||
|
||||
%% @doc Test setting the register of a map twice to different values.
|
||||
%% The # of results should still be 1.
|
||||
test_repeat_sets(Pid, Cluster, Bucket, Index, Key) ->
|
||||
{ok, M1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key),
|
||||
M2 = riakc_map:update(
|
||||
@ -181,26 +125,19 @@ test_repeat_sets(Pid, Cluster, Bucket, Index, Key) ->
|
||||
Key,
|
||||
riakc_map:to_op(M3)),
|
||||
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
|
||||
{ok, {search_results, Results, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"update_register:*">>),
|
||||
|
||||
lager:info("Search update_register:*: ~p~n", [Results]),
|
||||
?assertEqual(1, length(Results)).
|
||||
drain_and_commit(Cluster, Index).
|
||||
|
||||
%% @doc Tests varying deletes of within a CRDT map and checks for correct counts
|
||||
%% - Remove registers, remove and add elements within a set
|
||||
%% - Delete the map (associated w/ a key)
|
||||
%% - Recreate objects in the map and delete the map again
|
||||
test_delete(Pid, Cluster, Bucket, Index, Key) ->
|
||||
test_and_validate_delete(Pid, Cluster, Bucket, Index, Key) ->
|
||||
{ok, M1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key),
|
||||
|
||||
lager:info("Remove register from map"),
|
||||
M2 = riakc_map:erase({<<"name">>, register}, M1),
|
||||
|
||||
lager:info("Delete element from set (in map) & Add element to set (in map)"),
|
||||
lager:info("Delete element from set (in map) & Add element to set"),
|
||||
M3 = riakc_map:update(
|
||||
{<<"interests">>, set},
|
||||
fun(S) ->
|
||||
@ -214,13 +151,10 @@ test_delete(Pid, Cluster, Bucket, Index, Key) ->
|
||||
Key,
|
||||
riakc_map:to_op(M3)),
|
||||
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
{ok, {search_results, Results1, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"name_register:*">>),
|
||||
lager:info("Search deleted/erased name_register:*: ~p~n", [Results1]),
|
||||
?assertEqual(0, length(Results1)),
|
||||
lager:info("Search deleted/erased name_register:*"),
|
||||
search_and_validate_found(Pid, Index, <<"name_register:*">>, 0),
|
||||
|
||||
lager:info("Add another element to set (in map)"),
|
||||
M4 = riakc_map:update(
|
||||
@ -235,25 +169,21 @@ test_delete(Pid, Cluster, Bucket, Index, Key) ->
|
||||
Key,
|
||||
riakc_map:to_op(M4)),
|
||||
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
{ok, {search_results, Results2, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"interests_set:thing*">>),
|
||||
lager:info("Search deleted interests_set:thing*: ~p~n", [Results2]),
|
||||
?assertEqual(0, length(Results2)),
|
||||
lager:info("Search deleted interests_set:thing*"),
|
||||
search_and_validate_found(Pid, Index, <<"interests_set:thing*">>, 0),
|
||||
|
||||
lager:info("Delete key for map"),
|
||||
?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key)),
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
?assertEqual({error, {notfound, map}},
|
||||
riakc_pb_socket:fetch_type(Pid, Bucket, Key)),
|
||||
|
||||
{ok, {search_results, Results3, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"*:*">>),
|
||||
lager:info("Search deleted map *:*: ~p~n", [Results3]),
|
||||
?assertEqual(0, length(Results3)),
|
||||
lager:info("Search deleted map: *:*"),
|
||||
search_and_validate_found(Pid, Index, <<"*:*">>, 0),
|
||||
|
||||
lager:info("Recreate object and check counts..."),
|
||||
|
||||
@ -270,36 +200,32 @@ test_delete(Pid, Cluster, Bucket, Index, Key) ->
|
||||
Key,
|
||||
riakc_map:to_op(M5)),
|
||||
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
{ok, M6} = riakc_pb_socket:fetch_type(Pid, Bucket, Key),
|
||||
Keys = riakc_map:fetch_keys(M6),
|
||||
?assertEqual(1, length(Keys)),
|
||||
?assert(riakc_map:is_key({<<"name">>, register}, M6)),
|
||||
{ok, {search_results, Results4, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"*:*">>),
|
||||
lager:info("Search recreated map *:*: ~p~n", [Results4]),
|
||||
?assertEqual(1, length(Results4)),
|
||||
|
||||
lager:info("Search recreated map: *:*"),
|
||||
search_and_validate_found(Pid, Index, <<"*:*">>, 1),
|
||||
|
||||
lager:info("Delete key for map again"),
|
||||
?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key)),
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
?assertEqual({error, {notfound, map}},
|
||||
riakc_pb_socket:fetch_type(Pid, Bucket, Key)),
|
||||
|
||||
{ok, {search_results, Results5, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"*:*">>),
|
||||
lager:info("Search ~p deleted map *:*: ~p~n", [Key, Results5]),
|
||||
?assertEqual(0, length(Results5)).
|
||||
lager:info("Search ~p deleted map: *:*", [Key]),
|
||||
search_and_validate_found(Pid, Index, <<"*:*">>, 0).
|
||||
|
||||
%% @doc Tests key/map delete and AAE
|
||||
%% - Use intercept to trap yz_kv:delete_operation to skip over
|
||||
%% - Makes sure that yz AAE handles tombstone on expire/exchange
|
||||
%% - Recreate objects and check
|
||||
test_delete_aae(Pid, Cluster, Bucket, Index) ->
|
||||
test_and_validate_delete_aae(Pid, Cluster, Bucket, Index) ->
|
||||
Key1 = <<"ohyokozuna">>,
|
||||
M1 = riakc_map:update(
|
||||
{<<"name">>, register},
|
||||
@ -324,6 +250,8 @@ test_delete_aae(Pid, Cluster, Bucket, Index) ->
|
||||
Key2,
|
||||
riakc_map:to_op(M2)),
|
||||
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
lager:info("Add and load handle_delete_operation intercept"),
|
||||
|
||||
[make_intercepts_tab(ANode) || ANode <- Cluster],
|
||||
@ -339,28 +267,21 @@ test_delete_aae(Pid, Cluster, Bucket, Index) ->
|
||||
?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key2)),
|
||||
?assertEqual({error, {notfound, map}},
|
||||
riakc_pb_socket:fetch_type(Pid, Bucket, Key2)),
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
|
||||
{ok, {search_results, Results1, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"*:*">>),
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
lager:info("Search all results, expect extra b/c tombstone"
|
||||
" and we've modified the delete op... *:*: ~p~n",
|
||||
[length(Results1)]),
|
||||
?assertEqual(2, length(Results1)),
|
||||
" and we've modified the delete op : *:*"),
|
||||
search_and_validate_found(Pid, Index, <<"*:*">>, 2),
|
||||
|
||||
lager:info("Expire and re-check"),
|
||||
yokozuna_rt:expire_trees(Cluster),
|
||||
yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()),
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
|
||||
{ok, {search_results, Results2, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"*:*">>),
|
||||
lager:info("Search all results, expect removed tombstone b/c AAE"
|
||||
" should clean it up ... *:*: ~p~n", [length(Results2)]),
|
||||
?assertEqual(1, length(Results2)),
|
||||
" should clean it up: *:*"),
|
||||
search_and_validate_found(Pid, Index, <<"*:*">>, 1),
|
||||
|
||||
lager:info("Recreate object and check counts"),
|
||||
|
||||
@ -377,24 +298,22 @@ test_delete_aae(Pid, Cluster, Bucket, Index) ->
|
||||
Key2,
|
||||
riakc_map:to_op(M3)),
|
||||
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
{ok, M4} = riakc_pb_socket:fetch_type(Pid, Bucket, Key2),
|
||||
Keys = riakc_map:fetch_keys(M4),
|
||||
?assertEqual(1, length(Keys)),
|
||||
?assert(riakc_map:is_key({<<"name">>, register}, M4)),
|
||||
{ok, {search_results, Results3, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"*:*">>),
|
||||
lager:info("Search recreated map *:*: ~p~n", [Results3]),
|
||||
?assertEqual(2, length(Results3)).
|
||||
|
||||
lager:info("Search recreated map: *:*"),
|
||||
search_and_validate_found(Pid, Index, <<"*:*">>, 2).
|
||||
|
||||
%% @doc Tests sibling handling/merge when there's a partition
|
||||
%% - Write/remove from separate partitions
|
||||
%% - Verify counts and that CRDTs have no siblings, vtags,
|
||||
%% after healing partitions. The CRDT map merges so the search
|
||||
%% results be consistent.
|
||||
test_siblings(Pid, Cluster, Bucket, Index) ->
|
||||
test_siblings(Cluster, Bucket, Index) ->
|
||||
Key1 = <<"Movies">>,
|
||||
Key2 = <<"Games">>,
|
||||
Set1 = <<"directors">>,
|
||||
@ -403,135 +322,214 @@ test_siblings(Pid, Cluster, Bucket, Index) ->
|
||||
%% make siblings
|
||||
{P1, P2} = lists:split(1, Cluster),
|
||||
|
||||
%% Create an object in Partition 1 and siblings in Partition 2
|
||||
lager:info("Create partition: ~p | ~p", [P1, P2]),
|
||||
Partition = rt:partition(P1, P2),
|
||||
|
||||
%% PB connections for accessing each side
|
||||
Pid1 = rt:pbc(hd(P1)),
|
||||
Pid2 = rt:pbc(hd(P2)),
|
||||
|
||||
%% Create an object in Partition 1 and siblings in Partition 2
|
||||
lager:info("Create partition: ~p | ~p", [P1, P2]),
|
||||
Partition = rt:partition(P1, P2),
|
||||
try
|
||||
%% P1 writes
|
||||
lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p",
|
||||
[Key1, <<"Kubrick">>]),
|
||||
M1 = riakc_map:update(
|
||||
{Set1, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Kubrick">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid1,
|
||||
Bucket,
|
||||
Key1,
|
||||
riakc_map:to_op(M1)),
|
||||
riakc_pb_socket:set_options(Pid1, [queue_if_disconnected, auto_reconnect]),
|
||||
riakc_pb_socket:set_options(Pid2, [queue_if_disconnected, auto_reconnect]),
|
||||
|
||||
lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p",
|
||||
[Key1, <<"Demme">>]),
|
||||
M2 = riakc_map:update(
|
||||
{Set1, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Demme">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid1,
|
||||
Bucket,
|
||||
Key1,
|
||||
riakc_map:to_op(M2)),
|
||||
%% P1 writes
|
||||
lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p",
|
||||
[Key1, <<"Kubrick">>]),
|
||||
M1 = riakc_map:update(
|
||||
{Set1, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Kubrick">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid1,
|
||||
Bucket,
|
||||
Key1,
|
||||
riakc_map:to_op(M1)),
|
||||
|
||||
%% P2 Siblings
|
||||
lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p",
|
||||
[Key2, <<"Sonic">>]),
|
||||
M3 = riakc_map:update(
|
||||
{Set2, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Sonic">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid2,
|
||||
Bucket,
|
||||
Key2,
|
||||
riakc_map:to_op(M3)),
|
||||
lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p",
|
||||
[Key1, <<"Demme">>]),
|
||||
M2 = riakc_map:update(
|
||||
{Set1, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Demme">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid1,
|
||||
Bucket,
|
||||
Key1,
|
||||
riakc_map:to_op(M2)),
|
||||
|
||||
lager:info("Delete key from Partition 2: Key ~p", [Key2]),
|
||||
ok = riakc_pb_socket:delete(Pid2, Bucket, Key2),
|
||||
%% P2 Siblings
|
||||
lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p",
|
||||
[Key2, <<"Sonic">>]),
|
||||
M3 = riakc_map:update(
|
||||
{Set2, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Sonic">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid2,
|
||||
Bucket,
|
||||
Key2,
|
||||
riakc_map:to_op(M3)),
|
||||
|
||||
lager:info("Writing to Partition 2 Set 2: after delete: Key ~p | Char"
|
||||
" ~p", [Key2, <<"Crash">>]),
|
||||
M4 = riakc_map:update(
|
||||
{Set2, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Crash">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid2,
|
||||
Bucket,
|
||||
Key2,
|
||||
riakc_map:to_op(M4)),
|
||||
lager:info("Delete key from Partition 2: Key ~p", [Key2]),
|
||||
ok = riakc_pb_socket:delete(Pid2, Bucket, Key2),
|
||||
|
||||
lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p",
|
||||
[Key2, <<"Mario">>]),
|
||||
M5 = riakc_map:update(
|
||||
{Set2, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Mario">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid2,
|
||||
Bucket,
|
||||
Key2,
|
||||
riakc_map:to_op(M5)),
|
||||
lager:info("Writing to Partition 2 Set 2: after delete: Key ~p | Char"
|
||||
" ~p", [Key2, <<"Crash">>]),
|
||||
M4 = riakc_map:update(
|
||||
{Set2, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Crash">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid2,
|
||||
Bucket,
|
||||
Key2,
|
||||
riakc_map:to_op(M4)),
|
||||
|
||||
lager:info("Writing to Partition 2 Set 1: Key ~p | Director ~p",
|
||||
[Key1, <<"Klimov">>]),
|
||||
M6 = riakc_map:update(
|
||||
{Set1, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Klimov">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid2,
|
||||
Bucket,
|
||||
Key1,
|
||||
riakc_map:to_op(M6)),
|
||||
ok
|
||||
after
|
||||
rt:heal(Partition)
|
||||
end,
|
||||
lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p",
|
||||
[Key2, <<"Mario">>]),
|
||||
M5 = riakc_map:update(
|
||||
{Set2, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Mario">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid2,
|
||||
Bucket,
|
||||
Key2,
|
||||
riakc_map:to_op(M5)),
|
||||
|
||||
lager:info("Writing to Partition 2 Set 1: Key ~p | Director ~p",
|
||||
[Key1, <<"Klimov">>]),
|
||||
M6 = riakc_map:update(
|
||||
{Set1, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Klimov">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid2,
|
||||
Bucket,
|
||||
Key1,
|
||||
riakc_map:to_op(M6)),
|
||||
|
||||
rt:heal(Partition),
|
||||
rt:wait_until_transfers_complete(Cluster),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
|
||||
%% Verify Counts
|
||||
lager:info("Verify counts and operations after heal + transfers + commits"),
|
||||
drain_and_commit(Cluster, Index).
|
||||
|
||||
{ok, MF1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key1),
|
||||
Keys = riakc_map:fetch_keys(MF1),
|
||||
?assertEqual(1, length(Keys)),
|
||||
?assert(riakc_map:is_key({<<"directors">>, set}, MF1)),
|
||||
{ok, {search_results, Results1, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"directors_set:*">>),
|
||||
lager:info("Search movies map directors_set:*: ~p~n", [Results1]),
|
||||
?assertEqual(3, length(proplists:lookup_all(<<"directors_set">>,
|
||||
?GET(?INDEX, Results1)))),
|
||||
validate_sample_data(Pid, Key, Index) ->
|
||||
try
|
||||
Thing = <<"thing">>,
|
||||
|
||||
{ok, MF2} = riakc_pb_socket:fetch_type(Pid, Bucket, Key2),
|
||||
Keys2 = riakc_map:fetch_keys(MF2),
|
||||
?assertEqual(1, length(Keys2)),
|
||||
?assert(riakc_map:is_key({<<"characters">>, set}, MF2)),
|
||||
{ok, {search_results, Results2, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"characters_set:*">>),
|
||||
lager:info("Search games map characters_set:*: ~p~n", [Results2]),
|
||||
?assertEqual(2, length(proplists:lookup_all(<<"characters_set">>,
|
||||
?GET(?INDEX, Results2)))),
|
||||
{ok, {search_results, Results1a, _, Found1}} = riakc_pb_socket:search(
|
||||
Pid, Index, <<"name_register:Chris*">>),
|
||||
?assertEqual(1, Found1),
|
||||
|
||||
{ok, {search_results, Results3, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"_yz_vtag:*">>),
|
||||
lager:info("Search vtags in search *:*: ~p~n", [Results3]),
|
||||
?assertEqual(0, length(Results3)).
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results1a)),
|
||||
Key),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results1a)),
|
||||
Thing),
|
||||
|
||||
{ok, {search_results, Results2a, _, Found2}} = riakc_pb_socket:search(
|
||||
Pid, Index, <<"interests_set:thing*">>),
|
||||
?assertEqual(1, Found2),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results2a)),
|
||||
Key),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results2a)),
|
||||
Thing),
|
||||
|
||||
{ok, {search_results, Results3a, _, Found3}} = riakc_pb_socket:search(
|
||||
Pid, Index, <<"_yz_rb:testbucket">>),
|
||||
?assertEqual(1, Found3),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results3a)),
|
||||
Key),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results3a)),
|
||||
Thing),
|
||||
|
||||
%% Redo queries and check if results are equal
|
||||
{ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index, <<"name_register:Chris*">>),
|
||||
?assertEqual(number_of_fields(Results1a, Index),
|
||||
number_of_fields(Results1b, Index)),
|
||||
|
||||
{ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index, <<"interests_set:thing*">>),
|
||||
?assertEqual(number_of_fields(Results2a, Index),
|
||||
number_of_fields(Results2b, Index)),
|
||||
|
||||
{ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index, <<"_yz_rb:testbucket">>),
|
||||
?assertEqual(number_of_fields(Results3a, Index),
|
||||
number_of_fields(Results3b, Index)),
|
||||
|
||||
true
|
||||
catch Err:Reason ->
|
||||
lager:info("Waiting for CRDT search results to converge. Error"
|
||||
" was ~p.", [{Err, Reason}]),
|
||||
false
|
||||
end.
|
||||
|
||||
validate_test_repeat_set(Pid, Index) ->
|
||||
try
|
||||
{ok, {search_results, _R, _, Found}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"update_register:*">>),
|
||||
?assertEqual(1, Found),
|
||||
|
||||
true
|
||||
catch Err:Reason ->
|
||||
lager:info("Waiting for CRDT search results to converge. Error"
|
||||
" was ~p.", [{Err, Reason}]),
|
||||
false
|
||||
end.
|
||||
|
||||
validate_test_siblings(Pid, Bucket, Index) ->
|
||||
try
|
||||
Key1 = <<"Movies">>,
|
||||
Key2 = <<"Games">>,
|
||||
|
||||
{ok, MF1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key1),
|
||||
Keys = riakc_map:fetch_keys(MF1),
|
||||
?assertEqual(1, length(Keys)),
|
||||
?assert(riakc_map:is_key({<<"directors">>, set}, MF1)),
|
||||
{ok, {search_results, Results1, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"directors_set:*">>),
|
||||
lager:info("Search movies map directors_set:*: ~p~n", [Results1]),
|
||||
?assertEqual(3, length(proplists:lookup_all(<<"directors_set">>,
|
||||
?GET(?INDEX, Results1)))),
|
||||
|
||||
{ok, MF2} = riakc_pb_socket:fetch_type(Pid, Bucket, Key2),
|
||||
Keys2 = riakc_map:fetch_keys(MF2),
|
||||
?assertEqual(1, length(Keys2)),
|
||||
?assert(riakc_map:is_key({<<"characters">>, set}, MF2)),
|
||||
{ok, {search_results, Results2, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"characters_set:*">>),
|
||||
lager:info("Search games map characters_set:*: ~p~n", [Results2]),
|
||||
?assertEqual(2, length(proplists:lookup_all(<<"characters_set">>,
|
||||
?GET(?INDEX, Results2)))),
|
||||
|
||||
{ok, {search_results, Results3, _, Found}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"_yz_vtag:*">>),
|
||||
lager:info("Search vtags in search *:*: ~p~n", [Results3]),
|
||||
?assertEqual(0, Found),
|
||||
true
|
||||
catch Err:Reason ->
|
||||
lager:info("Waiting for CRDT search results to converge. Error"
|
||||
" was ~p.", [{Err, Reason}]),
|
||||
false
|
||||
end.
|
||||
|
||||
%% @private
|
||||
drain_and_commit(Cluster, Index) ->
|
||||
yokozuna_rt:drain_solrqs(Cluster),
|
||||
yokozuna_rt:commit(Cluster, Index).
|
||||
|
||||
%% @private
|
||||
number_of_fields(Resp, Index) ->
|
||||
@ -542,3 +540,21 @@ make_intercepts_tab(Node) ->
|
||||
SupPid = rpc:call(Node, erlang, whereis, [sasl_safe_sup]),
|
||||
intercepts_tab = rpc:call(Node, ets, new, [intercepts_tab, [named_table,
|
||||
public, set, {heir, SupPid, {}}]]).
|
||||
|
||||
%% @private
|
||||
search_and_validate_found(Pid, Index, Search, ExpectedCount) ->
|
||||
ok = rt:wait_until(
|
||||
fun() ->
|
||||
try
|
||||
{ok, {search_results, Results2, _, F}} =
|
||||
riakc_pb_socket:search(Pid, Index, Search),
|
||||
?assertEqual(ExpectedCount, F),
|
||||
true
|
||||
catch Err:Reason ->
|
||||
lager:info("Waiting for CRDT search results to"
|
||||
" converge. Error was ~p.",
|
||||
[{Err, Reason}]),
|
||||
false
|
||||
end
|
||||
end).
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user