added overload_drop checking

This commit is contained in:
Jon Anderson 2013-08-15 21:18:37 -04:00
parent 6714fb21f0
commit b77588f2b5
2 changed files with 13 additions and 25 deletions

View File

@ -8,8 +8,9 @@
%% @doc Drop the heartbeat messages from the rt source.
slow_trim_q(State) ->
%% ?I_INFO("slow_trim_q"),
%% This hideousness is neccesary in order to have this intercept sleep only
%% on the first iteraction. With hope, it causes the message queue of the
%% This hideousness is necessary in order to have this intercept sleep only
%% on the first iteration. With hope, it causes the message queue of the
%% RTQ to spike enough to initiate overload handling, then subsequently
%% allow the queue to drain, overload flipped off, and the writes to complete.
case get(hosed) of

View File

@ -26,11 +26,6 @@ confirm() ->
lager:info("Enabling realtime between ~p and ~p", [LeaderA, LeaderB]),
enable_rt(LeaderA, ANodes),
%% Verify that heartbeats are being acknowledged by the sink (B) back to source (A)
%% ?assertEqual(verify_heartbeat_messages(LeaderA), true),
%%rpc:call(LeaderA, lager, trace_file, ["./log/console.log", [{module, riak_repl2_rtq}], debug]),
%% Verify RT repl of objects
verify_rt(LeaderA, LeaderB),
@ -44,8 +39,6 @@ confirm() ->
verify_overload_writes(LeaderA, LeaderB),
%% load_rt(LeaderA, LeaderB),
pass.
%% @doc Turn on Realtime replication on the cluster lead by LeaderA.
@ -78,7 +71,7 @@ verify_rt(LeaderA, LeaderB) ->
?assertEqual(0, repl_util:wait_for_reads(LeaderB, First, Last, TestBucket, 2)).
verify_overload_writes(LeaderA, LeaderB) ->
TestHash = list_to_binary([io_lib:format("~2.16.0b", [X]) ||
TestHash = list_to_binary([io_lib:format("~2.16.0b", [X]) ||
<<X>> <= erlang:md5(term_to_binary(os:timestamp()))]),
TestBucket = <<TestHash/binary, "-rt_test_overload">>,
First = 101,
@ -90,23 +83,17 @@ verify_overload_writes(LeaderA, LeaderB) ->
?assertEqual([], repl_util:do_write(LeaderA, First, Last, TestBucket, 2)),
lager:info("Reading ~p keys from ~p", [Last-First+1, LeaderB]),
NumReads = repl_util:wait_for_reads(LeaderB, First, Last, TestBucket, 2),
NumReads = rt:systest_read(LeaderB, First, Last, TestBucket, 2),
%%NumReads = repl_util:wait_for_reads(LeaderB, First, Last, TestBucket, 2),
lager:info("Received ~p reads, so there were ~p drops", [NumReads, ((Last-First) - NumReads)]).
lager:info("Received ~p drops", [length(NumReads)]),
Status = rpc:call(LeaderA, riak_repl2_rtq, status, []),
{_, OverloadDrops} = lists:keyfind(overload_drops, 1, Status),
%% @doc put some load on RealTime replication
%load_rt(LeaderA, LeaderB) ->
% TestHash = list_to_binary([io_lib:format("~2.16.0b", [X]) ||
% <<X>> <= erlang:md5(term_to_binary(os:timestamp()))]),
% TestBucket = <<TestHash/binary, "-rt_test_a">>,
% First = 101,
% Last = 2000100,%
%
% %% Write some objects to the source cluster (A),
% lager:info("Writing ~p keys to ~p, which should RT repl to ~p",
% [Last-First+1, LeaderA, LeaderB]),
% ?assertEqual([], repl_util:do_write(LeaderA, First, Last, TestBucket, 2)).
lager:info("OverloadDrops: ~p", [OverloadDrops]),
?assert(OverloadDrops > 0).
%% @doc Connect two clusters for replication using their respective leader nodes.
connect_clusters(LeaderA, LeaderB) ->
@ -196,5 +183,5 @@ check_size(Node) ->
%%Status = rpc:call(Node, riak_repl2_rtq, status, []),
%%io:format("status: ~p", [Status]),
timer:sleep(5000),
timer:sleep(2000),
check_size(Node).