Added test for reduced n/read repair interaction.

The added tests ensure that even when forcing read-reapir, reduced objects
are in the proper places and correct values are returned to the clients.
This commit is contained in:
Micah Warren 2013-12-12 16:24:53 -06:00 committed by Christopher Meiklejohn
parent b32d3852bb
commit 49c78e3926
2 changed files with 360 additions and 0 deletions

View File

@ -0,0 +1,89 @@
-module(riak_repl_reduced_intercepts).
-include("intercept.hrl").
-export([recv_get_report/0, recv_get_report/1, report_mutate_get/1,
register_as_target/0, get_all_reports/0, get_all_reports/1]).
-export([report_mutate_put/5, recv_put_report/0, recv_put_report/1,
put_all_reports/0, put_all_reports/1]).
-define(M, riak_repl_reduced_orig).
report_mutate_get(InObject) ->
Node = node(),
Pid = self(),
TargetNode = 'riak_test@127.0.0.1',
TargetProcess = reduced_intercept_target,
{TargetProcess, TargetNode} ! {report_mutate_get, Node, Pid, InObject},
?M:mutate_get_orig(InObject).
report_mutate_put(InMeta, InVal, RevMeta, Obj, Props) ->
Node = node(),
Pid = self(),
TargetNode = 'riak_test@127.0.0.1',
TargetProcess = reduced_intercept_target,
{TargetProcess, TargetNode} ! {report_mutate_put, Node, Pid, InMeta, InVal},
?M:mutate_put_orig(InMeta, InVal, RevMeta, Obj, Props).
register_as_target() ->
Self = self(),
case whereis(reduced_intercept_target) of
Self ->
true;
undefined ->
register(reduced_intercept_target, Self);
_NotSelf ->
unregister(reduced_intercept_target),
register(reduced_intercept_target, Self)
end.
recv_get_report() ->
recv_get_report(5000).
recv_get_report(Timeout) ->
receive
{report_mutate_get, Node, Pid, InObject} ->
{Node, Pid, InObject}
after Timeout ->
{error, timeout}
end.
get_all_reports() ->
get_all_reports(5000).
get_all_reports(Timeout) ->
get_all_reports(Timeout, []).
get_all_reports(Timeout, Acc) ->
case recv_get_report(Timeout) of
{error, timeout} ->
lists:reverse(Acc);
Report ->
get_all_reports(Timeout, [Report | Acc])
end.
recv_put_report() ->
recv_put_report(5000).
recv_put_report(Timeout) ->
receive
{report_mutate_put, Node, Pid, InMeta, InVal} ->
{Node, Pid, InMeta, InVal}
after Timeout ->
{error, timeout}
end.
put_all_reports() ->
put_all_reports(5000).
put_all_reports(Timeout) ->
put_all_reports(Timeout, []).
put_all_reports(Timeout, Acc) ->
case recv_put_report(Timeout) of
{error, timeout} ->
lists:reverse(Acc);
Report ->
put_all_reports(Timeout, [Report | Acc])
end.

View File

@ -185,6 +185,226 @@ data_push_test_() ->
] end}}.
read_repair_interaction() ->
Tests = read_repair_interaction_test_(),
case eunit:test(Tests, [verbose]) of
ok -> pass;
error -> exit(error), fail
end.
read_repair_interaction_test_() ->
{timeout, rt_cascading:timeout(100000), {setup, fun() ->
Nodes = rt:deploy_nodes(6, conf()),
{[N1 | _] = C123, [N4 | _] = C456} = lists:split(3, Nodes),
repl_util:make_cluster(C123),
repl_util:name_cluster(N1, "c123"),
repl_util:make_cluster(C456),
repl_util:name_cluster(N4, "c456"),
Port = repl_util:get_cluster_mgr_port(N4),
lager:info("attempting to connect ~p to c456 on port ~p", [N1, Port]),
repl_util:connect_rt(N1, Port, "c456"),
#data_push_test{nodes = Nodes, c123 = C123, c456 = C456}
end,
fun(State) ->
case rt_config:config_or_os_env(skip_teardown, false) of
"false" ->
rt:clean_cluster(State#data_push_test.nodes);
false ->
rt:clean_cluster(State#data_push_test.nodes);
_ ->
ok
end
end,
fun(State) -> [
{"load up sink cluster", timeout, rt_cascading:timeout(100000), fun() ->
#data_push_test{c123 = [N1 | _], c456 = [N4 | _]} = State,
lager:info("setting full objects to 1"),
rpc:call(N4, riak_repl_console, full_objects, [["1"]]),
WaitFun = fun(Node) ->
case rpc:call(Node, riak_repl_console, full_objects, [[]]) of
1 ->
true;
Uh ->
lager:info("~p got ~p", [Node, Uh]),
false
end
end,
[rt:wait_until(Node, WaitFun) || Node <- State#data_push_test.c456],
lager:info("putting an object on ~p", [N1]),
Client123 = rt:pbc(N1),
Bin = <<"before repl reduction, this is a binary">>,
Key = <<"rrit">>,
Bucket = <<"rrit_objects">>,
Obj = riakc_obj:new(Bucket, Key, Bin),
riakc_pb_socket:put(Client123, Obj, [{w,3}]),
riakc_pb_socket:stop(Client123),
lager:info("Checking object on sink cluster"),
Got = lists:map(fun(Node) ->
lager:info("maybe eventualliy exists on ~p", [Node]),
maybe_eventually_exists(Node, Bucket, Key)
end, State#data_push_test.c456),
?assertMatch([{ok, _}, {ok, _}, {ok, _}], Got),
lists:map(fun({ok, GotObj}) ->
Value = riakc_obj:get_value(GotObj),
?assertEqual(Bin, Value)
end, Got)
end},
{"read repair in the small", timeout, rt_cascading:timeout(1000000), fun() ->
Key = <<"rrit">>,
Bucket = <<"rrit_objects">>,
Bin = <<"before repl reduction, this is a binary">>,
riak_repl_reduced_intercepts:register_as_target(),
lists:map(fun(Node) ->
rt_intercept:add(Node, {riak_repl_reduced, [
{{mutate_get, 1}, report_mutate_get},
{{mutate_put, 5}, report_mutate_put}
]})
end, State#data_push_test.c456),
[N4 | _] = State#data_push_test.c456,
Client456 = rt:pbc(N4),
% set the nval higher, which make the below have read repair
% end up being forced
riakc_pb_socket:set_bucket(Client456, Bucket, [{n_val, 5}]),
% getting the vlaud once with a smaller pr should trigger the
% read repair, which should have puts.
riakc_pb_socket:get(Client456, Bucket, Key, [{pr, 3}]),
% the n_val is 5, but there's only 3 actual data's. This means
% there should be 5 total mutate_gets (1 real, 2 promoise,
% 2 from promise getting real)
MutateGets1 = riak_repl_reduced_intercepts:get_all_reports(),
?assertEqual(5, length(MutateGets1)),
% 3 of those gets are from the primary, so they are exactly
% the same
?assertEqual(3, length(lists:usort(MutateGets1))),
% and since 3 of those are the real, we should have 3 copies
% of the full bin if we concat all the values together.
ExpectedValue1 = <<Bin/binary, Bin/binary, Bin/binary>>,
GotValue1 = lists:foldl(fun({_Node, _Pid, Object}, Acc) ->
Value = riak_object:get_value(Object),
<<Acc/binary, Value/binary>>
end, <<>>, MutateGets1),
?assertEqual(ExpectedValue1, GotValue1),
% and then 2 mutate puts due t
MutatePuts1 = riak_repl_reduced_intercepts:put_all_reports(),
?assertEqual(2, length(MutatePuts1)),
% next time a get is done, 5 vnodes will reply with found
% data, 4 of those being reduced. This means there are 9
% totoal get requests.
riakc_pb_socket:get(Client456, Bucket, Key),
MutateGets2 = riak_repl_reduced_intercepts:get_all_reports(),
?assertEqual(9, length(MutateGets2)),
% five of those (1 from request directly, 4 from promise objs)
% are get requests to the primary.
?assertEqual(5, length(lists:usort(MutateGets2))),
ExpectedValue2 = lists:foldl(fun(_, Acc) ->
<<Acc/binary, Bin/binary>>
end, <<>>, lists:seq(1, 5)),
GotValue2 = lists:foldl(fun({_Node, _Pid, Object}, Acc) ->
Value = riak_object:get_value(Object),
<<Acc/binary, Value/binary>>
end, <<>>, MutateGets2),
?assertEqual(ExpectedValue2, GotValue2)
end},
{"read repair interacton in the large", timeout, rt_cascading:timeout(1000000), fun() ->
Bucket = <<"rrit_objects_lots">>,
Bin = <<"datadatadata">>,
% to avoid overloading the message queue here, we're going
% to reset the intercepts
lists:map(fun(Node) ->
rt_intercept:add(Node, {riak_repl_reduced, []})
end, State#data_push_test.c456),
FillSize = case rt_config:config_or_os_env(fill_size, 1000) of
FS when is_integer(FS) ->
FS;
FS when is_list(FS) ->
list_to_integer(FS)
end,
% Fill the source node and let the sink get it all
lager:info("filling c123 cluster"),
write(State#data_push_test.c123, 1, FillSize, Bucket, Bin),
WaitForRtqEmptyFun = fun(Node) ->
rpc:call(Node, riak_repl2_rtq, all_queues_empty, [])
end,
lists:map(fun(N) ->
rt:wait_until(N, WaitForRtqEmptyFun)
end, State#data_push_test.c123),
% read all the things, ensuring read-repair happens
% as expected
lists:map(fun(Node) ->
rt_intercept:add(Node, {riak_repl_reduced, [
{{mutate_get, 1}, report_mutate_get},
{{mutate_put, 5}, report_mutate_put}
]})
end, State#data_push_test.c456),
riak_repl_reduced_intercepts:register_as_target(),
riak_repl_reduced_intercepts:put_all_reports(),
riak_repl_reduced_intercepts:get_all_reports(),
AssertFirstRead = fun
(Key, {ok, _Obj}) ->
MutateGets = riak_repl_reduced_intercepts:get_all_reports(),
?assertEqual(5, length(MutateGets)),
?assertEqual(3, length(lists:usort(MutateGets))),
ExpectValue = <<Bin/binary, Bin/binary, Bin/binary>>,
GotValue = lists:foldl(fun({_Node, _Pid, Object}, Acc) ->
Value = case riak_object:get_values(Object) of
[V] -> V;
Values ->
lager:warning("key ~p has mutltiple values in the object: ~p", [Key, Object]),
lists:foldl(fun(B,Ba) ->
<<B/binary, Ba/binary>>
end, <<>>, Values)
end,
<<Acc/binary, Value/binary>>
end, <<>>, MutateGets),
?assertEqual(ExpectValue, GotValue),
MutatePuts = riak_repl_reduced_intercepts:put_all_reports(),
?assertEqual(2, length(MutatePuts));
(Key, Error) ->
lager:warning("Didn't get ~p as expected: ~p", [Key, Error]),
?assertMatch({ok, _}, Error)
end,
AssertSecondRead = fun
(_Key, {ok, _Obj}) ->
MutateGets = riak_repl_reduced_intercepts:get_all_reports(),
?assertEqual(9, length(MutateGets)),
?assertEqual(5, length(lists:usort(MutateGets))),
ExpectValue = <<Bin/binary, Bin/binary, Bin/binary, Bin/binary, Bin/binary>>,
GotValue = lists:foldl(fun({_Node, _Pid, Object}, Acc) ->
Value = riak_object:get_value(Object),
<<Acc/binary, Value/binary>>
end, <<>>, MutateGets),
?assertEqual(ExpectValue, GotValue),
MutatePuts = riak_repl_reduced_intercepts:put_all_reports(),
?assertEqual(0, length(MutatePuts));
(Key, Error) ->
lager:warning("Didn't get ~p as expected: ~p", [Key, Error]),
?assertMatch({ok, _}, Error)
end,
Client456 = rt:pbc(hd(State#data_push_test.c456)),
riakc_pb_socket:set_bucket(Client456, Bucket, [{n_val, 5}]),
riakc_pb_socket:stop(Client456),
lager:info("assert first read group"),
read(State#data_push_test.c456, 1, FillSize, Bucket, AssertFirstRead),
lager:info("assert second read group"),
read(State#data_push_test.c456, 1, FillSize, Bucket, AssertSecondRead)
end}
] end}}.
conf() ->
[{lager, [
{handlers, [
@ -254,3 +474,54 @@ maybe_eventually_exists({ok, _RiakObj} = Out, _Nodes, _Bucket, _Key, _Timeout, _
maybe_eventually_exists(Got, _Nodes, _Bucket, _Key, _Timeout, _WaitMs) ->
Got.
write(NodeList, Start, Stop, Bucket, Data) when is_list(NodeList) ->
SocketQueue = make_socket_queue(NodeList),
write(SocketQueue, Start, Stop, Bucket, Data);
write(SocketQueue, N, Stop, _Bucket, _Data) when N > Stop ->
stop_socket_queue(SocketQueue),
ok;
write(SocketQueue, N, Stop, Bucket, Data) ->
{Socket, NextQueue} = next_socket(SocketQueue),
Key = list_to_binary(integer_to_list(N)),
Obj = riakc_obj:new(Bucket, Key, Data),
riakc_pb_socket:put(Socket, Obj, [{w, 3}]),
NextN = N + 1,
write(NextQueue, NextN, Stop, Bucket, Data).
read(NodeList, Start, Stop, Bucket, AssertFun) when is_list(NodeList) ->
SocketQueue = make_socket_queue(NodeList),
read(SocketQueue, Start, Stop, Bucket, AssertFun);
read(SocketQueue, N, Stop, _Bucket, _AssertFun) when N > Stop ->
stop_socket_queue(SocketQueue),
ok;
read(SocketQueue, N, Stop, Bucket, AssertFun) ->
{Socket, SocketQueue2} = next_socket(SocketQueue),
Key = list_to_binary(integer_to_list(N)),
Got = riakc_pb_socket:get(Socket, Bucket, Key, [{pr, 3}]),
AssertFun(Key, Got),
NextN = N + 1,
read(SocketQueue2, NextN, Stop, Bucket, AssertFun).
make_socket_queue(Nodes) ->
Sockets = lists:map(fun(Node) ->
rt:pbc(Node)
end, Nodes),
queue:from_list(Sockets).
stop_socket_queue(SocketQueue) ->
Sockets = queue:to_list(SocketQueue),
lists:map(fun(Socket) ->
riakc_pb_socket:stop(Socket)
end, Sockets).
next_socket(SocketQueue) ->
{{value, Socket}, ShorterQueue} = queue:out(SocketQueue),
NextQueue = queue:in(Socket, ShorterQueue),
{Socket, NextQueue}.