2013-05-10 20:54:50 +00:00
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
%%
|
|
|
|
%% Copyright (c) 2013 Basho Technologies, Inc.
|
|
|
|
%%
|
|
|
|
%% This file is provided to you under the Apache License,
|
|
|
|
%% Version 2.0 (the "License"); you may not use this file
|
|
|
|
%% except in compliance with the License. You may obtain
|
|
|
|
%% a copy of the License at
|
|
|
|
%%
|
|
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
%%
|
|
|
|
%% Unless required by applicable law or agreed to in writing,
|
|
|
|
%% software distributed under the License is distributed on an
|
|
|
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
|
%% KIND, either express or implied. See the License for the
|
|
|
|
%% specific language governing permissions and limitations
|
|
|
|
%% under the License.
|
|
|
|
%%
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
-module(overload).
|
|
|
|
-compile(export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
-cover_modules([riak_kv_vnode,
|
|
|
|
riak_kv_ensemble_backend,
|
|
|
|
riak_core_vnode_proxy]).
|
|
|
|
|
2014-01-24 20:13:24 +00:00
|
|
|
-define(NUM_REQUESTS, 200).
|
|
|
|
-define(THRESHOLD, 100).
|
2013-05-10 20:54:50 +00:00
|
|
|
-define(BUCKET, <<"test">>).
|
|
|
|
-define(KEY, <<"hotkey">>).
|
|
|
|
|
|
|
|
confirm() ->
|
2014-07-01 16:46:42 +00:00
|
|
|
Nodes = setup(),
|
|
|
|
|
|
|
|
NormalType = <<"normal_type">>,
|
|
|
|
ConsistentType = <<"consistent_type">>,
|
|
|
|
ok = create_bucket_type(Nodes, NormalType, [{n_val, 3}]),
|
|
|
|
ok = create_bucket_type(Nodes, ConsistentType, [{consistent, true}, {n_val, 5}]),
|
|
|
|
rt:wait_until(ring_manager_check_fun(hd(Nodes))),
|
|
|
|
|
|
|
|
BKV1 = {{NormalType, ?BUCKET}, ?KEY, <<"test">>},
|
|
|
|
BKV2 = {{ConsistentType, ?BUCKET}, ?KEY, <<"test">>},
|
|
|
|
|
|
|
|
Tests = [test_no_overload_protection,
|
|
|
|
test_vnode_protection,
|
|
|
|
test_fsm_protection,
|
|
|
|
test_cover_queries_overload],
|
|
|
|
|
|
|
|
[ok = erlang:apply(?MODULE, Test, [Nodes, BKV, IsConsistent]) ||
|
|
|
|
Test <- Tests,
|
|
|
|
{BKV, IsConsistent} <- [{BKV1, false},
|
|
|
|
{BKV2, true}]],
|
|
|
|
pass.
|
|
|
|
|
|
|
|
|
|
|
|
setup() ->
|
2013-05-10 20:54:50 +00:00
|
|
|
Config = [{riak_core, [{ring_creation_size, 8},
|
2014-07-01 16:46:42 +00:00
|
|
|
{default_bucket_props, [{n_val, 5}]},
|
|
|
|
{vnode_management_timer, 1000},
|
2013-05-10 20:54:50 +00:00
|
|
|
{enable_health_checks, false},
|
2014-07-01 16:46:42 +00:00
|
|
|
{enable_consensus, true},
|
2013-05-10 20:54:50 +00:00
|
|
|
{vnode_overload_threshold, undefined}]},
|
|
|
|
{riak_kv, [{fsm_limit, undefined},
|
|
|
|
{storage_backend, riak_kv_memory_backend},
|
2014-07-01 16:46:42 +00:00
|
|
|
{anti_entropy_build_limit, {100, 1000}},
|
|
|
|
{anti_entropy_concurrency, 100},
|
|
|
|
{anti_entropy_tick, 100},
|
|
|
|
{anti_entropy, {on, []}},
|
|
|
|
{anti_entropy_timeout, 5000}]}],
|
|
|
|
ensemble_util:build_cluster(5, Config, 5).
|
|
|
|
|
|
|
|
test_no_overload_protection(_Nodes, _BKV, true) ->
|
|
|
|
ok;
|
|
|
|
test_no_overload_protection(Nodes, BKV, ConsistentType) ->
|
2013-05-10 20:54:50 +00:00
|
|
|
lager:info("Testing with no overload protection"),
|
2014-07-01 16:46:42 +00:00
|
|
|
ProcFun = fun(X) ->
|
|
|
|
X >= (2*?NUM_REQUESTS * 0.9)
|
|
|
|
end,
|
|
|
|
QueueFun = fun(X) ->
|
|
|
|
X >= (?NUM_REQUESTS * 0.9)
|
|
|
|
end,
|
|
|
|
verify_test_results(run_test(Nodes, BKV), ConsistentType, ProcFun, QueueFun).
|
|
|
|
|
|
|
|
verify_test_results({_NumProcs, QueueLen}, true, _, QueueFun) ->
|
|
|
|
?assert(QueueFun(QueueLen));
|
|
|
|
verify_test_results({NumProcs, QueueLen}, false, ProcFun, QueueFun) ->
|
|
|
|
?assert(ProcFun(NumProcs)),
|
|
|
|
?assert(QueueFun(QueueLen)).
|
|
|
|
|
|
|
|
test_vnode_protection(Nodes, BKV, ConsistentType) ->
|
2014-01-28 23:49:57 +00:00
|
|
|
%% Setting check_interval to one ensures that process_info is called
|
|
|
|
%% to check the queue length on each vnode send.
|
|
|
|
%% This allows us to artificially raise vnode queue lengths with dummy
|
|
|
|
%% messages instead of having to go through the vnode path for coverage
|
|
|
|
%% query overload testing.
|
2013-05-10 20:54:50 +00:00
|
|
|
lager:info("Testing with vnode queue protection enabled"),
|
|
|
|
lager:info("Setting vnode overload threshold to ~b", [?THRESHOLD]),
|
2014-01-28 23:49:57 +00:00
|
|
|
lager:info("Setting vnode check interval to 1"),
|
2014-07-01 16:46:42 +00:00
|
|
|
Config = [{riak_core, [{vnode_overload_threshold, ?THRESHOLD},
|
|
|
|
{vnode_check_interval, 1}]}],
|
2013-05-10 20:54:50 +00:00
|
|
|
rt:pmap(fun(Node) ->
|
2014-07-01 16:46:42 +00:00
|
|
|
rt:update_app_config(Node, Config)
|
2013-05-10 20:54:50 +00:00
|
|
|
end, Nodes),
|
2014-07-01 16:46:42 +00:00
|
|
|
ProcFun = fun(X) ->
|
|
|
|
X =< (2*?THRESHOLD * 1.5)
|
|
|
|
end,
|
|
|
|
QueueFun = fun(X) ->
|
|
|
|
X =< (?THRESHOLD * 1.1)
|
|
|
|
end,
|
|
|
|
verify_test_results(run_test(Nodes, BKV), ConsistentType, ProcFun, QueueFun),
|
2013-05-10 20:54:50 +00:00
|
|
|
|
|
|
|
%% This stats check often fails. Manual testing shows stats
|
|
|
|
%% always incrementing properly. Plus, if I add code to Riak
|
|
|
|
%% to log when the dropped stat is incremented I see it called
|
|
|
|
%% the correct number of times. This looks like a stats bug
|
|
|
|
%% that is outside the scope of this test. Punting for now.
|
|
|
|
%%
|
|
|
|
%% ShouldDrop = ?NUM_REQUESTS - ?THRESHOLD,
|
|
|
|
%% ok = rt:wait_until(Node2, fun(Node) ->
|
|
|
|
%% dropped_stat(Node) =:= ShouldDrop
|
|
|
|
%% end),
|
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
[Node1 | _] = Nodes,
|
2013-05-10 20:54:50 +00:00
|
|
|
CheckInterval = ?THRESHOLD div 2,
|
|
|
|
Dropped = read_until_success(Node1),
|
|
|
|
lager:info("Unnecessary dropped requests: ~b", [Dropped]),
|
|
|
|
?assert(Dropped =< CheckInterval),
|
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
Victim = get_victim(Node1, BKV),
|
|
|
|
|
|
|
|
lager:info("Suspending vnode proxy for ~p", [Victim]),
|
|
|
|
Pid = suspend_vnode_proxy(Victim),
|
|
|
|
ProcFun2 = fun(X) ->
|
|
|
|
X >= (2*?NUM_REQUESTS * 0.9)
|
|
|
|
end,
|
|
|
|
QueueFun2 = fun(X) ->
|
|
|
|
X =< (?THRESHOLD * 1.1)
|
|
|
|
end,
|
|
|
|
verify_test_results(run_test(Nodes, BKV), ConsistentType, ProcFun2, QueueFun2),
|
2014-01-24 20:13:24 +00:00
|
|
|
Pid ! resume,
|
2013-05-10 20:54:50 +00:00
|
|
|
ok.
|
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
test_fsm_protection(Nodes, BKV, ConsistentType) ->
|
2013-05-10 20:54:50 +00:00
|
|
|
lager:info("Testing with coordinator protection enabled"),
|
|
|
|
lager:info("Setting FSM limit to ~b", [?THRESHOLD]),
|
2014-07-01 16:46:42 +00:00
|
|
|
Config = [{riak_kv, [{fsm_limit, ?THRESHOLD}]}],
|
2013-05-10 20:54:50 +00:00
|
|
|
rt:pmap(fun(Node) ->
|
2014-07-01 16:46:42 +00:00
|
|
|
rt:update_app_config(Node, Config)
|
2013-05-10 20:54:50 +00:00
|
|
|
end, Nodes),
|
2014-07-01 16:46:42 +00:00
|
|
|
ProcFun = fun(X) ->
|
|
|
|
X =< (?THRESHOLD * 1.1)
|
|
|
|
end,
|
|
|
|
QueueFun = fun(X) ->
|
|
|
|
X =< (?THRESHOLD * 1.1)
|
|
|
|
end,
|
|
|
|
verify_test_results(run_test(Nodes, BKV), ConsistentType, ProcFun, QueueFun),
|
2013-05-10 20:54:50 +00:00
|
|
|
ok.
|
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
test_cover_queries_overload(_Nodes, _, true) ->
|
|
|
|
ok;
|
|
|
|
test_cover_queries_overload(Nodes, _, false) ->
|
|
|
|
lager:info("Testing cover queries with vnode queue protection enabled"),
|
|
|
|
lager:info("Setting vnode overload threshold to ~b", [?THRESHOLD]),
|
|
|
|
lager:info("Setting vnode check interval to 1"),
|
|
|
|
|
|
|
|
Config = [{riak_core, [{vnode_overload_threshold, ?THRESHOLD},
|
|
|
|
{vnode_check_interval, 1}]}],
|
|
|
|
rt:pmap(fun(Node) ->
|
|
|
|
rt:update_app_config(Node, Config)
|
|
|
|
end, Nodes),
|
|
|
|
|
|
|
|
rt:load_modules_on_nodes([?MODULE], Nodes),
|
|
|
|
|
|
|
|
[Node1, Node2, Node3, Node4, Node5] = Nodes,
|
|
|
|
Pids = [begin
|
|
|
|
lager:info("Suspending all kv vnodes on ~p", [N]),
|
|
|
|
suspend_and_overload_all_kv_vnodes(N)
|
|
|
|
end || N <- [Node2, Node3, Node4, Node5]],
|
|
|
|
|
|
|
|
Res = list_keys(Node1),
|
|
|
|
?assertEqual({error, <<"mailbox_overload">>}, Res),
|
|
|
|
lager:info("list_keys correctly handled overload"),
|
|
|
|
|
|
|
|
Res2 = list_buckets(Node1),
|
|
|
|
?assertEqual({error, mailbox_overload}, Res2),
|
|
|
|
lager:info("list_buckets correctly handled overload"),
|
|
|
|
|
|
|
|
lager:info("Resuming all kv vnodes"),
|
|
|
|
[resume_all_vnodes(Pid) || Pid <- Pids],
|
|
|
|
|
|
|
|
lager:info("Waiting for vnode queues to empty"),
|
|
|
|
wait_for_all_vnode_queues_empty(Node2).
|
|
|
|
|
|
|
|
get_victim(ExcludeNode, {Bucket, Key, _}) ->
|
|
|
|
Hash = riak_core_util:chash_std_keyfun({Bucket, Key}),
|
|
|
|
PL = lists:sublist(riak_core_ring:preflist(Hash, rt:get_ring(ExcludeNode)), 5),
|
|
|
|
hd([IdxNode || {_, Node}=IdxNode <- PL, Node /= ExcludeNode]).
|
|
|
|
|
|
|
|
run_test(Nodes, BKV) ->
|
|
|
|
[Node1 | _RestNodes] = Nodes,
|
2013-05-10 20:54:50 +00:00
|
|
|
rt:wait_for_cluster_service(Nodes, riak_kv),
|
|
|
|
lager:info("Sleeping for 10s to let process count stablize"),
|
|
|
|
timer:sleep(10000),
|
|
|
|
rt:load_modules_on_nodes([?MODULE], Nodes),
|
2014-07-01 16:46:42 +00:00
|
|
|
Victim = get_victim(Node1, BKV),
|
|
|
|
lager:info("Suspending vnode ~p/~p",
|
|
|
|
[element(1, Victim), element(2, Victim)]),
|
|
|
|
Suspended = suspend_vnode(Victim),
|
|
|
|
|
2013-05-10 20:54:50 +00:00
|
|
|
NumProcs1 = process_count(Node1),
|
|
|
|
lager:info("Initial process count on ~p: ~b", [Node1, NumProcs1]),
|
2014-01-17 02:06:26 +00:00
|
|
|
lager:info("Sending ~b read requests", [?NUM_REQUESTS]),
|
2014-07-01 16:46:42 +00:00
|
|
|
write_once(Node1, BKV),
|
|
|
|
Reads = spawn_reads(Node1, BKV, ?NUM_REQUESTS),
|
2013-05-10 20:54:50 +00:00
|
|
|
timer:sleep(5000),
|
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
NumProcs2 = process_count(Node1),
|
2013-05-10 20:54:50 +00:00
|
|
|
lager:info("Final process count on ~p: ~b", [Node1, NumProcs2]),
|
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
QueueLen = vnode_queue_len(Victim),
|
|
|
|
lager:info("Final vnode queue length for ~p: ~b",
|
|
|
|
[Victim, QueueLen]),
|
2013-05-10 20:54:50 +00:00
|
|
|
resume_vnode(Suspended),
|
2014-07-01 16:46:42 +00:00
|
|
|
rt:wait_until(fun() ->
|
|
|
|
vnode_queue_len(Victim) =:= 0
|
|
|
|
end),
|
2014-01-17 02:06:26 +00:00
|
|
|
kill_pids(Reads),
|
2013-05-10 20:54:50 +00:00
|
|
|
{NumProcs2 - NumProcs1, QueueLen}.
|
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
ring_manager_check_fun(Node) ->
|
|
|
|
fun() ->
|
|
|
|
case rpc:call(Node, riak_core_ring_manager, get_chash_bin, []) of
|
|
|
|
{ok, _R} ->
|
|
|
|
true;
|
|
|
|
_ ->
|
|
|
|
false
|
|
|
|
end
|
|
|
|
end.
|
2014-01-25 02:44:22 +00:00
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
create_bucket_type(Nodes, Type, Props) ->
|
|
|
|
lager:info("Create bucket type ~p, wait for propagation", [Type]),
|
|
|
|
rt:create_and_activate_bucket_type(hd(Nodes), Type, Props),
|
|
|
|
rt:wait_until_bucket_type_status(Type, active, Nodes),
|
|
|
|
rt:wait_until_bucket_props(Nodes, {Type, <<"bucket">>}, Props),
|
|
|
|
ok.
|
2014-01-17 02:06:26 +00:00
|
|
|
|
2014-01-25 02:44:22 +00:00
|
|
|
list_keys(Node) ->
|
2014-02-13 20:51:49 +00:00
|
|
|
Pid = rt:pbc(Node),
|
|
|
|
riakc_pb_socket:list_keys(Pid, ?BUCKET, 30000).
|
2014-01-25 02:44:22 +00:00
|
|
|
|
|
|
|
list_buckets(Node) ->
|
|
|
|
{ok, C} = riak:client_connect(Node),
|
|
|
|
riak_client:list_buckets(30000, C).
|
2014-01-24 20:13:24 +00:00
|
|
|
|
2014-01-17 02:06:26 +00:00
|
|
|
wait_for_all_vnode_queues_empty(Node) ->
|
|
|
|
rt:wait_until(Node, fun(N) ->
|
2014-07-01 16:46:42 +00:00
|
|
|
vnode_queues_empty(N)
|
2014-01-17 02:06:26 +00:00
|
|
|
end).
|
|
|
|
|
|
|
|
vnode_queues_empty(Node) ->
|
|
|
|
rpc:call(Node, ?MODULE, remote_vnode_queues_empty, []).
|
|
|
|
|
|
|
|
remote_vnode_queues_empty() ->
|
|
|
|
lists:all(fun({_, _, Pid}) ->
|
2014-07-01 16:46:42 +00:00
|
|
|
{message_queue_len, Len} =
|
|
|
|
process_info(Pid, message_queue_len),
|
|
|
|
Len =:= 0
|
2014-01-17 02:06:26 +00:00
|
|
|
end, riak_core_vnode_manager:all_vnodes()).
|
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
write_once(Node, {Bucket, Key, Value}) ->
|
|
|
|
lager:info("Writing to node ~p", [Node]),
|
|
|
|
PBC = rt:pbc(Node),
|
|
|
|
rt:pbc_write(PBC, Bucket, Key, Value),
|
|
|
|
riakc_pb_socket:stop(PBC).
|
2013-05-10 20:54:50 +00:00
|
|
|
|
|
|
|
read_until_success(Node) ->
|
|
|
|
{ok, C} = riak:client_connect(Node),
|
|
|
|
read_until_success(C, 0).
|
|
|
|
|
|
|
|
read_until_success(C, Count) ->
|
|
|
|
case C:get(?BUCKET, ?KEY) of
|
2014-02-13 20:51:49 +00:00
|
|
|
{error, mailbox_overload} ->
|
2013-05-10 20:54:50 +00:00
|
|
|
read_until_success(C, Count+1);
|
|
|
|
_ ->
|
|
|
|
Count
|
|
|
|
end.
|
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
spawn_reads(Node, {Bucket, Key, _}, Num) ->
|
2013-05-10 20:54:50 +00:00
|
|
|
[spawn(fun() ->
|
2014-07-01 16:46:42 +00:00
|
|
|
PBC = rt:pbc(Node),
|
|
|
|
_ = riakc_pb_socket:get(PBC, Bucket, Key),
|
|
|
|
riakc_pb_socket:stop(PBC)
|
|
|
|
end) || _ <- lists:seq(1, Num)].
|
2013-05-10 20:54:50 +00:00
|
|
|
|
2014-01-17 02:06:26 +00:00
|
|
|
kill_pids(Pids) ->
|
2013-05-10 20:54:50 +00:00
|
|
|
[exit(Pid, kill) || Pid <- Pids].
|
|
|
|
|
2014-01-28 23:49:57 +00:00
|
|
|
suspend_and_overload_all_kv_vnodes(Node) ->
|
|
|
|
Pid = rpc:call(Node, ?MODULE, remote_suspend_and_overload, []),
|
|
|
|
Pid ! {overload, self()},
|
|
|
|
receive overloaded ->
|
2014-07-01 16:46:42 +00:00
|
|
|
Pid
|
2014-01-28 23:49:57 +00:00
|
|
|
end.
|
2014-01-17 02:06:26 +00:00
|
|
|
|
2014-01-28 23:49:57 +00:00
|
|
|
remote_suspend_and_overload() ->
|
2014-01-17 02:06:26 +00:00
|
|
|
spawn(fun() ->
|
|
|
|
Vnodes = riak_core_vnode_manager:all_vnodes(),
|
2014-07-01 16:46:42 +00:00
|
|
|
[begin
|
|
|
|
lager:info("Suspending vnode pid: ~p", [Pid]),
|
|
|
|
erlang:suspend_process(Pid, []) end || {riak_kv_vnode, _, Pid}
|
|
|
|
<- Vnodes],
|
2014-01-28 23:49:57 +00:00
|
|
|
receive {overload, From} ->
|
2014-07-01 16:46:42 +00:00
|
|
|
io:format("Overloading vnodes ~n"),
|
|
|
|
[?MODULE:overload(Pid) ||
|
|
|
|
{riak_kv_vnode, _, Pid} <- Vnodes],
|
|
|
|
From ! overloaded
|
2014-01-28 23:49:57 +00:00
|
|
|
end,
|
2014-01-17 02:06:26 +00:00
|
|
|
receive resume ->
|
2014-07-01 16:46:42 +00:00
|
|
|
io:format("Resuming vnodes~n"),
|
|
|
|
[erlang:resume_process(Pid) || {riak_kv_vnode, _, Pid}
|
|
|
|
<- Vnodes]
|
2014-01-17 02:06:26 +00:00
|
|
|
end
|
|
|
|
end).
|
|
|
|
|
2014-01-28 23:49:57 +00:00
|
|
|
overload(Pid) ->
|
2014-07-01 16:46:42 +00:00
|
|
|
%% The actual message doesn't matter. This one just has the least
|
|
|
|
%% side effects.
|
|
|
|
[Pid ! {set_concurrency_limit, some_lock, 1} ||
|
|
|
|
_ <- lists:seq(1, ?NUM_REQUESTS)].
|
|
|
|
|
|
|
|
suspend_vnode({Idx, Node}) ->
|
|
|
|
suspend_vnode(Node, Idx).
|
2014-01-28 23:49:57 +00:00
|
|
|
|
2013-05-10 20:54:50 +00:00
|
|
|
suspend_vnode(Node, Idx) ->
|
2014-07-01 16:46:42 +00:00
|
|
|
rpc:call(Node, ?MODULE, remote_suspend_vnode, [Idx], infinity).
|
2013-05-10 20:54:50 +00:00
|
|
|
|
|
|
|
remote_suspend_vnode(Idx) ->
|
|
|
|
spawn(fun() ->
|
|
|
|
{ok, Pid} = riak_core_vnode_manager:get_vnode_pid(Idx, riak_kv_vnode),
|
2014-07-01 16:46:42 +00:00
|
|
|
lager:info("Suspending vnode pid: ~p", [Pid]),
|
2013-05-10 20:54:50 +00:00
|
|
|
erlang:suspend_process(Pid, []),
|
|
|
|
receive resume ->
|
|
|
|
erlang:resume_process(Pid)
|
|
|
|
end
|
|
|
|
end).
|
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
suspend_vnode_proxy({Idx, Node}) ->
|
|
|
|
suspend_vnode_proxy(Node, Idx).
|
|
|
|
|
2013-05-10 20:54:50 +00:00
|
|
|
suspend_vnode_proxy(Node, Idx) ->
|
2014-07-01 16:46:42 +00:00
|
|
|
rpc:call(Node, ?MODULE, remote_suspend_vnode_proxy, [Idx], infinity).
|
2013-05-10 20:54:50 +00:00
|
|
|
|
|
|
|
remote_suspend_vnode_proxy(Idx) ->
|
|
|
|
spawn(fun() ->
|
|
|
|
Name = riak_core_vnode_proxy:reg_name(riak_kv_vnode, Idx),
|
|
|
|
Pid = whereis(Name),
|
|
|
|
erlang:suspend_process(Pid, []),
|
|
|
|
receive resume ->
|
|
|
|
erlang:resume_process(Pid)
|
|
|
|
end
|
|
|
|
end).
|
|
|
|
|
2014-01-17 02:06:26 +00:00
|
|
|
resume_all_vnodes(Pid) ->
|
|
|
|
Pid ! resume.
|
|
|
|
|
2013-05-10 20:54:50 +00:00
|
|
|
resume_vnode(Pid) ->
|
|
|
|
Pid ! resume.
|
|
|
|
|
|
|
|
process_count(Node) ->
|
|
|
|
rpc:call(Node, erlang, system_info, [process_count]).
|
|
|
|
|
2014-07-01 16:46:42 +00:00
|
|
|
vnode_queue_len({Idx, Node}) ->
|
|
|
|
vnode_queue_len(Node, Idx).
|
|
|
|
|
2013-05-10 20:54:50 +00:00
|
|
|
vnode_queue_len(Node, Idx) ->
|
|
|
|
rpc:call(Node, ?MODULE, remote_vnode_queue, [Idx]).
|
|
|
|
|
|
|
|
dropped_stat(Node) ->
|
|
|
|
Stats = rpc:call(Node, riak_core_stat, get_stats, []),
|
|
|
|
proplists:get_value(dropped_vnode_requests_total, Stats).
|
|
|
|
|
|
|
|
remote_vnode_queue(Idx) ->
|
|
|
|
{ok, Pid} = riak_core_vnode_manager:get_vnode_pid(Idx, riak_kv_vnode),
|
|
|
|
{message_queue_len, Len} = process_info(Pid, message_queue_len),
|
|
|
|
Len.
|