Merge pull request #947 from basho/dr/forward/proxy_overload_recovery_fixes

Forward port changes to proxy_overload_recovery to make it pass in the face of indeterminate behavior.
This commit is contained in:
Doug Rohrer 2015-12-01 12:29:36 -05:00
commit 010c35a473

View File

@ -208,16 +208,13 @@ prepare(ThresholdSeed) ->
{ok, VPid0} = riak_core_vnode_manager:get_vnode_pid(Id, riak_kv_vnode),
sys:resume(VPid0),
ok = supervisor:terminate_child(riak_core_vnode_sup, VPid0),
false = is_process_alive(VPid0),
%% Reset the proxy pid to make sure it resets state and picks up the new
%% environment variables
ok = supervisor:terminate_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}),
RegName = riak_core_vnode_proxy:reg_name(riak_kv_vnode, Index),
undefined = whereis(RegName),
%% Fail if we get back the dead vnode
{ok, VPid1} = riak_core_vnode_manager:get_vnode_pid(Index, riak_kv_vnode),
?assertNotEqual(VPid1, VPid0),
VPid1 = wait_for_vnode_change(VPid0, Index),
{ok, PPid} = supervisor:restart_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}),
@ -269,14 +266,14 @@ resume_args(#tstate{rt = RT}) ->
resume(#rt{ppid = PPid, vpid = VPid}) ->
sys:resume(VPid),
%% Use the sys:get_status call to force a synchronous call
%% against the vnode proxy to ensure all messages sent by
%% against the vnode & the proxy to ensure all messages sent by
%% this process have been serviced and there are no pending
%% 'ping's in the vnode before we continue.
%% Then drain the vnode to make sure any pending pongs have
%% been sent.
ok = drain(VPid),
%% been sent, and ensure the proxy has
_ = sys:get_status(PPid),
_ = sys:get_status(VPid),
_ = sys:get_status(PPid).
ok = drain([VPid, PPid]).
resume_next(S, _V, _A) ->
S#tstate{vnode_running = true, proxy_msgs = 0, direct_msgs = 0}.
@ -329,28 +326,28 @@ overloaded_args(#tstate{vnode_running = Running, rt = RT}) ->
overloaded(Running, #rt{ppid = PPid, vpid = VPid}) ->
case Running of
true ->
ok = drain(PPid), % make sure all proxy msgs processed/dropped
ok = drain(VPid); % make sure any pending ping/pongs are processed
ok = drain([PPid, VPid]);
_ ->
ok
end,
{riak_core_vnode_proxy:overloaded(PPid),
msgq_len(VPid), % just for debug so we can review in log output
sys:get_status(PPid)}. % ditto
{messages, PMsgs} = process_info(PPid, messages),
{messages, VMsgs} = process_info(VPid, messages),
Overloaded = riak_core_vnode_proxy:overloaded(PPid),
{Overloaded, {VMsgs, PMsgs}, sys:get_status(PPid)}.
overloaded_post(#tstate{threshold = undefined}, _A,
{R, _VnodeQ, _ProxyStatus}) ->
{R, _Messages, _ProxyStatus}) ->
%% If there are no thresholds there should never be an overload
eq(R, false);
overloaded_post(#tstate{vnode_running = true}, _A,
{R, _VnodeQ = 0, _ProxyStatus}) ->
{R, _Messages, _ProxyStatus}) ->
%% If the vnode is running, we have cleared queues so
%% should not be in overload.
eq(R, false);
overloaded_post(#tstate{vnode_running = false,
proxy_msgs = ProxyMsgs,
threshold = Threshold}, _A,
{ResultOverload, _VnodeQ, _ProxyStatus}) ->
{ResultOverload, _Messages, _ProxyStatus}) ->
%% Either
%% mailbox is completely an estimate based on proxy msgs
%% or mailbox is a check + estimate since
@ -397,16 +394,33 @@ prep_env(Var, Val) ->
%% Wait until all messages are drained by the Pid. No guarantees
%% about future messages being sent, or that responses for the
%% last message consumed have been transmitted.
%% NOTE: The "drain 3 times in a row" was determined empirically,
%% and may not be sufficient (2 was not). Given time constraints,
%% living with it for now. If this fails, we should really add some
%% tracing code around the send of messages to Vnode and Proxy to
%% determine where extra messages are coming from rather than just
%% make this "try 4 times"
%%
drain(Pid) ->
case erlang:process_info(Pid, message_queue_len) of
{message_queue_len, 0} ->
drain(Pid) when is_pid(Pid) ->
drain([Pid], {-1, -1});
drain(Pids) when is_list(Pids) ->
drain(Pids, {-1, -1}).
drain(Pids, {PrevPrev, Prev}) ->
_ = [sys:suspend(Pid) || Pid <- Pids],
Len = lists:foldl(fun(Pid, Acc0) ->
{message_queue_len, Len} = erlang:process_info(Pid, message_queue_len),
Acc0 + Len
end, 0, Pids),
_ = [sys:resume(Pid) || Pid <- Pids],
case {PrevPrev, Prev, Len} of
{0, 0, 0} ->
ok;
{message_queue_len, L} when L > 0 ->
timer:sleep(1), % give it a millisecond to drain
drain(Pid);
ER ->
ER
_ ->
%% Attempt to ensure something else is scheduled before we try to drain again
erlang:yield(),
timer:sleep(1),
drain(Pids, {Prev, Len})
end.
%% Return the length of the message queue (or crash if proc dead)
@ -462,3 +476,14 @@ confirm() ->
pass.
-endif.
wait_for_vnode_change(VPid0, Index) ->
{ok, VPid1} = riak_core_vnode_manager:get_vnode_pid(Index, riak_kv_vnode),
case VPid1 of
VPid0 ->
timer:sleep(1),
wait_for_vnode_change(VPid0, Index);
_ ->
VPid1
end.