From 7dac1c80f364fc07bee30c36d014b76f5f80ab28 Mon Sep 17 00:00:00 2001 From: Anton Belyaev Date: Wed, 30 May 2018 17:45:42 +0300 Subject: [PATCH] Introduce machinegun client/processor handler lib (#1) --- .gitignore | 13 + Jenkinsfile | 53 ++ Makefile | 48 ++ README.md | 4 +- docker-compose.sh | 28 + elvis.config | 77 +++ rebar.config | 71 +++ rebar.lock | 44 ++ src/machinery.app.src | 23 + src/machinery.erl | 150 ++++++ src/machinery_backend.erl | 45 ++ src/machinery_machine_unique_tag.erl | 120 +++++ ...achinery_machine_unique_tag_mg_example.erl | 96 ++++ src/machinery_mg_backend.erl | 497 ++++++++++++++++++ src/machinery_mg_client.erl | 69 +++ src/machinery_mg_schema.erl | 46 ++ src/machinery_mg_schema_generic.erl | 97 ++++ src/machinery_msgpack.erl | 81 +++ src/machinery_utils.erl | 75 +++ test/ct_helper.erl | 160 ++++++ test/ct_sup.erl | 32 ++ test/machinegun/config.yaml | 6 + ...chine_unique_tag_mg_example_test_SUITE.erl | 151 ++++++ 23 files changed, 1984 insertions(+), 2 deletions(-) create mode 100644 .gitignore create mode 100644 Jenkinsfile create mode 100644 Makefile create mode 100755 docker-compose.sh create mode 100644 elvis.config create mode 100644 rebar.config create mode 100644 rebar.lock create mode 100644 src/machinery.app.src create mode 100644 src/machinery.erl create mode 100644 src/machinery_backend.erl create mode 100644 src/machinery_machine_unique_tag.erl create mode 100644 src/machinery_machine_unique_tag_mg_example.erl create mode 100644 src/machinery_mg_backend.erl create mode 100644 src/machinery_mg_client.erl create mode 100644 src/machinery_mg_schema.erl create mode 100644 src/machinery_mg_schema_generic.erl create mode 100644 src/machinery_msgpack.erl create mode 100644 src/machinery_utils.erl create mode 100644 test/ct_helper.erl create mode 100644 test/ct_sup.erl create mode 100644 test/machinegun/config.yaml create mode 100644 test/machinery_machine_unique_tag_mg_example_test_SUITE.erl diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..30e2e85 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +# general +log +/_build/ +/_checkouts/ +*~ +erl_crash.dump +.tags* +*.sublime-workspace +.DS_Store +docker-compose.yml +/.idea/ +*.beam +/.edts \ No newline at end of file diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000..1f6c172 --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,53 @@ +#!groovy +// -*- mode: groovy -*- + +def finalHook = { + runStage('store CT logs') { + archive '_build/test/logs/' + } +} + +build('machinery', 'docker-host', finalHook) { + checkoutRepo() + loadBuildUtils() + + def pipeDefault + def withWsCache + runStage('load pipeline') { + env.JENKINS_LIB = "build_utils/jenkins_lib" + pipeDefault = load("${env.JENKINS_LIB}/pipeDefault.groovy") + withWsCache = load("${env.JENKINS_LIB}/withWsCache.groovy") + } + + pipeDefault() { + + if (!masterlikeBranch()) { + + runStage('compile') { + withGithubPrivkey { + sh 'make wc_compile' + } + } + + runStage('lint') { + sh 'make wc_lint' + } + + runStage('xref') { + sh 'make wc_xref' + } + + runStage('dialyze') { + withWsCache("_build/default/rebar3_19.1_plt") { + sh 'make wc_dialyze' + } + } + + runStage('test') { + sh "make wdeps_test" + } + + } + + } +} diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..508f092 --- /dev/null +++ b/Makefile @@ -0,0 +1,48 @@ +REBAR := $(shell which rebar3 2>/dev/null || which ./rebar3) + +UTILS_PATH := build_utils +TEMPLATES_PATH := . + +SUBMODULES = $(UTILS_PATH) +SUBTARGETS = $(patsubst %,%/.git,$(SUBMODULES)) + +SERVICE_NAME := machinery +BUILD_IMAGE_TAG := eee42f2ca018c313190bc350fe47d4dea70b6d27 + +CALL_ANYWHERE := all submodules compile xref lint dialyze clean distclean + +CALL_W_CONTAINER := $(CALL_ANYWHERE) test + +all: compile + +-include $(UTILS_PATH)/make_lib/utils_container.mk + +.PHONY: $(CALL_W_CONTAINER) + +$(SUBTARGETS): %/.git: % + git submodule update --init $< + touch $@ + +submodules: $(SUBTARGETS) + +compile: submodules + $(REBAR) compile + +xref: submodules + $(REBAR) xref + +lint: + elvis rock + +dialyze: submodules + $(REBAR) dialyzer + +clean: + $(REBAR) clean + +distclean: + $(REBAR) clean -a + rm -rf _build + +test: submodules + $(REBAR) ct diff --git a/README.md b/README.md index a8f0ee8..289c32c 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ -# Machinegun client +# Machinery -Client for [machinegun](https://github.com/rbkmoney/machinegun). +Client/hander lib for [machinegun](https://github.com/rbkmoney/machinegun). diff --git a/docker-compose.sh b/docker-compose.sh new file mode 100755 index 0000000..205db81 --- /dev/null +++ b/docker-compose.sh @@ -0,0 +1,28 @@ +#!/bin/bash +cat < ["src"], + filter => "*.erl", + ignore => ["_thrift.erl$"], + rules => [ + {elvis_style, line_length, #{limit => 120, skip_comments => false}}, + {elvis_style, no_tabs}, + {elvis_style, no_trailing_whitespace}, + {elvis_style, macro_module_names}, + {elvis_style, operator_spaces, #{rules => [{right, ","}, {right, "++"}, {left, "++"}]}}, + {elvis_style, nesting_level, #{level => 3}}, + {elvis_style, god_modules, #{limit => 30, ignore => [hg_client_party]}}, + {elvis_style, no_if_expression}, + {elvis_style, invalid_dynamic_call, #{ignore => [elvis]}}, + {elvis_style, used_ignored_variable}, + {elvis_style, no_behavior_info}, + {elvis_style, module_naming_convention, #{regex => "^([a-z][a-z0-9]*_?)*(_SUITE)?$"}}, + {elvis_style, function_naming_convention, #{regex => "^([a-z][a-z0-9]*_?)*$"}}, + {elvis_style, state_record_and_type}, + {elvis_style, no_spec_with_records}, + {elvis_style, dont_repeat_yourself, #{min_complexity => 15}}, + {elvis_style, no_debug_call, #{ignore => [elvis, elvis_utils]}} + ] + }, + #{ + dirs => ["test"], + filter => "*.erl", + rules => [ + {elvis_style, line_length, #{limit => 120, skip_comments => false}}, + {elvis_style, no_tabs}, + {elvis_style, no_trailing_whitespace}, + {elvis_style, macro_module_names}, + {elvis_style, operator_spaces, #{rules => [{right, ","}, {right, "++"}, {left, "++"}]}}, + {elvis_style, nesting_level, #{level => 3}}, + {elvis_style, no_if_expression}, + {elvis_style, used_ignored_variable}, + {elvis_style, no_behavior_info}, + {elvis_style, module_naming_convention, #{regex => "^([a-z][a-z0-9]*_?)*(_SUITE)?$"}}, + {elvis_style, function_naming_convention, #{regex => "^([a-z][a-z0-9]*_?)*$"}}, + {elvis_style, no_spec_with_records}, + {elvis_style, dont_repeat_yourself, #{min_complexity => 30}} + ] + }, + #{ + dirs => ["."], + filter => "Makefile", + ruleset => makefiles + }, + #{ + dirs => ["."], + filter => "elvis.config", + ruleset => elvis_config + }, + #{ + dirs => ["."], + filter => "rebar.config", + rules => [ + {elvis_style, line_length, #{limit => 120, skip_comments => false}}, + {elvis_style, no_tabs}, + {elvis_style, no_trailing_whitespace} + ] + }, + #{ + dirs => ["src"], + filter => "*.app.src", + rules => [ + {elvis_style, line_length, #{limit => 120, skip_comments => false}}, + {elvis_style, no_tabs}, + {elvis_style, no_trailing_whitespace} + ] + } + ]} + ]} +]. diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..e534dff --- /dev/null +++ b/rebar.config @@ -0,0 +1,71 @@ +% Common project erlang options. +{erl_opts, [ + + % mandatory + debug_info, + warnings_as_errors, + warn_export_all, + warn_missing_spec, + warn_untyped_record, + warn_export_vars, + + % by default + warn_unused_record, + warn_bif_clash, + warn_obsolete_guard, + warn_unused_vars, + warn_shadow_vars, + warn_unused_import, + warn_unused_function, + warn_deprecated_function + + % at will + % bin_opt_info + % no_auto_import + % warn_missing_spec_all + +]}. + +% Common project dependencies. +{deps, [ + {genlib, + {git, "https://github.com/rbkmoney/genlib.git", {branch, "master"}} + }, + {rfc3339, + "0.2.2" + }, + {woody, + {git, "git@github.com:rbkmoney/woody_erlang.git", {branch, "master"}} + }, + {mg_proto, + {git, "git@github.com:rbkmoney/machinegun_proto.git", {branch, "master"}} + } +]}. + +{xref_checks, [ + undefined_function_calls, + undefined_functions, + deprecated_functions_calls, + deprecated_functions +]}. + +{dialyzer, [ + {warnings, [ + % mandatory + unmatched_returns, + error_handling, + race_conditions, + unknown + ]}, + {plt_apps, all_deps} +]}. + +{profiles, [ + {test, [ + {deps, [ + {lager, + "3.6.1" + } + ]} + ]} +]}. diff --git a/rebar.lock b/rebar.lock new file mode 100644 index 0000000..d529bbc --- /dev/null +++ b/rebar.lock @@ -0,0 +1,44 @@ +{"1.1.0", +[{<<"certifi">>,{pkg,<<"certifi">>,<<"0.7.0">>},2}, + {<<"cowboy">>,{pkg,<<"cowboy">>,<<"1.0.4">>},1}, + {<<"cowlib">>,{pkg,<<"cowlib">>,<<"1.0.2">>},2}, + {<<"genlib">>, + {git,"https://github.com/rbkmoney/genlib.git", + {ref,"e9e5aed04a870a064312590e798f89d46ce5585c"}}, + 0}, + {<<"hackney">>,{pkg,<<"hackney">>,<<"1.6.2">>},1}, + {<<"idna">>,{pkg,<<"idna">>,<<"1.2.0">>},2}, + {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, + {<<"mg_proto">>, + {git,"git@github.com:rbkmoney/machinegun_proto.git", + {ref,"5c07c579014f9900357f7a72f9d10a03008b9da1"}}, + 0}, + {<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.0.2">>},2}, + {<<"ranch">>,{pkg,<<"ranch">>,<<"1.5.0">>},2}, + {<<"rfc3339">>,{pkg,<<"rfc3339">>,<<"0.2.2">>},0}, + {<<"snowflake">>, + {git,"https://github.com/rbkmoney/snowflake.git", + {ref,"0a598108f6582affe3b4ae550fc5b9f2062e318a"}}, + 1}, + {<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.1">>},2}, + {<<"thrift">>, + {git,"https://github.com/rbkmoney/thrift_erlang.git", + {ref,"240bbc842f6e9b90d01bd07838778cf48752b510"}}, + 1}, + {<<"woody">>, + {git,"git@github.com:rbkmoney/woody_erlang.git", + {ref,"06ef3d63c0b6777e7cfa4b4f949eb34008291c0e"}}, + 0}]}. +[ +{pkg_hash,[ + {<<"certifi">>, <<"861A57F3808F7EB0C2D1802AFEAAE0FA5DE813B0DF0979153CBAFCD853ABABAF">>}, + {<<"cowboy">>, <<"A324A8DF9F2316C833A470D918AAF73AE894278B8AA6226CE7A9BF699388F878">>}, + {<<"cowlib">>, <<"9D769A1D062C9C3AC753096F868CA121E2730B9A377DE23DEC0F7E08B1DF84EE">>}, + {<<"hackney">>, <<"96A0A5E7E65B7ACAD8031D231965718CC70A9B4131A8B033B7543BBD673B8210">>}, + {<<"idna">>, <<"AC62EE99DA068F43C50DC69ACF700E03A62A348360126260E87F2B54ECED86B2">>}, + {<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>}, + {<<"mimerl">>, <<"993F9B0E084083405ED8252B99460C4F0563E41729AB42D9074FD5E52439BE88">>}, + {<<"ranch">>, <<"F04166F456790FEE2AC1AA05A02745CC75783C2BFB26D39FAF6AEFC9A3D3A58A">>}, + {<<"rfc3339">>, <<"1552DF616ACA368D982E9F085A0E933B6688A3F4938A671798978EC2C0C58730">>}, + {<<"ssl_verify_fun">>, <<"28A4D65B7F59893BC2C7DE786DEC1E1555BD742D336043FE644AE956C3497FBE">>}]} +]. diff --git a/src/machinery.app.src b/src/machinery.app.src new file mode 100644 index 0000000..8930719 --- /dev/null +++ b/src/machinery.app.src @@ -0,0 +1,23 @@ +{application, machinery, [ + {description, + "Machinegun client/handler lib." + }, + {vsn, "1"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + genlib, + cowboy, + woody, + mg_proto + ]}, + {env, []}, + {modules, []}, + {maintainers, [ + "Anton Belyaev ", + "Andrey Mayorov " + ]}, + {licenses, []}, + {links, ["https://github.com/rbkmoney/machinery"]} +]}. diff --git a/src/machinery.erl b/src/machinery.erl new file mode 100644 index 0000000..4e85dd8 --- /dev/null +++ b/src/machinery.erl @@ -0,0 +1,150 @@ +%%% +%%% Machine API abstraction. +%%% Behaviour and API. +%%% + +%% TODO +%% +%% - What if we make `start` idempotent and argsless in the `machinery`. +%% - Storage schema behaviour + +-module(machinery). + +-type namespace() :: atom(). +-type id() :: binary(). +-type args(T) :: T. +-type response(T) :: T. + +-type usec_part() :: 0..999999. +-type timestamp() :: {calendar:datetime(), usec_part()}. + +-type event_id() :: integer(). +-type event_body(T) :: T. +-type event(T) :: {event_id(), timestamp(), event_body(T)}. +-type history(T) :: [event(T)]. +-type aux_state(T) :: T. + +-type event_cursor() :: undefined | event_id(). +-type limit() :: undefined | pos_integer(). +-type direction() :: forward | backward. +-type scope() :: {event_cursor(), limit(), direction()}. +-type signal(T) :: {init, args(T)} | timeout. +-type machine(T) :: #{ + namespace := namespace(), + id := id(), + history := history(T), + aux_state := aux_state(_) + %% TODO + %% history_range ? + %% timer ? +}. + +-export_type([namespace/0]). +-export_type([id/0]). +-export_type([scope/0]). +-export_type([args/1]). +-export_type([response/1]). +-export_type([machine/1]). + +-type modopts(O) :: module() | {module(), O}. + +%% handler +-type handler_opts(T) :: T. %% provided to logic handler from machinery backend +-type handler_args(T) :: args(T). %% provided to logic handler from handler server spec +-type logic_handler(A) :: modopts(handler_args(A)). + +%% client +-type backend_opts(T) :: T. %% opts for client backend +-type backend(O) :: modopts(backend_opts(O)). %% client backend + +-export_type([modopts/1]). +-export_type([handler_opts/1]). +-export_type([handler_args/1]). +-export_type([logic_handler/1]). +-export_type([backend_opts/1]). +-export_type([backend/1]). + +%% API +-export([start/4]). +-export([call/4]). +-export([call/5]). +-export([get/3]). +-export([get/4]). + +%% Internal API +-export([dispatch_signal/4]). +-export([dispatch_call/4]). + +%% Behaviour definition +-type seconds() :: pos_integer(). +-type timer() :: + {timeout, seconds()} | + {deadline, timestamp()}. + +-type result(T) :: #{ + events => [event_body(T)], + action => action() | [action()], + aux_state => aux_state(_) +}. + +-type action() :: + {set_timer, timer()} | + unset_timer | + continue | + remove. + +-export_type([result/1]). +-export_type([action/0]). + +-callback init(args(_), machine(T), handler_args(_), handler_opts(_)) -> + result(T). + +-callback process_timeout(machine(T), handler_args(_), handler_opts(_)) -> + result(T). + +-callback process_call(args(_), machine(T), handler_args(_), handler_opts(_)) -> + {response(_), result(T)}. + +%% API + +-spec start(namespace(), id(), args(_), backend(_)) -> + ok | {error, exists}. +start(NS, ID, Args, Backend) -> + {Module, Opts} = machinery_utils:get_backend(Backend), + machinery_backend:start(Module, NS, ID, Args, Opts). + +-spec call(namespace(), id(), args(_), backend(_)) -> + {ok, response(_)} | {error, notfound}. +call(NS, ID, Args, Backend) -> + call(NS, ID, {undefined, undefined, forward}, Args, Backend). + +-spec call(namespace(), id(), scope(), args(_), backend(_)) -> + {ok, response(_)} | {error, notfound}. +call(NS, ID, Scope, Args, Backend) -> + {Module, Opts} = machinery_utils:get_backend(Backend), + machinery_backend:call(Module, NS, ID, Scope, Args, Opts). + +-spec get(namespace(), id(), backend(_)) -> + {ok, machine(_)} | {error, notfound}. +get(NS, ID, Backend) -> + get(NS, ID, {undefined, undefined, forward}, Backend). + +-spec get(namespace(), id(), scope(), backend(_)) -> + {ok, machine(_)} | {error, notfound}. +get(NS, ID, Scope, Backend) -> + {Module, Opts} = machinery_utils:get_backend(Backend), + machinery_backend:get(Module, NS, ID, Scope, Opts). + +%% Internal API + +-spec dispatch_signal(signal(_), machine(T), logic_handler(_), handler_opts(_)) -> + result(T). +dispatch_signal({init, Args}, Machine, {Handler, HandlerArgs}, Opts) -> + Handler:init(Args, Machine, HandlerArgs, Opts); +dispatch_signal(timeout, Machine, {Handler, HandlerArgs}, Opts) -> + Handler:process_timeout(Machine, HandlerArgs, Opts). + +-spec dispatch_call(args(_), machine(T), logic_handler(_), handler_opts(_)) -> + {response(_), result(T)}. +dispatch_call(Args, Machine, {Handler, HandlerArgs}, Opts) -> + Handler:process_call(Args, Machine, HandlerArgs, Opts). diff --git a/src/machinery_backend.erl b/src/machinery_backend.erl new file mode 100644 index 0000000..b7e5ff9 --- /dev/null +++ b/src/machinery_backend.erl @@ -0,0 +1,45 @@ +%%% +%%% Machinery backend behaviour. + +-module(machinery_backend). + +%% API +-export([start/5]). +-export([call/6]). +-export([get/5]). + +%% Behaviour definition + +-type namespace() :: machinery:namespace(). +-type id() :: machinery:id(). +-type scope() :: machinery:scope(). +-type args() :: machinery:args(_). +-type backend_opts() :: machinery:backend_opts(_). + +-callback start(namespace(), id(), args(), backend_opts()) -> + ok | {error, exists}. + +-callback call(namespace(), id(), scope(), args(), backend_opts()) -> + {ok, machinery:response(_)} | {error, notfound}. + +-callback get(namespace(), id(), scope(), backend_opts()) -> + {ok, machinery:machine(_)} | {error, notfound}. + +%% API + +-type backend() :: module(). + +-spec start(backend(), namespace(), id(), args(), backend_opts()) -> + ok | {error, exists}. +start(Backend, Namespace, Id, Args, Opts) -> + Backend:start(Namespace, Id, Args, Opts). + +-spec call(backend(), namespace(), id(), scope(), args(), backend_opts()) -> + {ok, machinery:response(_)} | {error, notfound}. +call(Backend, Namespace, Id, Scope, Args, Opts) -> + Backend:call(Namespace, Id, Scope, Args, Opts). + +-spec get(backend(), namespace(), id(), scope(), backend_opts()) -> + {ok, machinery:machine(_)} | {error, notfound}. +get(Backend, Namespace, Id, Scope, Opts) -> + Backend:get(Namespace, Id, Scope, Opts). diff --git a/src/machinery_machine_unique_tag.erl b/src/machinery_machine_unique_tag.erl new file mode 100644 index 0000000..2108b5e --- /dev/null +++ b/src/machinery_machine_unique_tag.erl @@ -0,0 +1,120 @@ +%%% +%%% Machine behaving as a unique tag. + +-module(machinery_machine_unique_tag). + +%% API + +-type id() :: machinery:id(). +-type namespace() :: machinery:namespace(). +-type tag() :: binary(). + +-export_type([tag/0]). + +-export([tag/4]). +-export([untag/4]). +-export([get/3]). + +%% Machine behaviour + +-behaviour(machinery). + +-export([init/4]). +-export([process_timeout/3]). +-export([process_call/4]). + +%% + +-spec tag(namespace(), tag(), id(), machinery:backend(_)) -> + ok | {error, {set, id()}}. +tag(NS, Tag, ID, Backend) -> + case machinery:start(construct_namespace(NS), Tag, {tag, ID}, Backend) of + ok -> + ok; + {error, exists} -> + case get(NS, Tag, Backend) of + {ok, ID} -> + ok; + {ok, IDWas} -> + {error, {set, IDWas}}; + {error, unset} -> + tag(NS, Tag, ID, Backend) + end + end. + +-spec untag(namespace(), tag(), id(), machinery:backend(_)) -> + ok | {error, {set, id()}}. +untag(NS, Tag, ID, Backend) -> + case machinery:call(construct_namespace(NS), Tag, {untag, ID}, Backend) of + {ok, ok} -> + ok; + {ok, {error, IDWas}} -> + {error, {set, IDWas}}; + {error, notfound} -> + ok + end. + +-spec get(namespace(), tag(), machinery:backend(_)) -> + {ok, id()} | {error, unset}. +get(NS, Tag, Backend) -> + case machinery:get(construct_namespace(NS), Tag, Backend) of + {ok, Machine} -> + ID = get_machine_st(Machine), + {ok, ID}; + {error, notfound} -> + {error, unset} + end. + +construct_namespace(NS) -> + list_to_atom(atom_to_list(NS) ++ "/tags"). + +%% + +-type machine() :: machinery:machine(ev()). +-type handler_opts() :: machinery:handler_opts(_). +-type result() :: machinery:result(ev()). +-type response() :: machinery:response( + ok | {error, id()} +). + +-type ev() :: + {tag_set, id()} | + tag_unset. + +-spec init({tag, id()}, machine(), undefined, handler_opts()) -> + result(). +init({tag, ID}, _Machine, _, _Opts) -> + #{ + events => [ID] + }. + +-spec process_timeout(machine(), undefined, handler_opts()) -> + result(). +process_timeout(#{}, _, _Opts) -> + #{}. + +-spec process_call({untag, id()}, machine(), undefined, handler_opts()) -> + {response(), result()}. +process_call({untag, ID}, Machine, _, _Opts) -> + case get_machine_st(Machine) of + ID -> + {ok, #{ + action => [remove] + }}; + IDWas -> + {{error, IDWas}, #{}} + end. + +%% + +get_machine_st(#{history := History}) -> + collapse_history(History). + +collapse_history(History) -> + lists:foldl(fun apply_event/2, undefined, History). + +apply_event({_ID, _CreatedAt, Body}, St) -> + apply_event_body(Body, St). + +apply_event_body(ID, undefined) -> + ID. diff --git a/src/machinery_machine_unique_tag_mg_example.erl b/src/machinery_machine_unique_tag_mg_example.erl new file mode 100644 index 0000000..58b3571 --- /dev/null +++ b/src/machinery_machine_unique_tag_mg_example.erl @@ -0,0 +1,96 @@ +-module(machinery_machine_unique_tag_mg_example). + +%% API + +-export([child_spec/1]). +-export([tag/4]). +-export([untag/4]). +-export([get/3]). + +-type id() :: machinery:id(). +-type namespace() :: machinery:namespace(). +-type tag() :: machinery_machine_unique_tag:tag(). + +-type opts() :: #{ + woody_ctx := woody_context:ctx() +}. +-export_type([opts/0]). + +-define(LOGIC_HANDLER, machinery_machine_unique_tag). + +%% API +-spec child_spec(_Id) -> + supervisor:child_spec(). +child_spec(Id) -> + machinery_utils:woody_child_spec(Id, get_routes(), config(server_opts)). + +-spec tag(namespace(), tag(), id(), opts()) -> + ok | {error, {set, id()}}. +tag(NS, Tag, ID, Opts) -> + machinery_machine_unique_tag:tag(NS, Tag, ID, get_backend(Opts)). + +-spec untag(namespace(), tag(), id(), opts()) -> + ok | {error, {set, id()}}. +untag(NS, Tag, ID, Opts) -> + machinery_machine_unique_tag:untag(NS, Tag, ID, get_backend(Opts)). + +-spec get(namespace(), tag(), opts()) -> + {ok, id()} | {error, unset}. +get(NS, Tag, Opts) -> + machinery_machine_unique_tag:get(NS, Tag, get_backend(Opts)). + +%%% Internal functions + +%% handler +-spec get_routes() -> + machinery_utils:woody_routes(). +get_routes() -> + machinery_mg_backend:get_routes( + [get_handler(config(handler_path))], + maps:with([event_handler, handler_limits], config()) + ). + +get_handler(Path) -> + {?LOGIC_HANDLER, get_handler_config(Path)}. + +get_handler_config(Path) -> + #{path => Path, backend_config => get_backend_config()}. + +get_backend_config() -> + maps:with([schema], config()). + +%% client +-spec get_backend(opts()) -> + machinery_mg_backend:backend(). +get_backend(Opts) -> + machinery_mg_backend:new(get_backend_opts(Opts)). + +-spec get_backend_opts(opts()) -> + machinery_mg_backend:backend_opts(). +get_backend_opts(#{woody_ctx := WoodyCtx}) -> + #{ + woody_ctx => WoodyCtx, + client => get_woody_client(), + schema => config(schema) + }. + +-spec get_woody_client() -> + machinery_mg_client:woody_client(). +get_woody_client() -> + maps:with([url, event_handler], config()). + +%% config +config(Key) -> + maps:get(Key, config()). + +config() -> + #{ + schema => machinery_mg_schema_generic, + url => <<"http://machinegun:8022/v1/automaton">>, + handler_path => <<"/v1/stateproc">>, + event_handler => woody_event_handler_default, + server_opts => #{ + ip => {0, 0, 0, 0}, + port => 8022 + } + }. diff --git a/src/machinery_mg_backend.erl b/src/machinery_mg_backend.erl new file mode 100644 index 0000000..9faa95c --- /dev/null +++ b/src/machinery_mg_backend.erl @@ -0,0 +1,497 @@ +%%% +%%% Machinery machinegun backend + +%% TODO +%% +%% - There's marshalling scattered around which is common enough for _any_ thrift interface. + +-module(machinery_mg_backend). +-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). + +-type namespace() :: machinery:namespace(). +-type id() :: machinery:id(). +-type scope() :: machinery:scope(). +-type args(T) :: machinery:args(T). +-type response(T) :: machinery:response(T). +-type machine(T) :: machinery:machine(T). +-type logic_handler(T) :: machinery:logic_handler(T). + +-define(BACKEND_CORE_OPTS, + schema := machinery_mg_schema:schema() +). + +%% Server types +-type backend_config() :: #{ + ?BACKEND_CORE_OPTS +}. + +-type handler_config() :: #{ + path := woody:path(), + backend_config := backend_config() +}. + + +-type handler(A) :: {logic_handler(A), handler_config()}. %% handler server spec + +-type handler_opts() :: machinery:handler_opts(#{ + woody_ctx := woody_context:ctx() +}). + +-type backend_handler_opts() :: #{ + handler := logic_handler(_), + ?BACKEND_CORE_OPTS +}. + +%% Client types +-type backend_opts() :: machinery:backend_opts(#{ + woody_ctx := woody_context:ctx(), + client := machinery_mg_client:woody_client(), + ?BACKEND_CORE_OPTS +}). + +-type backend() :: {?MODULE, backend_opts()}. + +-export_type([backend_config/0]). +-export_type([handler_config/0]). +-export_type([logic_handler/1]). +-export_type([handler/1]). +-export_type([handler_opts/0]). +-export_type([backend_opts/0]). +-export_type([backend/0]). + +%% API +-export([get_routes/2]). +-export([get_handler/2]). +-export([new/1]). + +%% Machinery backend +-behaviour(machinery_backend). + +-export([start/4]). +-export([call/5]). +-export([get/4]). + +%% Woody handler +-behaviour(woody_server_thrift_handler). + +-export([handle_function/4]). + +%% API + +-spec get_routes([handler(_)], machinery_utils:route_opts()) -> + machinery_utils:woody_routes(). +get_routes(Handlers, Opts) -> + machinery_utils:get_woody_routes(Handlers, fun get_handler/2, Opts). + +-spec get_handler(handler(_), machinery_utils:route_opts()) -> + machinery_utils:woody_handler(). +get_handler({LogicHandler, #{path := Path, backend_config := Config}}, _) -> + {Path, { + {mg_proto_state_processing_thrift, 'Processor'}, + {?MODULE, get_backend_handler_opts(LogicHandler, Config)} + }}. + +-spec new(backend_opts()) -> + backend(). +new(Opts = #{woody_ctx := _, client := _, schema := _}) -> + {?MODULE, Opts}. + +%% Machinery backend + +-spec start(namespace(), id(), args(_), backend_opts()) -> + ok | {error, exists}. +start(NS, ID, Args, Opts) -> + Client = get_client(Opts), + Schema = get_schema(Opts), + InitArgs = marshal({schema, Schema, {args, init}}, Args), + case machinery_mg_client:start(marshal(namespace, NS), marshal(id, ID), InitArgs, Client) of + {ok, ok} -> + ok; + {exception, #mg_stateproc_MachineAlreadyExists{}} -> + {error, exists}; + {exception, #mg_stateproc_NamespaceNotFound{}} -> + error({namespace_not_found, NS}); + {exception, #mg_stateproc_MachineFailed{}} -> + error({failed, NS, ID}) + end. + +-spec call(namespace(), id(), scope(), args(_), backend_opts()) -> + {ok, response(_)} | {error, notfound}. +call(NS, ID, Scope, Args, Opts) -> + Client = get_client(Opts), + Schema = get_schema(Opts), + Descriptor = {NS, ID, Scope}, + CallArgs = marshal({schema, Schema, {args, call}}, Args), + case machinery_mg_client:call(marshal(descriptor, Descriptor), CallArgs, Client) of + {ok, Response} -> + {ok, unmarshal({schema, Schema, response}, Response)}; + {exception, #mg_stateproc_MachineNotFound{}} -> + {error, notfound}; + {exception, #mg_stateproc_NamespaceNotFound{}} -> + error({namespace_not_found, NS}); + {exception, #mg_stateproc_MachineFailed{}} -> + error({failed, NS, ID}) + end. + +-spec get(namespace(), id(), scope(), backend_opts()) -> + {ok, machine(_)} | {error, notfound}. +get(NS, ID, Scope, Opts) -> + Client = get_client(Opts), + Schema = get_schema(Opts), + Descriptor = {NS, ID, Scope}, + case machinery_mg_client:get_machine(marshal(descriptor, Descriptor), Client) of + {ok, Machine} -> + {ok, unmarshal({machine, Schema}, Machine)}; + {exception, #mg_stateproc_MachineNotFound{}} -> + {error, notfound}; + {exception, #mg_stateproc_NamespaceNotFound{}} -> + error({namespace_not_found, NS}) + end. + +%% Woody handler + +-spec handle_function + ('ProcessSignal', woody:args(), woody_context:ctx(), backend_handler_opts()) -> + {ok, mg_proto_state_processing_thrift:'SignalResult'()}; + ('ProcessCall', woody:args(), woody_context:ctx(), backend_handler_opts()) -> + {ok, mg_proto_state_processing_thrift:'CallResult'()}. +handle_function( + 'ProcessSignal', + [#mg_stateproc_SignalArgs{signal = Signal, machine = Machine}], + WoodyCtx, + #{handler := Handler, schema := Schema} +) -> + Machine1 = unmarshal({machine, Schema}, Machine), + Result = dispatch_signal( + unmarshal({signal, Schema}, Signal), + Machine1, + machinery_utils:get_handler(Handler), + get_handler_opts(WoodyCtx) + ), + {ok, marshal({signal_result, Schema}, handle_result(Result, Machine1))}; +handle_function( + 'ProcessCall', + [#mg_stateproc_CallArgs{arg = Args, machine = Machine}], + WoodyCtx, + #{handler := Handler, schema := Schema} +) -> + Machine1 = unmarshal({machine, Schema}, Machine), + {Response, Result} = dispatch_call( + unmarshal({schema, Schema, {args, call}}, Args), + Machine1, + machinery_utils:get_handler(Handler), + get_handler_opts(WoodyCtx) + ), + {ok, marshal({call_result, Schema}, {Response, handle_result(Result, Machine1)})}. + +%% Utils + +-spec get_backend_handler_opts(logic_handler(_), backend_config()) -> + backend_handler_opts(). +get_backend_handler_opts(Handler, Config) -> + Config#{handler => Handler}. + +get_schema(#{schema := Schema}) -> + Schema. + +get_client(#{client := Client, woody_ctx := WoodyCtx}) -> + machinery_mg_client:new(Client, WoodyCtx). + +get_handler_opts(WoodyCtx) -> + #{woody_ctx => WoodyCtx}. + +dispatch_signal(Signal, Machine, Handler, Opts) -> + machinery:dispatch_signal(Signal, Machine, Handler, Opts). + +dispatch_call(Args, Machine, Handler, Opts) -> + machinery:dispatch_call(Args, Machine, Handler, Opts). + +handle_result(Result, OrigMachine) -> + Result#{aux_state => set_aux_state( + maps:get(aux_state, Result, undefined), + maps:get(aux_state, OrigMachine, machinery_msgpack:nil()) + )}. + +set_aux_state(undefined, ReceivedState) -> + ReceivedState; +set_aux_state(NewState, _) -> + NewState. + +%% Marshalling + +%% No marshalling for the machine required by the protocol so far. +%% +%% marshal( +%% {machine, Schema}, +%% #{ +%% ns := NS, +%% id := ID, +%% history := History +%% } +%% ) -> +%% #mg_stateproc_Machine{ +%% 'ns' = marshal(namespace, NS), +%% 'id' = marshal(id, ID), +%% 'history' = marshal({history, Schema}, History), +%% % TODO +%% % There are required fields left +%% 'history_range' = marshal(range, {undefined, undefined, forward}) +%% }; + +marshal(descriptor, {NS, ID, Scope}) -> + #mg_stateproc_MachineDescriptor{ + 'ns' = marshal(namespace, NS), + 'ref' = {'id', marshal(id, ID)}, + 'range' = marshal(range, Scope) + }; + +marshal(range, {Cursor, Limit, Direction}) -> + #mg_stateproc_HistoryRange{ + 'after' = marshal({maybe, event_id}, Cursor), + 'limit' = marshal(limit, Limit), + 'direction' = marshal(direction, Direction) + }; + +marshal({history, Schema}, V) -> + marshal({list, {event, Schema}}, V); +marshal({event, Schema}, {EventID, CreatedAt, Body}) -> + #mg_stateproc_Event{ + 'id' = marshal(event_id, EventID), + 'created_at' = marshal(timestamp, CreatedAt), + 'event_payload' = marshal({schema, Schema, event}, Body) + }; + +marshal({signal, Schema}, {init, Args}) -> + {init, #mg_stateproc_InitSignal{arg = marshal({schema, Schema, {args, init}}, Args)}}; + +marshal({signal, _Schema}, timeout) -> + {timeout, #mg_stateproc_TimeoutSignal{}}; + +marshal({signal, Schema}, {repair, Args}) -> + {repair, #mg_stateproc_RepairSignal{arg = marshal({maybe, {schema, Schema, {args, repair}}}, Args)}}; + +marshal({signal_result, Schema}, #{} = V) -> + #mg_stateproc_SignalResult{ + change = marshal({state_change, Schema}, V), + action = marshal(action, maps:get(action, V, [])) + }; + +marshal({call_result, Schema}, {Response, #{} = V}) -> + #mg_stateproc_CallResult{ + response = marshal({schema, Schema, response}, Response), + change = marshal({state_change, Schema}, V), + action = marshal(action, maps:get(action, V, [])) + }; + +marshal({state_change, Schema}, #{} = V) -> + #mg_stateproc_MachineStateChange{ + events = marshal({list, {schema, Schema, event}}, maps:get(events, V, [])), + % TODO + % Provide this to logic handlers as well + aux_state = marshal({schema, Schema, aux_state}, maps:get(aux_state, V, undefined)) + }; + +marshal(action, V) when is_list(V) -> + lists:foldl(fun apply_action/2, #mg_stateproc_ComplexAction{}, V); + +marshal(action, V) -> + marshal(action, [V]); + +marshal(timer, {timeout, V}) when V > 0 -> + {timeout, marshal(integer, V)}; + +marshal(deadline, {deadline, V}) -> + {deadline, marshal(timestamp, V)}; + +marshal(namespace, V) -> + marshal(atom, V); + +marshal(id, V) -> + marshal(string, V); + +marshal(event_id, V) -> + marshal(integer, V); + +marshal(limit, V) -> + marshal({maybe, integer}, V); + +marshal(direction, V) -> + marshal({enum, [forward, backward]}, V); + +marshal({schema, Schema, T}, V) -> + % TODO + % Marshal properly + machinery_mg_schema:marshal(Schema, T, V); + +marshal(timestamp, {{Date, Time}, USec} = V) -> + case rfc3339:format({Date, Time, USec, 0}) of + {ok, R} when is_binary(R) -> + R; + Error -> + error(badarg, {timestamp, V, Error}) + end; + +marshal({list, T}, V) when is_list(V) -> + [marshal(T, E) || E <- V]; + +marshal({maybe, _}, undefined) -> + undefined; + +marshal({maybe, T}, V) -> + marshal(T, V); + +marshal({enum, Choices = [_ | _]} = T, V) when is_atom(V) -> + _ = lists:member(V, Choices) orelse error(badarg, {T, V}), + V; + +marshal(atom, V) when is_atom(V) -> + atom_to_binary(V, utf8); + +marshal(string, V) when is_binary(V) -> + V; + +marshal(integer, V) when is_integer(V) -> + V; + +marshal(T, V) -> + error(badarg, {T, V}). + +apply_action({set_timer, V}, CA) -> + CA#mg_stateproc_ComplexAction{ + timer = {set_timer, #mg_stateproc_SetTimerAction{timer = marshal(timer, V)}} + }; + +apply_action(unset_timer, CA) -> + CA#mg_stateproc_ComplexAction{ + timer = {unset_timer, #mg_stateproc_UnsetTimerAction{}} + }; + +apply_action(continue, CA) -> + CA#mg_stateproc_ComplexAction{ + timer = {set_timer, #mg_stateproc_SetTimerAction{timer = {timeout, 0}}} + }; + +apply_action(remove, CA) -> + CA#mg_stateproc_ComplexAction{ + remove = #mg_stateproc_RemoveAction{} + }. + +%% +%% No unmarshalling for the decriptor required by the protocol so far. +%% +%% unmarshal( +%% descriptor, +%% #mg_stateproc_MachineDescriptor{ +%% ns = NS, +%% ref = {'id', ID}, +%% range = Range +%% } +%% ) -> +%% {unmarshal(namespace, NS), unmarshal(id, ID), unmarshal(range, Range)}; + +unmarshal( + range, + #mg_stateproc_HistoryRange{ + 'after' = Cursor, + 'limit' = Limit, + 'direction' = Direction + } +) -> + {unmarshal({maybe, event_id}, Cursor), unmarshal(limit, Limit), unmarshal(direction, Direction)}; + +unmarshal( + {machine, Schema}, + #mg_stateproc_Machine{ + 'ns' = NS, + 'id' = ID, + 'history' = History, + 'history_range' = Range, + 'aux_state' = AuxState + } +) -> + #{ + ns => unmarshal(namespace, NS), + id => unmarshal(id, ID), + history => unmarshal({history, Schema}, History), + range => unmarshal(range, Range), + aux_state => unmarshal({maybe, {schema, Schema, aux_state}}, AuxState) + }; + +unmarshal({history, Schema}, V) -> + unmarshal({list, {event, Schema}}, V); + +unmarshal( + {event, Schema}, + #mg_stateproc_Event{ + 'id' = EventID, + 'created_at' = CreatedAt, + 'event_payload' = Payload + } +) -> + {unmarshal(event_id, EventID), unmarshal(timestamp, CreatedAt), unmarshal({schema, Schema, event}, Payload)}; + +unmarshal({signal, Schema}, {init, #mg_stateproc_InitSignal{arg = Args}}) -> + {init, unmarshal({schema, Schema, {args, init}}, Args)}; + +unmarshal({signal, _Schema}, {timeout, #mg_stateproc_TimeoutSignal{}}) -> + timeout; + +unmarshal({signal, Schema}, {repair, #mg_stateproc_RepairSignal{arg = Args}}) -> + {repair, unmarshal({maybe, {schema, Schema, {args, repair}}}, Args)}; + +unmarshal(namespace, V) -> + unmarshal(atom, V); + +unmarshal(id, V) -> + unmarshal(string, V); + +unmarshal(event_id, V) -> + unmarshal(integer, V); + +unmarshal(limit, V) -> + unmarshal({maybe, integer}, V); + +unmarshal(direction, V) -> + unmarshal({enum, [forward, backward]}, V); + +unmarshal({schema, Schema, T}, V) -> + machinery_mg_schema:unmarshal(Schema, T, V); + +unmarshal(timestamp, V) when is_binary(V) -> + case rfc3339:parse(V) of + {ok, {Date, Time, USec, TZOffset}} when TZOffset == undefined orelse TZOffset == 0 -> + {{Date, Time}, USec}; + {ok, _} -> + error(badarg, {timestamp, V, badoffset}); + {error, Reason} -> + error(badarg, {timestamp, V, Reason}) + end; + +unmarshal({list, T}, V) when is_list(V) -> + [unmarshal(T, E) || E <- V]; + +unmarshal({maybe, _}, undefined) -> + undefined; + +unmarshal({maybe, T}, V) -> + unmarshal(T, V); + +unmarshal({enum, Choices = [_ | _]} = T, V) when is_atom(V) -> + case lists:member(V, Choices) of + true -> + V; + false -> + error(badarg, {T, V}) + end; + +unmarshal(atom, V) when is_binary(V) -> + binary_to_existing_atom(V, utf8); + +unmarshal(string, V) when is_binary(V) -> + V; + +unmarshal(integer, V) when is_integer(V) -> + V; + +unmarshal(T, V) -> + error(badarg, {T, V}). diff --git a/src/machinery_mg_client.erl b/src/machinery_mg_client.erl new file mode 100644 index 0000000..a94148f --- /dev/null +++ b/src/machinery_mg_client.erl @@ -0,0 +1,69 @@ +%%% +%%% Simplistic machinegun client. + +-module(machinery_mg_client). +-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). + +%% API + +-export([new/2]). + +-export([start/4]). +-export([call/3]). +-export([get_machine/2]). + +-type woody_client() :: #{ + url := woody:url(), + event_handler := woody:ev_handler(), + transport_opts => woody_client_thrift_http_transport:options() +}. + +-opaque client() :: {woody_client(), woody_context:ctx()}. + +-export_type([woody_client/0]). +-export_type([client/0]). + +%% + +-spec new(woody_client(), woody_context:ctx()) -> + client(). +new(WoodyClient = #{url := _, event_handler := _}, WoodyCtx) -> + {WoodyClient, WoodyCtx}. + +%% + +-type namespace() :: mg_proto_base_thrift:'Namespace'(). +-type id() :: mg_proto_base_thrift:'ID'(). +-type args() :: mg_proto_state_processing_thrift:'Args'(). +-type descriptor() :: mg_proto_state_processing_thrift:'MachineDescriptor'(). +-type call_response() :: mg_proto_state_processing_thrift:'CallResponse'(). +-type machine() :: mg_proto_state_processing_thrift:'Machine'(). +-type namespace_not_found() :: mg_proto_state_processing_thrift:'NamespaceNotFound'(). +-type machine_not_found() :: mg_proto_state_processing_thrift:'MachineNotFound'(). +-type machine_already_exists() :: mg_proto_state_processing_thrift:'MachineAlreadyExists'(). +-type machine_failed() :: mg_proto_state_processing_thrift:'MachineFailed'(). + +-spec start(namespace(), id(), args(), client()) -> + {ok, ok} | + {exception, namespace_not_found() | machine_already_exists() | machine_failed()}. +start(NS, ID, Args, Client) -> + issue_call('Start', [NS, ID, Args], Client). + +-spec call(descriptor(), args(), client()) -> + {ok, call_response()} | + {exception, namespace_not_found() | machine_not_found() | machine_failed()}. +call(Descriptor, Args, Client) -> + issue_call('Call', [Descriptor, Args], Client). + +-spec get_machine(descriptor(), client()) -> + {ok, machine()} | + {exception, namespace_not_found() | machine_not_found()}. +get_machine(Descriptor, Client) -> + issue_call('GetMachine', [Descriptor], Client). + +%% Internal functions + +issue_call(Function, Args, {WoodyClient, WoodyCtx}) -> + Service = {mg_proto_state_processing_thrift, 'Automaton'}, + Request = {Service, Function, Args}, + woody_client:call(Request, WoodyClient, WoodyCtx). diff --git a/src/machinery_mg_schema.erl b/src/machinery_mg_schema.erl new file mode 100644 index 0000000..c6d2fae --- /dev/null +++ b/src/machinery_mg_schema.erl @@ -0,0 +1,46 @@ +%%% +%%% Storage schema behaviour. + +-module(machinery_mg_schema). + + +%% Behaviour definition + +-type schema() :: module(). + +-type t() :: + {args, + init | + repair | + call + } | + response | + event | + aux_state. +-type v(T) :: + T. + +-callback marshal(t(), v(_)) -> + machinery_msgpack:t(). + +-callback unmarshal(t(), machinery_msgpack:t()) -> + v(_). + +-export_type([schema/0]). +-export_type([t/0]). +-export_type([v/1]). + +%% API + +-export([marshal/3]). +-export([unmarshal/3]). + +-spec marshal(schema(), t(), v(_)) -> + machinery_msgpack:t(). +marshal(Schema, T, V) -> + Schema:marshal(T, V). + +-spec unmarshal(schema(), t(), machinery_msgpack:t()) -> + v(_). +unmarshal(Schema, T, V) -> + Schema:unmarshal(T, V). diff --git a/src/machinery_mg_schema_generic.erl b/src/machinery_mg_schema_generic.erl new file mode 100644 index 0000000..0d16174 --- /dev/null +++ b/src/machinery_mg_schema_generic.erl @@ -0,0 +1,97 @@ +%%% +%%% Storage schema adapted to store arbitrary persistent Erlang terms. +%%% +%%% Excluding, as expected: pids, refs, ports. + +-module(machinery_mg_schema_generic). + +%% API +-export([marshal/1]). +-export([unmarshal/1]). + +%% Storage schema behaviour +-behaviour(machinery_mg_schema). + +-export([marshal/2]). +-export([unmarshal/2]). + +-import(machinery_msgpack, [ + nil/0, + wrap/1, + unwrap/1 +]). + +%% + +-type t() :: machinery_mg_schema:t(). +-type v(T) :: machinery_mg_schema:v(T). + +-spec marshal(t(), v(eterm())) -> + machinery_msgpack:t(). +marshal(_T, V) -> + marshal(V). + +-spec unmarshal(t(), machinery_msgpack:t()) -> + v(eterm()). +unmarshal(_T, V) -> + unmarshal(V). + +%% API + +-type eterm() :: + atom() | + number() | + tuple() | + binary() | + list() | + map(). + +-spec marshal(eterm()) -> + machinery_msgpack:t(). +marshal(undefined) -> + nil(); +marshal(V) when is_boolean(V) -> + wrap(V); +marshal(V) when is_atom(V) -> + wrap(atom_to_binary(V, utf8)); +marshal(V) when is_number(V) -> + wrap(V); +marshal(V) when is_binary(V) -> + wrap({binary, V}); +marshal([]) -> + wrap([]); +marshal(V) when is_list(V) -> + wrap([marshal(lst) | lists:map(fun marshal/1, V)]); +marshal(V) when is_tuple(V) -> + wrap([marshal(tup) | lists:map(fun marshal/1, tuple_to_list(V))]); +marshal(V) when is_map(V) -> + wrap([marshal(map), wrap(genlib_map:truemap(fun (Ke, Ve) -> {marshal(Ke), marshal(Ve)} end, V))]); +marshal(V) -> + error({badarg, V}). + +-spec unmarshal(machinery_msgpack:t()) -> + eterm(). +unmarshal(M) -> + unmarshal_v(unwrap(M)). + +unmarshal_v(nil) -> + undefined; +unmarshal_v(V) when is_boolean(V) -> + V; +unmarshal_v(V) when is_binary(V) -> + binary_to_existing_atom(V, utf8); +unmarshal_v(V) when is_number(V) -> + V; +unmarshal_v({binary, V}) -> + V; +unmarshal_v([]) -> + []; +unmarshal_v([Ty | Vs]) -> + unmarshal_v(unmarshal(Ty), Vs). + +unmarshal_v(lst, Vs) -> + lists:map(fun unmarshal/1, Vs); +unmarshal_v(tup, Es) -> + list_to_tuple(unmarshal_v(lst, Es)); +unmarshal_v(map, [V]) -> + genlib_map:truemap(fun (Ke, Ve) -> {unmarshal(Ke), unmarshal(Ve)} end, unwrap(V)). diff --git a/src/machinery_msgpack.erl b/src/machinery_msgpack.erl new file mode 100644 index 0000000..5025949 --- /dev/null +++ b/src/machinery_msgpack.erl @@ -0,0 +1,81 @@ +%%% +%%% Msgpack manipulation employed by machinegun interfaces. + +-module(machinery_msgpack). +-include_lib("mg_proto/include/mg_proto_msgpack_thrift.hrl"). + +%% API + +-export([wrap/1]). +-export([unwrap/1]). + +-export([nil/0]). + +-type t() :: mg_proto_msgpack_thrift:'Value'(). + +-export_type([t/0]). + +%% + +-spec wrap + (nil ) -> t(); + (boolean() ) -> t(); + (integer() ) -> t(); + (float() ) -> t(); + (binary() ) -> t(); %% string + ({binary, binary()}) -> t(); %% binary + ([t()] ) -> t(); + (#{t() => t()} ) -> t(). + +wrap(nil) -> + {nl, #mg_msgpack_Nil{}}; +wrap(V) when is_boolean(V) -> + {b, V}; +wrap(V) when is_integer(V) -> + {i, V}; +wrap(V) when is_float(V) -> + V; +wrap(V) when is_binary(V) -> + % Assuming well-formed UTF-8 bytestring. + {str, V}; +wrap({binary, V}) when is_binary(V) -> + {bin, V}; +wrap(V) when is_list(V) -> + {arr, V}; +wrap(V) when is_map(V) -> + {obj, V}. + +-spec unwrap(t()) -> + nil | + boolean() | + integer() | + float() | + binary() | %% string + {binary, binary()} | %% binary + [t()] | + #{t() => t()} . + +unwrap({nl, #mg_msgpack_Nil{}}) -> + nil; +unwrap({b, V}) when is_boolean(V) -> + V; +unwrap({i, V}) when is_integer(V) -> + V; +unwrap({flt, V}) when is_float(V) -> + V; +unwrap({str, V}) when is_binary(V) -> + % Assuming well-formed UTF-8 bytestring. + V; +unwrap({bin, V}) when is_binary(V) -> + {binary, V}; +unwrap({arr, V}) when is_list(V) -> + V; +unwrap({obj, V}) when is_map(V) -> + V. + +%% + +-spec nil() -> t(). + +nil() -> + wrap(nil). diff --git a/src/machinery_utils.erl b/src/machinery_utils.erl new file mode 100644 index 0000000..78bf0c3 --- /dev/null +++ b/src/machinery_utils.erl @@ -0,0 +1,75 @@ +-module(machinery_utils). + +%% Types + +-type woody_routes() :: [woody_server_thrift_http_handler:route(_)]. +-type woody_handler() :: woody:http_handler(woody:th_handler()). +-type handler(T) :: T. +-type get_woody_handler() :: fun((handler(_), route_opts()) -> woody_handler()). + +-type woody_server_config() :: #{ + ip := inet:ip_address(), + port := inet:port_number(), + net_opts => cowboy_protocol:opts() +}. + +-type route_opts() :: #{ + event_handler := woody:ev_handler(), + handler_limits => woody_server_thrift_http_handler:handler_limits() +}. + +-export_type([woody_server_config/0]). +-export_type([woody_routes/0]). +-export_type([woody_handler/0]). +-export_type([route_opts/0]). +-export_type([handler/1]). +-export_type([get_woody_handler/0]). + +%% API + +-export([get_handler/1]). +-export([get_backend/1]). +-export([expand_modopts/2]). +-export([woody_child_spec/3]). +-export([get_woody_routes/3]). + +%% API + +-spec get_handler(machinery:modopts(Opts)) -> + {module(), Opts}. + +get_handler(Handler) -> + expand_modopts(Handler, undefined). + +-spec get_backend(machinery:backend(Opts)) -> + {module(), Opts}. + +get_backend(Backend) -> + expand_modopts(Backend, #{}). + +-spec expand_modopts(machinery:modopts(Opts), Opts) -> + {module(), Opts}. + +expand_modopts({Mod, Opts}, _) -> + {Mod, Opts}; +expand_modopts(Mod, Opts) -> + {Mod, Opts}. + +-spec woody_child_spec(_Id, woody_routes(), woody_server_config()) -> + supervisor:child_spec(). + +woody_child_spec(Id, Routes, Config) -> + woody_server:child_spec(Id, Config#{ + %% ev handler for `handlers`, which is `[]`, so this is just to satisfy the spec. + event_handler => woody_event_handler_default, + handlers => [], + additional_routes => Routes + }). + +-spec get_woody_routes([handler(_)], get_woody_handler(), route_opts()) -> + woody_routes(). + +get_woody_routes(Handlers, GetHandler, Opts = #{event_handler := _}) -> + woody_server_thrift_http_handler:get_routes(Opts#{ + handlers => [GetHandler(H, Opts) || H <- Handlers] + }). diff --git a/test/ct_helper.erl b/test/ct_helper.erl new file mode 100644 index 0000000..9c26147 --- /dev/null +++ b/test/ct_helper.erl @@ -0,0 +1,160 @@ +-module(ct_helper). + +-export([cfg/2]). + +-export([start_apps/1]). +-export([start_app/1]). +-export([stop_apps/1]). +-export([stop_app/1]). + +-export([makeup_cfg/2]). + +-export([woody_ctx/0]). +-export([get_woody_ctx/1]). + +-export([test_case_name/1]). +-export([get_test_case_name/1]). + +-type test_case_name() :: atom(). +-type group_name() :: atom(). +-type config() :: [{atom(), term()}]. + +-export_type([test_case_name/0]). +-export_type([group_name/0]). +-export_type([config/0]). + +%% + +-spec cfg(atom(), config()) -> term(). + +cfg(Key, Config) -> + case lists:keyfind(Key, 1, Config) of + {Key, V} -> V; + _ -> error({undefined, Key, Config}) + end. + +%% + +-type app_name() :: atom(). +-type app_env() :: [{atom(), term()}]. +-type startup_ctx() :: #{atom() => _}. + +-spec start_apps([app_name()]) -> {[Started :: app_name()], startup_ctx()}. + +start_apps(AppNames) -> + lists:foldl( + fun (AppName, {SAcc, CtxAcc}) -> + {Started, Ctx} = start_app(AppName), + {SAcc ++ Started, maps:merge(CtxAcc, Ctx)} + end, + {[], #{}}, + AppNames + ). + +-spec start_app(app_name()) -> {[Started :: app_name()], startup_ctx()}. + +start_app(lager = AppName) -> + {start_app_with(AppName, [ + {async_threshold, 1}, + {async_threshold_window, 0}, + {error_logger_hwm, 600}, + {suppress_application_start_stop, true}, + {suppress_supervisor_start_stop, true}, + {handlers, [ + {lager_common_test_backend, debug} + ]} + ]), #{}}; + +start_app(scoper = AppName) -> + {start_app_with(AppName, [ + {storage, scoper_storage_lager} + ]), #{}}; + +start_app(woody = AppName) -> + {start_app_with(AppName, [ + {acceptors_pool_size, 4} + ]), #{}}; + +start_app(AppName) -> + {start_app_with(AppName, []), #{}}. + +-spec start_app_with(app_name(), app_env()) -> {[app_name()], #{atom() => _}}. + +start_app_with(AppName, Env) -> + _ = application:load(AppName), + _ = set_app_env(AppName, Env), + case application:ensure_all_started(AppName) of + {ok, Apps} -> + Apps; + {error, Reason} -> + exit({start_app_failed, AppName, Reason}) + end. + +set_app_env(AppName, Env) -> + lists:foreach( + fun ({K, V}) -> + ok = application:set_env(AppName, K, V) + end, + Env + ). + +-spec stop_apps([app_name()]) -> ok. + +stop_apps(AppNames) -> + lists:foreach(fun stop_app/1, lists:reverse(AppNames)). + +-spec stop_app(app_name()) -> ok. + +stop_app(AppName) -> + case application:stop(AppName) of + ok -> + case application:unload(AppName) of + ok -> + ok; + {error, Reason} -> + exit({unload_app_failed, AppName, Reason}) + end; + {error, Reason} -> + exit({unload_app_failed, AppName, Reason}) + end. + +%% + +-type config_mut_fun() :: fun((config()) -> config()). + +-spec makeup_cfg([config_mut_fun()], config()) -> config(). + +makeup_cfg(CMFs, C0) -> + lists:foldl(fun (CMF, C) -> CMF(C) end, C0, CMFs). + +-spec woody_ctx() -> config_mut_fun(). + +woody_ctx() -> + fun (C) -> [{'$woody_ctx', construct_woody_ctx(C)} | C] end. + +construct_woody_ctx(C) -> + woody_context:new(construct_rpc_id(get_test_case_name(C))). + +construct_rpc_id(TestCaseName) -> + woody_context:new_rpc_id( + <<"undefined">>, + list_to_binary(lists:sublist(atom_to_list(TestCaseName), 32)), + woody_context:new_req_id() + ). + +-spec get_woody_ctx(config()) -> woody_context:ctx(). + +get_woody_ctx(C) -> + cfg('$woody_ctx', C). + +%% + +-spec test_case_name(test_case_name()) -> config_mut_fun(). + +test_case_name(TestCaseName) -> + fun (C) -> [{'$test_case_name', TestCaseName} | C] end. + +-spec get_test_case_name(config()) -> test_case_name(). + +get_test_case_name(C) -> + cfg('$test_case_name', C). diff --git a/test/ct_sup.erl b/test/ct_sup.erl new file mode 100644 index 0000000..706776a --- /dev/null +++ b/test/ct_sup.erl @@ -0,0 +1,32 @@ +-module(ct_sup). + +-export([start/0]). +-export([stop/1]). + +%% + +-behaviour(supervisor). +-export([init/1]). + +%% + +-spec start() -> pid(). + +start() -> + {ok, PID} = supervisor:start_link(?MODULE, []), + true = unlink(PID), + PID. + +-spec stop(pid()) -> ok. + +stop(PID) -> + true = exit(PID, shutdown), + ok. + +%% + +-spec init([]) -> + {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. + +init([]) -> + {ok, {#{strategy => one_for_all, intensity => 1, period => 1}, []}}. diff --git a/test/machinegun/config.yaml b/test/machinegun/config.yaml new file mode 100644 index 0000000..7cf223f --- /dev/null +++ b/test/machinegun/config.yaml @@ -0,0 +1,6 @@ +namespaces: + payproc/tags: + processor: + url: http://machinery:8022/v1/stateproc +storage: + type: memory diff --git a/test/machinery_machine_unique_tag_mg_example_test_SUITE.erl b/test/machinery_machine_unique_tag_mg_example_test_SUITE.erl new file mode 100644 index 0000000..4e4f9ec --- /dev/null +++ b/test/machinery_machine_unique_tag_mg_example_test_SUITE.erl @@ -0,0 +1,151 @@ +%%% TODO +%%% - Model things like `ct_sup` with something hook-like? + +-module(machinery_machine_unique_tag_mg_example_test_SUITE). + +%% Test suite + +-export([all/0]). +-export([init_per_suite/1]). +-export([end_per_suite/1]). +-export([init_per_testcase/2]). + +-export([tag_success/1]). +-export([tag_twice_success/1]). +-export([single_tag_set_only/1]). +-export([untag_success/1]). +-export([conflict_untag_failure/1]). +-export([reset_tag_success/1]). + +-import(ct_helper, [ + cfg/2, + start_apps/1, + get_woody_ctx/1 +]). + +%% + +-type config() :: ct_helper:config(). +-type test_case_name() :: ct_helper:test_case_name(). +-type group_name() :: ct_helper:group_name(). +-type test_return() :: _ | no_return(). + +-spec all() -> [test_case_name() | {group, group_name()}]. + +all() -> + [ + tag_success , + tag_twice_success , + single_tag_set_only , + untag_success , + conflict_untag_failure , + reset_tag_success + ]. + +-spec init_per_suite(config()) -> config(). + +init_per_suite(C) -> + % _ = dbg:tracer(), + % _ = dbg:p(all, c), + % _ = dbg:tpl({'woody_client', '_', '_'}, x), + {StartedApps, _StartupCtx} = start_apps([lager, machinery]), + SuiteSup = ct_sup:start(), + start_woody_server([ + {started_apps , StartedApps}, + {suite_sup , SuiteSup} + | C]). + +start_woody_server(C) -> + {ok, PID} = supervisor:start_child( + cfg(suite_sup, C), + machinery_machine_unique_tag_mg_example:child_spec(?MODULE) + ), + [{payproc_mg_machine_sup, PID} | C]. + +-spec end_per_suite(config()) -> _. + +end_per_suite(C) -> + ok = ct_sup:stop(cfg(suite_sup, C)), + ok = ct_helper:stop_apps(cfg(started_apps, C)), + ok. + +-spec init_per_testcase(test_case_name(), config()) -> config(). + +init_per_testcase(TestCaseName, C) -> + ct_helper:makeup_cfg([ct_helper:test_case_name(TestCaseName), ct_helper:woody_ctx()], C). + +%% + +-spec tag_success(config()) -> test_return(). + +tag_success(C) -> + Tag = genlib:unique(), + ID = pid_to_binary(self()), + Opts = #{woody_ctx => get_woody_ctx(C)}, + ok = machinery_machine_unique_tag_mg_example:tag(payproc, Tag, ID, Opts), + {ok, ID} = machinery_machine_unique_tag_mg_example:get(payproc, Tag, Opts). + +-spec tag_twice_success(config()) -> test_return(). + +tag_twice_success(C) -> + Tag = genlib:unique(), + ID = pid_to_binary(self()), + Opts = #{woody_ctx => get_woody_ctx(C)}, + ok = machinery_machine_unique_tag_mg_example:tag(payproc, Tag, ID, Opts), + ok = machinery_machine_unique_tag_mg_example:tag(payproc, Tag, ID, Opts), + {ok, ID} = machinery_machine_unique_tag_mg_example:get(payproc, Tag, Opts). + +-spec single_tag_set_only(config()) -> test_return(). + +single_tag_set_only(C) -> + Tag = genlib:unique(), + Opts = #{woody_ctx => get_woody_ctx(C)}, + IDs = [integer_to_binary(E) || E <- lists:seq(1, 42)], + Rs = genlib_pmap:map( + fun (ID) -> + {ID, machinery_machine_unique_tag_mg_example:tag(payproc, Tag, ID, Opts)} + end, + IDs + ), + [IDSet] = [ID0 || {ID0, ok} <- Rs], + IDsLeft = IDs -- [IDSet], + IDsLeft = [ID0 || {ID0, {error, {set, ID}}} <- Rs, ID == IDSet]. + +-spec untag_success(config()) -> test_return(). + +untag_success(C) -> + Tag = genlib:unique(), + ID = pid_to_binary(self()), + Opts = #{woody_ctx => get_woody_ctx(C)}, + ok = machinery_machine_unique_tag_mg_example:tag(payproc, Tag, ID, Opts), + ok = machinery_machine_unique_tag_mg_example:untag(payproc, Tag, ID, Opts), + {error, unset} = machinery_machine_unique_tag_mg_example:get(payproc, Tag, Opts). + +-spec conflict_untag_failure(config()) -> test_return(). + +conflict_untag_failure(C) -> + Tag = genlib:unique(), + ID1 = pid_to_binary(self()), + ID2 = pid_to_binary(cfg(suite_sup, C)), + Opts = #{woody_ctx => get_woody_ctx(C)}, + ok = machinery_machine_unique_tag_mg_example:tag(payproc, Tag, ID1, Opts), + {error, {set, ID1}} = machinery_machine_unique_tag_mg_example:untag(payproc, Tag, ID2, Opts), + ok = machinery_machine_unique_tag_mg_example:untag(payproc, Tag, ID1, Opts), + ok = machinery_machine_unique_tag_mg_example:untag(payproc, Tag, ID2, Opts). + +-spec reset_tag_success(config()) -> test_return(). + +reset_tag_success(C) -> + Tag = genlib:unique(), + ID = pid_to_binary(self()), + Opts = #{woody_ctx => get_woody_ctx(C)}, + ok = machinery_machine_unique_tag_mg_example:tag(payproc, Tag, ID, Opts), + ok = machinery_machine_unique_tag_mg_example:untag(payproc, Tag, ID, Opts), + ok = machinery_machine_unique_tag_mg_example:untag(payproc, Tag, ID, Opts), + ok = machinery_machine_unique_tag_mg_example:tag(payproc, Tag, ID, Opts), + {ok, ID} = machinery_machine_unique_tag_mg_example:get(payproc, Tag, Opts). + +%% + +pid_to_binary(PID) -> + list_to_binary(pid_to_list(PID)).