Merge remote-tracking branch 'basho/develop-2.2' into bch-merge-with-2.2

This commit is contained in:
Brett Hazen 2016-03-23 13:12:41 -06:00
commit 9c170fb052
8 changed files with 248 additions and 7 deletions

View File

@ -181,8 +181,8 @@ to tell riak_test about them. The method of choice is to create a
{rtdev_path, [{root, "/home/you/rt/riak"}, {rtdev_path, [{root, "/home/you/rt/riak"},
{current, "/home/you/rt/riak/current"}, {current, "/home/you/rt/riak/current"},
{previous, "/home/you/rt/riak/riak-2.0.6"}, {previous, "/home/you/rt/riak/riak-2.0.6"},
{legacy, "/home/you/rt/riak/riak-1.4.12"} {legacy, "/home/you/rt/riak/riak-1.4.12"},
{'2.0.2', "/home/you/rt/riak/riak-2.0.2"} {'2.0.2', "/home/you/rt/riak/riak-2.0.2"},
{'2.0.4', "/home/you/rt/riak/riak-2.0.4"} {'2.0.4', "/home/you/rt/riak/riak-2.0.4"}
]} ]}
]}. ]}.

View File

@ -22,7 +22,7 @@
%% Export explicit API but also send compile directive to export all %% Export explicit API but also send compile directive to export all
%% because some of these private functions are useful in their own %% because some of these private functions are useful in their own
%% right. %% right.
-export([add/3, add/4]). -export([add/3, add/4, clean/1]).
-compile(export_all). -compile(export_all).
-type abstract_code() :: term(). -type abstract_code() :: term().
@ -54,7 +54,7 @@
%% functions. %% functions.
%% %%
%% E.g. `[{{update_perform,2}, sleep_update_perform}]' %% E.g. `[{{update_perform,2}, sleep_update_perform}]'
-spec add(module(), module(), mapping()) -> ok. -spec add(module(), module(), mapping(), string()) -> ok.
add(Target, Intercept, Mapping, OutDir) -> add(Target, Intercept, Mapping, OutDir) ->
Original = ?ORIGINAL(Target), Original = ?ORIGINAL(Target),
TargetAC = get_abstract_code(Target), TargetAC = get_abstract_code(Target),
@ -66,9 +66,22 @@ add(Target, Intercept, Mapping, OutDir) ->
ok = compile_and_load(Original, OrigAC, OutDir), ok = compile_and_load(Original, OrigAC, OutDir),
ok = compile_and_load(Target, ProxyAC, OutDir). ok = compile_and_load(Target, ProxyAC, OutDir).
-spec add(module(), module(), mapping()) -> ok.
add(Target, Intercept, Mapping) -> add(Target, Intercept, Mapping) ->
add(Target, Intercept, Mapping, undefined). add(Target, Intercept, Mapping, undefined).
%% @doc Cleanup proxy and backuped original module
-spec clean(module()) -> ok|{error, term()}.
clean(Target) ->
_ = code:purge(Target),
_ = code:purge(?ORIGINAL(Target)),
case code:load_file(Target) of
{module, Target} ->
ok;
{error, Reason} ->
{error, Reason}
end.
%% @private %% @private
%% %%
%% @doc Compile the abstract code `AC' and load it into the code server. %% @doc Compile the abstract code `AC' and load it into the code server.

View File

@ -159,3 +159,9 @@ corrupting_handle_handoff_data(BinObj0, State) ->
corrupt_binary(O) -> corrupt_binary(O) ->
crypto:rand_bytes(byte_size(O)). crypto:rand_bytes(byte_size(O)).
put_as_readrepair(Preflist, BKey, Obj, ReqId, StartTime, Options) ->
?M:put_orig(Preflist, BKey, Obj, ReqId, StartTime, [rr | Options]).
coord_put_as_readrepair(Preflist, BKey, Obj, ReqId, StartTime, Options) ->
?M:coord_put_orig(Preflist, BKey, Obj, ReqId, StartTime, [rr | Options]).

View File

@ -78,6 +78,12 @@ add(Node, {Target, Intercept, Mapping}, OutDir) ->
NMapping = [transform_anon_fun(M) || M <- Mapping], NMapping = [transform_anon_fun(M) || M <- Mapping],
ok = rpc:call(Node, intercept, add, [Target, Intercept, NMapping, OutDir]). ok = rpc:call(Node, intercept, add, [Target, Intercept, NMapping, OutDir]).
clean(Node, Targets) when is_list(Targets) ->
[ok = clean(Node, T) || T <- Targets],
ok;
clean(Node, Target) ->
ok = rpc:call(Node, intercept, clean, [Target]).
%% The following function transforms anonymous function mappings passed %% The following function transforms anonymous function mappings passed
%% from an Erlang shell. Anonymous intercept functions from compiled code %% from an Erlang shell. Anonymous intercept functions from compiled code
%% require the developer to supply free variables themselves, and also %% require the developer to supply free variables themselves, and also

143
tests/repl_process_leak.erl Normal file
View File

@ -0,0 +1,143 @@
%% @doc The purpose of thie test is to ensure the realtime helpers on both
%% the source and sink sides properly exit when a connection is flakey; ie
%% then there are errors and not out-right closes of the connection.
-module(repl_process_leak).
-behavior(riak_test).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-define(SEND_ERROR_INTERVAL, 500).
confirm() ->
Conf = [
{riak_repl, [
{fullsync_on_connect, false},
{fullsync_interval, disabled}
]}
],
lager:info("deploying 2 nodes"),
Nodes = rt:deploy_nodes(2, Conf, [riak_kv, riak_repl]),
[SourceNode, SinkNode] = Nodes,
lager:info("nameing clusters"),
repl_util:name_cluster(SourceNode, "source"),
repl_util:name_cluster(SinkNode, "sink"),
{ok, {_IP, Port}} = rpc:call(SinkNode, application, get_env, [riak_core, cluster_mgr]),
lager:info("connecting clusters using port ~p", [Port]),
repl_util:connect_cluster(SourceNode, "127.0.0.1", Port),
repl_util:wait_for_connection(SourceNode, "sink"),
lager:info("enabling and starting realtime"),
repl_util:enable_realtime(SourceNode, "sink"),
repl_util:start_realtime(SourceNode, "sink"),
lager:info("testing for leaks on flakey sink"),
flakey_sink(SourceNode, SinkNode),
lager:info("testing for leaks on flakey source"),
flakey_source(SourceNode, SinkNode),
pass.
flakey_sink(_SourceNode, SinkNode) ->
InitialCount = rpc:call(SinkNode, erlang, system_info, [process_count]),
ProcCounts = send_sink_tcp_errors(SinkNode, 20, [InitialCount]),
Smallest = lists:min(ProcCounts),
Biggest = lists:max(ProcCounts),
?assert(2 =< Biggest - Smallest),
%?assertEqual(InitialProcCount, PostProcCount),
% the process count is increasing, but the helper did die
true.
send_sink_tcp_errors(_SinkNode, 0, Acc) ->
Acc;
send_sink_tcp_errors(SinkNode, N, Acc) ->
case rpc:call(SinkNode, riak_repl2_rtsink_conn_sup, started, []) of
[] ->
timer:sleep(?SEND_ERROR_INTERVAL),
send_sink_tcp_errors(SinkNode, N, Acc);
[P | _] ->
SysStatus = sys:get_status(P),
{status, P, _Modul, [_PDict, _Status, _, _, Data]} = SysStatus,
[_Header, _Data1, Data2] = Data,
{data, [{"State", StateRec}]} = Data2,
[Helper | _] = lists:filter(fun(E) ->
is_pid(E)
end, tuple_to_list(StateRec)),
HelpMon = erlang:monitor(process, Helper),
P ! {tcp_error, <<>>, test},
Mon = erlang:monitor(process, P),
receive {'DOWN', Mon, process, P, _} -> ok end,
receive
{'DOWN', HelpMon, process, Helper, _} ->
ok
after 10000 ->
throw("helper didn't die")
end,
timer:sleep(?SEND_ERROR_INTERVAL),
Procs = rpc:call(SinkNode, erlang, system_info, [process_count]),
send_sink_tcp_errors(SinkNode, N - 1, [Procs | Acc])
end.
flakey_source(SourceNode, _SinkNode) ->
InitialProcCount = rpc:call(SourceNode, erlang, system_info, [process_count]),
ProcCounts = send_source_tcp_errors(SourceNode, 20, [InitialProcCount]),
Biggest = lists:max(ProcCounts),
Smallest = lists:min(ProcCounts),
%lager:info("initial: ~p; post: ~p", [InitialProcCount, PostProcCount]),
%?assertEqual(InitialProcCount, PostProcCount).
?assert(2 =< Biggest - Smallest),
true.
send_source_tcp_errors(_SourceNode, 0, Acc) ->
Acc;
send_source_tcp_errors(SourceNode, N, Acc) ->
List = rpc:call(SourceNode, riak_repl2_rtsource_conn_sup, enabled, []),
case proplists:get_value("sink", List) of
undefined ->
timer:sleep(?SEND_ERROR_INTERVAL),
send_source_tcp_errors(SourceNode, N, Acc);
Pid ->
lager:debug("Get the status"),
SysStatus = try sys:get_status(Pid) of
S -> S
catch
W:Y ->
lager:info("Sys failed due to ~p:~p", [W,Y]),
{status, Pid, undefined, [undefined, undefined, undefined, undefined, [undefined, undefined, {data, [{"State", {Pid}}]}]]}
end,
{status, Pid, _Module, [_PDict, _Status, _, _, Data]} = SysStatus,
[_Header, _Data1, Data2] = Data,
{data, [{"State", StateRec}]} = Data2,
[Helper | _] = lists:filter(fun(E) ->
is_pid(E)
end, tuple_to_list(StateRec)),
lager:debug("mon the hlepr"),
HelperMon = erlang:monitor(process, Helper),
lager:debug("Send the murder"),
Pid ! {tcp_error, <<>>, test},
Mon = erlang:monitor(process, Pid),
lager:debug("Wait for deaths"),
receive
{'DOWN', Mon, process, Pid, _} -> ok
end,
receive
{'DOWN', HelperMon, process, Helper, _} ->
ok
after 10000 ->
throw("Helper didn't die")
end,
timer:sleep(?SEND_ERROR_INTERVAL),
Count = rpc:call(SourceNode, erlang, system_info, [process_count]),
send_source_tcp_errors(SourceNode, N - 1, [Count | Acc])
end.

View File

@ -65,8 +65,14 @@ confirm() ->
suspend_heartbeat_messages(LeaderA), suspend_heartbeat_messages(LeaderA),
%% sleep longer than the HB timeout interval to force re-connection; %% sleep longer than the HB timeout interval to force re-connection;
%% and give it time to restart the RT connection. Wait an extra 2 seconds. %% and give it time to restart the RT connection.
timer:sleep(timer:seconds(?HB_TIMEOUT) + 2000), %% Since it's possible we may disable heartbeats right after a heartbeat has been fired,
%% it can take up to 2*?HB_TIMEOUT seconds to detect a missed heartbeat. The extra second
%% is to avoid rare race conditions due to the timeouts lining up exactly. Not the prettiest
%% solution, but it failed so rarely at 2*HB_TIMEOUT, that this should be good enough
%% in practice, and it beats having to write a bunch of fancy intercepts to verify that
%% the timeout has been hit internally.
timer:sleep(timer:seconds(?HB_TIMEOUT*2) + 1000),
%% Verify that RT connection has restarted by noting that it's Pid has changed %% Verify that RT connection has restarted by noting that it's Pid has changed
RTConnPid2 = get_rt_conn_pid(LeaderA), RTConnPid2 = get_rt_conn_pid(LeaderA),

View File

@ -73,6 +73,7 @@
-behaviour(riak_test). -behaviour(riak_test).
-export([confirm/0]). -export([confirm/0]).
-define(NEG_LIMIT, -1000000). %% Due to sext encoding bug, do not permit negative numbers less than this to be generated.
-define(MAX_CLUSTER_SIZE, 1). -define(MAX_CLUSTER_SIZE, 1).
-define(MAX_FIELDS, 1). -define(MAX_FIELDS, 1).
-define(FIELDS, ["i" ++ integer_to_list(N) || N <- lists:seq(1, ?MAX_FIELDS)]). -define(FIELDS, ["i" ++ integer_to_list(N) || N <- lists:seq(1, ?MAX_FIELDS)]).
@ -113,6 +114,7 @@ confirm() ->
TestingTime = rt_config:get(eqc_testing_time, 120), TestingTime = rt_config:get(eqc_testing_time, 120),
lager:info("Will run in cluster of size ~p for ~p seconds.", lager:info("Will run in cluster of size ~p for ~p seconds.",
[Size, TestingTime]), [Size, TestingTime]),
rt:set_backend(eleveldb),
Nodes = rt:build_cluster(Size), Nodes = rt:build_cluster(Size),
?assert(eqc:quickcheck( ?assert(eqc:quickcheck(
eqc:testing_time(TestingTime, ?MODULE:prop_test(Nodes)))), eqc:testing_time(TestingTime, ?MODULE:prop_test(Nodes)))),
@ -171,7 +173,24 @@ gen_term() ->
%% Generates, with equal likelihood, either a smallish or a largish integer. %% Generates, with equal likelihood, either a smallish or a largish integer.
gen_int_term() -> gen_int_term() ->
oneof([int(), largeint()]). oneof([int(), gen_large_int()]).
%% XXX FIXME
%% Ensure that very large negative numbers are not used as values.
%%
%% This is because the version of sext used in riak has an encoding
%% bug for these values. Until we can fix this, this test needs to
%% deal with and work around the sext encoding bug.
%%
%% When that bug is fixed, this commit should be reverted/code
%% deleted.
gen_large_int() ->
?LET(I, largeint(), no_large_neg_ints(I)).
no_large_neg_ints(I) when I < ?NEG_LIMIT ->
abs(I);
no_large_neg_ints(I) ->
I.
%% Generates a random binary. %% Generates a random binary.
gen_bin_term() -> gen_bin_term() ->

View File

@ -36,6 +36,7 @@
confirm() -> confirm() ->
[Node1] = rt:build_cluster(1, [{riak_kv, [ [Node1] = rt:build_cluster(1, [{riak_kv, [
{ring_creation_size, 8}, {ring_creation_size, 8},
{anti_entropy, {off,[]}},
{max_object_size, ?MAX_SIZE}, {max_object_size, ?MAX_SIZE},
{warn_object_size, ?WARN_SIZE}, {warn_object_size, ?WARN_SIZE},
{max_siblings, ?MAX_SIBLINGS}, {max_siblings, ?MAX_SIBLINGS},
@ -51,6 +52,9 @@ confirm() ->
[{allow_mult, true}])), [{allow_mult, true}])),
verify_size_limits(C, Node1), verify_size_limits(C, Node1),
verify_sibling_limits(C, Node1), verify_sibling_limits(C, Node1),
lager:notice("Starting readrepair section of test"),
verify_readrepair_ignore_max_size(C, Node1),
verify_readrepair_ignore_max_sib(C, Node1),
pass. pass.
verify_size_limits(C, Node1) -> verify_size_limits(C, Node1) ->
@ -128,3 +132,47 @@ verify_sibling_limits(C, Node1) ->
lager:info("Result when too many siblings : ~p", [Res]), lager:info("Result when too many siblings : ~p", [Res]),
?assertMatch({error,_}, Res), ?assertMatch({error,_}, Res),
ok. ok.
verify_readrepair_ignore_max_size(C, Node1) ->
% Add intercept to treat all vnode puts as readrepairs
Intercept = {riak_kv_vnode, [{{put, 6}, put_as_readrepair},{{coord_put,6}, coord_put_as_readrepair}]},
ok = rt_intercept:add(Node1, Intercept),
% Do put with value greater than max size and confirm warning
lager:info("Checking readrepair put of size ~p, expecting ok result and log warning", [?MAX_SIZE*2]),
K = <<"rrsizetest">>,
V = <<0:(?MAX_SIZE*2)/integer-unit:8>>,
O = riakc_obj:new(?BUCKET, K, V),
?assertMatch(ok, riakc_pb_socket:put(C, O)),
verify_size_write_warning(Node1, K, ?MAX_SIZE*2),
% Clean intercept
ok = rt_intercept:clean(Node1, riak_kv_vnode),
ok.
verify_readrepair_ignore_max_sib(C, Node1) ->
lager:info("Checking sibling warning on readrepair above max siblings=~p", [?MAX_SIBLINGS]),
K = <<"rrsibtest">>,
V = <<"sibtest">>,
O = riakc_obj:new(?BUCKET, K, V),
% Force sibling error
[?assertMatch(ok, riakc_pb_socket:put(C, O))
|| _ <- lists:seq(1, ?MAX_SIBLINGS)],
Res = riakc_pb_socket:put(C, O),
lager:info("Result when too many siblings : ~p", [Res]),
?assertMatch({error,_}, Res),
% Add intercept to spoof writes as readrepair
Intercept = {riak_kv_vnode, [{{put, 6}, put_as_readrepair},{{coord_put,6}, coord_put_as_readrepair}]},
ok = rt_intercept:add(Node1, Intercept),
% Verify readrepair writes return ok and log warning
lager:info("Verifying succesful put above max_siblings with readrepair"),
?assertMatch(ok, riakc_pb_socket:put(C, O)),
P = io_lib:format("warning.*siblings.*~p.*~p.*(~p)",
[?BUCKET, K, ?MAX_SIBLINGS+1]),
Found = rt:expect_in_log(Node1, P),
lager:info("Looking for sibling warning: ~p", [Found]),
?assertEqual(true, Found),
% Clean intercept
ok = rt_intercept:clean(Node1, riak_kv_vnode),
ok.