TD-927: fix race condition

This commit is contained in:
ttt161 2024-09-16 09:19:57 +03:00
parent bf3397eca2
commit d777d65c92
2 changed files with 39 additions and 22 deletions

View File

@ -11,7 +11,7 @@
-export([add/3]).
-export([remove/3]).
-record(epg_pool_mgr_state, {pool, params, size, connections, workers, owners}).
-record(epg_pool_mgr_state, {pool, params, size, connections, workers, owners, monitors}).
%% API
@ -45,7 +45,8 @@ init([PoolName, DbParams, Size]) ->
size = Size,
connections = queue:new(),
workers = #{},
owners = #{}
owners = #{},
monitors = #{}
}}.
%%
@ -72,25 +73,28 @@ handle_call(
handle_cast(
{add, Worker, Connection},
State = #epg_pool_mgr_state{connections = Conns, workers = Workers}
State = #epg_pool_mgr_state{connections = Conns, workers = Workers, monitors = Monitors}
) ->
_ = erlang:monitor(process, Worker),
MRef = erlang:monitor(process, Connection),
{
noreply,
State#epg_pool_mgr_state{
connections = unique_queue_push(Connection, Conns),
workers = Workers#{Worker => Connection}}
workers = Workers#{Worker => Connection},
monitors = Monitors#{Connection => MRef}
}
};
handle_cast(
{remove, Worker, Connection},
State = #epg_pool_mgr_state{connections = Conns, workers = Workers}
State = #epg_pool_mgr_state{connections = Conns, workers = Workers, monitors = Monitors}
) ->
NewConns = queue:delete(Connection, Conns),
NewWorkers = maps:without([Worker], Workers),
_ = close(Connection),
NewMonitors = demonitor_and_close(Connection, Monitors),
{
noreply,
State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers}
State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers, monitors = NewMonitors}
};
handle_cast(
{checkin, Owner, Connection},
@ -114,28 +118,40 @@ handle_cast(_Request, State = #epg_pool_mgr_state{}) ->
handle_info(
{'DOWN', _MonitorRef, process, Pid, _Info},
State = #epg_pool_mgr_state{connections = Conns, workers = Workers}
State = #epg_pool_mgr_state{connections = Conns, workers = Workers, monitors = Monitors}
) when erlang:is_map_key(Pid, Workers) ->
Connection = maps:get(Pid, Workers),
NewConns = queue:delete(Connection, Conns),
NewWorkers = maps:without([Pid], Workers),
_ = close(Connection),
NewMonitors = demonitor_and_close(Connection, Monitors),
{
noreply,
State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers}
State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers, monitors = NewMonitors}
};
handle_info(
{'DOWN', _MonitorRef, process, Pid, _Info},
State = #epg_pool_mgr_state{connections = Conns, owners = Owners}
State = #epg_pool_mgr_state{connections = Conns, owners = Owners, monitors = Monitors}
) when erlang:is_map_key(Pid, Owners) ->
{_Ref, Connection} = maps:get(Pid, Owners),
NewConns = queue:delete(Connection, Conns),
NewOwners = maps:without([Pid], Owners),
_ = close(Connection),
NewMonitors = demonitor_and_close(Connection, Monitors),
{
noreply,
State#epg_pool_mgr_state{connections = NewConns, owners = NewOwners}
State#epg_pool_mgr_state{connections = NewConns, owners = NewOwners, monitors = NewMonitors}
};
%%
handle_info(
{'DOWN', _MonitorRef, process, Pid, _Info},
State = #epg_pool_mgr_state{connections = Conns, monitors = Monitors}
) when erlang:is_map_key(Pid, Monitors) ->
NewConns = queue:delete(Pid, Conns),
NewMonitors = demonitor_and_close(Pid, Monitors),
{
noreply,
State#epg_pool_mgr_state{connections = NewConns, monitors = NewMonitors}
};
handle_info(_Info, State = #epg_pool_mgr_state{}) ->
{noreply, State}.
@ -156,13 +172,14 @@ start_workers(Pool, Size) ->
reg_name(Name, Postfix) ->
list_to_atom(atom_to_list(Name) ++ Postfix).
close(Connection) ->
try epgsql:close(Connection)
catch
_:_ ->
skip
end,
ok.
demonitor_and_close(Connection, Monitors) when erlang:is_map_key(Connection, Monitors) ->
{MRef, NewMonitors} = maps:take(Connection, Monitors),
_ = catch erlang:demonitor(MRef),
_ = catch epgsql:close(Connection),
NewMonitors;
demonitor_and_close(Connection, Monitors) ->
_ = catch epgsql:close(Connection),
Monitors.
unique_queue_push(Item, Q) ->
case queue:member(Item, Q) of

View File

@ -29,9 +29,9 @@ handle_cast(_Request, State = #epg_pool_wrk_state{}) ->
handle_info(
{'EXIT', Pid, _Info},
State = #epg_pool_wrk_state{pool = Pool, connection = Pid}
State = #epg_pool_wrk_state{pool = _Pool, connection = Pid}
) ->
epg_pool_mgr:remove(Pool, self(), Pid),
%epg_pool_mgr:remove(Pool, self(), Pid),
reconnect_timer(),
{noreply, State#epg_pool_wrk_state{connection = undefined, monitor = undefined}};