diff --git a/README.md b/README.md index 02e9c3d..ec247cb 100644 --- a/README.md +++ b/README.md @@ -123,7 +123,7 @@ If using Vault for secret management, configure the following: -export([get_user/1]). get_user(UserId) -> - epgsql_pool:with(default_pool, fun(C) -> + epg_pool:with(default_pool, fun(C) -> {ok, _, [{Name, Email}]} = epgsql:equery(C, "SELECT name, email FROM users WHERE id = $1", [UserId]), {Name, Email} end). diff --git a/config/sys.config b/config/sys.config index bd71e04..d6f8b58 100644 --- a/config/sys.config +++ b/config/sys.config @@ -12,7 +12,7 @@ {pools, #{ default_pool => #{ database => default_db, - size => 10 + size => {1, 10} }, front_pool => #{ database => default_db, diff --git a/src/epg_pool_mgr.erl b/src/epg_pool_mgr.erl index 5bc7f0d..6cb7736 100644 --- a/src/epg_pool_mgr.erl +++ b/src/epg_pool_mgr.erl @@ -11,7 +11,7 @@ -export([add/3]). -export([remove/3]). --record(epg_pool_mgr_state, {pool, params, size, connections, workers, owners, monitors}). +-record(epg_pool_mgr_state, {pool, params, size, connections, workers, owners, monitors, ephemerals}). %% API @@ -31,37 +31,44 @@ remove(Pool, Worker, Connection) -> RegName = reg_name(Pool, "_pool_mgr"), gen_server:cast(RegName, {remove, Worker, Connection}). -%% +%%% start_link(PoolName, DbParams, Size) -> RegName = reg_name(PoolName, "_pool_mgr"), gen_server:start_link({local, RegName}, ?MODULE, [PoolName, DbParams, Size], []). init([PoolName, DbParams, Size]) -> + erlang:process_flag(trap_exit, true), _ = start_workers(PoolName, Size), {ok, #epg_pool_mgr_state{ pool = PoolName, params = DbParams, - size = Size, + size = range(Size), connections = queue:new(), workers = #{}, owners = #{}, - monitors = #{} + monitors = #{}, + ephemerals = #{} }}. -%% +%% nested checkout handle_call( checkout, {Pid, _}, - State = #epg_pool_mgr_state{connections = Conns, monitors = Monitors, owners = Owners} + State = #epg_pool_mgr_state{connections = Conns, monitors = Monitors, owners = Owners, ephemerals = Ephemerals} ) when erlang:is_map_key(Pid, Owners) -> {{Ref, Conn}, NewOwners} = maps:take(Pid, Owners), _ = catch erlang:demonitor(Ref), NewConns = queue:delete(Conn, Conns), - NewMonitors = demonitor_and_close(Conn, Monitors), + {NewMonitors, NewEphemerals} = demonitor_and_close(Conn, Monitors, Ephemerals), { reply, {error, nested_checkout}, - State#epg_pool_mgr_state{connections = NewConns, monitors = NewMonitors, owners = NewOwners} + State#epg_pool_mgr_state{ + connections = NewConns, + monitors = NewMonitors, + owners = NewOwners, + ephemerals = NewEphemerals + } }; handle_call( checkout, {Pid, _Ref}, @@ -74,7 +81,8 @@ handle_call( Ref = erlang:monitor(process, Pid), {State#epg_pool_mgr_state{owners = Owners#{Pid => {Ref, Connection}}}, Connection}; empty -> - {State, empty} + %% try ephemeral connection + maybe_new_connection(Pid, State) end, NewState = State1#epg_pool_mgr_state{connections = NewConns}, {reply, Response, NewState}. @@ -95,25 +103,26 @@ handle_cast( }; handle_cast( {remove, Worker, Connection}, - State = #epg_pool_mgr_state{connections = Conns, workers = Workers, monitors = Monitors} + State = #epg_pool_mgr_state{connections = Conns, workers = Workers, monitors = Monitors, ephemerals = Ephemerals} ) -> NewConns = queue:delete(Connection, Conns), NewWorkers = maps:without([Worker], Workers), - NewMonitors = demonitor_and_close(Connection, Monitors), + {NewMonitors, NewEphemerals} = demonitor_and_close(Connection, Monitors, Ephemerals), { noreply, - State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers, monitors = NewMonitors} + State#epg_pool_mgr_state{ + connections = NewConns, + workers = NewWorkers, + monitors = NewMonitors, + ephemerals = NewEphemerals + } }; +%% stable connection checkin handle_cast( {checkin, Owner, Connection}, - State = #epg_pool_mgr_state{connections = Conns, owners = Owners} -) -> - case maps:get(Owner, Owners, undefined) of - undefined -> - skip; - {Ref, _Conn} -> - _ = erlang:demonitor(Ref) - end, + State = #epg_pool_mgr_state{connections = Conns, owners = Owners, monitors = Monitors} +) when erlang:is_map_key(Connection, Monitors) -> + _ = demonitor_owner(Owner, Owners), { noreply, State#epg_pool_mgr_state{ @@ -121,45 +130,110 @@ handle_cast( owners = maps:without([Owner], Owners) } }; +%% ephemeral connection checkin +handle_cast( + {checkin, Owner, Connection}, + State = #epg_pool_mgr_state{owners = Owners, ephemerals = Ephemerals, monitors = Monitors} +) when erlang:is_map_key(Connection, Ephemerals) -> + _ = demonitor_owner(Owner, Owners), + {NewMonitors, NewEphemerals} = demonitor_and_close(Connection, Monitors, Ephemerals), + { + noreply, + State#epg_pool_mgr_state{ + owners = maps:without([Owner], Owners), + monitors = NewMonitors, + ephemerals = NewEphemerals + } + }; handle_cast(_Request, State = #epg_pool_mgr_state{}) -> {noreply, State}. +%% worker down handle_info( {'DOWN', _MonitorRef, process, Pid, _Info}, - State = #epg_pool_mgr_state{connections = Conns, workers = Workers, monitors = Monitors} + State = #epg_pool_mgr_state{ + connections = Conns, + owners = Owners, + workers = Workers, + monitors = Monitors, + ephemerals = Ephemerals + } ) when erlang:is_map_key(Pid, Workers) -> Connection = maps:get(Pid, Workers), NewConns = queue:delete(Connection, Conns), NewWorkers = maps:without([Pid], Workers), - NewMonitors = demonitor_and_close(Connection, Monitors), + NewOwners = cleanup_owners(Connection, Owners), + {NewMonitors, NewEphemerals} = demonitor_and_close(Connection, Monitors, Ephemerals), { noreply, - State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers, monitors = NewMonitors} + State#epg_pool_mgr_state{ + connections = NewConns, + owners = NewOwners, + workers = NewWorkers, + monitors = NewMonitors, + ephemerals = NewEphemerals + } }; +%% owner down handle_info( {'DOWN', _MonitorRef, process, Pid, _Info}, - State = #epg_pool_mgr_state{connections = Conns, owners = Owners, monitors = Monitors} + State = #epg_pool_mgr_state{ + connections = Conns, + owners = Owners, + monitors = Monitors, + ephemerals = Ephemerals + } ) when erlang:is_map_key(Pid, Owners) -> {_Ref, Connection} = maps:get(Pid, Owners), NewConns = queue:delete(Connection, Conns), NewOwners = maps:without([Pid], Owners), - NewMonitors = demonitor_and_close(Connection, Monitors), + {NewMonitors, NewEphemerals} = demonitor_and_close(Connection, Monitors, Ephemerals), { noreply, - State#epg_pool_mgr_state{connections = NewConns, owners = NewOwners, monitors = NewMonitors} + State#epg_pool_mgr_state{ + connections = NewConns, + owners = NewOwners, + monitors = NewMonitors, + ephemerals = NewEphemerals + } }; -%% +%% stable connection down handle_info( {'DOWN', _MonitorRef, process, Pid, _Info}, - State = #epg_pool_mgr_state{connections = Conns, monitors = Monitors} + State = #epg_pool_mgr_state{ + connections = Conns, + owners = Owners, + monitors = Monitors, + ephemerals = Ephemerals + } ) when erlang:is_map_key(Pid, Monitors) -> NewConns = queue:delete(Pid, Conns), - NewMonitors = demonitor_and_close(Pid, Monitors), + NewOwners = cleanup_owners(Pid, Owners), + {NewMonitors, NewEphemerals} = demonitor_and_close(Pid, Monitors, Ephemerals), { noreply, - State#epg_pool_mgr_state{connections = NewConns, monitors = NewMonitors} + State#epg_pool_mgr_state{ + connections = NewConns, + owners = NewOwners, + monitors = NewMonitors, + ephemerals = NewEphemerals + } + }; +%% ephemeral connection down +handle_info( + {'DOWN', _MonitorRef, process, Pid, _Info}, + State = #epg_pool_mgr_state{ + owners = Owners, + monitors = Monitors, + ephemerals = Ephemerals + } +) when erlang:is_map_key(Pid, Ephemerals) -> + NewOwners = cleanup_owners(Pid, Owners), + {NewMonitors, NewEphemerals} = demonitor_and_close(Pid, Monitors, Ephemerals), + { + noreply, + State#epg_pool_mgr_state{owners = NewOwners, monitors = NewMonitors, ephemerals = NewEphemerals} }; - handle_info(_Info, State = #epg_pool_mgr_state{}) -> {noreply, State}. @@ -171,7 +245,9 @@ code_change(_OldVsn, State = #epg_pool_mgr_state{}, _Extra) -> %% -start_workers(Pool, Size) -> +start_workers(Pool, {Min, _Max}) -> + start_workers(Pool, Min); +start_workers(Pool, Size) when is_integer(Size) -> WorkerSup = reg_name(Pool, "_pool_wrk_sup"), lists:foreach(fun(N) -> supervisor:start_child(WorkerSup, [N]) @@ -180,17 +256,101 @@ start_workers(Pool, Size) -> reg_name(Name, Postfix) -> list_to_atom(atom_to_list(Name) ++ Postfix). -demonitor_and_close(Connection, Monitors) when erlang:is_map_key(Connection, Monitors) -> +demonitor_and_close(Connection, Monitors, Ephemerals) when erlang:is_map_key(Connection, Monitors) -> {MRef, NewMonitors} = maps:take(Connection, Monitors), _ = catch erlang:demonitor(MRef), _ = catch epgsql:close(Connection), - NewMonitors; -demonitor_and_close(Connection, Monitors) -> + {NewMonitors, Ephemerals}; +demonitor_and_close(Connection, Monitors, Ephemerals) when erlang:is_map_key(Connection, Ephemerals) -> + {MRef, NewEphemerals} = maps:take(Connection, Ephemerals), + _ = catch erlang:demonitor(MRef), _ = catch epgsql:close(Connection), - Monitors. + {Monitors, NewEphemerals}; +demonitor_and_close(Connection, Monitors, Ephemerals) -> + _ = catch epgsql:close(Connection), + {Monitors, Ephemerals}. + +demonitor_owner(Owner, Owners) -> + case maps:get(Owner, Owners, undefined) of + undefined -> + skip; + {Ref, _Conn} -> + _ = erlang:demonitor(Ref) + end. + +cleanup_owners(Connection, Owners) -> + SearchOwner = maps:fold( + fun + (Own, {Ref, Conn}, _Acc) when Conn =:= Connection -> + {Own, Ref, Conn}; + (_, _, Acc) -> + Acc + end, + not_found, + Owners + ), + case SearchOwner of + {Owner, OwnRef, Connection} -> + _ = catch erlang:demonitor(OwnRef), + maps:without([Owner], Owners); + not_found -> + Owners + end. unique_queue_push(Item, Q) -> case queue:member(Item, Q) of true -> Q; false -> queue:in(Item, Q) end. + +range({_Min, _Max} = Range) -> + Range; +range(Size) when is_integer(Size) -> + {Size, Size}. + +maybe_new_connection( + Owner, + #epg_pool_mgr_state{ + size = {Min, Max}, + ephemerals = Ephemerals + } = State +) -> + AllConnectionsCount = Min + maps:size(Ephemerals), + case AllConnectionsCount < Max of + true -> + connect_ephemeral(Owner, State); + false -> + {State, empty} + end. + +connect_ephemeral( + Owner, + #epg_pool_mgr_state{ + pool = Pool, + params = #{database := DB} = Params, + owners = Owners, + ephemerals = Ephemerals + } = State +) -> + try epgsql:connect(Params) of + {ok, Connection} -> + logger:info("db dynamic connection established. pool: ~p. database: ~p", [Pool, DB]), + OwnerRef = erlang:monitor(process, Owner), + ConnRef = erlang:monitor(process, Connection), + NewOwners = Owners#{Owner => {OwnerRef, Connection}}, + NewEphemerals = Ephemerals#{Connection => ConnRef}, + {State#epg_pool_mgr_state{owners = NewOwners, ephemerals = NewEphemerals}, Connection}; + {error, Reason} -> + logger:warning( + "db can`t establish dynamic connection. pool: ~p. database: ~p. error: ~p", + [Pool, DB, Reason] + ), + {State, empty} + catch + _Class:Reason:Trace -> + logger:error( + "db can`t establish dynamic connection. pool: ~p. database: ~p. error: ~p. trace: ~p", + [Pool, DB, Reason, Trace] + ), + {State, empty} + end.