#!/usr/bin/env escript %% -*- erlang -*- %%! -smp enable +K true +P 10000 -env ERL_MAX_PORTS 10000 -export([main/1]). usage() -> io:format("Usage: ~s []~n~n" "Commands:~n" " build Build Riak cluster(s)~n" " teardown Teardown running clusters~n", [escript:script_name()]). build_options() -> %% Option Name, Short Code, Long Code, Argument Spec, Help Message [ {help, $h, "help", undefined, "Print this usage page"}, {config, $c, "conf", string, "project configuration"}, {version, $v, "version", atom, "Riak version (eg. current, previous legacy)"}, {nodes, $n, "num", integer, "number of nodes/cluster (required)"}, {clusters, undefined, "clusters", integer, "number of clusters"}, {backend, $b, "backend", atom, "backend [memory | bitcask | leveldb | yessir]"}, {ring_size, $r, "ring-size", integer, "cluster ring size"}, {repl, undefined, "repl", atom, "connect clusters [primary | bidirect]"}, {file, $F, "file", string, "use the specified file instead of ~/.riak_test.config"} ]. build_defaults() -> [{nodes, required}, {version, current}, {clusters, 1}, {backend, undefined}, {ring_size, undefined}]. teardown_options() -> %% Option Name, Short Code, Long Code, Argument Spec, Help Message [ {help, $h, "help", undefined, "Print this usage page"}, {config, $c, "conf", string, "project configuration"}, {file, $F, "file", string, "use the specified file instead of ~/.riak_test.config"} ]. print_help(Cmd, CmdOpts) -> getopt:usage(CmdOpts, escript:script_name() ++ " " ++ Cmd), halt(0). run_help(ParsedArgs) -> lists:member(help, ParsedArgs). process_options(ParsedArgs, Defaults, Cmd, CmdOpts) -> Opts = lists:ukeysort(1, ParsedArgs ++ Defaults), case lists:keyfind(required, 2, Opts) of false -> Opts; _ -> io:format("Missing required option!~n"), print_help(Cmd, CmdOpts) end. parse_args(Args, Cmd, CmdOpts, Defaults) -> {ParsedArgs, _} = case getopt:parse(CmdOpts, Args) of {ok, {P, H}} -> {P, H}; _ -> print_help(Cmd, CmdOpts) end, case run_help(ParsedArgs) of true -> print_help(Cmd, CmdOpts); _ -> ok end, Opts = process_options(ParsedArgs, Defaults, Cmd, CmdOpts), Opts. setup() -> try true = filelib:is_dir("./ebin"), true = filelib:is_dir("./deps/getopt/ebin"), code:add_patha("./ebin"), riak_test_escript:add_deps("./deps") catch _:_ -> io:format("rt-cluster must be run from top-level of " "compiled riak_test tree~n"), halt(1) end. main(Args) -> setup(), command(Args). command(["build"|Args]) -> Opts = parse_args(Args, "build", build_options(), build_defaults()), NumNodes = proplists:get_value(nodes, Opts), NumClusters = proplists:get_value(clusters, Opts), KVConfig = [{storage_backend, get_backend(Opts)}], CoreConfig = [{ring_creation_size, proplists:get_value(ring_size, Opts)}], ConfigOpts = [maybe_config(riak_kv, KVConfig), maybe_config(riak_core, CoreConfig)], Config = lists:flatten(ConfigOpts), setup_rt(Opts), io:format("Config: ~p~n", [Config]), Settings = [{NumNodes, Config} || _ <- lists:seq(1, NumClusters)], Clusters = rt:build_clusters(Settings), lists:foldl(fun(Nodes, N) -> io:format("---~nCluster ~b: ~p~n", [N, Nodes]), rpc:call(hd(Nodes), riak_core_console, member_status, [[]]), N+1 end, 1, Clusters), Repl = proplists:get_value(repl, Opts), (length(Clusters) > 1) andalso maybe_connect_repl(Repl, Clusters), info("Finished building clusters"), info(""), ok; command(["teardown"|Args]) -> Opts = parse_args(Args, "teardown", teardown_options(), []), setup_rt(Opts), rt:teardown(), ok; command(_) -> usage(). get_backend(Opts) -> case proplists:get_value(backend, Opts) of bitcask -> riak_kv_bitcask_backend; leveldb -> riak_kv_eleveldb_backend; memory -> riak_kv_memory_backend; yessir -> riak_kv_yessir_backend; undefined -> undefined end. maybe_config(App, Config) -> MaybeConfig = [Setting || Setting={_, Value} <- Config, Value =/= undefined], case MaybeConfig of [] -> []; _ -> [{App, MaybeConfig}] end. setup_rt(Opts) -> register(riak_test, self()), %% ibrowse application:load(ibrowse), application:start(ibrowse), %% Start Lager application:load(lager), Config = proplists:get_value(config, Opts), ConfigFile = proplists:get_value(file, Opts), %% Loads application defaults application:load(riak_test), %% Loads from ~/.riak_test.config rt_config:load(Config, ConfigFile), application:set_env(lager, handlers, [{lager_console_backend, rt_config:get(lager_level, info)}]), lager:start(), %% Two hard-coded deps... riak_test_escript:add_deps(rt:get_deps()), riak_test_escript:add_deps("deps"), [riak_test_escript:add_deps(Dep) || Dep <- rt_config:get(rt_deps, [])], ENode = rt_config:get(rt_nodename, 'riak_test@127.0.0.1'), Cookie = rt_config:get(rt_cookie, riak), [] = os:cmd("epmd -daemon"), net_kernel:start([ENode]), erlang:set_cookie(node(), Cookie), rt:setup_harness(undefined, []), ok. maybe_connect_repl(undefined, _) -> ok; maybe_connect_repl(primary, Clusters) -> info("Connecting cluster1 (source) to other clusters (sink)"), NamedClusters = name_clusters(Clusters), [Primary|Others] = NamedClusters, [connect_clusters(Primary, Other) || Other <- Others], ok; maybe_connect_repl(bidirect, Clusters) -> info("Connecting all clusters bidirectionally"), NamedClusters = name_clusters(Clusters), [connect_clusters(A, B) || A <- NamedClusters, B <- NamedClusters, A =/= B], ok; maybe_connect_repl(Other, _) -> info("Unknown --repl option: ~p~n", [Other]), ok. connect_clusters({A, Source}, {B, Sink}) -> NodeA = hd(Source), NodeB = hd(Sink), Leader = rpc:call(NodeA, riak_core_cluster_mgr, get_leader, []), {ok, {IP, Port}} = rpc:call(NodeB, application, get_env, [riak_core, cluster_mgr]), info("connecting ~p to ~p at ~p:~p", [A, B, IP, Port]), repl_util:connect_cluster(Leader, IP, Port), ok = repl_util:wait_for_connection(Leader, B), info("....connected"), ok. name_clusters(Clusters) -> info("Setting cluster names~n"), {NamedClusters, _} = lists:mapfoldl(fun(Nodes, N) -> Name = "cluster" ++ integer_to_list(N), repl_util:name_cluster(hd(Nodes), Name), {{Name, Nodes}, N+1} end, 1, Clusters), [begin rt:wait_until_ring_converged(Nodes), ok = repl_util:wait_until_leader_converge(Nodes) end || Nodes <- Clusters], NamedClusters. info(Msg) -> lager:log(info, self(), Msg). info(Format, Args) -> lager:log(info, self(), Format, Args).