TD-927: progressor prototype

This commit is contained in:
ttt161 2024-09-12 08:04:08 +03:00
parent d435cb934c
commit 16d9505aad
8 changed files with 394 additions and 20 deletions

View File

@ -6,6 +6,7 @@
{applications,
[kernel,
stdlib,
herd,
epgsql,
epgsql_pool
]},

View File

@ -16,9 +16,7 @@
start(_StartType, _StartArgs) ->
_ = maybe_start_canal(application:get_all_env(canal)),
Databases0 = application:get_env(epg_connector, databases, #{}),
Databases = maybe_set_secrets(Databases0),
Pools = application:get_env(epg_connector, pools, #{}),
ok = start_pools(Pools, Databases),
_Databases = maybe_set_secrets(Databases0),
epg_connector_sup:start_link().
stop(_State) ->
@ -31,21 +29,6 @@ maybe_start_canal([]) ->
maybe_start_canal(_Env) ->
_ = application:ensure_all_started(canal).
start_pools(Pools, Databases) ->
maps:fold(
fun(PoolName, Opts, _Acc) ->
#{
database := DB,
size := Size
} = Opts,
DbParams = maps:get(DB, Databases),
{ok, _} = epgsql_pool:start(PoolName, Size, Size, DbParams),
ok
end,
ok,
Pools
).
maybe_set_secrets(Databases) ->
TokenPath = application:get_env(epg_connector, vault_token_path, ?VAULT_TOKEN_PATH),
try vault_client_auth(TokenPath) of

View File

@ -20,7 +20,29 @@ init([]) ->
SupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.
Databases = application:get_env(epg_connector, databases, #{}),
Pools = application:get_env(epg_connector, pools, #{}),
PoolSpecs = pool_specs(Pools, Databases),
{ok, {SupFlags, PoolSpecs}}.
%% internal functions
pool_specs(Pools, Databases) ->
maps:fold(
fun(PoolName, Opts, Acc) ->
#{
database := DB,
size := Size
} = Opts,
DbParams = maps:get(DB, Databases),
[
#{
id => PoolName,
start => {epg_pool_sup, start_link, [PoolName, DbParams, Size]},
type => supervisor
} | Acc
]
end,
[],
Pools
).

54
src/epg_pool.erl Normal file
View File

@ -0,0 +1,54 @@
-module(epg_pool).
-export([query/2]).
-export([query/3]).
-export([query/4]).
-export([transaction/2]).
-export([transaction/3]).
-export([with/2]).
-export([with/3]).
query(Pool, Stmt) when is_atom(Pool)->
query(Pool, Stmt, []);
query(Conn, Stmt) when is_pid(Conn) ->
epgsql:equery(Conn, Stmt).
query(Pool, Stmt, Params) when is_atom(Pool) ->
query(epg_pool_mgr:checkout(Pool), Pool, Stmt, Params);
query(Conn, Stmt, Params) when is_pid(Conn) ->
epgsql:equery(Conn, Stmt, Params).
query(empty, _Pool, _Stmt, _Params) ->
{error, overload};
query({error, _} = Err, _Pool, _Stmt, _Params) ->
Err;
query(Conn, Pool, Stmt, Params) when is_pid(Conn) ->
Result = epgsql:equery(Conn, Stmt, Params),
_ = epg_pool_mgr:checkin(Pool, self(), Conn),
Result.
transaction(Pool, Fun) when is_atom(Pool) ->
transaction(epg_pool_mgr:checkout(Pool), Pool, Fun);
transaction(Conn, Fun) when is_pid(Conn) ->
epgsql:with_transaction(Conn, Fun).
transaction(empty, _Pool, _Fun) ->
{error, overload};
transaction({error, _} = Err, _Pool, _Fun) ->
Err;
transaction(Conn, Pool, Fun) when is_pid(Conn) ->
Result = epgsql:with_transaction(Conn, Fun),
_ = epg_pool_mgr:checkin(Pool, self(), Conn),
Result.
with(Pool, Fun) when is_atom(Pool) ->
with(epg_pool_mgr:checkout(Pool), Pool, Fun);
with(Conn, Fun) when is_pid(Conn) ->
Fun(Conn).
with(empty, _Pool, _F) ->
{error, overload};
with(Conn, Pool, Fun) when is_pid(Conn) ->
Result = Fun(Conn),
_ = epg_pool_mgr:checkin(Pool, self(), Conn),
Result.

171
src/epg_pool_mgr.erl Normal file
View File

@ -0,0 +1,171 @@
-module(epg_pool_mgr).
-behaviour(gen_server).
-export([start_link/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([checkout/1]).
-export([checkin/3]).
-export([add/3]).
-export([remove/3]).
-record(epg_pool_mgr_state, {pool, params, size, connections, workers, owners}).
%% API
checkout(Pool) ->
RegName = reg_name(Pool, "_pool_mgr"),
gen_server:call(RegName, checkout).
checkin(Pool, Owner, Connection) ->
RegName = reg_name(Pool, "_pool_mgr"),
gen_server:cast(RegName, {checkin, Owner, Connection}).
add(Pool, Worker, Connection) ->
RegName = reg_name(Pool, "_pool_mgr"),
gen_server:cast(RegName, {add, Worker, Connection}).
remove(Pool, Worker, Connection) ->
RegName = reg_name(Pool, "_pool_mgr"),
gen_server:cast(RegName, {remove, Worker, Connection}).
%%
start_link(PoolName, DbParams, Size) ->
RegName = reg_name(PoolName, "_pool_mgr"),
gen_server:start_link({local, RegName}, ?MODULE, [PoolName, DbParams, Size], []).
init([PoolName, DbParams, Size]) ->
_ = start_workers(PoolName, Size),
{ok, #epg_pool_mgr_state{
pool = PoolName,
params = DbParams,
size = Size,
connections = queue:new(),
workers = #{},
owners = #{}
}}.
%%
handle_call(
checkout, {Pid, _Ref},
State = #epg_pool_mgr_state{owners = Owners}
) when erlang:is_map_key(Pid, Owners)->
{reply, {error, nested_checkout}, State};
handle_call(
checkout, {Pid, _Ref},
State = #epg_pool_mgr_state{connections = Conns, owners = Owners}
) ->
{Result, NewConns} = queue:out(Conns),
{State1, Response} =
case Result of
{value, Connection} ->
Ref = erlang:monitor(process, Pid),
{State#epg_pool_mgr_state{owners = Owners#{Pid => {Ref, Connection}}}, Connection};
empty ->
{State, empty}
end,
NewState = State1#epg_pool_mgr_state{connections = NewConns},
{reply, Response, NewState}.
handle_cast(
{add, Worker, Connection},
State = #epg_pool_mgr_state{connections = Conns, workers = Workers}
) ->
_ = erlang:monitor(process, Worker),
{
noreply,
State#epg_pool_mgr_state{
connections = unique_queue_push(Connection, Conns),
workers = Workers#{Worker => Connection}}
};
handle_cast(
{remove, Worker, Connection},
State = #epg_pool_mgr_state{connections = Conns, workers = Workers}
) ->
NewConns = queue:delete(Connection, Conns),
NewWorkers = maps:without([Worker], Workers),
_ = close(Connection),
{
noreply,
State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers}
};
handle_cast(
{checkin, Owner, Connection},
State = #epg_pool_mgr_state{connections = Conns, owners = Owners}
) ->
case maps:get(Owner, Owners, undefined) of
undefined ->
skip;
{Ref, _Conn} ->
_ = erlang:demonitor(Ref, [flush])
end,
{
noreply,
State#epg_pool_mgr_state{
connections = unique_queue_push(Connection, Conns),
owners = maps:without([Owner], Owners)
}
};
handle_cast(_Request, State = #epg_pool_mgr_state{}) ->
{noreply, State}.
handle_info(
{'DOWN', _MonitorRef, process, Pid, _Info},
State = #epg_pool_mgr_state{connections = Conns, workers = Workers}
) when erlang:is_map_key(Pid, Workers) ->
Connection = maps:get(Pid, Workers),
NewConns = queue:delete(Connection, Conns),
NewWorkers = maps:without([Pid], Workers),
_ = close(Connection),
{
noreply,
State#epg_pool_mgr_state{connections = NewConns, workers = NewWorkers}
};
handle_info(
{'DOWN', _MonitorRef, process, Pid, _Info},
State = #epg_pool_mgr_state{connections = Conns, owners = Owners}
) when erlang:is_map_key(Pid, Owners) ->
{_Ref, Connection} = maps:get(Pid, Owners),
NewConns = queue:delete(Connection, Conns),
NewOwners = maps:without([Pid], Owners),
_ = close(Connection),
{
noreply,
State#epg_pool_mgr_state{connections = NewConns, owners = NewOwners}
};
handle_info(_Info, State = #epg_pool_mgr_state{}) ->
{noreply, State}.
terminate(_Reason, _State = #epg_pool_mgr_state{}) ->
ok.
code_change(_OldVsn, State = #epg_pool_mgr_state{}, _Extra) ->
{ok, State}.
%%
start_workers(Pool, Size) ->
WorkerSup = reg_name(Pool, "_pool_wrk_sup"),
lists:foreach(fun(N) ->
supervisor:start_child(WorkerSup, [N])
end, lists:seq(1, Size)).
reg_name(Name, Postfix) ->
list_to_atom(atom_to_list(Name) ++ Postfix).
close(Connection) ->
try epgsql:close(Connection)
catch
_:_ ->
skip
end,
ok.
unique_queue_push(Item, Q) ->
case queue:member(Item, Q) of
true -> Q;
false -> queue:in(Item, Q)
end.

37
src/epg_pool_sup.erl Normal file
View File

@ -0,0 +1,37 @@
-module(epg_pool_sup).
-behaviour(supervisor).
-export([start_link/3]).
-export([init/1]).
start_link(PoolName, DbParams, Size) ->
RegName = reg_name(PoolName, "_pool_sup"),
supervisor:start_link({local, RegName}, ?MODULE, [PoolName, DbParams, Size]).
init([PoolName, DbParams, Size]) ->
SupFlags = #{
strategy => one_for_all,
intensity => 1000,
period => 3600
},
ConnectionsSupSpec = #{
id => reg_name(PoolName, "_pool_wrk_sup"),
start => {epg_pool_wrk_sup, start_link, [PoolName, DbParams, Size]},
type => supervisor
},
ManagerSpec = #{
id => reg_name(PoolName, "_pool_mgr"),
start => {epg_pool_mgr, start_link, [PoolName, DbParams, Size]}
},
Specs = [
ConnectionsSupSpec,
ManagerSpec
],
{ok, {SupFlags, Specs}}.
%% internal functions
reg_name(Name, Postfix) ->
list_to_atom(atom_to_list(Name) ++ Postfix).

64
src/epg_pool_wrk.erl Normal file
View File

@ -0,0 +1,64 @@
-module(epg_pool_wrk).
-behaviour(gen_server).
-export([start_link/4]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
handle_continue/2, code_change/3]).
%% API
-record(epg_pool_wrk_state, {pool, params, size, connection, monitor}).
start_link(PoolName, DbParams, Size, _N) ->
gen_server:start_link(?MODULE, [PoolName, DbParams, Size], []).
init([PoolName, DbParams, Size]) ->
erlang:process_flag(trap_exit, true),
{ok, #epg_pool_wrk_state{pool = PoolName, params = DbParams, size = Size}, {continue, init}}.
handle_continue(init, State) ->
{noreply, connect(State)}.
handle_call(_Request, _From, State = #epg_pool_wrk_state{}) ->
{reply, ok, State}.
handle_cast(_Request, State = #epg_pool_wrk_state{}) ->
{noreply, State}.
handle_info(
{'EXIT', Pid, _Info},
State = #epg_pool_wrk_state{pool = Pool, connection = Pid}
) ->
epg_pool_mgr:remove(Pool, self(), Pid),
reconnect_timer(),
{noreply, State#epg_pool_wrk_state{connection = undefined, monitor = undefined}};
handle_info({timeout, _Ref, reconnect}, State = #epg_pool_wrk_state{}) ->
{noreply, connect(State)};
handle_info(_Info, State = #epg_pool_wrk_state{}) ->
{noreply, State}.
terminate(_Reason, _State = #epg_pool_wrk_state{}) ->
ok.
code_change(_OldVsn, State = #epg_pool_wrk_state{}, _Extra) ->
{ok, State}.
%%
reconnect_timer() ->
erlang:start_timer(5000, self(), reconnect).
connect(#epg_pool_wrk_state{pool = Pool, params = Params} = State) ->
try epgsql:connect(Params) of
{ok, Connection} ->
epg_pool_mgr:add(Pool, self(), Connection),
State#epg_pool_wrk_state{connection = Connection};
{error, _Reason} ->
reconnect_timer(),
State
catch
_:_ ->
reconnect_timer(),
State
end.

42
src/epg_pool_wrk_sup.erl Normal file
View File

@ -0,0 +1,42 @@
-module(epg_pool_wrk_sup).
-behaviour(supervisor).
%% API
-export([start_link/3]).
%% Supervisor callbacks
-export([init/1]).
%%%===================================================================
%%% API functions
%%%===================================================================
start_link(PoolName, DbParams, Size) ->
RegName = reg_name(PoolName, "_pool_wrk_sup"),
supervisor:start_link({local, RegName}, ?MODULE, [PoolName, DbParams, Size]).
%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
init([PoolName, DbParams, Size]) ->
MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600,
SupFlags = #{
strategy => simple_one_for_one,
intensity => MaxRestarts,
period => MaxSecondsBetweenRestarts
},
ChildSpecs = [
#{
id => reg_name(PoolName, "_pool_wrk"),
start => {epg_pool_wrk, start_link, [PoolName, DbParams, Size]}
}
],
{ok, {SupFlags, ChildSpecs}}.
%%
reg_name(Name, Postfix) ->
list_to_atom(atom_to_list(Name) ++ Postfix).