Add presence server and session (#26)

* Allow to start presence sup w/ app config
* Allow to set presence based checks on sessions
This commit is contained in:
Andrew Mayorov 2019-08-14 18:19:02 +03:00 committed by GitHub
parent 67bc1b804b
commit 8b4771c2d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 672 additions and 152 deletions

View File

@ -24,6 +24,7 @@
jsx,
hackney,
cowlib,
{ranch, "1.7.1"},
{genlib, {git, "https://github.com/rbkmoney/genlib", {branch, "master"}}}
]}.
@ -57,12 +58,4 @@
{provider_hooks, [
{pre, [{compile, {neotoma, compile}}]}
]}.
{profiles, [
{test, [
{deps, [
{ranch, "1.7.1"}
]}
]}
]}.
]}.

View File

@ -11,6 +11,7 @@
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},1},
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},1},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.0">>},2},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.7.1">>},0},
{<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.4">>},1},
{<<"unicode_util_compat">>,{pkg,<<"unicode_util_compat">>,<<"0.4.1">>},2}]}.
[
@ -23,6 +24,7 @@
{<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>},
{<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>},
{<<"parse_trans">>, <<"09765507A3C7590A784615CFD421D101AEC25098D50B89D7AA1D66646BC571C1">>},
{<<"ranch">>, <<"6B1FAB51B49196860B733A49C07604465A47BDB78AA10C1C16A3D199F7F8C881">>},
{<<"ssl_verify_fun">>, <<"F0EAFFF810D2041E93F915EF59899C923F4568F4585904D010387ED74988E77B">>},
{<<"unicode_util_compat">>, <<"D869E4C68901DD9531385BB0C8C40444EBF624E60B6962D95952775CAC5E90CD">>}]}
].

View File

@ -9,7 +9,8 @@
jsx,
hackney,
cowlib,
genlib
genlib,
ranch
]},
{env, []},
{modules, []},

View File

@ -10,7 +10,7 @@
%%
-type service() :: consuela_health:servicename().
-type service() :: consuela_health:service_name().
-type tag() :: consuela_health:tag().
-type client() :: consuela_client:t().
-type millisecs() :: pos_integer().
@ -71,7 +71,7 @@ start_link(Name, Tags, Client, Opts) ->
tags := [tag()],
interval := #{init := millisecs(), idle := millisecs()},
tref => reference(),
nodes => [consuela_health:nodename()],
nodes => [consuela_health:node_name()],
client := client(),
pulse := {module(), _PulseOpts}
}.

View File

@ -17,29 +17,26 @@
opts => consuela_client:opts() % #{} by default
}.
-export([start_link/2]).
-export([start_link/1]).
-export([stop/1]).
-export_type([opts/0]).
%%
-type ref() :: atom().
-spec start_link(ref(), opts()) ->
-spec start_link(opts()) ->
{ok, pid()} | {error, _Reason}.
start_link(Ref, Opts) ->
start_link(Opts) ->
Name = maps:get(name, Opts),
Tags = maps:get(tags, Opts, []),
Client = mk_consul_client(maps:get(consul, Opts)),
DiscoveryOpts = maps:get(opts, Opts, #{}),
genlib_adhoc_supervisor:start_link(
{local, Ref},
#{strategy => one_for_one, intensity => 20, period => 5},
[
#{
id => {Ref, discovery},
id => discovery,
start => {consuela_discovery_server, start_link, [Name, Tags, Client, DiscoveryOpts]}
}
]
@ -52,8 +49,8 @@ mk_consul_client(Opts) ->
Url = maps:get(url, Opts),
consuela_client:new(Url, maps:get(opts, Opts, #{})).
-spec stop(ref() | pid()) ->
-spec stop(pid()) ->
ok.
stop(Ref) ->
proc_lib:stop(Ref).
stop(Pid) ->
proc_lib:stop(Pid).

View File

@ -5,30 +5,32 @@
%%
-type nodename() :: inet:hostname().
-type servicename() :: binary().
-type checkname() :: binary().
-type tag() :: binary().
-type status() :: passing | warning | critical.
-type endpoint() :: {inet:ip_address() | undefined, inet:port_number()}.
-type metadata() :: #{binary() => binary()}.
-type indexes() :: #{create | modify => integer()}.
-type node_name() :: inet:hostname().
-type service_id() :: id().
-type service_name() :: binary().
-type check_id() :: id().
-type check_name() :: binary().
-type tag() :: binary().
-type status() :: passing | warning | critical.
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
-type metadata() :: #{binary() => binary()}.
-type indexes() :: #{create | modify => integer()}.
-type id() :: binary().
-type uuid() :: binary().
-type seconds() :: non_neg_integer().
-type id() :: binary().
-type uuid() :: binary().
-type seconds() :: non_neg_integer().
-type node_() :: #{
id := uuid(),
name := nodename(),
name := node_name(),
address := inet:ip_address(),
metadata => metadata(),
indexes => indexes()
}.
-type service() :: #{
id := id(),
name := servicename(),
id := service_id(),
name := service_name(),
tags := [tag()],
endpoint := endpoint(),
metadata => metadata(),
@ -36,8 +38,8 @@
}.
-type check() :: #{
id := id(),
name := checkname(),
id := check_id(),
name := check_name(),
status := status(),
indexes => indexes()
}.
@ -48,18 +50,25 @@
checks := [check()]
}.
-export_type([servicename/0]).
-export_type([nodename/0]).
-export_type([service_id/0]).
-export_type([service_name/0]).
-export_type([check_id/0]).
-export_type([node_name/0]).
-export_type([tag/0]).
-export_type([endpoint/0]).
-export_type([status/0]).
-export_type([t/0]).
-export_type([service_params/0]).
-export_type([check_params/0]).
-export([get/4]).
-export([register/2]).
-export([deregister/2]).
%%
-spec get(servicename(), [tag()], boolean(), consuela_client:t()) ->
-spec get(service_name(), [tag()], boolean(), consuela_client:t()) ->
{ok, [t()]}.
get(ServiceName, Tags, Passing, Client) ->
@ -75,17 +84,17 @@ get(ServiceName, Tags, Passing, Client) ->
end.
-type service_params() :: #{
name := servicename(),
id => id(),
name := service_name(),
id => service_id(),
tags := [tag()],
endpoint := endpoint(),
endpoint => endpoint(),
checks => [check_params()]
}.
-type check_params() :: #{
name := checkname(),
id => id(),
type := {ttl, seconds()}, % TODO | {http, ...}
name := check_name(),
id => check_id(),
type := {ttl, seconds()} | {tcp, endpoint(), seconds()}, % TODO | {http, ...}
initial => status()
}.
@ -102,7 +111,7 @@ register(ServiceParams, Client) ->
erlang:error(Reason)
end.
-spec deregister(servicename() | id(), consuela_client:t()) ->
-spec deregister(service_name() | service_id(), consuela_client:t()) ->
ok | {error, notfound}.
deregister(ServiceNameOrID, Client) ->
@ -131,18 +140,19 @@ attach_passing(false, Q) ->
encode_service_params(#{
name := Name,
tags := Tags,
endpoint := {Address, Port}
tags := Tags
} = V) ->
maps:merge(
lists:foldl(
fun maps:merge/2,
#{
<<"Name">> => encode_servicename(Name),
<<"ID">> => encode_binary(maps:get(id, V, genlib:to_binary(Name))),
<<"Tags">> => encode_tags(Tags),
<<"Address">> => encode_address(Address),
<<"Port">> => encode_port(Port)
<<"Tags">> => encode_tags(Tags)
},
encode_checks(maps:get(checks, V, undefined))
[
encode_endpoint(maps:get(endpoint, V, undefined)),
encode_checks(maps:get(checks, V, undefined))
]
).
encode_tags(V) ->
@ -151,6 +161,14 @@ encode_tags(V) ->
encode_tag(V) ->
encode_binary(V).
encode_endpoint(undefined) ->
#{};
encode_endpoint({IP, Port}) ->
#{
<<"Address">> => encode_address(IP),
<<"Port">> => encode_port(Port)
}.
encode_address(V) ->
case inet:ntoa(V) of
R when is_list(R) ->
@ -170,9 +188,9 @@ encode_checks(V) when is_list(V) ->
encode_check_params(#{name := Name, type := Type} = V) ->
maps:merge(
#{
<<"Name">> => encode_servicename(Name),
<<"ID">> => encode_binary(maps:get(id, V, Name)),
<<"Status">> => encode_status(maps:get(initial, V, critical))
<<"CheckID">> => encode_binary(maps:get(id, V, Name)),
<<"Name">> => encode_servicename(Name),
<<"Status">> => encode_status(maps:get(initial, V, critical))
},
encode_check_type(Type)
).
@ -180,6 +198,11 @@ encode_check_params(#{name := Name, type := Type} = V) ->
encode_check_type({ttl, V}) ->
#{
<<"TTL">> => encode_duration('s', V)
};
encode_check_type({tcp, {IP, Port}, Interval}) ->
#{
<<"TCP">> => iolist_to_binary([encode_address(IP), ":", integer_to_binary(encode_port(Port))]),
<<"Interval">> => encode_duration('s', Interval)
}.
encode_status(passing) ->

View File

@ -0,0 +1,113 @@
%%%
%%% Presence server
%%%
%%% A simple TCP server which only purpose is to service Consul TCP healthchecks.
-module(consuela_presence_server).
%%
-type ref() :: _.
-type transport_opts() :: ranch:opts().
-type opts() :: #{
transport_opts => transport_opts(),
pulse => {module(), _PulseOpts}
}.
-export([child_spec/2]).
-export([get_endpoint/1]).
-export_type([ref/0]).
-export_type([transport_opts/0]).
-export_type([opts/0]).
%%
-behaviour(ranch_protocol).
-export([start_link/4]).
-export([init/4]).
%%
-type beat() ::
{{socket, {module(), inet:socket()}}, accepted}.
-export_type([beat/0]).
-callback handle_beat(beat(), _PulseOpts) ->
_.
-export([handle_beat/2]).
%%
-spec child_spec(_Ref, opts()) ->
supervisor:child_spec().
child_spec(Ref, Opts) ->
ranch:child_spec(
{?MODULE, Ref},
ranch_tcp,
maps:get(transport_opts, Opts, #{}),
?MODULE,
mk_state(Opts)
).
-spec get_endpoint(_Ref) ->
{ok, consuela_health:endpoint()} | {error, undefined}.
get_endpoint(Ref) ->
case ranch:get_addr({?MODULE, Ref}) of
{IP, Port} when is_tuple(IP), is_integer(Port) ->
{ok, {IP, Port}};
{undefined, _} ->
{error, undefined}
end.
%%
-type st() :: #{
pulse := {module(), _PulseOpts}
}.
-spec mk_state(opts()) ->
st().
mk_state(Opts) ->
#{
pulse => maps:get(pulse, Opts, {?MODULE, []})
}.
-spec start_link(pid(), inet:socket(), module(), st()) ->
{ok, pid()}.
start_link(ListenerPid, Socket, Transport, St) ->
Pid = spawn_link(?MODULE, init, [ListenerPid, Socket, Transport, St]),
{ok, Pid}.
-spec init(pid(), inet:socket(), module(), st()) ->
_.
init(ListenerPid, Socket, Transport, St) ->
{ok, _} = ranch:handshake(ListenerPid),
_ = beat({{socket, {Transport, Socket}}, accepted}, St),
ok = Transport:close(Socket),
ok.
%%
-spec beat(beat(), st()) ->
_.
beat(Beat, #{pulse := {Module, PulseOpts}}) ->
% TODO handle errors?
Module:handle_beat(Beat, PulseOpts).
-spec handle_beat(beat(), [trace]) ->
ok.
handle_beat(Beat, [trace]) ->
logger:debug("[~p] ~p", [?MODULE, Beat]);
handle_beat(_Beat, []) ->
ok.

View File

@ -0,0 +1,191 @@
%%%
%%% Presence session
%%%
%%% A server which keeps _presence_ in the Consul catalog by registering a service on startup and
%%% deregistering on shutdown.
-module(consuela_presence_session).
%%
-type name() :: consuela_health:service_name().
-type client() :: consuela_client:t().
-type seconds() :: non_neg_integer().
-type opts() :: #{
interval => seconds(), % 5 by default
pulse => {module(), _PulseOpts}
}.
-export([start_link/5]).
-export([get_check_id/1]).
-export_type([name/0]).
-export_type([opts/0]).
%% gen server
-behaviour(gen_server).
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).
%%
-type beat() ::
{{presence, name()}, started | {stopped, _Reason}} |
{unexpected, {{call, from()} | cast | info, _Msg}}.
-export_type([beat/0]).
-callback handle_beat(beat(), _PulseOpts) ->
_.
-export([handle_beat/2]).
%%
-spec start_link(name(), inet:ip_address(), consuela_presence_server:ref(), client(), opts()) ->
{ok, pid()} | {error, _}.
start_link(Name, Address, ServerRef, Client, Opts) ->
St = mk_state(Name, Address, ServerRef, Client, Opts),
gen_server:start_link({local, mk_local_ref(Name)}, ?MODULE, St, []).
-spec get_check_id(name()) ->
consuela_health:check_id().
get_check_id(Name) ->
gen_server:call(mk_local_ref(Name), get_check_id).
-spec mk_local_ref(name()) ->
atom().
mk_local_ref(Name) ->
erlang:binary_to_atom(<<"$consuela_presence_session/", Name/binary>>, latin1).
%%
-type st() :: #{
name := name(),
address := inet:ip_address(),
server := consuela_health:endpoint(),
check_id := consuela_health:check_id(),
interval := non_neg_integer(),
client := client(),
pulse := {module(), _PulseOpts}
}.
-type from() :: {pid(), reference()}.
-spec mk_state(name(), inet:ip_address(), consuela_presence_server:ref(), client(), opts()) ->
st().
mk_state(Name, Address, ServerRef, Client, Opts) ->
{ok, Endpoint} = consuela_presence_server:get_endpoint(ServerRef),
#{
name => Name,
address => Address,
server => Endpoint,
check_id => <<Name/binary, ":presence:tcp">>,
interval => maps:get(interval, Opts, 5),
client => Client,
pulse => maps:get(pulse, Opts, {?MODULE, []})
}.
-spec init(st()) ->
{ok, st(), hibernate}.
init(St = #{name := Name}) ->
_ = erlang:process_flag(trap_exit, true),
_ = register_service(St),
_ = beat({{presence, Name}, started}, St),
{ok, St, hibernate}.
-spec handle_call(_Call, from(), st()) ->
{reply, _, st(), hibernate} | {noreply, st(), hibernate}.
handle_call(get_check_id, _From, St = #{check_id := CheckID}) ->
{reply, CheckID, St, hibernate};
handle_call(Call, From, St) ->
_ = beat({unexpected, {{call, From}, Call}}, St),
{noreply, St, hibernate}.
-spec handle_cast(_Cast, st()) ->
{noreply, st(), hibernate}.
handle_cast(Cast, St) ->
_ = beat({unexpected, {cast, Cast}}, St),
{noreply, St, hibernate}.
-spec handle_info(_Info, st()) ->
{noreply, st(), hibernate}.
handle_info(Info, St) ->
_ = beat({unexpected, {info, Info}}, St),
{noreply, St, hibernate}.
-spec terminate(_Reason, st()) ->
ok.
terminate(Reason, St = #{name := Name}) ->
_ = deregister_service(St),
_ = beat({{presence, Name}, {stopped, Reason}}, St),
ok.
-spec code_change(_Vsn | {down, _Vsn}, st(), _Extra) ->
{ok, st()}.
code_change(_Vsn, St, _Extra) ->
{ok, St}.
register_service(#{
name := Name,
address := Address,
check_id := CheckID,
server := {_, Port},
interval := Interval,
client := Client
}) ->
ServiceParams = #{
name => Name,
endpoint => {Address, 0},
tags => [], % TODO
checks => [
#{
id => CheckID,
name => CheckID,
type => {tcp, {Address, Port}, Interval},
initial => passing % So we would be able to start discovery right away
% TODO
% https://www.consul.io/api/agent/check.html#deregistercriticalserviceafter ?
}
]
},
ok = consuela_health:register(ServiceParams, Client),
ok.
deregister_service(#{name := Name, client := Client}) ->
ok = consuela_health:deregister(Name, Client),
ok.
%%
-spec beat(beat(), st()) ->
_.
beat(Beat, #{pulse := {Module, PulseOpts}}) ->
% TODO handle errors?
Module:handle_beat(Beat, PulseOpts).
-spec handle_beat(beat(), [trace]) ->
ok.
handle_beat(Beat, [trace]) ->
logger:debug("[~p] ~p", [?MODULE, Beat]);
handle_beat(_Beat, []) ->
ok.

View File

@ -0,0 +1,113 @@
%%%
%%% Presence supervisor
-module(consuela_presence_sup).
%%
-type opts() :: #{
name := consuela_health:service_name(),
address => inet:ip_address(),
consul := consul_opts(),
server_opts => consuela_presence_server:opts(), % #{} by default
session_opts => consuela_presence_session:opts(), % #{} by default
shutdown => timeout() % if not specified uses supervisor default of 5000 ms
}.
-type consul_opts() :: #{
url := consuela_client:url(),
opts => consuela_client:opts() % #{} by default
}.
-export([start_link/1]).
-export([stop/1]).
-export_type([opts/0]).
%%
-spec start_link(opts()) ->
{ok, pid()} | {error, _}.
start_link(Opts) ->
Name = maps:get(name, Opts),
Address = maps:get(address, Opts, guess_host_address()),
Client = mk_consul_client(maps:get(consul, Opts)),
genlib_adhoc_supervisor:start_link(
#{strategy => one_for_one, intensity => 20, period => 5},
[
consuela_presence_server:child_spec(Name, maps:get(server_opts, Opts, #{})),
maps:merge(
#{
id => {Name, session},
start => {consuela_presence_session, start_link, [
Name,
Address,
Name,
Client,
maps:get(session_opts, Opts, #{})
]}
},
maps:with([shutdown], Opts)
)
]
).
-spec mk_consul_client(consul_opts()) ->
consuela_client:t().
mk_consul_client(Opts) ->
Url = maps:get(url, Opts),
consuela_client:new(Url, maps:get(opts, Opts, #{})).
-spec stop(pid()) ->
ok.
stop(Pid) ->
proc_lib:stop(Pid).
%%
-spec guess_host_address() ->
inet:ip_address().
guess_host_address() ->
guess_host_address([distribution, hostname]).
guess_host_address([Method | Rest]) ->
case do_guess_host_address(Method) of
Address when is_tuple(Address) ->
Address;
undefined ->
guess_host_address(Rest)
end.
do_guess_host_address(distribution) ->
case net_kernel:longnames() of
true ->
Nodename = erlang:atom_to_list(erlang:node()),
[_, Hostname] = string:split(Nodename, "@"),
get_host_address(Hostname);
_ ->
undefined
end;
do_guess_host_address(hostname) ->
{ok, Hostname} = inet:gethostname(),
get_host_address(Hostname).
%%
-include_lib("kernel/include/inet.hrl").
get_host_address(Hostname) ->
case inet:parse_address(Hostname) of
{ok, Address} ->
Address;
{error, einval} ->
case inet:gethostbyname(Hostname) of
{ok, #hostent{h_addr_list = [Address | _]}} ->
Address;
{error, _} ->
undefined
end
end.

View File

@ -13,9 +13,12 @@
%%
-type nodename() :: consuela_session:nodename().
-type namespace() :: consuela_registry:namespace().
-type opts() :: #{
nodename := consuela_session:nodename(),
namespace := consuela_registry:namespace(),
nodename := nodename(),
namespace := namespace(),
consul => consul_opts(),
session => session_opts(),
keeper => consuela_session_keeper:opts(), % #{} by default
@ -31,6 +34,7 @@
-type session_opts() :: #{
name => consuela_session:name(), % '{namespace}' by default
presence => consuela_presence_session:name(), % which checks to associate
ttl => consuela_session:ttl(), % 20 by default
lock_delay => consuela_session:delay() % 10 by default
}.
@ -54,11 +58,10 @@ start_link(Ref, Opts) ->
Nodename = maps:get(nodename, Opts),
Namespace = maps:get(namespace, Opts),
Client = mk_consul_client(Nodename, maps:get(consul, Opts, #{})),
SessionOpts = mk_session_params(Namespace, Nodename, maps:get(session, Opts, #{})),
KeeperOpts = maps:get(keeper, Opts, #{}),
ReaperOpts = maps:get(reaper, Opts, #{}),
RegistryOpts = maps:get(registry, Opts, #{}),
Session = mk_session(SessionOpts, Client),
Session = mk_session(Namespace, Nodename, Client, maps:get(session, Opts, #{})),
Registry = consuela_registry:new(Namespace, Session, Client),
{ok, Pid} = supervisor:start_link(?MODULE, []),
{ok, _KeeperPid} = supervisor:start_child(
@ -99,23 +102,23 @@ mk_consul_client_url(_Nodename, Url) when Url /= undefined ->
mk_consul_client_url(Nodename, undefined) ->
genlib:format("http://~s:8500", [Nodename]).
-type session_params() :: {
consuela_session:name(),
consuela_session:nodename(),
consuela_session:ttl(),
consuela_session:delay()
}.
-spec mk_session(namespace(), nodename(), consuela_client:t(), session_opts()) ->
consuela_session:t().
-spec mk_session_params(consuela_registry:namespace(), consuela_session:nodename(), session_opts()) ->
session_params().
mk_session_params(Namespace, Nodename, Opts) ->
mk_session(Namespace, Node, Client, Opts) ->
Name = maps:get(name, Opts, Namespace),
TTL = maps:get(ttl, Opts, 20),
Delay = maps:get(lock_delay, Opts, 10),
{maps:get(name, Opts, Namespace), Nodename, TTL, Delay}.
mk_session({Name, Node, TTL, LockDelay}, Client) ->
{ok, SessionID} = consuela_session:create(Name, Node, TTL, LockDelay, delete, Client),
Opts0 = #{
behavior => delete,
lock_delay => maps:get(lock_delay, Opts, 10)
},
Opts1 = case maps:find(presence, Opts) of
{ok, Presence} ->
Opts0#{checks => [consuela_presence_session:get_check_id(Presence)]};
error ->
Opts0
end,
{ok, SessionID} = consuela_session:create(Name, Node, TTL, Opts1, Client),
{ok, Session} = consuela_session:get(SessionID, Client),
Session.

View File

@ -9,6 +9,7 @@
-type name() :: binary().
-type nodename() :: inet:hostname().
-type ttl() :: seconds().
-type check() :: consuela_health:check_id().
-type delay() :: seconds().
-type behavior() :: release | delete.
-type indexes() :: #{create | modify => integer()}.
@ -20,6 +21,7 @@
id := id(),
name := name(),
node := nodename(),
checks := [check()],
ttl := ttl(),
lock_delay := delay(),
behavior := behavior(),
@ -36,20 +38,22 @@
-export([create/4]).
-export([create/5]).
-export([create/6]).
-export([destroy/2]).
-export([get/2]).
-export([renew/2]).
%%
-type opts() :: #{
checks => [check()], % [] by default
lock_delay => delay(),
behavior => behavior()
}.
-spec create(name(), nodename(), ttl(), consuela_client:t()) ->
{ok, id()}.
-spec create(name(), nodename(), ttl(), delay(), consuela_client:t()) ->
{ok, id()}.
-spec create(name(), nodename(), ttl(), delay(), behavior(), consuela_client:t()) ->
-spec create(name(), nodename(), ttl(), opts(), consuela_client:t()) ->
{ok, id()}.
-spec get(id(), consuela_client:t()) ->
@ -62,20 +66,11 @@
{ok, t()}.
create(Name, Node, TTL, Client) ->
create(#{name => Name, node => Node, ttl => TTL}, Client).
create(Name, Node, TTL, #{}, Client).
create(Name, Node, TTL, LockDelay, Client) ->
create(#{name => Name, node => Node, ttl => TTL, lock_delay => LockDelay}, Client).
create(Name, Node, TTL, LockDelay, Behavior, Client) ->
create(
#{name => Name, node => Node, ttl => TTL, lock_delay => LockDelay, behavior => Behavior},
Client
).
create(Params, Client) ->
create(Name, Node, TTL, Opts, Client) ->
Resource = <<"/v1/session/create">>,
Content = encode_params(Params),
Content = encode_params(mk_params(Name, Node, TTL, Opts)),
case consuela_client:request(put, Resource, Content, Client) of
{ok, SessionID} ->
{ok, decode_session_id(SessionID)};
@ -83,6 +78,20 @@ create(Params, Client) ->
erlang:error(Reason)
end.
mk_params(Name, Node, TTL, Opts) ->
% NOTE
% We deliberately override Consul defaults here despite strong recommendations against such measures.
% This is because letting serfHealth decide if the node is dead proved too unreliable during stress
% testing. We were able to trigger session invalidation when consuela app was alive and stable, session
% had 10 seconds more to live, even the node was on the majority side of a cluster.
Params = #{
name => Name,
node => Node,
ttl => TTL,
checks => []
},
maps:merge(Params, Opts).
get(ID, Client) ->
Resource = [<<"/v1/session/info/">>, encode_id(ID)],
case consuela_client:request(get, Resource, Client) of
@ -119,6 +128,7 @@ encode_params(Params = #{name := Name, node := Node, ttl := TTL}) ->
fun
(behavior, V, R) -> R#{<<"Behavior">> => encode_behavior(V)};
(lock_delay, V, R) -> R#{<<"LockDelay">> => encode_seconds(V)};
(checks, V, R) -> R#{<<"Checks">> => encode_checks(V)};
(_, _, R) -> R
end,
#{
@ -140,6 +150,9 @@ encode_nodename(V) when is_atom(V) ->
encode_nodename(V) when is_list(V) ->
encode_string(V).
encode_checks(V) when is_list(V) ->
[encode_id(E) || E <- V].
encode_seconds(V) ->
encode_duration('s', V).
@ -167,18 +180,18 @@ decode_session(#{
<<"Name">> := Name,
<<"Node">> := Node,
<<"Behavior">> := Behavior,
<<"Checks">> := Checks,
<<"TTL">> := TTL,
<<"LockDelay">> := LockDelay,
<<"CreateIndex">> := CreateIndex,
<<"ModifyIndex">> := ModifyIndex
% TODO
% <<"Checks">> := Checks,
}) ->
#{
id => decode_id(ID),
name => decode_name(Name),
node => decode_nodename(Node),
behavior => decode_behavior(Behavior),
checks => decode_checks(Checks),
ttl => decode_seconds(TTL),
lock_delay => decode_seconds(LockDelay),
indexes => #{
@ -196,6 +209,11 @@ decode_name(V) ->
decode_nodename(V) ->
decode_string(V).
decode_checks(V) when is_list(V) ->
[decode_id(E) || E <- V];
decode_checks(null) ->
[].
decode_seconds(V) ->
decode_duration('s', V).

View File

@ -16,18 +16,31 @@ start_link() ->
genlib_adhoc_supervisor:start_link(
#{strategy => one_for_one, intensity => 0, period => 1},
lists:append([
mk_presence_childspec(genlib_app:env(consuela, presence)),
mk_discovery_childspec(genlib_app:env(consuela, discovery)),
mk_registry_childspec(genlib_app:env(consuela, registry))
])
).
-spec mk_presence_childspec(consuela_presence_sup:opts() | undefined) ->
[supervisor:child_spec()].
mk_presence_childspec(Opts = #{}) ->
[#{
id => presence,
start => {consuela_presence_sup, start_link, [Opts]},
type => supervisor
}];
mk_presence_childspec(undefined) ->
[].
-spec mk_discovery_childspec(consuela_discovery_sup:opts() | undefined) ->
[supervisor:child_spec()].
mk_discovery_childspec(Opts = #{}) ->
[#{
id => discovery,
start => {consuela_discovery_sup, start_link, [discovery, Opts]},
start => {consuela_discovery_sup, start_link, [Opts]},
type => supervisor
}];
mk_discovery_childspec(undefined) ->

View File

@ -14,6 +14,8 @@
-export([groups/0]).
-export([init_per_suite/1]).
-export([end_per_suite/1]).
-export([init_per_group/2]).
-export([end_per_group/2]).
-export([init_per_testcase/2]).
-export([end_per_testcase/2]).
@ -42,7 +44,9 @@
all() ->
[
{group, regular_workflow}
{group, regular_workflow},
{group, proper_exceptions},
{group, presence_workflow}
].
-spec groups() ->
@ -50,8 +54,8 @@ all() ->
groups() ->
[
{regular_workflow, [parallel], [
{regular_workflow, [parallel], [
empty_lookup_notfound,
empty_unregistration_notfound,
registration_persists,
@ -60,13 +64,19 @@ groups() ->
registration_unregistration_succeeds,
conflicting_unregistration_fails,
dead_registration_cleaned,
registrations_select_ok,
registrations_select_ok
]},
{proper_exceptions, [], [
unavail_lookup_exits,
unavail_registration_exits,
no_registry_exits
]},
{presence_workflow, [], [
{group, regular_workflow}
]}
].
%% Startup / shutdown
@ -77,6 +87,12 @@ groups() ->
-spec end_per_suite(config()) ->
_.
-spec init_per_group(group_name(), config()) ->
config().
-spec end_per_group(group_name(), config()) ->
_.
-spec init_per_testcase(test_name(), config()) ->
config().
@ -88,37 +104,67 @@ init_per_suite(C) ->
genlib_app:start_application(ranch) ++
genlib_app:start_application(consuela),
ok = ct_consul:await_ready(),
[{suite_apps, Apps} | C].
Consul = #{
url => "http://consul0:8500",
opts => #{
pulse => {?MODULE, {client, debug}}
}
},
[{suite_apps, Apps}, {consul, Consul} | C].
end_per_suite(C) ->
genlib_app:test_application_stop(?config(suite_apps, C)).
init_per_testcase(Name, C) ->
init_per_group(proper_exceptions, C) ->
{ok, Proxy = #{endpoint := {Host, Port}}} = ct_proxy:start_link({"consul0", 8500}),
Consul = #{
url => ["http://", Host, ":", integer_to_list(Port)],
opts => #{
transport_opts => #{
pool => false,
connect_timeout => 100,
recv_timeout => 1000
},
pulse => {?MODULE, {client, debug}}
}
},
[{proxy, ct_proxy:unlink(Proxy)}, {consul, Consul} | C];
init_per_group(presence_workflow, C) ->
Name = <<?MODULE_STRING>>,
Opts = #{
name => Name,
consul => ?config(consul, C),
server_opts => #{pulse => {?MODULE, {presence_server, info}}},
session_opts => #{pulse => {?MODULE, {presence_session, info}}}
},
{ok, Pid} = consuela_presence_sup:start_link(Opts),
[{session, #{presence => Name}}, {presence_sup, ct_helper:unlink(Pid)} | C];
init_per_group(_, C) ->
C.
end_per_group(proper_exceptions, C) ->
ct_proxy:stop(?config(proxy, C));
end_per_group(presence_workflow, C) ->
consuela_presence_sup:stop(?config(presence_sup, C));
end_per_group(_, _C) ->
ok.
init_per_testcase(Name, C) ->
Opts = #{
nodename => "consul0",
namespace => genlib:to_binary(Name),
consul => #{
url => ["http://", Host, ":", integer_to_list(Port)],
opts => #{
transport_opts => #{
pool => false,
connect_timeout => 100,
recv_timeout => 1000
},
pulse => {?MODULE, {client, debug}}
}
},
consul => ?config(consul, C),
session => proplists:get_value(session, C, #{}),
keeper => #{pulse => {?MODULE, {keeper, info}}},
reaper => #{pulse => {?MODULE, {reaper, info}}},
registry => #{pulse => {?MODULE, {registry, info}}}
},
{ok, Pid} = consuela_registry_sup:start_link(Name, Opts),
[{registry, Name}, {registry_sup, Pid}, {proxy, Proxy}, {testcase, Name} | C].
[{registry, Name}, {registry_sup, Pid}, {testcase, Name} | C].
end_per_testcase(_Name, C) ->
_ = (catch consuela_registry_sup:stop(?config(registry_sup, C))),
_ = (catch ct_proxy:stop(?config(proxy, C))).
catch change_proxy_mode(pass, C),
catch consuela_registry_sup:stop(?config(registry_sup, C)).
%% Definitions
@ -203,7 +249,7 @@ registrations_select_ok(C) ->
unavail_lookup_exits(C) ->
Ref = ?config(registry, C),
ok = change_proxy_mode(pass, ignore, C),
ok = change_proxy_mode(ignore, C),
?assertExit(
{consuela, {unknown, {transport_error, timeout}}},
lookup(Ref, my_boy)
@ -211,7 +257,7 @@ unavail_lookup_exits(C) ->
unavail_registration_exits(C) ->
Ref = ?config(registry, C),
ok = change_proxy_mode(pass, ignore, C),
ok = change_proxy_mode(ignore, C),
?assertExit(
{consuela, {unknown, timeout}},
register(Ref, my_boy, self())
@ -238,10 +284,10 @@ unregister(Ref, Name, Pid) ->
lookup(Ref, Name) ->
consuela_registry_server:lookup(Ref, Name).
change_proxy_mode(ModeWas, Mode, C) ->
change_proxy_mode(Mode, C) ->
Proxy = ?config(proxy, C),
_ = ct:pal(debug, "[~p] setting proxy from '~p' to '~p'", [?config(testcase, C), ModeWas, Mode]),
_ = ?assertEqual({ok, ModeWas}, ct_proxy:mode(Proxy, Mode)),
{ok, ModeWas} = ct_proxy:mode(Proxy, Mode),
_ = ct:pal(debug, "[~p] set proxy from '~p' to '~p'", [?config(testcase, C), ModeWas, Mode]),
ok.
%%

View File

@ -6,6 +6,7 @@
-include_lib("stdlib/include/assert.hrl").
-export([stop_linked/2]).
-export([unlink/1]).
-export([await/2]).
-export([await/3]).
@ -26,6 +27,13 @@ stop_linked(Pid, Reason) ->
ok
end.
-spec unlink(pid()) ->
pid().
unlink(Pid) ->
true = erlang:unlink(Pid),
Pid.
%%
-spec await(Expect, fun(() -> Expect | _)) ->

View File

@ -2,6 +2,7 @@
-export([start_link/1]).
-export([start_link/2]).
-export([unlink/1]).
-export([mode/2]).
-export([stop/1]).
@ -38,6 +39,9 @@
-spec start_link(endpoint(), ranch_tcp:opts()) ->
{ok, proxy()}.
-spec unlink(proxy()) ->
proxy().
start_link(Upstream) ->
start_link(Upstream, [{ip, {127, 0, 0, 1}}]).
@ -70,6 +74,10 @@ resolve_endpoint({Host, Port}) ->
{ok, #hostent{h_addr_list = [Address | _Rest]}} = inet:gethostbyname(Host),
{Address, Port}.
unlink(Proxy = #{supervisor := SupPid}) ->
true = erlang:unlink(SupPid),
Proxy.
-spec mode(proxy(), mode()) ->
{ok, mode()}.

View File

@ -1,48 +1,39 @@
-module(discovery_node_runner).
-include_lib("kernel/include/inet.hrl").
-export([run/4]).
-export([run/3]).
-export([handle_beat/2]).
%%
-spec run(binary(), inet:hostname(), consuela_client:url(), pos_integer()) ->
-spec run(binary(), consuela_client:url(), pos_integer()) ->
ok.
run(Nodename, Hostname, ConsulUrl, Lifetime) ->
run(Nodename, ConsulUrl, Lifetime) ->
ok = logger:set_primary_config(level, info),
_Apps = genlib_app:start_application(consuela),
ok = ct_consul:await_ready(),
Client = consuela_client:new(ConsulUrl, opts(client)),
ok = register(Nodename, Hostname, Lifetime, Client),
try
{ok, _Pid} = consuela_discovery_server:start_link(Nodename, [], Client, opts(discovery)),
ok = await(Lifetime)
after
ok = deregister(Nodename, Client)
end,
Consul = #{
url => ConsulUrl,
opts => opts(consul)
},
PresenceOpts = #{
name => Nodename,
consul => Consul,
server_opts => opts(presence_server),
session_opts => opts(presence_session)
},
DiscoveryOpts = #{
name => Nodename,
consul => Consul,
opts => opts(discovery)
},
{ok, _} = consuela_presence_sup:start_link(PresenceOpts),
{ok, _} = consuela_discovery_sup:start_link(DiscoveryOpts),
ok = await(Lifetime),
init:stop().
register(Nodename, Hostname, Lifetime, Client) ->
{ok, #hostent{h_addr_list = [IP | _]}} = inet:gethostbyname(Hostname),
consuela_health:register(
#{
name => Nodename,
tags => [],
endpoint => {IP, 31337},
checks => [#{
name => <<Nodename/binary, ":erlang-node">>,
type => {ttl, Lifetime},
initial => passing
}]
},
Client
).
deregister(Nodename, Client) ->
consuela_health:deregister(Nodename, Client).
await(Lifetime) ->
ok = os_signal_relay:replace([sigterm]),
receive

View File

@ -7,7 +7,7 @@ LIFETIME=${3:-90}
COOKIE=${4:-${NODENAME}}
SCRIPT=$(cat <<END
discovery_node_runner:run(<<"${NODENAME}">>, "${HOSTNAME}", "http://${CONSUL}:8500", ${LIFETIME}).
discovery_node_runner:run(<<"${NODENAME}">>, "http://${CONSUL}:8500", ${LIFETIME}).
END
)