mirror of
https://github.com/valitydev/riak_test.git
synced 2024-11-06 00:25:22 +00:00
wippiest of wips.
compiles and mostly should be done, but needs sanity checking and some adjustment to how it's collecting stats, as dstat is junk and it'd be better I guess to get the stats directly?
This commit is contained in:
parent
9c5df565f5
commit
ecd9173ec1
46
INSTALL
46
INSTALL
@ -1,8 +1,44 @@
|
||||
to get the reporting stuff working, you'll need to have gnuplot
|
||||
installed on your local machine, at at least version 4.6.3. If
|
||||
you need to install from source, you'll need at least these two
|
||||
deps:
|
||||
sudo apt-get install libgd2-xpm-dev libreadline6-dev
|
||||
- to get the reporting stuff working, you'll need to have gnuplot
|
||||
installed on your local machine, at at least version 4.6.3. If you
|
||||
need to install from source, you'll need at least these two deps:
|
||||
|
||||
- sudo apt-get install libgd2-xpm-dev libreadline6-dev
|
||||
|
||||
- symlink the stuff in perf/reporting to ~/bin/, assuming that is on
|
||||
your path.
|
||||
|
||||
- drop_caches.sh needs to be visible on the remote machine. the
|
||||
initial version of these scripts assumes that everyone is on the
|
||||
same filer. This isn't going to be true everywhere. drop is meant
|
||||
to contain your username, and be in /etc/sudoers.d/.
|
||||
|
||||
- ssh needs to be setup so that the user doesn't need to provide
|
||||
credentials. The current way is that I just generated a bos-local
|
||||
key and use that. This isn't a terrible idea, but should at least
|
||||
be automated on setup.
|
||||
|
||||
|
||||
- ~/.riak_test.config should have a harness entry like this:
|
||||
|
||||
{rtperf,
|
||||
[
|
||||
{rt_deps, ["/usr/local/basho/evan/riak2.0/deps"]},
|
||||
%% should be really long to allow full bitcasks to
|
||||
%% come up
|
||||
{rt_max_wait_time, 600000000000000},
|
||||
{basho_bench, "/usr/local/basho/evan/basho_bench/"},
|
||||
{basho_bench_statedir, "/tmp/bb_seqstate/"},
|
||||
{rt_retry_delay, 500},
|
||||
{rt_harness, rtperf},
|
||||
{load_intercepts, false},
|
||||
{perf_builds, "/usr/local/basho/evan/"},
|
||||
{perf_loadgens, ["localhost", "r2s24"]},
|
||||
{rtdev_path, []}
|
||||
|
||||
]}.
|
||||
|
||||
at least right now. this will problably change as the configuration
|
||||
stuff gets sorted out. some of these may not be necessary.
|
||||
rt_max_wait_time is (could maybe be set to infinity? maybe by the
|
||||
harness?), perf_* and basho_bench* are also critical. rt_deps
|
||||
should maybe be dynamic?
|
||||
|
217
perf/consistent_get_put.erl
Normal file
217
perf/consistent_get_put.erl
Normal file
@ -0,0 +1,217 @@
|
||||
-module(consistent_get_put).
|
||||
-compile(export_all).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(HARNESS, (rt_config:get(rt_harness))).
|
||||
|
||||
confirm() ->
|
||||
HostList = rt_config:get(rt_hostnames),
|
||||
Count = length(HostList),
|
||||
|
||||
rtperf:standard_config(Count),
|
||||
|
||||
ok = build_cluster(Config),
|
||||
|
||||
%%[Node | _] = Nodes,
|
||||
%%rt:create_and_activate_bucket_type(Node, <<"sc">>, [{consistent, true}]),
|
||||
|
||||
ok = rtperf:maybe_prepop(Hosts),
|
||||
|
||||
%% this should also get moved into the perf harness.
|
||||
|
||||
|
||||
%% this needs to be broken apart and abstracted so it's easy to
|
||||
%% specify what needs to happen. also need to add baseline load
|
||||
%% vs. test load.
|
||||
|
||||
Test =
|
||||
case rt_config:get(perf_config, undefined) of
|
||||
undefined ->
|
||||
case rt_config:get(perf_test_type, undefined) of
|
||||
undefined ->
|
||||
error("no run config or runtype defined");
|
||||
pareto ->
|
||||
do_pareto;
|
||||
uniform ->
|
||||
do_uniform
|
||||
end
|
||||
end,
|
||||
|
||||
|
||||
%% this also
|
||||
|
||||
Collectors = rtperf:start_data_collectors(HostList),
|
||||
|
||||
%% need to come up with a better naming scheme, for sure
|
||||
|
||||
TestName = rtperf:test_name(),
|
||||
|
||||
rtperf:Test(HostList, BinSize, TestName),
|
||||
|
||||
ok = rtperf:stop_data_collectors(Collectors),
|
||||
ok = rtperf:collect_test_data(HostList, TestName).
|
||||
|
||||
|
||||
code_paths() ->
|
||||
{code_paths, ["evan/basho_bench/deps/riakc",
|
||||
"evan/basho_bench/deps/riak_pb",
|
||||
"evan/basho_bench/deps/protobuffs"]}.
|
||||
|
||||
do_prepop(NodeList, BinSize, TestName) ->
|
||||
PrepopCount = rt_config:get(perf_prepop_size),
|
||||
Config = prepop_config(BinSize, NodeList, PrepopCount),
|
||||
lager:info("Config ~p", [Config]),
|
||||
rt_bench:bench(Config, NodeList, TestName),
|
||||
lager:info("Prepop complete").
|
||||
|
||||
prepop_config(BinSize, NodeList, BinCount0) ->
|
||||
Count = length(NodeList),
|
||||
{Mode, _Duration} = get_md(BinSize),
|
||||
|
||||
BinCount = adjust_count(BinCount0, 30*Count),
|
||||
|
||||
|
||||
lager:info("Starting prepop run for ~p binaries of size ~p bytes",
|
||||
[BinCount, BinSize]),
|
||||
|
||||
[Mode,
|
||||
%% run to completion
|
||||
{duration, infinity},
|
||||
{concurrent, 30*Count},
|
||||
{rng_seed, now},
|
||||
|
||||
{riakc_pb_bucket, <<"b1">>},
|
||||
%%{riakc_pb_bucket, {<<"sc">>, <<"b1">>}},
|
||||
{key_generator, {int_to_bin, {partitioned_sequential_int, 0, BinCount}}},
|
||||
{value_generator, valgen(BinSize)},
|
||||
{operations, operations(prepop)},
|
||||
{sequential_int_state_dir, rt_bench:seq_state_dir()},
|
||||
|
||||
%% should add asis when it's supported by the driver.
|
||||
{riakc_pb_ips, NodeList},
|
||||
{riakc_pb_replies, default},
|
||||
{driver, basho_bench_driver_riakc_pb},
|
||||
code_paths()].
|
||||
|
||||
adjust_count(Count, Concurrency) ->
|
||||
case Count rem Concurrency of
|
||||
0 -> Count;
|
||||
N -> Count + (Concurrency - N)
|
||||
end.
|
||||
|
||||
do_pareto(NodeList, BinSize, TestName) ->
|
||||
Config = pareto_config(BinSize, NodeList),
|
||||
lager:info("Config ~p", [Config]),
|
||||
rt_bench:bench(Config, NodeList, TestName, 2).
|
||||
|
||||
pareto_config(BinSize, NodeList) ->
|
||||
Count = length(NodeList),
|
||||
{Mode, Duration} = get_md(BinSize),
|
||||
|
||||
[Mode,
|
||||
{duration, Duration},
|
||||
{concurrent, 30*Count},
|
||||
{rng_seed, now},
|
||||
|
||||
{riakc_pb_bucket, <<"b1">>},
|
||||
{key_generator, {int_to_bin, {truncated_pareto_int, 10000000}}},
|
||||
{value_generator, valgen(BinSize)},
|
||||
{operations, operations(pareto)}, %% update - 50% get, 50% put
|
||||
|
||||
%% should add asis when it's supported by the driver.
|
||||
{riakc_pb_ips, NodeList},
|
||||
{riakc_pb_replies, default},
|
||||
{driver, basho_bench_driver_riakc_pb},
|
||||
code_paths()].
|
||||
|
||||
do_uniform(NodeList, BinSize, TestName) ->
|
||||
Config = uniform_config(BinSize, NodeList),
|
||||
lager:info("Config ~p", [Config]),
|
||||
rt_bench:bench(Config, NodeList, TestName, 2).
|
||||
|
||||
uniform_config(BinSize, NodeList) ->
|
||||
Count = length(NodeList),
|
||||
{Mode, Duration} = get_md(BinSize),
|
||||
|
||||
Numkeys =
|
||||
case rt_config:get(perf_prepop_size) of
|
||||
Count when is_integer(Count) ->
|
||||
Count;
|
||||
_ -> 10000000
|
||||
end,
|
||||
|
||||
[Mode,
|
||||
{duration, Duration},
|
||||
{concurrent, 40*Count},
|
||||
{rng_seed, now},
|
||||
|
||||
{riakc_pb_bucket, <<"b1">>},
|
||||
%%{riakc_pb_bucket, {<<"sc">>, <<"b1">>}},
|
||||
{key_generator, {int_to_bin, {uniform_int, Numkeys}}},
|
||||
{value_generator, valgen(BinSize)},
|
||||
{operations, operations(uniform)},
|
||||
|
||||
%% should add asis when it's supported by the driver.
|
||||
{riakc_pb_ips, NodeList},
|
||||
{riakc_pb_replies, default},
|
||||
{driver, basho_bench_driver_riakc_pb},
|
||||
code_paths()].
|
||||
|
||||
|
||||
get_md(BinSize) ->
|
||||
case rt_config:get(rt_backend, undefined) of
|
||||
%% hueristically determined nonsense, need a real model
|
||||
riak_kv_eleveldb_backend ->
|
||||
lager:info("leveldb"),
|
||||
Rate =
|
||||
case BinSize >= 10000 of
|
||||
true -> maybe_override(50);
|
||||
false -> maybe_override(75)
|
||||
end,
|
||||
{{mode, {rate, Rate}}, maybe_override(150)};
|
||||
_ ->
|
||||
%%fixme yo
|
||||
lager:info("unset or bitcask"),
|
||||
{{mode, max}, maybe_override(90)}
|
||||
end.
|
||||
|
||||
maybe_override(Default) ->
|
||||
case rt_config:get(perf_runtime, undefined) of
|
||||
N when is_integer(N) ->
|
||||
N;
|
||||
_ ->
|
||||
Default
|
||||
end.
|
||||
|
||||
date_string() ->
|
||||
{YrMoDay, HrMinSec} = calendar:local_time(),
|
||||
string:join(lists:map(fun erlang:integer_to_list/1,
|
||||
tuple_to_list(YrMoDay)++tuple_to_list(HrMinSec)),
|
||||
"-").
|
||||
|
||||
valgen(BinSize) ->
|
||||
Type = rt_config:get(perf_bin_type),
|
||||
case Type of
|
||||
fixed ->
|
||||
{fixed_bin, BinSize};
|
||||
exponential ->
|
||||
Quarter = BinSize div 4,
|
||||
{exponential_bin, Quarter, Quarter*3}
|
||||
end.
|
||||
|
||||
operations(Type) ->
|
||||
LoadType = rt_config:get(perf_load_type),
|
||||
N =
|
||||
case LoadType of
|
||||
read_heavy -> 4;
|
||||
write_heavy -> 1
|
||||
end,
|
||||
|
||||
case Type of
|
||||
prepop ->
|
||||
[{put, 1}];
|
||||
pareto ->
|
||||
[{get, N*3}, {update, 4}];
|
||||
uniform ->
|
||||
[{get, N*3}, {update, 1}]
|
||||
end.
|
34
perf/get_put.erl
Normal file
34
perf/get_put.erl
Normal file
@ -0,0 +1,34 @@
|
||||
-module(get_put).
|
||||
-compile(export_all).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(HARNESS, (rt_config:get(rt_harness))).
|
||||
|
||||
confirm() ->
|
||||
HostList = rt_config:get(rt_hostnames),
|
||||
Count = length(HostList),
|
||||
BinSize = rt_config:get(perf_binsize),
|
||||
|
||||
Config = rtperf:standard_config(Count),
|
||||
|
||||
ok = build_cluster(Config),
|
||||
|
||||
SetSize = rtperf:target_size(rt_config:get(perf_target_pct),
|
||||
BinSize,
|
||||
rt_config:get(perf_ram_size),
|
||||
Count),
|
||||
TestConfig =
|
||||
rt_bench:config(
|
||||
max,
|
||||
rt_config:get(perf_duration),
|
||||
HostList,
|
||||
{int_to_bin_bigendian, {uniform_int, SetSize}},
|
||||
rt_bench:valgen(rt_config:get(perf_bin_type), BinSize),
|
||||
%% 4:1 get/put
|
||||
[{get, 3}, {update, 1}]
|
||||
),
|
||||
|
||||
ok = rtperf:maybe_prepop(HostList, BinSize, SetSize),
|
||||
|
||||
ok = rtperf:run_test(HostList, TestConfig, []),
|
||||
pass.
|
@ -939,12 +939,14 @@ members_according_to(Node) ->
|
||||
Members = riak_core_ring:all_members(Ring),
|
||||
Members.
|
||||
|
||||
%% @doc Return an appropriate ringsize for the node count passed in
|
||||
%% @doc Return an appropriate ringsize for the node count passed
|
||||
%% in. 24 is the number of cores on the bigger intel machines, but this
|
||||
%% may be too large for the single-chip machines.
|
||||
nearest_ringsize(Count) ->
|
||||
nearest_ringsize(Count * 10, 2).
|
||||
nearest_ringsize(Count * 24, 2).
|
||||
|
||||
nearest_ringsize(Count, Power) ->
|
||||
case Count < trunc(Power * 1.5) of
|
||||
case Count < trunc(Power * 0.9) of
|
||||
true ->
|
||||
Power;
|
||||
false ->
|
||||
|
129
src/rt_bench.erl
129
src/rt_bench.erl
@ -2,16 +2,7 @@
|
||||
-compile(export_all).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
|
||||
clear_seq_state_dir() ->
|
||||
StateDir = rt_config:get(basho_bench_statedir),
|
||||
case file:list_dir(StateDir) of
|
||||
{ok, FL} ->
|
||||
[file:delete(StateDir++F) || F <- FL],
|
||||
file:del_dir(StateDir);
|
||||
{error, enoent} -> ok;
|
||||
{error, Reason} -> error(Reason)
|
||||
end.
|
||||
-define(ESCRIPT, rt_config:get(basho_bench_escript)).
|
||||
|
||||
seq_state_dir() ->
|
||||
rt_config:get(basho_bench_statedir).
|
||||
@ -20,6 +11,9 @@ bench(Config, NodeList, TestName) ->
|
||||
bench(Config, NodeList, TestName, 1).
|
||||
|
||||
bench(Config, NodeList, TestName, Runners) ->
|
||||
bench(Config, NodeList, TestName, Runners, false).
|
||||
|
||||
bench(Config, NodeList, TestName, Runners, Drop) ->
|
||||
lager:info("Starting basho_bench run"),
|
||||
|
||||
LoadGens =
|
||||
@ -28,24 +22,31 @@ bench(Config, NodeList, TestName, Runners) ->
|
||||
LG -> LG
|
||||
end,
|
||||
|
||||
case rt_config:get(perf_drop_cache, true) of
|
||||
case Drop of
|
||||
true ->
|
||||
[begin
|
||||
Fun = fun(Node) ->
|
||||
R = rtssh:ssh_cmd(Node, "sudo ~/bin/drop_caches.sh"),
|
||||
lager:info("Dropped cache for node: ~p ~p",
|
||||
lager:info("Dropped cache for node: ~p ret: ~p",
|
||||
[Node, R])
|
||||
end
|
||||
|| Node <- NodeList];
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
rt:pmap(Fun, NodeList);
|
||||
_ -> ok
|
||||
end,
|
||||
|
||||
%% make a local config file, to be copied to a remote
|
||||
%% loadgen. They're named separately because for simplicity, we
|
||||
%% use network operations even for local load generation
|
||||
|
||||
%% remote tempdir name
|
||||
BBTmp = "/tmp/basho_bench_tmp"++os:getpid(),
|
||||
BBTmpStage = BBTmp ++ "_stage",
|
||||
file:make_dir(BBTmpStage), %% removed error checking here
|
||||
Path = BBTmpStage++"/"++TestName++"-rt.config",
|
||||
RemPath = BBTmp++"/"++TestName++"-rt.config",
|
||||
[file:write_file(Path,io_lib:fwrite("~p.\n",[C]),
|
||||
%% local staging version
|
||||
BBTmpStage = BBTmp ++ "_stage/",
|
||||
ok = file:ensure_dir(BBTmpStage),
|
||||
|
||||
Filename = TestName++"-rt.config",
|
||||
ConfigPath = BBTmpStage++"/"++Filename,
|
||||
RemotePath = BBTmp++"/"++Filename,
|
||||
[file:write_file(ConfigPath,io_lib:fwrite("~p.\n",[C]),
|
||||
[append])
|
||||
|| C <- Config],
|
||||
BBDir = rt_config:get(basho_bench),
|
||||
@ -56,50 +57,84 @@ bench(Config, NodeList, TestName, Runners) ->
|
||||
end
|
||||
|| C <- lists:seq(1, Runners)],
|
||||
|
||||
F =
|
||||
fun({LG, N}, Owner) ->
|
||||
F = fun({LG, N}, Owner) ->
|
||||
try
|
||||
Num = integer_to_list(N),
|
||||
|
||||
{0, _} = rtssh:ssh_cmd(LG, "mkdir -p "++BBTmp),
|
||||
%% don't care if we fail here
|
||||
rtssh:ssh_cmd(LG, "rm -r /tmp/bb_seqstate/"),
|
||||
rtssh:scp(LG, Path, BBTmp),
|
||||
Cmd = "/usr/local/basho/erlang/R16B02/bin/escript "++
|
||||
%% don't care much if we fail here
|
||||
rtssh:ssh_cmd(LG, "rm -r " ++ seq_state_dir()),
|
||||
{0, _} = rtssh:scp(LG, ConfigPath, BBTmp),
|
||||
%% run basho bench on the remote loadgen,
|
||||
%% specifying the remote testdir and the newly
|
||||
%% copied remote config location
|
||||
Cmd = ?ESCRIPT++
|
||||
BBDir++"basho_bench -d "++
|
||||
BBDir++"/"++TestName++"_"++Num++" "++RemPath,
|
||||
BBDir++"/"++TestName++"_"++Num++" "++RemotePath,
|
||||
lager:info("Spawning remote basho_bench w/ ~p on ~p",
|
||||
[Cmd, LG]),
|
||||
{0, R} = rtssh:ssh_cmd(LG, Cmd, false),
|
||||
lager:info("ssh done returned ~p", [R]),
|
||||
lager:info("bench run finished, returned ~p", [R]),
|
||||
{0, _} = rtssh:ssh_cmd(LG, "rm -r "++BBTmp++"/")
|
||||
catch
|
||||
Class:Error ->
|
||||
lager:error("basho_bench died with error ~p:~p",
|
||||
[Class, Error])
|
||||
after
|
||||
lager:info("finishing bb run")
|
||||
lager:info("finished bb run")
|
||||
end,
|
||||
Owner ! done
|
||||
end,
|
||||
S = self(),
|
||||
[begin
|
||||
spawn(fun() -> F(R, S) end)
|
||||
end
|
||||
|| R <- GenList],
|
||||
[receive done -> ok end || _ <- lists:seq(1,Runners)],
|
||||
lager:info("removing stage dir"),
|
||||
[spawn(fun() -> F(R, S) end)|| R <- GenList],
|
||||
[receive done -> ok end || _ <- GenList],
|
||||
lager:debug("removing stage dir"),
|
||||
{ok, FL} = file:list_dir(BBTmpStage),
|
||||
[file:delete(BBTmpStage++File) || File <- FL],
|
||||
file:del_dir(BBTmpStage).
|
||||
ok = file:del_dir(BBTmpStage).
|
||||
|
||||
discard_bb_output(Port) ->
|
||||
receive
|
||||
{Port, {data, _Bytes}} ->
|
||||
%%lager:info("bb ~p", [Bytes]),
|
||||
discard_bb_output(Port);
|
||||
{Port, {exit_status, 0}} ->
|
||||
ok;
|
||||
{Port, {exit_status, Status}} ->
|
||||
throw({bb_error, Status})
|
||||
-define(CONCURRENCY_FACTOR, rt_config:get(basho_bench_concurrency, 30)).
|
||||
|
||||
config(Rate, Duration, NodeList, KeyGen,
|
||||
ValGen, Operations) ->
|
||||
config(Rate, Duration, NodeList, KeyGen,
|
||||
ValGen, Operations,
|
||||
<<"testbucket">>, riakc_pb).
|
||||
|
||||
config(Rate, Duration, NodeList, KeyGen,
|
||||
ValGen, Operations, Bucket, Driver) ->
|
||||
DriverBucket = append_atoms(Driver, '_bucket'),
|
||||
DriverIps = append_atoms(Driver, '_ips'),
|
||||
DriverReplies = append_atoms(Driver, '_replies'),
|
||||
DriverName = append_atoms(basho_bench_driver_, Driver),
|
||||
[
|
||||
Rate,
|
||||
{duration, Duration},
|
||||
{concurrent, ?CONCURRENCY_FACTOR * length(NodeList)},
|
||||
{rng_seed, now},
|
||||
|
||||
{DriverBucket, Bucket},
|
||||
{key_generator, KeyGen},
|
||||
{value_generator, ValGen},
|
||||
{operations, Operations},
|
||||
%% just leave this in in case we need it, it's harmless when not
|
||||
%% using the sequential generator
|
||||
{sequential_int_state_dir, seq_state_dir()},
|
||||
|
||||
{DriverIps, NodeList},
|
||||
{DriverReplies, default},
|
||||
{driver, DriverName},
|
||||
rt_config:get(basho_bench_code_paths)
|
||||
].
|
||||
|
||||
append_atoms(L, R) ->
|
||||
list_to_atom(atom_to_list(L)++atom_to_list(R)).
|
||||
|
||||
valgen(Type, BinSize) ->
|
||||
case Type of
|
||||
fixed ->
|
||||
{fixed_bin, BinSize};
|
||||
exponential ->
|
||||
Quarter = BinSize div 4,
|
||||
{exponential_bin, Quarter, Quarter*3}
|
||||
end.
|
||||
|
270
src/rtperf.erl
Normal file
270
src/rtperf.erl
Normal file
@ -0,0 +1,270 @@
|
||||
-module(rtperf).
|
||||
-compile(export_all).
|
||||
|
||||
run_test(HostList, TestBenchConfig, BaseBenchConfig) ->
|
||||
Collectors = start_data_collectors(HostList),
|
||||
|
||||
TestName = test_name(),
|
||||
|
||||
Base = maybe_start_base_load(BaseBenchConfig),
|
||||
|
||||
rt_bench:bench(TestBenchConfig, HostList, TestName,
|
||||
rt_config:get(perf_loadgens)),
|
||||
|
||||
maybe_stop_base_load(Base),
|
||||
|
||||
ok = stop_data_collectors(Collectors),
|
||||
|
||||
ok = collect_test_data(HostList, TestName).
|
||||
|
||||
build_cluster(Config) ->
|
||||
Vsn = rt_config:get(perf_version),
|
||||
HostList = rt_config:get(rt_hostnames),
|
||||
Count = length(HostList),
|
||||
|
||||
%% make sure that all of the remote nodes have a clean build at
|
||||
%% the remote location
|
||||
Force = rt_config:get(perf_force_build, false),
|
||||
case rt_config:get(perf_restart, meh) of
|
||||
true ->
|
||||
case rtssh:ensure_remote_build(HostList, Vsn, Force) of
|
||||
ok -> ok;
|
||||
Else ->
|
||||
lager:error("Got unexpected return ~p from deploy, stopping",
|
||||
[Else]),
|
||||
error(deploy_error)
|
||||
end;
|
||||
_ -> ok
|
||||
end,
|
||||
|
||||
Nodes =
|
||||
case rt_config:get(perf_restart) of
|
||||
true ->
|
||||
rt:build_cluster(Count,
|
||||
lists:duplicate(Count, {Vsn, Config}),
|
||||
whatever);
|
||||
false ->
|
||||
[list_to_atom("riak@" ++ Host) || Host <- HostList]
|
||||
end,
|
||||
|
||||
Me = self(),
|
||||
spawn(fun() ->
|
||||
ok = rt:wait_until_nodes_ready(Nodes),
|
||||
ok = rt:wait_until_ring_converged(Nodes),
|
||||
ok = rt:wait_until_transfers_complete(Nodes),
|
||||
Me ! done
|
||||
end),
|
||||
receive
|
||||
done -> ok
|
||||
after timer:minutes(10) ->
|
||||
lager:error("Cluster setup is taking too long, stopping"),
|
||||
error(cluster_setup_timeout)
|
||||
end.
|
||||
|
||||
start_data_collectors(Hosts) ->
|
||||
%% should probably only start this once?
|
||||
inets:start(),
|
||||
|
||||
OSPid = os:getpid(),
|
||||
PrepDir = "/tmp/perf-"++OSPid,
|
||||
file:make_dir(PrepDir),
|
||||
|
||||
Cmd = "python ./bin/dstat -cdngyimrs --vm --fs --socket --tcp --disk-util "++
|
||||
"--output "++"/tmp/dstat-"++os:getpid(),
|
||||
|
||||
file:write(PrepDir++"/START", io_lib:format("~w.~n", [calendar:local_time()])),
|
||||
|
||||
[spawn(rtssh, ssh_cmd, [Host, Cmd]) || Host <- Hosts] ++
|
||||
[spawn(?MODULE, poll_stats, [Host]) || Host <- Hosts].
|
||||
|
||||
poll_stats(Host) ->
|
||||
|
||||
case httpc:request("http://"++Host++":8098/stats/") of
|
||||
{ok, {{_Version, 200, _ReasonPhrase}, _Headers, Body}} ->
|
||||
|
||||
Stats = mochijson2:decode(Body),
|
||||
|
||||
OSPid = os:getpid(),
|
||||
PrepDir = "/tmp/perf-"++OSPid,
|
||||
|
||||
{ok, Fd} = file:open(PrepDir++"/rstats-"++Host, [append]),
|
||||
file:write(Fd, io_lib:format("~w.~n", [calendar:local_time()])),
|
||||
file:write(Fd, io_lib:format("~p.~n", [Stats])),
|
||||
file:close(Fd),
|
||||
|
||||
timer:sleep(60000);
|
||||
_Else ->
|
||||
%% good to know, but turns out that this is just annoying
|
||||
%%lager:error("Web stat collector failed with: ~p", [Else]),
|
||||
timer:sleep(100)
|
||||
end,
|
||||
poll_stats(Host).
|
||||
|
||||
stop_data_collectors(Collectors) ->
|
||||
[C ! stop || C <- Collectors].
|
||||
|
||||
maybe_start_base_load([]) ->
|
||||
none.
|
||||
|
||||
maybe_stop_base_load(none) ->
|
||||
ok.
|
||||
|
||||
%% need more sensible test names.
|
||||
test_name() ->
|
||||
Vsn = rt_config:get(perf_version),
|
||||
BinSize = rt_config:get(perf_binsize),
|
||||
rt_config:get(perf_test_name)++"-"++Vsn++"-"++
|
||||
atom_to_list(rt_config:get(perf_test_type))++"-"++
|
||||
atom_to_list(rt_config:get(perf_bin_type))++"-"++
|
||||
integer_to_list(BinSize)++"b-"++date_string().
|
||||
|
||||
collect_test_data(Hosts, TestName) ->
|
||||
%% stop the dstat watching processes
|
||||
[rtssh:ssh_cmd(Host, "killall python") %% potentially unsafe
|
||||
|| Host <- Hosts],
|
||||
|
||||
%% collect the files
|
||||
OSPid = os:getpid(),
|
||||
PrepDir = "/tmp/perf-"++OSPid,
|
||||
|
||||
file:write_file(PrepDir++"/END",
|
||||
io_lib:format("~w~n", [calendar:local_time()])),
|
||||
|
||||
%% get rid of this hateful crap
|
||||
[begin
|
||||
rtssh:cmd("scp -q "++Host++":/tmp/dstat-"++OSPid++" "
|
||||
++PrepDir++"/dstat-"++Host),
|
||||
rtssh:ssh_cmd(Host, "rm /tmp/dstat-"++OSPid)
|
||||
end || Host <- Hosts],
|
||||
|
||||
|
||||
ok = rt_bench:collect_bench_data(PrepDir),
|
||||
|
||||
%% grab all the benchmark stuff. need L to make real files because
|
||||
%% it's a soft link
|
||||
BBDir = rt_config:get(basho_bench),
|
||||
rtssh:cmd("cp -aL "++BBDir++"/"++TestName++"/current_. "++PrepDir),
|
||||
|
||||
rt:cmd("mv "++PrepDir++" results/"++TestName),
|
||||
|
||||
%% really, really need to compress the results so they don't take
|
||||
%% up os damn much space
|
||||
ok.
|
||||
|
||||
maybe_prepop(Hosts, BinSize, SetSize) ->
|
||||
Vsn = rt_config:get(perf_version),
|
||||
case rt_config:get(perf_prepop) of
|
||||
true ->
|
||||
PPids = start_data_collectors(Hosts),
|
||||
PrepopName = rt_config:get(perf_test_name)++"-"++Vsn++
|
||||
"-prepop"++integer_to_list(BinSize)++"b-"++date_string(),
|
||||
|
||||
PrepopConfig =
|
||||
rt_bench:config(
|
||||
max,
|
||||
rt_config:get(perf_runtime),
|
||||
Hosts,
|
||||
{int_to_bin_bigendian, {uniform_int, SetSize}},
|
||||
rt_bench:valgen(rt_config:get(perf_bin_type), BinSize),
|
||||
[{put,1}]),
|
||||
|
||||
rt_bench:bench(PrepopConfig, Hosts, PrepopName,
|
||||
rt_config:get(perf_loadgens)),
|
||||
|
||||
timer:sleep(timer:minutes(1)+timer:seconds(30)),
|
||||
[exit(P, kill) || P <- PPids],
|
||||
collect_test_data(Hosts, PrepopName);
|
||||
false ->
|
||||
ok
|
||||
end.
|
||||
|
||||
date_string() ->
|
||||
{Mega, Sec, _Micro} = os:timestamp(),
|
||||
integer_to_list((Mega * 1000000) + Sec).
|
||||
|
||||
|
||||
%% in the end, it'd be nice to automatically generate some of the
|
||||
%% other config stuff as well, i.e. give a node count, some
|
||||
%% information (RAM, fast or slow disks, etc.) and generate a config
|
||||
%% that should more or less hit the same performance contours
|
||||
%% regardless of what machines are being used. I suspect that
|
||||
%% data-set sizing here is what's important, the ratio of disk cache
|
||||
%% to data set size.
|
||||
|
||||
%% this actually suggests an entirely different line of testing than
|
||||
%% what's been pursued so far, running a test at say, 50% ram usage,
|
||||
%% 150%, 200% etc. Then we could skip the time-consuming and not
|
||||
%% terribly enlightening up-from-cold period.
|
||||
|
||||
standard_config(NodeCount) ->
|
||||
standard_config(NodeCount, off).
|
||||
|
||||
standard_config(NodeCount, AAE) ->
|
||||
Backend = rt_config:get(rt_backend, undefined),
|
||||
Fish = rt_config:get(cuttle, true),
|
||||
RingSize = rt:nearest_ringsize(NodeCount),
|
||||
mk_std_conf(Backend, Fish, RingSize, AAE).
|
||||
|
||||
mk_std_conf(riak_kv_memory_backend, false, Ring, AAE) ->
|
||||
[{riak_core,
|
||||
[{ring_creation_size, Ring*2},
|
||||
{handoff_concurrency, 16}]},
|
||||
{riak_kv, [{storage_backend, riak_kv_memory_backend},
|
||||
{anti_entropy,{AAE, []}},
|
||||
{memory_backend, []},
|
||||
{fsm_limit, 50000}
|
||||
]}
|
||||
];
|
||||
mk_std_conf(riak_kv_memory_backend, true, Ring, AAE0) ->
|
||||
AAE = aae_cuttle(AAE0),
|
||||
{cuttlefish,
|
||||
[{ring_size, Ring*2},
|
||||
{handoff_concurrency, 16},
|
||||
{"erlang.distribution_buffer_size", "128MB"},
|
||||
{storage_backend, memory},
|
||||
{anti_entropy, AAE}
|
||||
]};
|
||||
mk_std_conf(riak_kv_eleveldb_backend, false, Ring, AAE) ->
|
||||
[{riak_core,
|
||||
[{ring_creation_size, Ring}]},
|
||||
{riak_kv,
|
||||
[{storage_backend, riak_kv_eleveldb_backend},
|
||||
{anti_entropy,{AAE,[]}},
|
||||
{fsm_limit, undefined}]},
|
||||
{eleveldb,
|
||||
[{max_open_files, 500}]}
|
||||
];
|
||||
mk_std_conf(riak_kv_eleveldb_backend, true, Ring, AAE0) ->
|
||||
AAE = aae_cuttle(AAE0),
|
||||
{cuttlefish,
|
||||
[{ring_size, Ring},
|
||||
{"erlang.distribution_buffer_size", "128MB"},
|
||||
{storage_backend, leveldb},
|
||||
{anti_entropy, AAE}
|
||||
]};
|
||||
mk_std_conf(_, false, Ring, AAE) ->
|
||||
[{riak_core,
|
||||
[{ring_creation_size, Ring}]},
|
||||
{riak_kv,
|
||||
[{anti_entropy,{AAE, []}}]}
|
||||
];
|
||||
mk_std_conf(_, true, Ring, AAE0) ->
|
||||
AAE = aae_cuttle(AAE0),
|
||||
{cuttlefish,
|
||||
[{ring_size, Ring},
|
||||
{"storage_backend", "bitcask"},
|
||||
{"erlang.distribution_buffer_size", "128MB"},
|
||||
{"bitcask.io_mode", nif},
|
||||
{anti_entropy, AAE}]}.
|
||||
|
||||
aae_cuttle(off) ->
|
||||
passive;
|
||||
aae_cuttle(on) ->
|
||||
active.
|
||||
|
||||
target_size(Percentage, BinSize, RamSize, NodeCount) ->
|
||||
TotalRam = RamSize * NodeCount,
|
||||
CacheTarget = trunc((Percentage/100)*TotalRam),
|
||||
BinPlus = (BinSize + 300) * 3,
|
||||
%% hacky way of rounding up to the nearest 10k
|
||||
trunc((CacheTarget/(BinPlus*10000))+1)*10000.
|
Loading…
Reference in New Issue
Block a user