diff --git a/intercepts/riak_repl_reduced_intercepts.erl b/intercepts/riak_repl_reduced_intercepts.erl new file mode 100644 index 00000000..f40b6a4a --- /dev/null +++ b/intercepts/riak_repl_reduced_intercepts.erl @@ -0,0 +1,89 @@ +-module(riak_repl_reduced_intercepts). + +-include("intercept.hrl"). + +-export([recv_get_report/0, recv_get_report/1, report_mutate_get/1, + register_as_target/0, get_all_reports/0, get_all_reports/1]). +-export([report_mutate_put/5, recv_put_report/0, recv_put_report/1, + put_all_reports/0, put_all_reports/1]). + +-define(M, riak_repl_reduced_orig). + +report_mutate_get(InObject) -> + Node = node(), + Pid = self(), + TargetNode = 'riak_test@127.0.0.1', + TargetProcess = reduced_intercept_target, + {TargetProcess, TargetNode} ! {report_mutate_get, Node, Pid, InObject}, + ?M:mutate_get_orig(InObject). + +report_mutate_put(InMeta, InVal, RevMeta, Obj, Props) -> + Node = node(), + Pid = self(), + TargetNode = 'riak_test@127.0.0.1', + TargetProcess = reduced_intercept_target, + {TargetProcess, TargetNode} ! {report_mutate_put, Node, Pid, InMeta, InVal}, + ?M:mutate_put_orig(InMeta, InVal, RevMeta, Obj, Props). + +register_as_target() -> + Self = self(), + case whereis(reduced_intercept_target) of + Self -> + true; + undefined -> + register(reduced_intercept_target, Self); + _NotSelf -> + unregister(reduced_intercept_target), + register(reduced_intercept_target, Self) + end. + +recv_get_report() -> + recv_get_report(5000). + +recv_get_report(Timeout) -> + receive + {report_mutate_get, Node, Pid, InObject} -> + {Node, Pid, InObject} + after Timeout -> + {error, timeout} + end. + +get_all_reports() -> + get_all_reports(5000). + +get_all_reports(Timeout) -> + get_all_reports(Timeout, []). + +get_all_reports(Timeout, Acc) -> + case recv_get_report(Timeout) of + {error, timeout} -> + lists:reverse(Acc); + Report -> + get_all_reports(Timeout, [Report | Acc]) + end. + +recv_put_report() -> + recv_put_report(5000). + +recv_put_report(Timeout) -> + receive + {report_mutate_put, Node, Pid, InMeta, InVal} -> + {Node, Pid, InMeta, InVal} + after Timeout -> + {error, timeout} + end. + +put_all_reports() -> + put_all_reports(5000). + +put_all_reports(Timeout) -> + put_all_reports(Timeout, []). + +put_all_reports(Timeout, Acc) -> + case recv_put_report(Timeout) of + {error, timeout} -> + lists:reverse(Acc); + Report -> + put_all_reports(Timeout, [Report | Acc]) + end. + diff --git a/tests/repl_reduced.erl b/tests/repl_reduced.erl index 2cb0f996..f2212fe4 100644 --- a/tests/repl_reduced.erl +++ b/tests/repl_reduced.erl @@ -185,6 +185,226 @@ data_push_test_() -> ] end}}. +read_repair_interaction() -> + Tests = read_repair_interaction_test_(), + case eunit:test(Tests, [verbose]) of + ok -> pass; + error -> exit(error), fail + end. + +read_repair_interaction_test_() -> + {timeout, rt_cascading:timeout(100000), {setup, fun() -> + Nodes = rt:deploy_nodes(6, conf()), + {[N1 | _] = C123, [N4 | _] = C456} = lists:split(3, Nodes), + repl_util:make_cluster(C123), + repl_util:name_cluster(N1, "c123"), + repl_util:make_cluster(C456), + repl_util:name_cluster(N4, "c456"), + Port = repl_util:get_cluster_mgr_port(N4), + lager:info("attempting to connect ~p to c456 on port ~p", [N1, Port]), + repl_util:connect_rt(N1, Port, "c456"), + #data_push_test{nodes = Nodes, c123 = C123, c456 = C456} + end, + fun(State) -> + case rt_config:config_or_os_env(skip_teardown, false) of + "false" -> + rt:clean_cluster(State#data_push_test.nodes); + false -> + rt:clean_cluster(State#data_push_test.nodes); + _ -> + ok + end + end, + fun(State) -> [ + + {"load up sink cluster", timeout, rt_cascading:timeout(100000), fun() -> + #data_push_test{c123 = [N1 | _], c456 = [N4 | _]} = State, + lager:info("setting full objects to 1"), + rpc:call(N4, riak_repl_console, full_objects, [["1"]]), + WaitFun = fun(Node) -> + case rpc:call(Node, riak_repl_console, full_objects, [[]]) of + 1 -> + true; + Uh -> + lager:info("~p got ~p", [Node, Uh]), + false + end + end, + [rt:wait_until(Node, WaitFun) || Node <- State#data_push_test.c456], + lager:info("putting an object on ~p", [N1]), + Client123 = rt:pbc(N1), + Bin = <<"before repl reduction, this is a binary">>, + Key = <<"rrit">>, + Bucket = <<"rrit_objects">>, + Obj = riakc_obj:new(Bucket, Key, Bin), + riakc_pb_socket:put(Client123, Obj, [{w,3}]), + riakc_pb_socket:stop(Client123), + lager:info("Checking object on sink cluster"), + Got = lists:map(fun(Node) -> + lager:info("maybe eventualliy exists on ~p", [Node]), + maybe_eventually_exists(Node, Bucket, Key) + end, State#data_push_test.c456), + ?assertMatch([{ok, _}, {ok, _}, {ok, _}], Got), + lists:map(fun({ok, GotObj}) -> + Value = riakc_obj:get_value(GotObj), + ?assertEqual(Bin, Value) + end, Got) + end}, + + {"read repair in the small", timeout, rt_cascading:timeout(1000000), fun() -> + Key = <<"rrit">>, + Bucket = <<"rrit_objects">>, + Bin = <<"before repl reduction, this is a binary">>, + riak_repl_reduced_intercepts:register_as_target(), + lists:map(fun(Node) -> + rt_intercept:add(Node, {riak_repl_reduced, [ + {{mutate_get, 1}, report_mutate_get}, + {{mutate_put, 5}, report_mutate_put} + ]}) + end, State#data_push_test.c456), + [N4 | _] = State#data_push_test.c456, + Client456 = rt:pbc(N4), + + % set the nval higher, which make the below have read repair + % end up being forced + riakc_pb_socket:set_bucket(Client456, Bucket, [{n_val, 5}]), + + % getting the vlaud once with a smaller pr should trigger the + % read repair, which should have puts. + riakc_pb_socket:get(Client456, Bucket, Key, [{pr, 3}]), + + % the n_val is 5, but there's only 3 actual data's. This means + % there should be 5 total mutate_gets (1 real, 2 promoise, + % 2 from promise getting real) + MutateGets1 = riak_repl_reduced_intercepts:get_all_reports(), + ?assertEqual(5, length(MutateGets1)), + % 3 of those gets are from the primary, so they are exactly + % the same + ?assertEqual(3, length(lists:usort(MutateGets1))), + % and since 3 of those are the real, we should have 3 copies + % of the full bin if we concat all the values together. + ExpectedValue1 = <>, + GotValue1 = lists:foldl(fun({_Node, _Pid, Object}, Acc) -> + Value = riak_object:get_value(Object), + <> + end, <<>>, MutateGets1), + ?assertEqual(ExpectedValue1, GotValue1), + + % and then 2 mutate puts due t + MutatePuts1 = riak_repl_reduced_intercepts:put_all_reports(), + ?assertEqual(2, length(MutatePuts1)), + + % next time a get is done, 5 vnodes will reply with found + % data, 4 of those being reduced. This means there are 9 + % totoal get requests. + riakc_pb_socket:get(Client456, Bucket, Key), + MutateGets2 = riak_repl_reduced_intercepts:get_all_reports(), + ?assertEqual(9, length(MutateGets2)), + % five of those (1 from request directly, 4 from promise objs) + % are get requests to the primary. + ?assertEqual(5, length(lists:usort(MutateGets2))), + ExpectedValue2 = lists:foldl(fun(_, Acc) -> + <> + end, <<>>, lists:seq(1, 5)), + GotValue2 = lists:foldl(fun({_Node, _Pid, Object}, Acc) -> + Value = riak_object:get_value(Object), + <> + end, <<>>, MutateGets2), + ?assertEqual(ExpectedValue2, GotValue2) + end}, + + {"read repair interacton in the large", timeout, rt_cascading:timeout(1000000), fun() -> + Bucket = <<"rrit_objects_lots">>, + Bin = <<"datadatadata">>, + % to avoid overloading the message queue here, we're going + % to reset the intercepts + lists:map(fun(Node) -> + rt_intercept:add(Node, {riak_repl_reduced, []}) + end, State#data_push_test.c456), + + FillSize = case rt_config:config_or_os_env(fill_size, 1000) of + FS when is_integer(FS) -> + FS; + FS when is_list(FS) -> + list_to_integer(FS) + end, + % Fill the source node and let the sink get it all + lager:info("filling c123 cluster"), + write(State#data_push_test.c123, 1, FillSize, Bucket, Bin), + WaitForRtqEmptyFun = fun(Node) -> + rpc:call(Node, riak_repl2_rtq, all_queues_empty, []) + end, + lists:map(fun(N) -> + rt:wait_until(N, WaitForRtqEmptyFun) + end, State#data_push_test.c123), + + % read all the things, ensuring read-repair happens + % as expected + lists:map(fun(Node) -> + rt_intercept:add(Node, {riak_repl_reduced, [ + {{mutate_get, 1}, report_mutate_get}, + {{mutate_put, 5}, report_mutate_put} + ]}) + end, State#data_push_test.c456), + riak_repl_reduced_intercepts:register_as_target(), + riak_repl_reduced_intercepts:put_all_reports(), + riak_repl_reduced_intercepts:get_all_reports(), + + AssertFirstRead = fun + (Key, {ok, _Obj}) -> + MutateGets = riak_repl_reduced_intercepts:get_all_reports(), + ?assertEqual(5, length(MutateGets)), + ?assertEqual(3, length(lists:usort(MutateGets))), + ExpectValue = <>, + GotValue = lists:foldl(fun({_Node, _Pid, Object}, Acc) -> + Value = case riak_object:get_values(Object) of + [V] -> V; + Values -> + lager:warning("key ~p has mutltiple values in the object: ~p", [Key, Object]), + lists:foldl(fun(B,Ba) -> + <> + end, <<>>, Values) + end, + <> + end, <<>>, MutateGets), + ?assertEqual(ExpectValue, GotValue), + MutatePuts = riak_repl_reduced_intercepts:put_all_reports(), + ?assertEqual(2, length(MutatePuts)); + (Key, Error) -> + lager:warning("Didn't get ~p as expected: ~p", [Key, Error]), + ?assertMatch({ok, _}, Error) + end, + + AssertSecondRead = fun + (_Key, {ok, _Obj}) -> + MutateGets = riak_repl_reduced_intercepts:get_all_reports(), + ?assertEqual(9, length(MutateGets)), + ?assertEqual(5, length(lists:usort(MutateGets))), + ExpectValue = <>, + GotValue = lists:foldl(fun({_Node, _Pid, Object}, Acc) -> + Value = riak_object:get_value(Object), + <> + end, <<>>, MutateGets), + ?assertEqual(ExpectValue, GotValue), + MutatePuts = riak_repl_reduced_intercepts:put_all_reports(), + ?assertEqual(0, length(MutatePuts)); + (Key, Error) -> + lager:warning("Didn't get ~p as expected: ~p", [Key, Error]), + ?assertMatch({ok, _}, Error) + end, + + Client456 = rt:pbc(hd(State#data_push_test.c456)), + riakc_pb_socket:set_bucket(Client456, Bucket, [{n_val, 5}]), + riakc_pb_socket:stop(Client456), + + lager:info("assert first read group"), + read(State#data_push_test.c456, 1, FillSize, Bucket, AssertFirstRead), + lager:info("assert second read group"), + read(State#data_push_test.c456, 1, FillSize, Bucket, AssertSecondRead) + end} + + ] end}}. + conf() -> [{lager, [ {handlers, [ @@ -254,3 +474,54 @@ maybe_eventually_exists({ok, _RiakObj} = Out, _Nodes, _Bucket, _Key, _Timeout, _ maybe_eventually_exists(Got, _Nodes, _Bucket, _Key, _Timeout, _WaitMs) -> Got. + +write(NodeList, Start, Stop, Bucket, Data) when is_list(NodeList) -> + SocketQueue = make_socket_queue(NodeList), + write(SocketQueue, Start, Stop, Bucket, Data); + +write(SocketQueue, N, Stop, _Bucket, _Data) when N > Stop -> + stop_socket_queue(SocketQueue), + ok; + +write(SocketQueue, N, Stop, Bucket, Data) -> + {Socket, NextQueue} = next_socket(SocketQueue), + Key = list_to_binary(integer_to_list(N)), + Obj = riakc_obj:new(Bucket, Key, Data), + riakc_pb_socket:put(Socket, Obj, [{w, 3}]), + NextN = N + 1, + write(NextQueue, NextN, Stop, Bucket, Data). + +read(NodeList, Start, Stop, Bucket, AssertFun) when is_list(NodeList) -> + SocketQueue = make_socket_queue(NodeList), + read(SocketQueue, Start, Stop, Bucket, AssertFun); + +read(SocketQueue, N, Stop, _Bucket, _AssertFun) when N > Stop -> + stop_socket_queue(SocketQueue), + ok; + +read(SocketQueue, N, Stop, Bucket, AssertFun) -> + {Socket, SocketQueue2} = next_socket(SocketQueue), + Key = list_to_binary(integer_to_list(N)), + Got = riakc_pb_socket:get(Socket, Bucket, Key, [{pr, 3}]), + AssertFun(Key, Got), + NextN = N + 1, + read(SocketQueue2, NextN, Stop, Bucket, AssertFun). + + +make_socket_queue(Nodes) -> + Sockets = lists:map(fun(Node) -> + rt:pbc(Node) + end, Nodes), + queue:from_list(Sockets). + +stop_socket_queue(SocketQueue) -> + Sockets = queue:to_list(SocketQueue), + lists:map(fun(Socket) -> + riakc_pb_socket:stop(Socket) + end, Sockets). + +next_socket(SocketQueue) -> + {{value, Socket}, ShorterQueue} = queue:out(SocketQueue), + NextQueue = queue:in(Socket, ShorterQueue), + {Socket, NextQueue}. +