From d777d65c92e0c31c1338bdaaf89a9077105b5fad Mon Sep 17 00:00:00 2001 From: ttt161 Date: Mon, 16 Sep 2024 09:19:57 +0300 Subject: [PATCH] TD-927: fix race condition --- src/epg_pool_mgr.erl | 57 ++++++++++++++++++++++++++++---------------- src/epg_pool_wrk.erl | 4 ++-- 2 files changed, 39 insertions(+), 22 deletions(-) diff --git a/src/epg_pool_mgr.erl b/src/epg_pool_mgr.erl index 582ab42..c839353 100644 --- a/src/epg_pool_mgr.erl +++ b/src/epg_pool_mgr.erl @@ -11,7 +11,7 @@ -export([add/3]). -export([remove/3]). --record(epg_pool_mgr_state, {pool, params, size, connections, workers, owners}). +-record(epg_pool_mgr_state, {pool, params, size, connections, workers, owners, monitors}). %% API @@ -45,7 +45,8 @@ init([PoolName, DbParams, Size]) -> size = Size, connections = queue:new(), workers = #{}, - owners = #{} + owners = #{}, + monitors = #{} }}. %% @@ -72,25 +73,28 @@ handle_call( handle_cast( {add, Worker, Connection}, - State = #epg_pool_mgr_state{connections = Conns, workers = Workers} + State = #epg_pool_mgr_state{connections = Conns, workers = Workers, monitors = Monitors} ) -> _ = erlang:monitor(process, Worker), + MRef = erlang:monitor(process, Connection), { noreply, State#epg_pool_mgr_state{ connections = unique_queue_push(Connection, Conns), - workers = Workers#{Worker => Connection}} + workers = Workers#{Worker => Connection}, + monitors = Monitors#{Connection => MRef} + } }; handle_cast( {remove, Worker, Connection}, - State = #epg_pool_mgr_state{connections = Conns, workers = Workers} + State = #epg_pool_mgr_state{connections = Conns, workers = Workers, monitors = Monitors} ) -> NewConns = queue:delete(Connection, Conns), NewWorkers = maps:without([Worker], Workers), - _ = close(Connection), + NewMonitors = demonitor_and_close(Connection, Monitors), { noreply, - State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers} + State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers, monitors = NewMonitors} }; handle_cast( {checkin, Owner, Connection}, @@ -114,28 +118,40 @@ handle_cast(_Request, State = #epg_pool_mgr_state{}) -> handle_info( {'DOWN', _MonitorRef, process, Pid, _Info}, - State = #epg_pool_mgr_state{connections = Conns, workers = Workers} + State = #epg_pool_mgr_state{connections = Conns, workers = Workers, monitors = Monitors} ) when erlang:is_map_key(Pid, Workers) -> Connection = maps:get(Pid, Workers), NewConns = queue:delete(Connection, Conns), NewWorkers = maps:without([Pid], Workers), - _ = close(Connection), + NewMonitors = demonitor_and_close(Connection, Monitors), { noreply, - State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers} + State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers, monitors = NewMonitors} }; handle_info( {'DOWN', _MonitorRef, process, Pid, _Info}, - State = #epg_pool_mgr_state{connections = Conns, owners = Owners} + State = #epg_pool_mgr_state{connections = Conns, owners = Owners, monitors = Monitors} ) when erlang:is_map_key(Pid, Owners) -> {_Ref, Connection} = maps:get(Pid, Owners), NewConns = queue:delete(Connection, Conns), NewOwners = maps:without([Pid], Owners), - _ = close(Connection), + NewMonitors = demonitor_and_close(Connection, Monitors), { noreply, - State#epg_pool_mgr_state{connections = NewConns, owners = NewOwners} + State#epg_pool_mgr_state{connections = NewConns, owners = NewOwners, monitors = NewMonitors} }; +%% +handle_info( + {'DOWN', _MonitorRef, process, Pid, _Info}, + State = #epg_pool_mgr_state{connections = Conns, monitors = Monitors} +) when erlang:is_map_key(Pid, Monitors) -> + NewConns = queue:delete(Pid, Conns), + NewMonitors = demonitor_and_close(Pid, Monitors), + { + noreply, + State#epg_pool_mgr_state{connections = NewConns, monitors = NewMonitors} + }; + handle_info(_Info, State = #epg_pool_mgr_state{}) -> {noreply, State}. @@ -156,13 +172,14 @@ start_workers(Pool, Size) -> reg_name(Name, Postfix) -> list_to_atom(atom_to_list(Name) ++ Postfix). -close(Connection) -> - try epgsql:close(Connection) - catch - _:_ -> - skip - end, - ok. +demonitor_and_close(Connection, Monitors) when erlang:is_map_key(Connection, Monitors) -> + {MRef, NewMonitors} = maps:take(Connection, Monitors), + _ = catch erlang:demonitor(MRef), + _ = catch epgsql:close(Connection), + NewMonitors; +demonitor_and_close(Connection, Monitors) -> + _ = catch epgsql:close(Connection), + Monitors. unique_queue_push(Item, Q) -> case queue:member(Item, Q) of diff --git a/src/epg_pool_wrk.erl b/src/epg_pool_wrk.erl index 8efc8ae..c85a51f 100644 --- a/src/epg_pool_wrk.erl +++ b/src/epg_pool_wrk.erl @@ -29,9 +29,9 @@ handle_cast(_Request, State = #epg_pool_wrk_state{}) -> handle_info( {'EXIT', Pid, _Info}, - State = #epg_pool_wrk_state{pool = Pool, connection = Pid} + State = #epg_pool_wrk_state{pool = _Pool, connection = Pid} ) -> - epg_pool_mgr:remove(Pool, self(), Pid), + %epg_pool_mgr:remove(Pool, self(), Pid), reconnect_timer(), {noreply, State#epg_pool_wrk_state{connection = undefined, monitor = undefined}};