riak_test/tests/pipe_verify_handoff.erl
Bryan Fink 9a8f6e1b5b update pipe handoff test for new fold req record
2.0 adds _v2, so we need to watch for that record as well
2013-09-30 16:17:49 -04:00

264 lines
9.0 KiB
Erlang

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2012 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc Verify handoff between riak_pipe vnodes.
%%
%% Important: this test loads this module and {@link rt_pipe} on each
%% Riak node, such that it can reference their functions in pipe
%% workers.
%%
%% This test is a largely rewritten version of riak_pipe:limits_test_/0.
%%
%% Strategy:
%% 1. start one node
%% 2. start pipes on that node
%% 3. put inputs in the pipe
%% 3a. use a worker that waits for a signal from the test process
%% before processing its input, so that we can ensure active
%% workers and queue contents
%% 4. start and join second node
%% 5. wait for agreement on owners
%% 6. add more inputs, to start some workers on second node
%% 7. give the signal to the workers to process things
%% 8. count archive commands/etc.
-module(pipe_verify_handoff).
-export([
%% riak_test's entry
confirm/0,
%% test machinery
runner_wait/1,
collector/0
]).
-include_lib("eunit/include/eunit.hrl").
%% local copy of riak_pipe.hrl
-include("rt_pipe.hrl").
-define(NODE_COUNT, 2).
-define(ALL_LOG, [{log, sink}, {trace, all}]).
%% @doc riak_test callback
confirm() ->
lager:info("Start ~b nodes", [?NODE_COUNT]),
NodeDefs = lists:duplicate(?NODE_COUNT, {current, default}),
Services = [riak_pipe],
[Primary,Secondary] = Nodes = rt:deploy_nodes(NodeDefs, Services),
%% Ensure each node owns 100% of it's own ring
[?assertEqual([Node], rt:owners_according_to(Node)) || Node <- Nodes],
lager:info("Load useful modules"),
rt:load_modules_on_nodes([?MODULE, rt_pipe], Nodes),
lager:info("Start run coordinator"),
Runner = spawn_link(?MODULE, runner_wait, [[]]),
P1Spec = [#fitting_spec{name="p1handoff",
module=riak_pipe_w_xform,
arg=pause_until_signal(Runner)}],
P2Spec = [#fitting_spec{name="p2handoff",
module=riak_pipe_w_xform,
arg=pause_until_signal(Runner)}],
lager:info("Start two pipes on Primary"),
{ok, Pipe1} =
rpc:call(Primary, riak_pipe, exec,
[P1Spec, [{sink, rt_pipe:self_sink()}|?ALL_LOG]]),
{ok, Pipe2} =
rpc:call(Primary, riak_pipe, exec,
[P2Spec, [{sink, rt_pipe:self_sink()}|?ALL_LOG]]),
lager:info("Send some inputs to both pipes"),
[ok = rpc:call(Primary, riak_pipe, queue_work, [Pipe1, X]) ||
X <- lists:seq(1, 20)],
[ok = rpc:call(Primary, riak_pipe, queue_work, [Pipe2, X]) ||
X <- lists:seq(101, 120)],
P1Status1 = pipe_status(Primary, Pipe1),
P2Status1 = pipe_status(Primary, Pipe2),
lager:info("Start and register intercept log collector"),
Collector = spawn_link(Primary, ?MODULE, collector, []),
rpc:call(Primary, erlang, register, [riak_test_collector, Collector]),
lager:info("Install pipe vnode intercept"),
Intercept = {riak_pipe_vnode,
[{{handle_handoff_command,3}, log_handoff_command}]},
ok = rt_intercept:add(Primary, Intercept),
lager:info("Join Secondary to Primary"),
%% Give slave a chance to start and master to notice it.
rt:join(Secondary, Primary),
rt:wait_until_no_pending_changes(Nodes),
rt:wait_until_nodes_agree_about_ownership(Nodes),
lager:info("Unpause workers"),
Runner ! go,
ok = rt:wait_until_transfers_complete(Nodes),
lager:info("Add more inputs to Pipe2"),
[ok = rpc:call(Primary, riak_pipe, queue_work, [Pipe2, X]) ||
X <- lists:seq(121, 140)],
%% transfers completing takes so long that the pipe is extremely
%% likely to have finished all of its inputs by now
P1Status2 = pipe_status(Primary, Pipe1),
P2Status2 = pipe_status(Primary, Pipe2),
lager:info("Send eoi and collect results"),
riak_pipe:eoi(Pipe1),
riak_pipe:eoi(Pipe2),
{eoi, Out1, Trace1} = riak_pipe:collect_results(Pipe1, 1000),
{eoi, Out2, Trace2} = riak_pipe:collect_results(Pipe2, 1000),
%% no errors on either pipe, all items make it through; if these
%% are wrong, we dropped things somewhere
?assertEqual([], rt_pipe:extract_trace_errors(Trace1)),
?assertEqual(20, length(Out1)),
?assertEqual([], rt_pipe:extract_trace_errors(Trace2)),
?assertEqual(40, length(Out2)),
%% VM trace verification
timer:sleep(1000),
lager:info("Collect intercept log"),
PTraces = get_collection(Collector),
%% time to compare things
P1PrimaryWorkers1 = partitions_on_node(Primary, P1Status1),
P1SecondaryWorkers2 = partitions_on_node(Secondary, P1Status2),
P2PrimaryWorkers1 = partitions_on_node(Primary, P2Status1),
P2SecondaryWorkers2 = partitions_on_node(Secondary, P2Status2),
%% workers moved
P1MovedPrimaryToSecondary = ordsets:intersection(
ordsets:from_list(P1PrimaryWorkers1),
ordsets:from_list(P1SecondaryWorkers2)),
P2MovedPrimaryToSecondary = ordsets:intersection(
ordsets:from_list(P2PrimaryWorkers1),
ordsets:from_list(P2SecondaryWorkers2)),
%% vnodes moved
AllMovedPrimaryToSecondary = ordsets:union(
P1MovedPrimaryToSecondary,
P2MovedPrimaryToSecondary),
PFoldReqs = [X || X <- PTraces,
%% it would be really nice to import ?FOLD_REQ
%% from riak_core_vnode.hrl
(X == riak_core_fold_req_v1 orelse
X == riak_core_fold_req_v2)],
PArchives = [X || cmd_archive=X <- PTraces],
%% number of active vnodes migrating from Primary to Secondary,
%% should be one fold per move, otherwise inputs were directed
%% incorrectly after transfers settled
?assertEqual(length(AllMovedPrimaryToSecondary),
length(PFoldReqs)),
%% number of workers migrating from Secondary to Primary, should
%% be one archive per move, otherwise inputs were directed
%% incorrectly after transfers settled
?assertEqual(length(P1MovedPrimaryToSecondary)
+length(P2MovedPrimaryToSecondary),
length(PArchives)),
case ordsets:intersection(ordsets:from_list(P1PrimaryWorkers1),
ordsets:from_list(P2PrimaryWorkers1)) of
[] ->
lager:warning("Multiple archives in a single fold was not tested");
_ ->
ok
end,
rt_pipe:assert_no_zombies(Nodes),
lager:info("~s: PASS", [atom_to_list(?MODULE)]),
pass.
%%% Run pausing bits
%% @doc Create a worker function that asks the specified process for
%% permission before sending its input as output.
pause_until_signal(Runner) ->
fun(I, P, D) ->
Runner ! {wait, self()},
receive go ->
riak_pipe_vnode_worker:send_output(I, P, D)
end
end.
%% @doc Phase one of worker-pausing process: just collect requests,
%% waiting for overall signal to allow processing to happen.
runner_wait(Waiting) ->
receive
go ->
[ W ! go || W <- Waiting ],
runner_go();
{wait, W} ->
runner_wait([W|Waiting])
end.
%% @doc Phase two of worker-pausing process: just let workers do their
%% processing as soon as they ask.
runner_go() ->
receive
{wait, W} ->
W ! go,
runner_go()
end.
%%% Status filtering bits
%% @doc Dig through a riak_pipe:status/1 response to determine which
%% partitions are on the given node.
partitions_on_node(Node, PipeStatus) ->
[proplists:get_value(partition, W)
|| W <- PipeStatus,
Node == proplists:get_value(node, W)].
%% @doc Call riak_pipe:status/1 on the given node, and extract the
%% status list from it. It is expected that the given pipe has exactly
%% one fitting.
pipe_status(Node, Pipe) ->
[{_Name, Status}] = rpc:call(Node, riak_pipe, status, [Pipe]),
Status.
%% @doc entry point for collector process
collector() ->
collector([]).
collector(Acc) ->
receive
{send_collection, Ref, Pid} ->
Pid ! {collection, Ref, lists:reverse(Acc)};
Any ->
collector([Any|Acc])
end.
get_collection(Collector) ->
Ref = make_ref(),
Collector ! {send_collection, Ref, self()},
receive {collection, Ref, Collection} ->
Collection
end.