Merge branch 'master' of github.com:basho/riak_test

This commit is contained in:
Russell Brown 2013-10-11 13:07:36 +01:00
commit a81949dac8
7 changed files with 237 additions and 32 deletions

1
.edts
View File

@ -1 +1,2 @@
:node-sname "rt"
:lib-dirs "deps"

View File

@ -817,7 +817,8 @@ build_cluster(NumNodes, Versions, InitialConfig) ->
%% ok do a staged join and then commit it, this eliminates the
%% large amount of redundant handoff done in a sequential join
[staged_join(Node, Node1) || Node <- OtherNodes],
plan_and_commit(Node1)
plan_and_commit(Node1),
try_nodes_ready(Nodes, 3, 500)
end,
?assertEqual(ok, wait_until_nodes_ready(Nodes)),
@ -829,6 +830,19 @@ build_cluster(NumNodes, Versions, InitialConfig) ->
lager:info("Cluster built: ~p", [Nodes]),
Nodes.
try_nodes_ready([Node1 | _Nodes], 0, _SleepMs) ->
lager:info("Nodes not ready after initial plan/commit, retrying"),
plan_and_commit(Node1);
try_nodes_ready(Nodes, N, SleepMs) ->
ReadyNodes = [Node || Node <- Nodes, is_ready(Node) =:= true],
case ReadyNodes of
Nodes ->
ok;
_ ->
timer:sleep(SleepMs),
try_nodes_ready(Nodes, N-1, SleepMs)
end.
%% @doc Stop nodes and wipe out their data directories
clean_cluster(Nodes) when is_list(Nodes) ->
[stop_and_wait(Node) || Node <- Nodes],

View File

@ -14,7 +14,7 @@
-prereq("virtualenv").
confirm() ->
prereqs(),
{ok, TestCommand} = prereqs(),
Config = [{riak_search, [{enabled, true}]}],
[Node] = rt:deploy_nodes(1, Config),
rt:wait_for_service(Node, riak_search),
@ -28,7 +28,7 @@ confirm() ->
lager:info("Enabling search hook on 'searchbucket'"),
rt:enable_search_hook(Node, <<"searchbucket">>),
{ExitCode, PythonLog} = rt_local:stream_cmd("bin/python setup.py develop test",
{ExitCode, PythonLog} = rt_local:stream_cmd(TestCommand,
[{cd, ?PYTHON_CHECKOUT},
{env,[{"RIAK_TEST_PB_HOST", PB_Host},
{"RIAK_TEST_PB_PORT", integer_to_list(PB_Port)},
@ -72,4 +72,14 @@ prereqs() ->
lager:info("[PREREQ] Installing an isolated environment with virtualenv in ~s", [?PYTHON_CHECKOUT]),
rt_local:stream_cmd("virtualenv --clear --no-site-packages .", [{cd, ?PYTHON_CHECKOUT}]),
ok.
lager:info("[PREREQ] Installing dependencies"),
rt_local:stream_cmd("bin/python setup.py develop", [{cd, ?PYTHON_CHECKOUT}]),
case Minor of
$6 ->
lager:info("[PREREQ] Installing unittest2 for python 2.6"),
rt_local:stream_cmd("bin/easy_install unittest2", [{cd, ?PYTHON_CHECKOUT}]),
{ok, "bin/unit2 riak.tests.test_all"};
_ ->
{ok, "bin/python setup.py test"}
end.

View File

@ -99,8 +99,8 @@ verify_fsm_timeout([RN|_]) ->
Opts = [{log, sink},
{trace, [error]},
{sink, Sink},
%% a very short timeout, to fit eunit
{sink_type, {fsm, 0, 10}}],
%% a short timeout, to fit eunit
{sink_type, {fsm, 0, 1000}}],
{ok, P} = rpc:call(RN, riak_pipe, exec, [Spec, Opts]),
rpc:call(RN, riak_pipe, queue_work, [P, {sync, 1}]),
rpc:call(RN, riak_pipe, queue_work, [P, {sync, 2}]),

View File

@ -202,23 +202,28 @@ start_http_stream(Ref) ->
receive
{http, {Ref, stream_start, Headers}} ->
Boundary = get_boundary(proplists:get_value("content-type", Headers)),
http_stream_loop(Ref, orddict:new(), Boundary)
http_stream_loop(Ref, <<>>, Boundary);
Other -> lager:error("Unexpected message ~p", [Other]),
{error, unknown_message}
after 60000 ->
{error, timeout_local}
end.
http_stream_loop(Ref, Acc, {Boundary, BLen}=B) ->
receive
{http, {Ref, stream, Chunk}} ->
http_stream_loop(Ref, <<Acc/binary,Chunk/binary>>, B);
{http, {Ref, stream_end, _Headers}} ->
orddict:to_list(Acc);
{http, {Ref, stream, <<"\r\n--", Boundary:BLen/bytes, "\r\nContent-Type: application/json\r\n\r\n", Body/binary>>}} ->
ReverseBoundary = reverse_bin(<<"\r\n--", Boundary:BLen/binary, "--\r\n">>),
Message = get_message(ReverseBoundary, reverse_bin(Body)),
{struct, Result} = mochijson2:decode(Message),
Acc2 = lists:foldl(fun({K, V}, A) -> orddict:update(K, fun(Existing) -> Existing++V end, V, A) end,
Acc,
Result),
http_stream_loop(Ref, Acc2, B);
{http, {Ref, stream, <<"\r\n--", Boundary:BLen/bytes, "--\r\n">>}} ->
http_stream_loop(Ref, Acc, B);
Parts = binary:split(Acc,[
<<"\r\n--", Boundary:BLen/bytes, "\r\nContent-Type: application/json\r\n\r\n">>,
<<"\r\n--", Boundary:BLen/bytes,"--\r\n">>
], [global, trim]),
lists:foldl(fun(<<>>, Results) -> Results;
(Part, Results) ->
{struct, Result} = mochijson2:decode(Part),
orddict:merge(fun(_K, V1, V2) -> V1 ++ V2 end,
Results, Result)
end, [], Parts);
Other -> lager:error("Unexpected message ~p", [Other]),
{error, unknown_message}
after 60000 ->
@ -231,15 +236,3 @@ get_boundary("multipart/mixed;boundary=" ++ Boundary) ->
get_boundary(_) ->
undefined.
reverse_bin(Bin) ->
list_to_binary(lists:reverse(binary_to_list(Bin))).
get_message(Boundary, Body) ->
BLen = byte_size(Boundary),
case Body of
<<Boundary:BLen/binary, Message/binary>> ->
reverse_bin(Message);
_ -> reverse_bin(Body)
end.

View File

@ -47,10 +47,11 @@ confirm() ->
{ok, Res} = stream_pb(PBPid, Query, [{timeout, 5000}]),
?assertEqual(ExpectedKeys, proplists:get_value(keys, Res, [])),
{ok, {{_, 503, _}, _, Body}} = httpc:request(url("~s/buckets/~s/index/~s/~s~s",
{ok, {{_, ErrCode, _}, _, Body}} = httpc:request(url("~s/buckets/~s/index/~s/~s~s",
[Http, ?BUCKET, <<"$bucket">>, ?BUCKET, []])),
?assertMatch({match, _}, re:run(Body, "request timed out")), %% shows the app.config timeout
?assertEqual(true, ErrCode >= 500),
?assertMatch({match, _}, re:run(Body, "request timed out|{error,timeout}")), %% shows the app.config timeout
HttpRes = http_query(Http, Query, [{timeout, 5000}]),
?assertEqual(ExpectedKeys, proplists:get_value(<<"keys">>, HttpRes, [])),

View File

@ -0,0 +1,186 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc Test basic handoff in mixed-version clusters. This was born
%% out of a bug found in the upgrade of vnode fold requests:
%% https://github.com/basho/riak/issues/407
%%
%% Basic test:
%% - load data into a new node
%% - join an old node to it
%% - wait for handoff to finish
%%
%% Node versions used are `current' and whatever the test runner has
%% set the `upgrade_version' metadata to (`previous' by default).
%%
%% Handoff uses riak_core_fold_req_v* commands, and riak issue #407
%% tracked a problem with upgrading that command from 1.4.2 format to
%% 2.0.0pre3 format.
-module(verify_handoff_mixed).
-behavior(riak_test).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-include("rt_pipe.hrl").
-define(KV_BUCKET, <<"vhm_kv">>).
-define(KV_COUNT, 1000).
-define(SEARCH_BUCKET, <<"vhm_search">>).
-define(SEARCH_COUNT, 1000).
-define(PIPE_COUNT, 100).
-define(FOLD_CAPABILITY, {riak_core,fold_req_version}).
confirm() ->
%% this `upgrade_version' lookup was copied from loaded_upgrade
UpgradeVsn = proplists:get_value(upgrade_version,
riak_test_runner:metadata(),
previous),
SearchEnabled = [{riak_search, [{enabled, true}]}],
Versions = [{current, SearchEnabled},
{UpgradeVsn, SearchEnabled}],
Services = [riak_kv, riak_search, riak_pipe],
[Current, Old] = Nodes = rt:deploy_nodes(Versions, Services),
prepare_vnodes(Current),
%% before joining, learn what fold req the old version used,
%% so we can know when the cluster has negotiated to it
OldFold = rt:capability(Old, ?FOLD_CAPABILITY, v1),
%% now link the nodes together and wait for handoff to complete
ok = rt:join(Old, Current),
ok = rt:wait_until_all_members(Nodes),
ok = rt:wait_until_ring_converged(Nodes),
%% the calls to ..._no_pending_changes and ..._transfers_complete
%% speed up the timing of handoff such that it will happen before
%% capability renegotiation if we don't wait here - this is still
%% technically race-prone, but negotiation usually happens *much*
%% sooner than handoff at normal timing
lager:info("Wait for fold_req_version == ~p", [OldFold]),
ok = rt:wait_until_capability(Current, ?FOLD_CAPABILITY, OldFold),
%% this will timeout if wrong fix is in place
%% (riak_kv_vnode would infinite-loop v1 fold requests)
%% or if no fix is in place
%% (riak_pipe_vnode would drop v1 fold requests on the floor)
ok = rt:wait_until_no_pending_changes(Nodes),
ok = rt:wait_until_transfers_complete(Nodes),
%% this will error if wrong fix is in place
%% (riak_search forward v1 fold requests)
ok = check_logs(),
pass.
%% @doc get vnodes running on Node, such that they'll be ready to
%% handoff when we join the other node
prepare_vnodes(Node) ->
prepare_kv_vnodes(Node),
prepare_search_vnodes(Node),
prepare_pipe_vnodes(Node).
prepare_kv_vnodes(Node) ->
lager:info("Preparing KV vnodes with keys 1-~b in bucket ~s",
[?KV_COUNT, ?KV_BUCKET]),
C = rt:pbc(Node),
lists:foreach(
fun(KV) ->
ok = riakc_pb_socket:put(C, riakc_obj:new(?KV_BUCKET, KV, KV))
end,
[ list_to_binary(integer_to_list(N)) || N <- lists:seq(1, ?KV_COUNT) ]),
riakc_pb_socket:stop(C).
prepare_search_vnodes(Node) ->
lager:info("Peparing Search vnodes with keys 1000-~b in bucket ~s",
[1000+?SEARCH_COUNT, ?SEARCH_BUCKET]),
rt:enable_search_hook(Node, ?SEARCH_BUCKET),
C = rt:pbc(Node),
lists:foreach(
fun(KV) ->
O = riakc_obj:new(?SEARCH_BUCKET, KV, KV, "text/plain"),
ok = riakc_pb_socket:put(C, O)
end,
[ list_to_binary(integer_to_list(N))
|| N <- lists:seq(1000, 1000+?SEARCH_COUNT) ]),
riakc_pb_socket:stop(C).
prepare_pipe_vnodes(Node) ->
%% the riak_pipe_w_pass worker produces no archive, but the vnode
%% still sends its queue (even if empty) through handoff
Spec = [#fitting_spec{name=vhm, module=riak_pipe_w_pass}],
%% keep outputs out of our mailbox
DummySink = spawn_link(fun() -> receive never -> ok end end),
Options = [{sink, #fitting{pid=DummySink}}],
lager:info("Filling a pipe with ~b inputs", [?PIPE_COUNT]),
{ok, Pipe} = rpc:call(Node, riak_pipe, exec, [Spec, Options]),
lists:foreach(
fun(I) -> ok = rpc:call(Node, riak_pipe, queue_work, [Pipe, I]) end,
lists:seq(1, ?PIPE_COUNT)).
check_logs() ->
AppCounts = sum_app_handoff(),
lager:info("Found handoff counts in logs: ~p", [AppCounts]),
%% make sure all of our apps completed some handoff
ExpectedApps = lists:sort([riak_kv_vnode,
riak_search_vnode,
riak_pipe_vnode]),
FoundApps = lists:sort([ A || {A, _} <- AppCounts ]),
?assertEqual(ExpectedApps, FoundApps),
ZeroHandoff = [ A || {_, Count}=A <- AppCounts,
Count == 0 ],
%% none of these apps should be reporting zero objects handed off
?assertEqual([], ZeroHandoff),
ok.
sum_app_handoff() ->
lager:info("Combing logs for handoff notes"),
lists:foldl(
fun({App, Count}, Acc) ->
orddict:update_counter(App, Count, Acc)
end,
[],
lists:append([ find_app_handoff(Log) || Log <- rt:get_node_logs() ])).
find_app_handoff({Path, Port}) ->
case re:run(Path, "console\.log$") of
{match, _} ->
find_line(Port, file:read_line(Port));
nomatch ->
%% save time not looking through other logs
[]
end.
find_line(Port, {ok, Data}) ->
Re = "ownership_transfer transfer of ([a-z_]+).*"
"completed.*([0-9]+) objects",
case re:run(Data, Re, [{capture, all_but_first, list}]) of
{match, [App, Count]} ->
[{list_to_atom(App), list_to_integer(Count)}
|find_line(Port, file:read_line(Port))];
nomatch ->
find_line(Port, file:read_line(Port))
end;
find_line(_, _) ->
[].