add ephemeral connections to pools

This commit is contained in:
ttt161 2024-11-01 17:09:59 +03:00
parent 19c1a4bd2c
commit b05649fc9a
3 changed files with 198 additions and 38 deletions

View File

@ -123,7 +123,7 @@ If using Vault for secret management, configure the following:
-export([get_user/1]). -export([get_user/1]).
get_user(UserId) -> get_user(UserId) ->
epgsql_pool:with(default_pool, fun(C) -> epg_pool:with(default_pool, fun(C) ->
{ok, _, [{Name, Email}]} = epgsql:equery(C, "SELECT name, email FROM users WHERE id = $1", [UserId]), {ok, _, [{Name, Email}]} = epgsql:equery(C, "SELECT name, email FROM users WHERE id = $1", [UserId]),
{Name, Email} {Name, Email}
end). end).

View File

@ -12,7 +12,7 @@
{pools, #{ {pools, #{
default_pool => #{ default_pool => #{
database => default_db, database => default_db,
size => 10 size => {1, 10}
}, },
front_pool => #{ front_pool => #{
database => default_db, database => default_db,

View File

@ -11,7 +11,7 @@
-export([add/3]). -export([add/3]).
-export([remove/3]). -export([remove/3]).
-record(epg_pool_mgr_state, {pool, params, size, connections, workers, owners, monitors}). -record(epg_pool_mgr_state, {pool, params, size, connections, workers, owners, monitors, ephemerals}).
%% API %% API
@ -31,37 +31,44 @@ remove(Pool, Worker, Connection) ->
RegName = reg_name(Pool, "_pool_mgr"), RegName = reg_name(Pool, "_pool_mgr"),
gen_server:cast(RegName, {remove, Worker, Connection}). gen_server:cast(RegName, {remove, Worker, Connection}).
%% %%%
start_link(PoolName, DbParams, Size) -> start_link(PoolName, DbParams, Size) ->
RegName = reg_name(PoolName, "_pool_mgr"), RegName = reg_name(PoolName, "_pool_mgr"),
gen_server:start_link({local, RegName}, ?MODULE, [PoolName, DbParams, Size], []). gen_server:start_link({local, RegName}, ?MODULE, [PoolName, DbParams, Size], []).
init([PoolName, DbParams, Size]) -> init([PoolName, DbParams, Size]) ->
erlang:process_flag(trap_exit, true),
_ = start_workers(PoolName, Size), _ = start_workers(PoolName, Size),
{ok, #epg_pool_mgr_state{ {ok, #epg_pool_mgr_state{
pool = PoolName, pool = PoolName,
params = DbParams, params = DbParams,
size = Size, size = range(Size),
connections = queue:new(), connections = queue:new(),
workers = #{}, workers = #{},
owners = #{}, owners = #{},
monitors = #{} monitors = #{},
ephemerals = #{}
}}. }}.
%%
%% nested checkout
handle_call( handle_call(
checkout, {Pid, _}, checkout, {Pid, _},
State = #epg_pool_mgr_state{connections = Conns, monitors = Monitors, owners = Owners} State = #epg_pool_mgr_state{connections = Conns, monitors = Monitors, owners = Owners, ephemerals = Ephemerals}
) when erlang:is_map_key(Pid, Owners) -> ) when erlang:is_map_key(Pid, Owners) ->
{{Ref, Conn}, NewOwners} = maps:take(Pid, Owners), {{Ref, Conn}, NewOwners} = maps:take(Pid, Owners),
_ = catch erlang:demonitor(Ref), _ = catch erlang:demonitor(Ref),
NewConns = queue:delete(Conn, Conns), NewConns = queue:delete(Conn, Conns),
NewMonitors = demonitor_and_close(Conn, Monitors), {NewMonitors, NewEphemerals} = demonitor_and_close(Conn, Monitors, Ephemerals),
{ {
reply, reply,
{error, nested_checkout}, {error, nested_checkout},
State#epg_pool_mgr_state{connections = NewConns, monitors = NewMonitors, owners = NewOwners} State#epg_pool_mgr_state{
connections = NewConns,
monitors = NewMonitors,
owners = NewOwners,
ephemerals = NewEphemerals
}
}; };
handle_call( handle_call(
checkout, {Pid, _Ref}, checkout, {Pid, _Ref},
@ -74,7 +81,8 @@ handle_call(
Ref = erlang:monitor(process, Pid), Ref = erlang:monitor(process, Pid),
{State#epg_pool_mgr_state{owners = Owners#{Pid => {Ref, Connection}}}, Connection}; {State#epg_pool_mgr_state{owners = Owners#{Pid => {Ref, Connection}}}, Connection};
empty -> empty ->
{State, empty} %% try ephemeral connection
maybe_new_connection(Pid, State)
end, end,
NewState = State1#epg_pool_mgr_state{connections = NewConns}, NewState = State1#epg_pool_mgr_state{connections = NewConns},
{reply, Response, NewState}. {reply, Response, NewState}.
@ -95,25 +103,26 @@ handle_cast(
}; };
handle_cast( handle_cast(
{remove, Worker, Connection}, {remove, Worker, Connection},
State = #epg_pool_mgr_state{connections = Conns, workers = Workers, monitors = Monitors} State = #epg_pool_mgr_state{connections = Conns, workers = Workers, monitors = Monitors, ephemerals = Ephemerals}
) -> ) ->
NewConns = queue:delete(Connection, Conns), NewConns = queue:delete(Connection, Conns),
NewWorkers = maps:without([Worker], Workers), NewWorkers = maps:without([Worker], Workers),
NewMonitors = demonitor_and_close(Connection, Monitors), {NewMonitors, NewEphemerals} = demonitor_and_close(Connection, Monitors, Ephemerals),
{ {
noreply, noreply,
State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers, monitors = NewMonitors} State#epg_pool_mgr_state{
connections = NewConns,
workers = NewWorkers,
monitors = NewMonitors,
ephemerals = NewEphemerals
}
}; };
%% stable connection checkin
handle_cast( handle_cast(
{checkin, Owner, Connection}, {checkin, Owner, Connection},
State = #epg_pool_mgr_state{connections = Conns, owners = Owners} State = #epg_pool_mgr_state{connections = Conns, owners = Owners, monitors = Monitors}
) -> ) when erlang:is_map_key(Connection, Monitors) ->
case maps:get(Owner, Owners, undefined) of _ = demonitor_owner(Owner, Owners),
undefined ->
skip;
{Ref, _Conn} ->
_ = erlang:demonitor(Ref)
end,
{ {
noreply, noreply,
State#epg_pool_mgr_state{ State#epg_pool_mgr_state{
@ -121,45 +130,110 @@ handle_cast(
owners = maps:without([Owner], Owners) owners = maps:without([Owner], Owners)
} }
}; };
%% ephemeral connection checkin
handle_cast(
{checkin, Owner, Connection},
State = #epg_pool_mgr_state{owners = Owners, ephemerals = Ephemerals, monitors = Monitors}
) when erlang:is_map_key(Connection, Ephemerals) ->
_ = demonitor_owner(Owner, Owners),
{NewMonitors, NewEphemerals} = demonitor_and_close(Connection, Monitors, Ephemerals),
{
noreply,
State#epg_pool_mgr_state{
owners = maps:without([Owner], Owners),
monitors = NewMonitors,
ephemerals = NewEphemerals
}
};
handle_cast(_Request, State = #epg_pool_mgr_state{}) -> handle_cast(_Request, State = #epg_pool_mgr_state{}) ->
{noreply, State}. {noreply, State}.
%% worker down
handle_info( handle_info(
{'DOWN', _MonitorRef, process, Pid, _Info}, {'DOWN', _MonitorRef, process, Pid, _Info},
State = #epg_pool_mgr_state{connections = Conns, workers = Workers, monitors = Monitors} State = #epg_pool_mgr_state{
connections = Conns,
owners = Owners,
workers = Workers,
monitors = Monitors,
ephemerals = Ephemerals
}
) when erlang:is_map_key(Pid, Workers) -> ) when erlang:is_map_key(Pid, Workers) ->
Connection = maps:get(Pid, Workers), Connection = maps:get(Pid, Workers),
NewConns = queue:delete(Connection, Conns), NewConns = queue:delete(Connection, Conns),
NewWorkers = maps:without([Pid], Workers), NewWorkers = maps:without([Pid], Workers),
NewMonitors = demonitor_and_close(Connection, Monitors), NewOwners = cleanup_owners(Connection, Owners),
{NewMonitors, NewEphemerals} = demonitor_and_close(Connection, Monitors, Ephemerals),
{ {
noreply, noreply,
State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers, monitors = NewMonitors} State#epg_pool_mgr_state{
connections = NewConns,
owners = NewOwners,
workers = NewWorkers,
monitors = NewMonitors,
ephemerals = NewEphemerals
}
}; };
%% owner down
handle_info( handle_info(
{'DOWN', _MonitorRef, process, Pid, _Info}, {'DOWN', _MonitorRef, process, Pid, _Info},
State = #epg_pool_mgr_state{connections = Conns, owners = Owners, monitors = Monitors} State = #epg_pool_mgr_state{
connections = Conns,
owners = Owners,
monitors = Monitors,
ephemerals = Ephemerals
}
) when erlang:is_map_key(Pid, Owners) -> ) when erlang:is_map_key(Pid, Owners) ->
{_Ref, Connection} = maps:get(Pid, Owners), {_Ref, Connection} = maps:get(Pid, Owners),
NewConns = queue:delete(Connection, Conns), NewConns = queue:delete(Connection, Conns),
NewOwners = maps:without([Pid], Owners), NewOwners = maps:without([Pid], Owners),
NewMonitors = demonitor_and_close(Connection, Monitors), {NewMonitors, NewEphemerals} = demonitor_and_close(Connection, Monitors, Ephemerals),
{ {
noreply, noreply,
State#epg_pool_mgr_state{connections = NewConns, owners = NewOwners, monitors = NewMonitors} State#epg_pool_mgr_state{
connections = NewConns,
owners = NewOwners,
monitors = NewMonitors,
ephemerals = NewEphemerals
}
}; };
%% %% stable connection down
handle_info( handle_info(
{'DOWN', _MonitorRef, process, Pid, _Info}, {'DOWN', _MonitorRef, process, Pid, _Info},
State = #epg_pool_mgr_state{connections = Conns, monitors = Monitors} State = #epg_pool_mgr_state{
connections = Conns,
owners = Owners,
monitors = Monitors,
ephemerals = Ephemerals
}
) when erlang:is_map_key(Pid, Monitors) -> ) when erlang:is_map_key(Pid, Monitors) ->
NewConns = queue:delete(Pid, Conns), NewConns = queue:delete(Pid, Conns),
NewMonitors = demonitor_and_close(Pid, Monitors), NewOwners = cleanup_owners(Pid, Owners),
{NewMonitors, NewEphemerals} = demonitor_and_close(Pid, Monitors, Ephemerals),
{ {
noreply, noreply,
State#epg_pool_mgr_state{connections = NewConns, monitors = NewMonitors} State#epg_pool_mgr_state{
connections = NewConns,
owners = NewOwners,
monitors = NewMonitors,
ephemerals = NewEphemerals
}
};
%% ephemeral connection down
handle_info(
{'DOWN', _MonitorRef, process, Pid, _Info},
State = #epg_pool_mgr_state{
owners = Owners,
monitors = Monitors,
ephemerals = Ephemerals
}
) when erlang:is_map_key(Pid, Ephemerals) ->
NewOwners = cleanup_owners(Pid, Owners),
{NewMonitors, NewEphemerals} = demonitor_and_close(Pid, Monitors, Ephemerals),
{
noreply,
State#epg_pool_mgr_state{owners = NewOwners, monitors = NewMonitors, ephemerals = NewEphemerals}
}; };
handle_info(_Info, State = #epg_pool_mgr_state{}) -> handle_info(_Info, State = #epg_pool_mgr_state{}) ->
{noreply, State}. {noreply, State}.
@ -171,7 +245,9 @@ code_change(_OldVsn, State = #epg_pool_mgr_state{}, _Extra) ->
%% %%
start_workers(Pool, Size) -> start_workers(Pool, {Min, _Max}) ->
start_workers(Pool, Min);
start_workers(Pool, Size) when is_integer(Size) ->
WorkerSup = reg_name(Pool, "_pool_wrk_sup"), WorkerSup = reg_name(Pool, "_pool_wrk_sup"),
lists:foreach(fun(N) -> lists:foreach(fun(N) ->
supervisor:start_child(WorkerSup, [N]) supervisor:start_child(WorkerSup, [N])
@ -180,17 +256,101 @@ start_workers(Pool, Size) ->
reg_name(Name, Postfix) -> reg_name(Name, Postfix) ->
list_to_atom(atom_to_list(Name) ++ Postfix). list_to_atom(atom_to_list(Name) ++ Postfix).
demonitor_and_close(Connection, Monitors) when erlang:is_map_key(Connection, Monitors) -> demonitor_and_close(Connection, Monitors, Ephemerals) when erlang:is_map_key(Connection, Monitors) ->
{MRef, NewMonitors} = maps:take(Connection, Monitors), {MRef, NewMonitors} = maps:take(Connection, Monitors),
_ = catch erlang:demonitor(MRef), _ = catch erlang:demonitor(MRef),
_ = catch epgsql:close(Connection), _ = catch epgsql:close(Connection),
NewMonitors; {NewMonitors, Ephemerals};
demonitor_and_close(Connection, Monitors) -> demonitor_and_close(Connection, Monitors, Ephemerals) when erlang:is_map_key(Connection, Ephemerals) ->
{MRef, NewEphemerals} = maps:take(Connection, Ephemerals),
_ = catch erlang:demonitor(MRef),
_ = catch epgsql:close(Connection), _ = catch epgsql:close(Connection),
Monitors. {Monitors, NewEphemerals};
demonitor_and_close(Connection, Monitors, Ephemerals) ->
_ = catch epgsql:close(Connection),
{Monitors, Ephemerals}.
demonitor_owner(Owner, Owners) ->
case maps:get(Owner, Owners, undefined) of
undefined ->
skip;
{Ref, _Conn} ->
_ = erlang:demonitor(Ref)
end.
cleanup_owners(Connection, Owners) ->
SearchOwner = maps:fold(
fun
(Own, {Ref, Conn}, _Acc) when Conn =:= Connection ->
{Own, Ref, Conn};
(_, _, Acc) ->
Acc
end,
not_found,
Owners
),
case SearchOwner of
{Owner, OwnRef, Connection} ->
_ = catch erlang:demonitor(OwnRef),
maps:without([Owner], Owners);
not_found ->
Owners
end.
unique_queue_push(Item, Q) -> unique_queue_push(Item, Q) ->
case queue:member(Item, Q) of case queue:member(Item, Q) of
true -> Q; true -> Q;
false -> queue:in(Item, Q) false -> queue:in(Item, Q)
end. end.
range({_Min, _Max} = Range) ->
Range;
range(Size) when is_integer(Size) ->
{Size, Size}.
maybe_new_connection(
Owner,
#epg_pool_mgr_state{
size = {Min, Max},
ephemerals = Ephemerals
} = State
) ->
AllConnectionsCount = Min + maps:size(Ephemerals),
case AllConnectionsCount < Max of
true ->
connect_ephemeral(Owner, State);
false ->
{State, empty}
end.
connect_ephemeral(
Owner,
#epg_pool_mgr_state{
pool = Pool,
params = #{database := DB} = Params,
owners = Owners,
ephemerals = Ephemerals
} = State
) ->
try epgsql:connect(Params) of
{ok, Connection} ->
logger:info("db dynamic connection established. pool: ~p. database: ~p", [Pool, DB]),
OwnerRef = erlang:monitor(process, Owner),
ConnRef = erlang:monitor(process, Connection),
NewOwners = Owners#{Owner => {OwnerRef, Connection}},
NewEphemerals = Ephemerals#{Connection => ConnRef},
{State#epg_pool_mgr_state{owners = NewOwners, ephemerals = NewEphemerals}, Connection};
{error, Reason} ->
logger:warning(
"db can`t establish dynamic connection. pool: ~p. database: ~p. error: ~p",
[Pool, DB, Reason]
),
{State, empty}
catch
_Class:Reason:Trace ->
logger:error(
"db can`t establish dynamic connection. pool: ~p. database: ~p. error: ~p. trace: ~p",
[Pool, DB, Reason, Trace]
),
{State, empty}
end.