Merge pull request #263 from basho/pevm-stream-timeouts

test for bucket and key listing timeouts
This commit is contained in:
Evan Vigil-McClanahan 2013-05-24 14:33:45 -07:00
commit bb545fc29c
3 changed files with 207 additions and 26 deletions

View File

@ -17,6 +17,14 @@ slow_handle_command(Req, Sender, State) ->
timer:sleep(500),
?M:handle_command_orig(Req, Sender, State).
%% @doc Make all KV vnode coverage commands take abnormally long.
slow_handle_coverage(Req, Filter, Sender, State) ->
random:seed(erlang:now()),
Rand = random:uniform(5000),
error_logger:info_msg("coverage sleeping ~p", [Rand]),
timer:sleep(Rand),
?M:handle_coverage_orig(Req, Filter, Sender, State).
%% @doc Simulate dropped gets/network partitions byresponding with
%% noreply during get requests.
drop_do_get(Sender, BKey, ReqId, State) ->

View File

@ -3,6 +3,10 @@
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-define(BUCKET, <<"listkeys_bucket">>).
-define(NUM_BUCKETS, 1200).
-define(NUM_KEYS, 1000).
confirm() ->
[Node] = rt:build_cluster(1),
rt:wait_until_pingable(Node),
@ -12,10 +16,16 @@ confirm() ->
rt:httpc_write(HC, <<"foo">>, <<"bar">>, <<"foobarbaz\n">>),
rt:httpc_write(HC, <<"foo">>, <<"bar2">>, <<"foobarbaz2\n">>),
put_keys(Node, ?BUCKET, ?NUM_KEYS),
put_buckets(Node, ?NUM_BUCKETS),
timer:sleep(2000),
rt_intercept:add(Node, {riak_kv_get_fsm,
[{{prepare,2}, slow_prepare}]}),
rt_intercept:add(Node, {riak_kv_put_fsm,
[{{prepare,2}, slow_prepare}]}),
rt_intercept:add(Node, {riak_kv_vnode,
[{{handle_coverage,4}, slow_handle_coverage}]}),
lager:info("testing HTTP API"),
@ -97,4 +107,128 @@ confirm() ->
V2 -> ?assertEqual({object_value, <<"get2get2get2get2get\n">>},
{object_value, V2})
end,
pass.
Long = 1000000,
Short = 1000,
lager:info("Checking List timeouts"),
lager:info("Checking PBC"),
Pid = rt:pbc(Node),
lager:info("Checking keys timeout"),
?assertMatch({error, <<"timeout">>},
riakc_pb_socket:list_keys(Pid, ?BUCKET, Short)),
lager:info("Checking keys w/ long timeout"),
?assertMatch({ok, _},
riakc_pb_socket:list_keys(Pid, ?BUCKET, Long)),
lager:info("Checking stream keys timeout"),
{ok, ReqId0} = riakc_pb_socket:stream_list_keys(Pid, ?BUCKET, Short),
wait_for_error(ReqId0),
lager:info("Checking stream keys works w/ long timeout"),
{ok, ReqId8} = riakc_pb_socket:stream_list_keys(Pid, ?BUCKET, Long),
wait_for_end(ReqId8),
lager:info("Checking buckets timeout"),
?assertMatch({error, <<"timeout">>},
riakc_pb_socket:list_buckets(Pid, Short)),
lager:info("Checking buckets w/ long timeout"),
?assertMatch({ok, _},
riakc_pb_socket:list_buckets(Pid, Long)),
lager:info("Checking stream buckets timeout"),
{ok, ReqId1} = riakc_pb_socket:stream_list_buckets(Pid, Short),
wait_for_error(ReqId1),
lager:info("Checking stream buckets works w/ long timeout"),
{ok, ReqId7} = riakc_pb_socket:stream_list_buckets(Pid, Long),
wait_for_end(ReqId7),
lager:info("Checking HTTP"),
LHC = rt:httpc(Node),
lager:info("Checking keys timeout"),
?assertMatch({error, <<"timeout">>},
rhc:list_keys(LHC, ?BUCKET, Short)),
lager:info("Checking keys w/ long timeout"),
?assertMatch({ok, _},
rhc:list_keys(LHC, ?BUCKET, Long)),
lager:info("Checking stream keys timeout"),
{ok, ReqId2} = rhc:stream_list_keys(LHC, ?BUCKET, Short),
wait_for_error(ReqId2),
lager:info("Checking stream keys works w/ long timeout"),
{ok, ReqId4} = rhc:stream_list_keys(LHC, ?BUCKET, Long),
wait_for_end(ReqId4),
lager:info("Checking buckets timeout"),
?assertMatch({error, <<"timeout">>},
rhc:list_buckets(LHC, Short)),
lager:info("Checking buckets w/ long timeout"),
?assertMatch({ok, _},
rhc:list_buckets(LHC, Long)),
lager:info("Checking stream buckets timeout"),
{ok, ReqId3} = rhc:stream_list_buckets(LHC, Short),
wait_for_error(ReqId3),
lager:info("Checking stream buckets works w/ long timeout"),
{ok, ReqId5} = rhc:stream_list_buckets(LHC, Long),
wait_for_end(ReqId5),
pass.
wait_for_error(ReqId) ->
receive
{ReqId, done} ->
lager:error("stream incorrectly finished"),
error(stream_finished);
{ReqId, {error, <<"timeout">>}} ->
lager:info("stream correctly timed out"),
ok;
{ReqId, {_Key, _Vals}} ->
%% the line below is spammy but nice for debugging
%%{ReqId, {Key, Vals}} ->
%%lager:info("Got some values ~p, ~p", [Key, Vals]),
wait_for_error(ReqId);
{ReqId, Other} ->
error({unexpected_message, Other})
after 10000 ->
error(error_stream_recv_timed_out)
end.
wait_for_end(ReqId) ->
receive
{ReqId, done} ->
lager:info("stream correctly finished"),
ok;
{ReqId, {error, <<"timeout">>}} ->
lager:error("stream incorrectly timed out"),
error(stream_timed_out);
{ReqId, {_Key, _Vals}} ->
%% the line below is spammy but nice for debugging
%%{ReqId, {Key, Vals}} ->
%%lager:info("Got some values ~p, ~p", [Key, Vals]),
wait_for_end(ReqId);
{ReqId, Other} ->
error({unexpected_message, Other})
after 10000 ->
error(error_stream_recv_timed_out)
end.
put_buckets(Node, Num) ->
Pid = rt:pbc(Node),
Buckets = [list_to_binary(["", integer_to_list(Ki)])
|| Ki <- lists:seq(0, Num - 1)],
{Key, Val} = {<<"test_key">>, <<"test_value">>},
[riakc_pb_socket:put(Pid, riakc_obj:new(Bucket, Key, Val))
|| Bucket <- Buckets],
riakc_pb_socket:stop(Pid).
put_keys(Node, Bucket, Num) ->
Pid = rt:pbc(Node),
Keys = [list_to_binary(["", integer_to_list(Ki)]) || Ki <- lists:seq(0, Num - 1)],
Vals = [list_to_binary(["", integer_to_list(Ki)]) || Ki <- lists:seq(0, Num - 1)],
[riakc_pb_socket:put(Pid, riakc_obj:new(Bucket, Key, Val)) || {Key, Val} <- lists:zip(Keys, Vals)],
riakc_pb_socket:stop(Pid).

View File

@ -53,6 +53,9 @@ confirm() ->
Ns
end, [Node1], [Node2, Node3, Node4]),
lager:info("Checking basic HTTP"),
check_it_all(Nodes, http),
lager:info("Stopping Node1"),
rt:stop(Node1),
rt:wait_until_unpingable(Node1),
@ -73,7 +76,7 @@ confirm() ->
check_it_all(UpNodes),
Node
end, Node1, [Node2, Node3, Node4]),
lager:info("Stopping Node2"),
rt:stop(Node2),
rt:wait_until_unpingable(Node2),
@ -84,7 +87,7 @@ confirm() ->
lager:info("Only Node1 is up, so test should fail!"),
check_it_all([Node1], false),
check_it_all([Node1], pbc, false),
pass.
put_keys(Node, Bucket, Num) ->
@ -94,37 +97,64 @@ put_keys(Node, Bucket, Num) ->
[riakc_pb_socket:put(Pid, riakc_obj:new(Bucket, Key, Val)) || {Key, Val} <- lists:zip(Keys, Vals)],
riakc_pb_socket:stop(Pid).
list_keys(Node, Bucket, Attempt, Num, ShouldPass) ->
Pid = rt:pbc(Node),
lager:info("Listing keys on ~p. Attempt #~p", [Node, Attempt]),
list_keys(Node, Interface, Bucket, Attempt, Num, ShouldPass) ->
case Interface of
pbc ->
Pid = rt:pbc(Node),
Mod = riakc_pb_socket;
http ->
Pid = rt:httpc(Node),
Mod = rhc
end,
lager:info("Listing keys on ~p using ~p. Attempt #~p",
[Node, Interface, Attempt]),
case ShouldPass of
true ->
{ok, Keys} = riakc_pb_socket:list_keys(Pid, Bucket),
{ok, Keys} = Mod:list_keys(Pid, Bucket),
ActualKeys = lists:usort(Keys),
ExpectedKeys = lists:usort([list_to_binary(["", integer_to_list(Ki)]) || Ki <- lists:seq(0, Num - 1)]),
ExpectedKeys = lists:usort([list_to_binary(["", integer_to_list(Ki)])
|| Ki <- lists:seq(0, Num - 1)]),
assert_equal(ExpectedKeys, ActualKeys);
_ ->
{Status, Message} = riakc_pb_socket:list_keys(Pid, Bucket),
{Status, Message} = Mod:list_keys(Pid, Bucket),
?assertEqual(error, Status),
?assertEqual(<<"insufficient_vnodes_available">>, Message)
end,
riakc_pb_socket:stop(Pid).
case Interface of
pbc -> riakc_pb_socket:stop(Pid);
_ -> ok
end.
put_buckets(Node, Num) ->
put_buckets(Node, Num) ->
Pid = rt:pbc(Node),
Buckets = [list_to_binary(["", integer_to_list(Ki)]) || Ki <- lists:seq(0, Num - 1)],
Buckets = [list_to_binary(["", integer_to_list(Ki)])
|| Ki <- lists:seq(0, Num - 1)],
{Key, Val} = {<<"test_key">>, <<"test_value">>},
[riakc_pb_socket:put(Pid, riakc_obj:new(Bucket, Key, Val)) || Bucket <- Buckets],
[riakc_pb_socket:put(Pid, riakc_obj:new(Bucket, Key, Val))
|| Bucket <- Buckets],
riakc_pb_socket:stop(Pid).
list_buckets(Node, Attempt, Num, ShouldPass) ->
Pid = rt:pbc(Node),
lager:info("Listing buckets on ~p. Attempt #~p", [Node, Attempt]),
list_buckets(Node, Interface, Attempt, Num, ShouldPass) ->
case Interface of
pbc ->
Pid = rt:pbc(Node),
Mod = riakc_pb_socket;
http ->
Pid = rt:httpc(Node),
Mod = rhc
end,
lager:info("Listing buckets on ~p using ~p. Attempt #~p",
[Node, Interface, Attempt]),
{Status, Buckets} = riakc_pb_socket:list_buckets(Pid),
{Status, Buckets} = Mod:list_buckets(Pid),
case Status of
error -> lager:info("list buckets error ~p", [Buckets]);
_ -> ok
end,
?assertEqual(ok, Status),
ExpectedBuckets= lists:usort([?BUCKET | [list_to_binary(["", integer_to_list(Ki)]) || Ki <- lists:seq(0, Num - 1)]]),
ExpectedBuckets= lists:usort([?BUCKET |
[list_to_binary(["", integer_to_list(Ki)])
|| Ki <- lists:seq(0, Num - 1)]]),
ActualBuckets = lists:usort(Buckets),
case ShouldPass of
true ->
@ -133,7 +163,10 @@ list_buckets(Node, Attempt, Num, ShouldPass) ->
?assert(length(ActualBuckets) < length(ExpectedBuckets)),
lager:info("This case expects inconsistent bucket lists")
end,
riakc_pb_socket:stop(Pid).
case Interface of
pbc -> riakc_pb_socket:stop(Pid);
_ -> ok
end.
assert_equal(Expected, Actual) ->
@ -145,10 +178,16 @@ assert_equal(Expected, Actual) ->
?assertEqual(Actual, Expected).
check_it_all(Nodes) ->
check_it_all(Nodes, true).
check_it_all(Nodes, ShouldPass) ->
[check_a_node(N, ShouldPass) || N <- Nodes].
check_it_all(Nodes, pbc).
check_it_all(Nodes, Interface) ->
check_it_all(Nodes, Interface, true).
check_it_all(Nodes, Interface, ShouldPass) ->
[check_a_node(N, Interface, ShouldPass) || N <- Nodes].
check_a_node(Node, ShouldPass) ->
[list_keys(Node, ?BUCKET, Attempt, ?NUM_KEYS, ShouldPass) || Attempt <- [1,2,3] ],
[list_buckets(Node, Attempt, ?NUM_BUCKETS, ShouldPass) || Attempt <- [1,2,3] ].
check_a_node(Node, Interface, ShouldPass) ->
[list_keys(Node, Interface, ?BUCKET, Attempt, ?NUM_KEYS, ShouldPass)
|| Attempt <- [1,2,3] ],
[list_buckets(Node, Interface, Attempt, ?NUM_BUCKETS, ShouldPass)
|| Attempt <- [1,2,3] ].