riak_test/tests/pipe_verify_handoff_blocking.erl

238 lines
8.4 KiB
Erlang

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2012 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc Verify handoff of the blocking queue between riak_pipe vnodes.
%%
%% Important: this test loads this module and {@link rt_pipe} on each
%% Riak node, such that it can reference their functions in pipe
%% workers.
%%
%% This test is similar to pipe_verify_handoff, but specifically tests
%% handoff while input-senders are waiting for responses. Mostly this
%% is tested in the case where the input is put into a pipe worker's
%% blocking queue, but because message timing is not tightly
%% controlled, it may also trigger other cases.
%%
%% While an input is in-flight to the vnode, or in a worker's blocking
%% queue, the sender of that input is waiting on a response, while
%% also monitoring the vnode. During handoff, the vnode that should
%% respond to the sender will change, but the monitoring does not,
%% currently. This is a bug.
%%
%% The testing strategy is to block workers while filling their queues
%% to the point they begin blocking, then add a node to the cluster
%% and watch their handoff progress. The slightly tricky point is that
%% we have to allow workers to process some inputs, or they won't
%% handoff (because the vnode waits for them to be between inputs to
%% archive), but we don't want them to process so many inputs that
%% they consume their blocking queues before handing off.
%% Please Note: Under rare circumstances, this test may fail with a
%% "{badmatch,{error,[{vnode_down,noproc}]}}' error. This is not a
%% failure of this test but rather a side effect of a race condition
%% in riak_core_vnode_proxy. It manifests due to the fact that the
%% test is attempting to send a command to a vnode that is in fact
%% down, however monitor only works by issuing a command and getting
%% a PID. In some instances, get_vnode_pid fails because vnode shutdown
%% is queued up in the mailbox before monitor node. Unfortunately, the
%% fix would require a fundamental shift in the architecture of
%% riak_core, which at the time of this writing is not feasible for
%% this rare failure case.
-module(pipe_verify_handoff_blocking).
-export([
%% riak_test's entry
confirm/0,
%% test machinery
runner_wait/1,
queue_filler/3
]).
-include_lib("eunit/include/eunit.hrl").
%% local copy of riak_pipe.hrl
-include("rt_pipe.hrl").
-define(NODE_COUNT, 2).
-define(ALL_LOG, [{log, sink}, {trace, all}]).
-define(FILLER_COUNT, 5).
%% @doc riak_test callback
confirm() ->
%% static list of inputs, so we keep hitting the same partitions
Inputs = lists:seq(1, 20),
lager:info("Start ~b nodes", [?NODE_COUNT]),
NodeDefs = lists:duplicate(?NODE_COUNT, {current, default}),
Services = [riak_pipe],
[Primary,Secondary] = Nodes = rt:deploy_nodes(NodeDefs, Services),
%% Ensure each node owns 100% of it's own ring
[?assertEqual([Node], rt:owners_according_to(Node)) || Node <- Nodes],
lager:info("Load useful modules"),
rt:load_modules_on_nodes([?MODULE, rt_pipe], Nodes),
lager:info("Start run coordinator"),
Runner = spawn_link(?MODULE, runner_wait, [[]]),
Spec = [#fitting_spec{name="blockhandoff",
module=riak_pipe_w_xform,
arg=pause_until_signal(Runner)}],
lager:info("Start pipe on Primary"),
{ok, Pipe} =
rpc:call(Primary, riak_pipe, exec,
[Spec, [{sink, rt_pipe:self_sink()}|?ALL_LOG]]),
InitialInputCount = fill_queues(Primary, Pipe, Inputs),
Fillers = keep_queues_full(Primary, Pipe, Inputs),
_Status1 = pipe_status(Primary, Pipe),
lager:info("Join Secondary to Primary"),
%% Give slave a chance to start and master to notice it.
rt:join(Secondary, Primary),
rt:wait_until_no_pending_changes(Nodes),
rt:wait_until_nodes_agree_about_ownership(Nodes),
lager:info("Unpause workers"),
Runner ! go,
ok = rt:wait_until_transfers_complete(Nodes),
FillerInputCount = stop_fillers(Fillers),
%% if we make it this far, then no filler ever saw the vnode_down
%% error message; otherwise badmatches in queue_filler/4 will have
%% halted the test
_Status2 = pipe_status(Primary, Pipe),
lager:info("Send eoi and collect results"),
riak_pipe:eoi(Pipe),
{eoi, Out, Trace} = riak_pipe:collect_results(Pipe, 1000),
%% no errors on either pipe, all items make it through; if these
%% are wrong, we dropped things somewhere
?assertEqual([], rt_pipe:extract_trace_errors(Trace)),
?assertEqual(InitialInputCount+FillerInputCount, length(Out)),
rt_pipe:assert_no_zombies(Nodes),
lager:info("~s: PASS", [atom_to_list(?MODULE)]),
pass.
%%% queue filling
%% @doc fill pipe vnode queues by repeatedly sending each input in the
%% input list until the queue reports timeout.
fill_queues(Node, Pipe, Inputs) ->
lists:sum([ fill_queue(Node, Pipe, I, 0) || I <- Inputs ]).
%% @doc fill one vnode queue by repeatedly sending the same input
%% until it reports timeout
fill_queue(Node, Pipe, Input, Count) ->
case rpc:call(Node, riak_pipe, queue_work, [Pipe, Input, noblock]) of
{error, [timeout]} ->
%% This queue is now full
Count;
ok ->
%% not full yet; add more
fill_queue(Node, Pipe, Input, Count+1)
end.
%% @doc spawn workers that will keep sending inputs to the pipe in
%% order to keep the queues full to test handoff of blocking queues
keep_queues_full(Node, Pipe, Inputs) ->
[ spawn_link(?MODULE, queue_filler, [Node, Pipe, shuffle(Inputs)])
|| _ <- lists:seq(1, ?FILLER_COUNT) ].
%% @doc Send each element of Inputs into Pipe, until told not to
queue_filler(Node, Pipe, Inputs) ->
%% putting Inputs in a queue means we don't have to track out
%% progress through them separately
queue_filler(Node, Pipe, queue:from_list(Inputs), 0).
queue_filler(Node, Pipe, Inputs, Count) ->
receive
{stop, Owner} -> Owner ! {done, Count}
after 0 ->
{{value, I}, Q} = queue:out(Inputs),
ok = rpc:call(Node, riak_pipe, queue_work, [Pipe, I]),
queue_filler(Node, Pipe, queue:in(I, Q), Count+1)
end.
%% @doc tell all fillers to stop and collect and sum their send counts
stop_fillers(Fillers) ->
lists:sum([ receive {done, Count} -> Count end
|| _ <- [ F ! {stop, self()} || F <- Fillers ] ]).
%%% Run pausing bits
%% @doc Create a worker function that asks the specified process for
%% permission before sending its input as output.
pause_until_signal(Runner) ->
fun(I, P, D) ->
Runner ! {wait, self()},
receive go ->
riak_pipe_vnode_worker:send_output(I, P, D)
end
end.
%% @doc Phase one of worker-pausing process: just collect requests,
%% waiting for overall signal to allow processing to happen.
runner_wait(Waiting) ->
receive
go ->
[ W ! go || W <- Waiting ],
runner_go();
{wait, W} ->
runner_wait([W|Waiting])
end.
%% @doc Phase two of worker-pausing process: just let workers do their
%% processing as soon as they ask.
runner_go() ->
receive
{wait, W} ->
W ! go,
runner_go()
end.
%%% Status filtering bits
%% @doc Call riak_pipe:status/1 on the given node, and extract the
%% status list from it. It is expected that the given pipe has exactly
%% one fitting.
pipe_status(Node, Pipe) ->
[{_Name, Status}] = rpc:call(Node, riak_pipe, status, [Pipe]),
Status.
%% @doc Shuffle the elements of a list. (Thanks Micah)
shuffle([]) ->
[];
shuffle([E]) ->
[E];
shuffle(List) ->
Max = length(List),
Keyed = [{random:uniform(Max), E} || E <- List],
Sorted = lists:sort(Keyed),
[N || {_, N} <- Sorted].