Merge pull request #1159 from basho/nem-overload-test-refinements

Fix intermittent failure in w1c part of overload test (and do some misc code cleanups)
This commit is contained in:
Nick Marino 2016-09-21 10:07:56 -04:00 committed by GitHub
commit 95b8747c58

View File

@ -30,13 +30,13 @@
-define(LIST_KEYS_RETRIES, 1000).
-define(GET_RETRIES, 1000).
-define(BUCKET, <<"test">>).
-define(KEY, <<"hotkey">>).
-define(VALUE, <<"overload_test_value">>).
-define(NORMAL_TYPE, <<"normal_type">>).
-define(CONSISTENT_TYPE, <<"consistent_type">>).
-define(WRITE_ONCE_TYPE, <<"write_once_type">>).
-define(NORMAL_BKV, {{?NORMAL_TYPE, ?BUCKET}, ?KEY, <<"test">>}).
-define(CONSISTENT_BKV, {{?CONSISTENT_TYPE, ?BUCKET}, ?KEY, <<"test">>}).
-define(WRITE_ONCE_BKV, {{?WRITE_ONCE_TYPE, ?BUCKET}, ?KEY, <<"test">>}).
-define(NORMAL_BUCKET, {?NORMAL_TYPE, ?BUCKET}).
-define(CONSISTENT_BUCKET, {?CONSISTENT_TYPE, ?BUCKET}).
-define(WRITE_ONCE_BUCKET, {?WRITE_ONCE_TYPE, ?BUCKET}).
%% This record contains the default values for config settings if they were not set
%% in the advanced.config file - because setting something to `undefined` is not the same
@ -81,18 +81,22 @@ default_config(#config{
{riak_api, [{pb_backlog, 1024}]}].
confirm() ->
Nodes = setup(),
[Node1 | _] = Nodes = setup(),
ok = create_bucket_type(Nodes, ?NORMAL_TYPE, [{n_val, 3}]),
ok = create_bucket_type(Nodes, ?CONSISTENT_TYPE, [{consistent, true}, {n_val, 5}]),
ok = create_bucket_type(Nodes, ?WRITE_ONCE_TYPE, [{write_once, true}, {n_val, 1}]),
rt:wait_until(ring_manager_check_fun(hd(Nodes))),
Key = generate_key(),
lager:info("Generated overload test key ~p", [Key]),
Node1 = hd(Nodes),
write_once(Node1, ?NORMAL_BKV),
write_once(Node1, ?CONSISTENT_BKV),
write_once(Node1, ?WRITE_ONCE_BKV),
NormalBKV = {?NORMAL_BUCKET, Key, ?VALUE},
ConsistentBKV = {?CONSISTENT_BUCKET, Key, ?VALUE},
WriteOnceBKV = {?WRITE_ONCE_BUCKET, Key, ?VALUE},
write_once(Node1, NormalBKV),
write_once(Node1, ConsistentBKV),
write_once(Node1, WriteOnceBKV),
Tests = [test_no_overload_protection,
test_vnode_protection,
@ -102,33 +106,41 @@ confirm() ->
lager:info("Starting Test ~p for ~p~n", [Test, BKV]),
ok = erlang:apply(?MODULE, Test, [Nodes, BKV])
end || Test <- Tests,
BKV <- [?NORMAL_BKV,
?CONSISTENT_BKV,
?WRITE_ONCE_BKV]],
BKV <- [NormalBKV,
ConsistentBKV,
WriteOnceBKV]],
%% Test cover queries doesn't depend on bucket/keyvalue, just run it once
test_cover_queries_overload(Nodes),
pass.
generate_key() ->
random:seed(erlang:now()),
N = random:uniform(500),
Part1 = <<"overload_test_key_">>,
Part2 = integer_to_binary(N),
<<Part1/binary, Part2/binary>>.
setup() ->
ensemble_util:build_cluster(5, default_config(), 5).
test_no_overload_protection(_Nodes, ?CONSISTENT_BKV) ->
test_no_overload_protection(_Nodes, {?CONSISTENT_BUCKET, _, _}) ->
ok;
test_no_overload_protection(Nodes, BKV) ->
lager:info("Setting default configuration for no overload protestion test."),
lager:info("Setting default configuration for no overload protection test."),
rt:pmap(fun(Node) ->
rt:update_app_config(Node, default_config())
end, Nodes),
lager:info("Testing with no overload protection"),
ProcFun = build_predicate_eq(test_no_overload_protection, ?NUM_REQUESTS,
"ProcFun", "Procs"),
QueueFun = build_predicate_gte(test_no_overload_protection, ?NUM_REQUESTS,
"QueueFun", "Queue Size"),
QueueFun = build_predicate_eq(test_no_overload_protection, ?NUM_REQUESTS,
"QueueFun", "Queue Size"),
verify_test_results(run_test(Nodes, BKV), BKV, ProcFun, QueueFun).
verify_test_results({_NumProcs, QueueLen}, ?CONSISTENT_BKV, _ProcFun, QueueFun) ->
verify_test_results({_NumProcs, QueueLen}, {?CONSISTENT_BUCKET, _, _}, _ProcFun, QueueFun) ->
?assert(QueueFun(QueueLen));
verify_test_results({NumProcs, QueueLen}, _BKV, ProcFun, QueueFun) ->
?assert(ProcFun(NumProcs)),
@ -171,11 +183,11 @@ test_vnode_protection(Nodes, BKV) ->
%% Don't check consistent gets, as they don't use the FSM
test_fsm_protection(_, ?CONSISTENT_BKV) ->
test_fsm_protection(_, {?CONSISTENT_BUCKET, _, _}) ->
ok;
%% Don't check on fast path either.
test_fsm_protection(_, ?WRITE_ONCE_BKV) ->
test_fsm_protection(_, {?WRITE_ONCE_BUCKET, _, _}) ->
ok;
test_fsm_protection(Nodes, BKV) ->
@ -282,22 +294,22 @@ run_test(Nodes, BKV) ->
NumProcs2 = overload_proxy:get_count(),
lager:info("Final process count on ~p: ~b", [Node1, NumProcs2]),
QueueLen = vnode_queue_len(Victim),
QueueLen = vnode_gets_in_queue(Victim),
lager:info("Final vnode queue length for ~p: ~b",
[Victim, QueueLen]),
resume_vnode(Suspended),
rt:wait_until(fun() ->
vnode_queue_len(Victim) =:= 0
vnode_gets_in_queue(Victim) =:= 0
end),
kill_pids(Reads),
overload_proxy:stop(),
{NumProcs2 - NumProcs1, QueueLen}.
get_victim(ExcludeNode, {Bucket, Key, _}) ->
get_victim(Node, {Bucket, Key, _}) ->
Hash = riak_core_util:chash_std_keyfun({Bucket, Key}),
PL = lists:sublist(riak_core_ring:preflist(Hash, rt:get_ring(ExcludeNode)), 5),
hd([IdxNode || {_, Node}=IdxNode <- PL, Node /= ExcludeNode]).
PL = riak_core_ring:preflist(Hash, rt:get_ring(Node)),
hd(PL).
ring_manager_check_fun(Node) ->
fun() ->
@ -366,7 +378,7 @@ read_until_success(Node) ->
read_until_success(C, 0).
read_until_success(C, Count) ->
case C:get(?BUCKET, ?KEY) of
case C:get(<<"dummy">>, <<"dummy">>) of
{error, mailbox_overload} ->
read_until_success(C, Count+1);
_ ->
@ -532,11 +544,11 @@ resume_vnode(Pid) ->
process_count(Node) ->
rpc:call(Node, erlang, system_info, [process_count]).
vnode_queue_len({Idx, Node}) ->
vnode_queue_len(Node, Idx).
vnode_gets_in_queue({Idx, Node}) ->
vnode_gets_in_queue(Node, Idx).
vnode_queue_len(Node, Idx) ->
rpc:call(Node, ?MODULE, remote_vnode_queue, [Idx]).
vnode_gets_in_queue(Node, Idx) ->
rpc:call(Node, ?MODULE, remote_vnode_gets_in_queue, [Idx]).
dropped_stat(Node) ->
Stats = rpc:call(Node, riak_core_stat, get_stats, []),
@ -551,12 +563,6 @@ run_count(Node) ->
lager:info("fsm count:~p", [get_num_running_gen_fsm(Node)]),
run_count(Node).
run_queue_len({Idx, Node}) ->
timer:sleep(500),
Len = vnode_queue_len(Node, Idx),
lager:info("queue len on ~p is:~p", [Node, Len]),
run_queue_len({Idx, Node}).
get_num_running_gen_fsm(Node) ->
Procs = rpc:call(Node, erlang, processes, []),
ProcInfo = [ rpc:call(Node, erlang, process_info, [P]) || P <- Procs, P /= undefined ],
@ -565,13 +571,27 @@ get_num_running_gen_fsm(Node) ->
FsmList = [ proplists:lookup(riak_kv_get_fsm, Call) || Call <- InitCalls ],
length(proplists:lookup_all(riak_kv_get_fsm, FsmList)).
remote_vnode_queue(Idx) ->
remote_vnode_gets_in_queue(Idx) ->
{ok, Pid} = riak_core_vnode_manager:get_vnode_pid(Idx, riak_kv_vnode),
{messages, AllMessages} = process_info(Pid, messages),
NonPings = lists:filter(fun({'$vnode_proxy_ping', _, _, _}) -> false; (_) -> true end, AllMessages),
Len = length(NonPings),
lager:info("Non-Ping Messages (~p): ~n~p~n", [Len, NonPings]),
Len.
GetMessages = lists:foldl(fun(E, A) ->
case is_get_req(E) of
true -> A + 1;
false -> A
end
end, 0, AllMessages),
lager:info("Get requests (~p): ~p", [Idx, GetMessages]),
GetMessages.
%% This is not the greatest thing ever, since we're coupling this test pretty
%% tightly to the internal representation of get requests in riak_kv...can't
%% really figure out a better way to do this, though.
is_get_req({'$gen_event', {riak_vnode_req_v1, _, _, Req}}) ->
element(1, Req) =:= riak_kv_get_req_v1;
is_get_req(_) ->
false.
%% In tests that do not expect work to be shed, we want to confirm that
%% at least ?NUM_REQUESTS (queue entries) are handled.