test deletions w/ yz_crdt + tombstones + delete_mode -> keep

This commit is contained in:
Zeeshan Lakhani 2015-09-11 09:09:54 -04:00
parent f37f69007b
commit ecae32da10
2 changed files with 132 additions and 2 deletions

View File

@ -2187,6 +2187,11 @@ stop_tracing() ->
dbg:stop_clear(),
ok.
get_primary_preflist(Node, Bucket, Key, NVal) ->
DocIdx = rpc:call(Node, riak_core_util, chash_std_keyfun, [{Bucket, Key}]),
PL = rpc:call(Node, riak_core_apl, get_primary_apl, [DocIdx, NVal, riak_kv]),
{ok, PL}.
%% @doc Trace fun calls and store their count state into an ETS table.
-spec trace_count({trace, pid(), call|return_from,
{atom(), atom(), non_neg_integer()}}, {node(), [node()]}) ->

View File

@ -10,12 +10,15 @@
-define(KEY, "Chris Meiklejohn").
-define(BUCKET, {?TYPE, <<"testbucket">>}).
-define(GET(K,L), proplists:get_value(K, L)).
-define(N, 3).
-define(CONF,
[
{riak_core,
[{ring_creation_size, 8}]
},
{riak_kv,
[{delete_mode, keep}]},
{yokozuna,
[{enabled, true}]
}]).
@ -39,10 +42,10 @@ confirm() ->
rt:create_and_activate_bucket_type(Node,
?TYPE,
[{datatype, map},
{n_val, ?N},
{search_index, ?INDEX}]),
%% Write some sample data.
Map1 = riakc_map:update(
{<<"name">>, register},
fun(R) ->
@ -59,8 +62,13 @@ confirm() ->
?KEY,
riakc_map:to_op(Map2)),
yokozuna_rt:drain_solrqs(Nodes),
yokozuna_rt:commit(Nodes, ?INDEX),
%% Wait for yokozuna index to trigger.
timer:sleep(1100),
%% Perform simple queries, check for register, set fields.
ok = rt:wait_until(
fun() ->
@ -69,6 +77,9 @@ confirm() ->
%% Stop PB connection.
riakc_pb_socket:stop(Pid),
%% Clean cluster.
rt:clean_cluster(Nodes),
pass.
validate_search_results(Pid) ->
@ -119,7 +130,121 @@ validate_search_results(Pid) ->
catch Err:Reason ->
lager:info("Waiting for CRDT search results to converge. Error was ~p.", [{Err, Reason}]),
false
end.
end,
test_repeat_sets(Pid, ?BUCKET, ?KEY),
test_delete(Pid, ?BUCKET, ?KEY),
pass.
test_repeat_sets(Pid, Bucket, Key) ->
{ok, M1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key),
M2 = riakc_map:update(
{<<"update">>, register},
fun(R) ->
riakc_register:set(<<"foo">>, R)
end, M1),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key,
riakc_map:to_op(M2)),
M3 = riakc_map:update(
{<<"update">>, register},
fun(R) ->
riakc_register:set(<<"bar">>, R)
end, M1),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key,
riakc_map:to_op(M3)),
timer:sleep(1100),
{ok, {search_results, Results, _, _}} = riakc_pb_socket:search(
Pid, ?INDEX,
<<"update_register:*">>),
lager:info("Search update_register:*: ~p~n", [Results]),
?assertEqual(1, length(Results)).
test_delete(Pid, Bucket, Key) ->
{ok, M1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key),
M2 = riakc_map:erase({<<"name">>, register}, M1),
M3 = riakc_map:update(
{<<"interests">>, set},
fun(S) ->
riakc_set:del_element(<<"thing">>,
riakc_set:add_element(<<"roses">>, S))
end, M2),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key,
riakc_map:to_op(M3)),
timer:sleep(1100),
{ok, {search_results, Results1, _, _}} = riakc_pb_socket:search(
Pid, ?INDEX,
<<"name_register:*">>),
lager:info("Search deleted/erased name_register:*: ~p~n", [Results1]),
?assertEqual(0, length(Results1)),
M4 = riakc_map:update(
{<<"interests">>, set},
fun(S) ->
riakc_set:add_element(<<"pans">>, S)
end, M3),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key,
riakc_map:to_op(M4)),
timer:sleep(1100),
{ok, {search_results, Results2, _, _}} = riakc_pb_socket:search(
Pid, ?INDEX,
<<"interests_set:thing*">>),
lager:info("Search deleted interests_set:thing*: ~p~n", [Results2]),
?assertEqual(0, length(Results2)),
lager:info("Delete key for map"),
?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key)),
timer:sleep(1100),
?assertEqual({error, {notfound, map}}, riakc_pb_socket:fetch_type(Pid, Bucket, Key)),
{ok, {search_results, Results3, _, _}} = riakc_pb_socket:search(
Pid, ?INDEX,
<<"*:*">>),
lager:info("Search deleted map *:*: ~p~n", [Results3]),
?assertEqual(0, length(Results3)),
M5 = riakc_map:update(
{<<"name">>, register},
fun(R) ->
riakc_register:set(<<"hello">>, R)
end, riakc_map:new()),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key,
riakc_map:to_op(M5)),
timer:sleep(1100),
{ok, _} = riakc_pb_socket:fetch_type(Pid, Bucket, Key),
{ok, {search_results, Results4, _, _}} = riakc_pb_socket:search(
Pid, ?INDEX,
<<"*:*">>),
lager:info("Search recreated map *:*: ~p~n", [Results4]),
?assertEqual(1, length(Results4)).
%% @private
number_of_fields(Resp) ->