diff --git a/tests/overload.erl b/tests/overload.erl index 8d475f09..8f5aab14 100644 --- a/tests/overload.erl +++ b/tests/overload.erl @@ -30,13 +30,13 @@ -define(LIST_KEYS_RETRIES, 1000). -define(GET_RETRIES, 1000). -define(BUCKET, <<"test">>). --define(KEY, <<"hotkey">>). +-define(VALUE, <<"overload_test_value">>). -define(NORMAL_TYPE, <<"normal_type">>). -define(CONSISTENT_TYPE, <<"consistent_type">>). -define(WRITE_ONCE_TYPE, <<"write_once_type">>). --define(NORMAL_BKV, {{?NORMAL_TYPE, ?BUCKET}, ?KEY, <<"test">>}). --define(CONSISTENT_BKV, {{?CONSISTENT_TYPE, ?BUCKET}, ?KEY, <<"test">>}). --define(WRITE_ONCE_BKV, {{?WRITE_ONCE_TYPE, ?BUCKET}, ?KEY, <<"test">>}). +-define(NORMAL_BUCKET, {?NORMAL_TYPE, ?BUCKET}). +-define(CONSISTENT_BUCKET, {?CONSISTENT_TYPE, ?BUCKET}). +-define(WRITE_ONCE_BUCKET, {?WRITE_ONCE_TYPE, ?BUCKET}). %% This record contains the default values for config settings if they were not set %% in the advanced.config file - because setting something to `undefined` is not the same @@ -81,18 +81,22 @@ default_config(#config{ {riak_api, [{pb_backlog, 1024}]}]. confirm() -> - Nodes = setup(), + [Node1 | _] = Nodes = setup(), ok = create_bucket_type(Nodes, ?NORMAL_TYPE, [{n_val, 3}]), ok = create_bucket_type(Nodes, ?CONSISTENT_TYPE, [{consistent, true}, {n_val, 5}]), ok = create_bucket_type(Nodes, ?WRITE_ONCE_TYPE, [{write_once, true}, {n_val, 1}]), - rt:wait_until(ring_manager_check_fun(hd(Nodes))), + Key = generate_key(), + lager:info("Generated overload test key ~p", [Key]), - Node1 = hd(Nodes), - write_once(Node1, ?NORMAL_BKV), - write_once(Node1, ?CONSISTENT_BKV), - write_once(Node1, ?WRITE_ONCE_BKV), + NormalBKV = {?NORMAL_BUCKET, Key, ?VALUE}, + ConsistentBKV = {?CONSISTENT_BUCKET, Key, ?VALUE}, + WriteOnceBKV = {?WRITE_ONCE_BUCKET, Key, ?VALUE}, + + write_once(Node1, NormalBKV), + write_once(Node1, ConsistentBKV), + write_once(Node1, WriteOnceBKV), Tests = [test_no_overload_protection, test_vnode_protection, @@ -102,33 +106,41 @@ confirm() -> lager:info("Starting Test ~p for ~p~n", [Test, BKV]), ok = erlang:apply(?MODULE, Test, [Nodes, BKV]) end || Test <- Tests, - BKV <- [?NORMAL_BKV, - ?CONSISTENT_BKV, - ?WRITE_ONCE_BKV]], + BKV <- [NormalBKV, + ConsistentBKV, + WriteOnceBKV]], %% Test cover queries doesn't depend on bucket/keyvalue, just run it once test_cover_queries_overload(Nodes), pass. +generate_key() -> + random:seed(erlang:now()), + N = random:uniform(500), + + Part1 = <<"overload_test_key_">>, + Part2 = integer_to_binary(N), + + <>. setup() -> ensemble_util:build_cluster(5, default_config(), 5). -test_no_overload_protection(_Nodes, ?CONSISTENT_BKV) -> +test_no_overload_protection(_Nodes, {?CONSISTENT_BUCKET, _, _}) -> ok; test_no_overload_protection(Nodes, BKV) -> - lager:info("Setting default configuration for no overload protestion test."), + lager:info("Setting default configuration for no overload protection test."), rt:pmap(fun(Node) -> rt:update_app_config(Node, default_config()) end, Nodes), lager:info("Testing with no overload protection"), ProcFun = build_predicate_eq(test_no_overload_protection, ?NUM_REQUESTS, "ProcFun", "Procs"), - QueueFun = build_predicate_gte(test_no_overload_protection, ?NUM_REQUESTS, - "QueueFun", "Queue Size"), + QueueFun = build_predicate_eq(test_no_overload_protection, ?NUM_REQUESTS, + "QueueFun", "Queue Size"), verify_test_results(run_test(Nodes, BKV), BKV, ProcFun, QueueFun). -verify_test_results({_NumProcs, QueueLen}, ?CONSISTENT_BKV, _ProcFun, QueueFun) -> +verify_test_results({_NumProcs, QueueLen}, {?CONSISTENT_BUCKET, _, _}, _ProcFun, QueueFun) -> ?assert(QueueFun(QueueLen)); verify_test_results({NumProcs, QueueLen}, _BKV, ProcFun, QueueFun) -> ?assert(ProcFun(NumProcs)), @@ -171,11 +183,11 @@ test_vnode_protection(Nodes, BKV) -> %% Don't check consistent gets, as they don't use the FSM -test_fsm_protection(_, ?CONSISTENT_BKV) -> +test_fsm_protection(_, {?CONSISTENT_BUCKET, _, _}) -> ok; %% Don't check on fast path either. -test_fsm_protection(_, ?WRITE_ONCE_BKV) -> +test_fsm_protection(_, {?WRITE_ONCE_BUCKET, _, _}) -> ok; test_fsm_protection(Nodes, BKV) -> @@ -282,22 +294,22 @@ run_test(Nodes, BKV) -> NumProcs2 = overload_proxy:get_count(), lager:info("Final process count on ~p: ~b", [Node1, NumProcs2]), - QueueLen = vnode_queue_len(Victim), + QueueLen = vnode_gets_in_queue(Victim), lager:info("Final vnode queue length for ~p: ~b", [Victim, QueueLen]), resume_vnode(Suspended), rt:wait_until(fun() -> - vnode_queue_len(Victim) =:= 0 + vnode_gets_in_queue(Victim) =:= 0 end), kill_pids(Reads), overload_proxy:stop(), {NumProcs2 - NumProcs1, QueueLen}. -get_victim(ExcludeNode, {Bucket, Key, _}) -> +get_victim(Node, {Bucket, Key, _}) -> Hash = riak_core_util:chash_std_keyfun({Bucket, Key}), - PL = lists:sublist(riak_core_ring:preflist(Hash, rt:get_ring(ExcludeNode)), 5), - hd([IdxNode || {_, Node}=IdxNode <- PL, Node /= ExcludeNode]). + PL = riak_core_ring:preflist(Hash, rt:get_ring(Node)), + hd(PL). ring_manager_check_fun(Node) -> fun() -> @@ -366,7 +378,7 @@ read_until_success(Node) -> read_until_success(C, 0). read_until_success(C, Count) -> - case C:get(?BUCKET, ?KEY) of + case C:get(<<"dummy">>, <<"dummy">>) of {error, mailbox_overload} -> read_until_success(C, Count+1); _ -> @@ -532,11 +544,11 @@ resume_vnode(Pid) -> process_count(Node) -> rpc:call(Node, erlang, system_info, [process_count]). -vnode_queue_len({Idx, Node}) -> - vnode_queue_len(Node, Idx). +vnode_gets_in_queue({Idx, Node}) -> + vnode_gets_in_queue(Node, Idx). -vnode_queue_len(Node, Idx) -> - rpc:call(Node, ?MODULE, remote_vnode_queue, [Idx]). +vnode_gets_in_queue(Node, Idx) -> + rpc:call(Node, ?MODULE, remote_vnode_gets_in_queue, [Idx]). dropped_stat(Node) -> Stats = rpc:call(Node, riak_core_stat, get_stats, []), @@ -551,12 +563,6 @@ run_count(Node) -> lager:info("fsm count:~p", [get_num_running_gen_fsm(Node)]), run_count(Node). -run_queue_len({Idx, Node}) -> - timer:sleep(500), - Len = vnode_queue_len(Node, Idx), - lager:info("queue len on ~p is:~p", [Node, Len]), - run_queue_len({Idx, Node}). - get_num_running_gen_fsm(Node) -> Procs = rpc:call(Node, erlang, processes, []), ProcInfo = [ rpc:call(Node, erlang, process_info, [P]) || P <- Procs, P /= undefined ], @@ -565,13 +571,27 @@ get_num_running_gen_fsm(Node) -> FsmList = [ proplists:lookup(riak_kv_get_fsm, Call) || Call <- InitCalls ], length(proplists:lookup_all(riak_kv_get_fsm, FsmList)). -remote_vnode_queue(Idx) -> +remote_vnode_gets_in_queue(Idx) -> {ok, Pid} = riak_core_vnode_manager:get_vnode_pid(Idx, riak_kv_vnode), {messages, AllMessages} = process_info(Pid, messages), - NonPings = lists:filter(fun({'$vnode_proxy_ping', _, _, _}) -> false; (_) -> true end, AllMessages), - Len = length(NonPings), - lager:info("Non-Ping Messages (~p): ~n~p~n", [Len, NonPings]), - Len. + + GetMessages = lists:foldl(fun(E, A) -> + case is_get_req(E) of + true -> A + 1; + false -> A + end + end, 0, AllMessages), + + lager:info("Get requests (~p): ~p", [Idx, GetMessages]), + GetMessages. + +%% This is not the greatest thing ever, since we're coupling this test pretty +%% tightly to the internal representation of get requests in riak_kv...can't +%% really figure out a better way to do this, though. +is_get_req({'$gen_event', {riak_vnode_req_v1, _, _, Req}}) -> + element(1, Req) =:= riak_kv_get_req_v1; +is_get_req(_) -> + false. %% In tests that do not expect work to be shed, we want to confirm that %% at least ?NUM_REQUESTS (queue entries) are handled.