Merge pull request #586 from basho/perf-harness-merge

At last, merge the perf harness.
This commit is contained in:
Christopher Meiklejohn 2014-07-12 14:20:33 -04:00
commit 0359c309f6
33 changed files with 2222 additions and 54 deletions

2
.gitignore vendored
View File

@ -16,3 +16,5 @@ doc/
coverage coverage
tags tags
riak-* riak-*
*.png
.rebar/

44
INSTALL Normal file
View File

@ -0,0 +1,44 @@
- to get the reporting stuff working, you'll need to have gnuplot
installed on your local machine, at at least version 4.6.3. If you
need to install from source, you'll need at least these two deps:
- sudo apt-get install libgd2-xpm-dev libreadline6-dev
- symlink the stuff in perf/reporting to ~/bin/, assuming that is on
your path.
- drop_caches.sh needs to be visible on the remote machine. the
initial version of these scripts assumes that everyone is on the
same filer. This isn't going to be true everywhere. drop is meant
to contain your username, and be in /etc/sudoers.d/.
- ssh needs to be setup so that the user doesn't need to provide
credentials. The current way is that I just generated a bos-local
key and use that. This isn't a terrible idea, but should at least
be automated on setup.
- ~/.riak_test.config should have a harness entry like this:
{rtperf,
[
{rt_deps, ["/usr/local/basho/evan/riak2.0/deps"]},
%% should be really long to allow full bitcasks to
%% come up
{rt_max_wait_time, 600000000000000},
{basho_bench, "/usr/local/basho/evan/basho_bench/"},
{basho_bench_statedir, "/tmp/bb_seqstate/"},
{rt_retry_delay, 500},
{rt_harness, rtperf},
{load_intercepts, false},
{perf_builds, "/usr/local/basho/evan/"},
{perf_loadgens, ["localhost", "r2s24"]},
{rtdev_path, []}
]}.
at least right now. this will problably change as the configuration
stuff gets sorted out. some of these may not be necessary.
rt_max_wait_time is (could maybe be set to infinity? maybe by the
harness?), perf_* and basho_bench* are also critical. rt_deps
should maybe be dynamic?

43
bcrunner.sh Executable file
View File

@ -0,0 +1,43 @@
#!/bin/bash
test_name=$1
bin_size=$2
version=$3
perf_test=$4
backend=$5
cuttle=$5
if [ -z $version -o -z $test_name -o -z $bin_size ]; then
echo "out"
exit 1
fi
if [ -z $perf_test ]; then
perf_test="get_put"
fi
if [ -z $backend ]; then
backend="riak_kv_bitcask_backend"
fi
if [ ! -z $cuttle ]; then
cuttle="--cuttle false"
else
cuttle=""
fi
./riak_test -c rtperf2 -t $perf_test -b $backend -- --restart true --prepop true \
--run-time 120 --target-pct 120 --ram-size 48 \
--bin-size $bin_size --name $test_name --bin-type exponential \
--version $version $cuttle
./riak_test -c rtperf2 -t $perf_test -b $backend -- --restart true --prepop true \
--run-time 120 --target-pct 70 --ram-size 48 \
--bin-size $bin_size --name $test_name --bin-type exponential \
--version $version $cuttle
./riak_test -c rtperf2 -t $perf_test -b $backend -- --restart true --prepop true \
--run-time 120 --target-pct 20 --ram-size 48 \
--bin-size $bin_size --name $test_name --bin-type exponential \
--version $version $cuttle

1
compare.sh Symbolic link
View File

@ -0,0 +1 @@
priv/reporting/compare.sh

View File

@ -0,0 +1,18 @@
{rtperf, [
{rt_deps, ["/mnt/riak_ee/deps"]},
%% should be really long to allow full bitcasks to
%% come up
{rt_max_wait_time, 600000000000000},
{basho_bench, "/mnt/basho_bench"},
{basho_bench_escript, "/usr/local/erlang-r16b02/bin/escript"},
{basho_bench_statedir, "/tmp/bb_seqstate/"},
{rt_retry_delay, 500},
{rt_harness, rtperf},
{load_intercepts, false},
{perf_builds, "/mnt/perf_builds"},
{perf_loadgens, ["bench101.aws"]},
{rtdev_path, [{root, "/mnt/rt/riak_ee"},
{current, "/mnt/rt/riak_ee/riak-ee-2.0.0beta1"},
{previous, "/mnt/rt/riak_ee/riak-ee-1.4.8"},
{legacy, "/mnt/rt/riak_ee/riak-ee-1.3.4"}]}
]}.

47
perf/2iperf.erl Normal file
View File

@ -0,0 +1,47 @@
-module('2iperf').
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-define(HARNESS, (rt_config:get(rt_harness))).
confirm() ->
lager:info("entering get_put:confirm()"),
HostList = rt_config:get(rt_hostnames),
Count = length(HostList),
BinSize = rt_config:get(perf_bin_size),
Config = rtperf:standard_config(Count),
ok = rtperf:build_cluster(Config),
SetSize = rtperf:target_size(rt_config:get(perf_target_pct),
BinSize,
rt_config:get(perf_ram_size),
Count),
LoadConfig = [],
%% rt_bench:config(
%% 50,
%% rt_config:get(perf_duration),
%% HostList,
%% {int_to_bin_bigendian, {truncated_pareto_int, SetSize}},
%% rt_bench:valgen(rt_config:get(perf_bin_type), BinSize),
%% %% 4:1 get/put
%% [{get, 3}, {update, 1}]
%% ),
TwoIConfig =
rt_bench:config(
max,
rt_config:get(perf_duration),
HostList,
{truncated_pareto_int, SetSize},
rt_bench:valgen(rt_config:get(perf_bin_type), BinSize),
[{{query_pb, 100}, 5}, {{query_pb, 1000}, 1},
{{put_pb, 2}, 1}, {get_pb, 5}],
<<"testbucket">>, '2i'
),
ok = rtperf:maybe_prepop(HostList, BinSize, SetSize),
ok = rtperf:run_test(HostList, TwoIConfig, LoadConfig),
pass.

View File

@ -0,0 +1,40 @@
-module(consistent_get_put).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-define(HARNESS, (rt_config:get(rt_harness))).
%% note that this test does not currently work
confirm() ->
HostList = rt_config:get(rt_hostnames),
Count = length(HostList),
BinSize = rt_config:get(perf_binsize),
Config = rtperf:standard_config(Count),
Nodes = rtperf:build_cluster(Config),
SetSize = rtperf:target_size(rt_config:get(perf_target_pct),
BinSize,
rt_config:get(perf_ram_size),
Count),
[Node | _] = Nodes,
rt:create_and_activate_bucket_type(Node, <<"sc">>, [{consistent, true}]),
TestConfig =
rt_bench:config(
max,
rt_config:get(perf_duration),
HostList,
{int_to_bin_bigendian, {uniform_int, SetSize}},
rt_bench:valgen(rt_config:get(perf_bin_type), BinSize),
%% 4:1 get/put
[{get, 3}, {update, 1}],
{<<"sc">>, <<"testbucket">>}
),
ok = rtperf:maybe_prepop(HostList, BinSize, SetSize),
ok = rtperf:run_test(HostList, TestConfig, []),
pass.

38
perf/get_put.erl Normal file
View File

@ -0,0 +1,38 @@
-module(get_put).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-define(HARNESS, (rt_config:get(rt_harness))).
confirm() ->
lager:info("entering get_put:confirm()"),
HostList = rt_config:get(rt_hostnames),
Count = length(HostList),
Config = rtperf:standard_config(Count),
lager:info("Generated configuration is: ~p", [Config]),
[Nodes] = rt:build_clusters([Count]),
lager:info("Built cluster: ~p", [Nodes]),
BinSize = rt_config:get(perf_bin_size),
SetSize = rtperf:target_size(rt_config:get(perf_target_pct),
BinSize,
rt_config:get(perf_ram_size),
Count),
TestConfig =
rt_bench:config(
max,
rt_config:get(perf_duration),
[{Host, 10017} || Host <- HostList],
{int_to_bin_bigendian, {truncated_pareto_int, SetSize}},
rt_bench:valgen(rt_config:get(perf_bin_type), BinSize),
%% 4:1 get/put
[{get, 3}, {update, 1}]
),
ok = rtperf:maybe_prepop(Nodes, BinSize, SetSize),
ok = rtperf:run_test(Nodes, TestConfig, []),
pass.

38
priv/dir_sum.sh Executable file
View File

@ -0,0 +1,38 @@
#!/bin/bash
if [ ! -z $1 ]; then
RIAK_DIR=$1
else
echo "No build supplied"
exit 1
fi
TMP=/tmp/dir_sum.tmp
SORTED=/tmp/dir_sum.tmp.sorted
rm -f $TMP
rm -f $SORTED
sum_subdir () {
SUBDIR=$1
if [ ! -d $RIAK_DIR/$SUBDIR ]; then
exit 2
fi
find $RIAK_DIR/$SUBDIR -type f -print | xargs md5sum -b >> $TMP
}
if [ ! -d $RIAK_DIR ]; then
exit 2
fi
sum_subdir lib
sum_subdir bin
sum_subdir erts*
sort $TMP > $SORTED
md5sum $SORTED | awk '{print $1}'
rm -f $TMP
rm -f $SORTED

2
priv/install/drop Normal file
View File

@ -0,0 +1,2 @@
evan ALL=NOPASSWD: /usr/local/bos1/home/evan/bin/drop_caches.sh

6
priv/install/drop_caches.sh Executable file
View File

@ -0,0 +1,6 @@
#!/bin/bash
sync
sync
sync
sleep 5
echo 3 >/proc/sys/vm/drop_caches

20
priv/reporting/compare.sh Executable file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
set -e
if [ ! -d "$1/" -o ! -d "$2" ]; then
echo "two directories must be specified"
exit 1
fi
#digesting the files is expensive, don't do it by default
if [ ! -z $3 -a $3 = "true" ]; then
#make our digest files
(cd $1; rm -f *-digest; escript ~/bin/riak-digest.escript)
(cd $2; rm -f *-digest; escript ~/bin/riak-digest.escript)
fi
D1=`basename "$1"`
D2=`basename "$2"`
#generate our comparison graph
gnuplot -e "dir1=\"$1\"; dir2=\"$2\"; outfilename=\"${D1}vs${D2}-summary.png\";" ~/bin/summarize.gpl

View File

@ -0,0 +1,36 @@
#!/usr/bin/env escript -f
-mode(compile).
main([Dir1, Dir2]) ->
main([Dir1, Dir2, "false"]);
main([Dir1, Dir2, Regen]) ->
{ok, L10} = file:list_dir(Dir1),
{ok, L20} = file:list_dir(Dir2),
Len = erlang:min(length(L10), length(L20)),
{L1, _} = lists:split(Len, L10),
{L2, _} = lists:split(Len, L20),
CL = lists:zip(lists:usort(L1), lists:usort(L2)),
%%M =
lists:map(fun(X) ->
run_compare(X, Dir1, Dir2, Regen)
end,
CL);
% lists:foreach(fun(M1) ->
% io:format(M1)
% end, M);
main(_) ->
io:format("incorrect number of arguments\n").
run_compare({A, B}, DirA, DirB, Regen) ->
%%io:format("~p ~p ~p ~p ~p ~n", [DirA, A, DirB, B, Regen]),
O= os:cmd("./compare.sh "++DirA++"/"++A++" "++
DirB++"/"++B++" "++Regen),
io:format("~p ~n", [O]).
%%os:cmd("mv summary.png "++A++"vs"++B++"-sumary.png").

15
priv/reporting/report-2i.sh Executable file
View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
set -e
if [ ! -d "$1/" ]; then
echo "a directory must be specified"
exit 1
fi
if [ "x$2" == "xtrue" ]; then
(cd $1; rm -f *-digest; escript ~/bin/riak-digest.escript)
fi
D1=`basename "$1"`
#generate our comparison graph
gnuplot -e "dir1=\"$1\"; outfilename=\"${D1}-report.png\";" priv/reporting/summarize2i.gpl

15
priv/reporting/report.sh Executable file
View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
set -e
if [ ! -d "$1/" ]; then
echo "a directory must be specified"
exit 1
fi
if [ "x$2" == "xtrue" ]; then
(cd $1; rm -f *-digest; escript ../../priv/reporting/riak-digest.escript)
fi
D1=`basename "$1"`
#generate our comparison graph
gnuplot -e "dir1=\"$1\"; outfilename=\"${D1}-report.png\";" priv/reporting/summarize1.gpl

View File

@ -0,0 +1,187 @@
#!/usr/bin/env escript -f
-mode(compile).
read_file(File, Owner) ->
%%io:format("File: ~p~n", [File]),
Out =
case file:consult(File) of
{ok, F} -> F;
Else ->
io:format("Failure in ~p: ~p~n",
[File,Else]),
[]
end,
Owner ! Out.
main(_) ->
Start = now(),
Timer = fun(Slogan) ->
T = trunc(timer:now_diff(now(), Start)/1000),
io:format("~p ~p ~n", [Slogan, T])
end,
{ok, Files0} = file:list_dir("."),
Files = [File || File <- Files0,
lists:prefix("cstats-1", File)],
if Files =:= [] ->
error(no_files_found);
true -> ok
end,
%% io:format("Files: ~p ~p~n", [file:get_cwd(), Files]),
Main = self(),
[spawn(fun() -> read_file(Fn, Main) end)|| Fn <- Files],
Data00 = [receive L -> L end || _ <- Files],
Data0 = lists:filter(fun(L) -> length(L) /= 0 end, Data00),
Timer(consult_files),
Data1 = normalize_len(Data0),
Timer(normalize_lens),
%% lens(Data1),
Data = lists:map(fun winnow/1, Data1),
Timer(winnow),
%% lens(Data),
{Names, Avg} = avg_lists(Data),
Timer(average),
%%io:format("~p ~p~n", [Names, length(Avg)]),
{ok, Fd} = file:open("rstats-digest", [write]),
[file:write(Fd, io_lib:format("~s ", [atom_to_list(Name)]))
|| Name <- Names],
file:write(Fd, io_lib:format("~n", [])),
[begin
%%io:format("~p~n", [Item]),
[file:write(Fd, io_lib:format("~p ", [Val]))
|| Val <- Item],
file:write(Fd, io_lib:format("~n", []))
end
|| Item <- Avg],
file:close(Fd).
%% lens(L) ->
%% [io:format("~p ", [length(Ln)])
%% || Ln <- L],
%% io:format("~n").
normalize_len(L) ->
Lengths = lists:map(fun erlang:length/1, L),
Longest = lists:max(Lengths),
Shortest = lists:min(Lengths),
case (Longest - Shortest) < 4 of
true ->
trim_long(Shortest, lists:zip(Lengths, L), []);
false ->
discard_short(L)
end.
trim_long(_, [], Acc) ->
Acc;
trim_long(Len, [{L, List}|T], Acc) ->
io:format("~p ~p ", [Len, L]),
case L > Len of
true ->
io:format("trunc ~n"),
{List1, _} = lists:split(Len, List),
trim_long(Len, T, [List1| Acc]);
false ->
io:format("~n"),
trim_long(Len, T, [List| Acc])
end.
discard_short(L) ->
Longest = lists:max(lists:map(fun erlang:length/1, L)),
io:format("longest ~p ~n", [Longest]),
lists:filter(fun(X) ->
Len = length(X),
io:format("len ~p ~n", [Len]),
Len =:= Longest
end, L).
avg_lists(LoL) ->
avg_lists(LoL, []).
avg_lists(LoL, Acc) ->
HaTs = lists:map(fun([H|T]) -> {H, T}
end, LoL),
{Heads, Tails} = lists:unzip(HaTs),
[First|_] = Heads,
Names = [N || {N, _V} <- First],
Avgs = avg_items(Heads, Names),
Acc1 = [Avgs|Acc],
case lists:any(fun(X) -> X =:= [] end, Tails) of
true -> {Names, lists:reverse(Acc1)};
false -> avg_lists(Tails, Acc1)
end.
avg_items(L, Names) ->
%%io:format("~p~n", [length(L)]),
Dicts = lists:map(fun orddict:from_list/1, L),
[begin
Vals = lists:map(fun(D) ->
try orddict:fetch(Name, D)
catch _:_ -> 0
end
end,
Dicts),
case Name of
%% vnode gets and puts are a per-minute rolling window
index_fsm_create ->
(lists:sum(Vals)/length(Vals)) / 60;
index_fsm_create_error ->
(lists:sum(Vals)/length(Vals)) / 60;
vnode_gets ->
(lists:sum(Vals)/length(Vals)) / 60;
vnode_puts ->
(lists:sum(Vals)/length(Vals)) / 60;
node_gets ->
(lists:sum(Vals)/length(Vals)) / 60;
node_puts ->
(lists:sum(Vals)/length(Vals)) / 60;
_ ->
lists:sum(Vals)/length(Vals)
end
end
|| Name <- Names].
%% get rid of timestamps and slim down the stats glob
winnow(Data0) ->
[strip_stats(Glob)
|| Glob <- Data0,
is_list(Glob)].
strip_stats(Glob) ->
Filter = [
index_fsm_create, index_fsm_create_error,
node_gets, node_puts,
vnode_gets, vnode_puts,
node_get_fsm_time_median,
node_get_fsm_time_95,
node_get_fsm_time_99,
node_put_fsm_time_median,
node_put_fsm_time_95,
node_put_fsm_time_99,
message_queue_max,
cpu_utilization,
cpu_iowait,
memory_utilization,
memory_page_dirty,
memory_page_writeback,
dropped_vnode_requests_total,
node_get_fsm_objsize_median,
node_get_fsm_objsize_95,
node_get_fsm_objsize_99,
disk_utilization
],
[begin
{Name, Val}
end
|| {Name, Val} <- Glob,
lists:member(Name, Filter)].

View File

@ -0,0 +1,141 @@
#clean up the environment for interactive use
unset multiplot
reset
set terminal png font "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf,9" size 850,1100
set output outfilename
#set term x11 size 850, 1100
set multiplot
set grid
#set key below
set tics out
set lmargin 12
set rmargin 10
rd = dir1."/".'rstats-digest'
## graph system stats
set size .5, .315
set xtics 60
set yrange [0:100]
set y2range [0:100000]
set y2tics nomirror
set xlabel "seconds"
set ylabel "percentage"
set y2label "fault rate"
plot rd using "cpu_utilization" with lines, \
rd using "cpu_iowait" with lines, \
rd using "disk_utilization" with lines, \
rd using "memory_utilization" with lines, \
rd using "memory_page_dirty" with lines axes x1y2 smooth bezier
unset y2tics
unset y2range
## graph latencies
set origin 0, .315
set xtics 10
set yrange [500:150000]
set y2range [1000:1100000]
set y2tics nomirror
set xlabel "minutes"
set ylabel "usec"
unset y2label
plot rd using "node_get_fsm_time_median" with lines, \
rd using "node_put_fsm_time_median" with lines, \
rd using "node_put_fsm_time_95" with lines axes x1y2, \
rd using "node_put_fsm_time_95" with lines axes x1y2, \
rd using "node_put_fsm_time_99" with lines axes x1y2, \
rd using "node_get_fsm_time_95" with lines axes x1y2, \
rd using "node_get_fsm_time_99" with lines axes x1y2
unset y2tics
unset y2range
## graph ops performance
set origin 0, .63
#ymax = median(rd) + 1000
set yrange [0:50000]
set ylabel "operations per node"
plot rd using "vnode_gets" with lines, \
rd using "vnode_puts" with lines
rd = dir2."/".'rstats-digest'
## graph system stats
set origin .48, 0
set xtics 60
set yrange [0:100]
set y2range [0:100000]
set y2tics nomirror
set xlabel "seconds"
set ylabel "percentage"
set y2label "fault rate"
plot rd using "cpu_utilization" with lines, \
rd using "cpu_iowait" with lines, \
rd using "disk_utilization" with lines, \
rd using "memory_utilization" with lines, \
rd using "memory_page_dirty" with lines axes x1y2 smooth bezier
unset y2tics
unset y2range
## graph latencies
set origin .48, .315
set yrange [500:150000]
set y2range [1000:1100000]
set y2tics nomirror
set xtics 10
set xlabel "minutes"
set ylabel "usec"
unset y2label
plot rd using "node_get_fsm_time_median" with lines, \
rd using "node_put_fsm_time_median" with lines, \
rd using "node_put_fsm_time_95" with lines axes x1y2, \
rd using "node_put_fsm_time_95" with lines axes x1y2, \
rd using "node_put_fsm_time_99" with lines axes x1y2, \
rd using "node_get_fsm_time_95" with lines axes x1y2, \
rd using "node_get_fsm_time_99" with lines axes x1y2
unset y2tics
unset y2range
## graph ops performance
set origin .48, .63
set yrange [0:50000]
set ylabel "operations per node"
#hack to set the title for the whole graph
set label dir1 at screen 0.5,0.98 center front
set label "vs." at screen 0.5,0.97 center front
set label dir2 at screen 0.5,0.96 center front
plot rd using "vnode_gets" with lines, \
rd using "vnode_puts" with lines
unset multiplot
reset

View File

@ -0,0 +1,116 @@
#clean up the environment for interactive use
unset multiplot
reset
set terminal png font "/Library/Fonts/Arial.ttf" 8
set output outfilename
#set term x11 size 850, 1100
set multiplot
set grid
#set key below
set tics out
set lmargin 12
set rmargin 10
rd = dir1."/".'rstats-digest'
## graph system stats
set size .5, .315
## graph latencies
set origin 0, .315
#set xtics 10
#set yrange [500:150000]
#set y2range [1000:1100000]
#set y2tics nomirror
set xlabel "5 second intervals"
set ylabel "usec"
unset y2label
plot rd using "node_put_fsm_time_95" with lines, \
rd using "node_put_fsm_time_99" with lines, \
rd using "node_get_fsm_time_95" with lines, \
rd using "node_get_fsm_time_99" with lines
unset y2tics
unset y2range
## graph ops performance
set origin 0, .63
#ymax = median(rd) + 1000
#set yrange [0:50000]
set ylabel "operations per node"
plot rd using "vnode_gets" with lines, \
rd using "vnode_puts" with lines
## graph system stats
set origin .48, 0
set xlabel "5 second intervals"
set y2tics nomirror
plot rd using "memory_page_dirty" with lines, \
rd using "memory_page_writeback" with lines axis x1y2
unset y2tics
unset y2range
## graph latencies
set origin .48, .315
#set yrange [500:150000]
#set y2range [1000:1100000]
#set y2tics nomirror
#set xtics 10
set xlabel "5 second intervals"
set ylabel "usec"
unset y2label
plot rd using "node_get_fsm_time_median" with lines, \
rd using "node_put_fsm_time_median" with lines
unset y2tics
unset y2range
## graph ops performance
set origin .48, .63
#set yrange [0:50000]
set ylabel "operations per node"
#hack to set the title for the whole graph
set label dir1 at screen 0.5,0.97 center front
plot rd using "message_queue_max" with lines, \
rd using "dropped_vnode_requests_total" with lines
set origin 0, 0
#set xtics 60
set yrange [0:100]
set xlabel "5 second intervals"
set ylabel "percentage"
plot rd using "cpu_utilization" with lines, \
rd using "cpu_iowait" with lines, \
rd using "disk_utilization" with lines, \
rd using "memory_utilization" with lines
unset yrange
unset y2tics
unset multiplot
reset

View File

@ -0,0 +1,119 @@
#clean up the environment for interactive use
unset multiplot
reset
set terminal png font "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf,9" size 850,1100
set output outfilename
#set term x11 size 850, 1100
set multiplot
set grid
#set key below
set tics out
set lmargin 12
set rmargin 10
rd = dir1."/".'rstats-digest'
## graph system stats
set size .5, .315
## graph latencies
set origin 0, .315
#set xtics 10
#set yrange [500:150000]
#set y2range [1000:1100000]
#set y2tics nomirror
set xlabel "5 second intervals"
set ylabel "usec"
unset y2label
plot rd using "node_put_fsm_time_95" with lines, \
rd using "node_put_fsm_time_99" with lines, \
rd using "node_get_fsm_time_95" with lines, \
rd using "node_get_fsm_time_99" with lines
unset y2tics
unset y2range
## graph ops performance
set origin 0, .63
#ymax = median(rd) + 1000
#set yrange [0:50000]
set ylabel "operations per node"
set y2label "errors"
set y2tics nomirror
plot rd using "index_fsm_create" with lines, \
rd using "vnode_gets" with lines axis x1y2, \
rd using "vnode_puts" with lines axis x1y2
## graph system stats
set origin .48, 0
set xlabel "5 second intervals"
set y2tics nomirror
plot rd using "memory_page_dirty" with lines, \
rd using "memory_page_writeback" with lines axis x1y2
unset y2tics
unset y2range
## graph latencies
set origin .48, .315
#set yrange [500:150000]
#set y2range [1000:1100000]
#set y2tics nomirror
#set xtics 10
set xlabel "5 second intervals"
set ylabel "usec"
unset y2label
plot rd using "node_get_fsm_time_median" with lines, \
rd using "node_put_fsm_time_median" with lines
unset y2tics
unset y2range
## graph ops performance
set origin .48, .63
#set yrange [0:50000]
set ylabel "operations per node"
#hack to set the title for the whole graph
set label dir1 at screen 0.5,0.97 center front
plot rd using "message_queue_max" with lines, \
rd using "dropped_vnode_requests_total" with lines
set origin 0, 0
#set xtics 60
set yrange [0:100]
set xlabel "5 second intervals"
set ylabel "percentage"
plot rd using "cpu_utilization" with lines, \
rd using "cpu_iowait" with lines, \
rd using "disk_utilization" with lines, \
rd using "memory_utilization" with lines
unset yrange
unset y2tics
unset multiplot
reset

View File

@ -4,7 +4,7 @@
%%{edoc_opts, [{doclet, edown_doclet}, {pretty_printer, erl_pp}]}. %%{edoc_opts, [{doclet, edown_doclet}, {pretty_printer, erl_pp}]}.
%%{edoc_opts, [{doclet, my_layout}, {pretty_printer, erl_pp}]}. %%{edoc_opts, [{doclet, my_layout}, {pretty_printer, erl_pp}]}.
%%{edoc_opts, [{layout, my_layout}, {file_suffix, ".xml"}, {pretty_printer, erl_pp}]}. %%{edoc_opts, [{layout, my_layout}, {file_suffix, ".xml"}, {pretty_printer, erl_pp}]}.
{erl_opts, [{src_dirs, [src, intercepts]}, {erl_opts, [{src_dirs, [src, intercepts, perf]},
warnings_as_errors, {parse_transform, lager_transform}]}. warnings_as_errors, {parse_transform, lager_transform}]}.
{erl_first_files, ["src/rt_intercept_pt.erl"]}. {erl_first_files, ["src/rt_intercept_pt.erl"]}.
@ -24,6 +24,6 @@
{plugin_dir, "src"}. {plugin_dir, "src"}.
{plugins, [rebar_riak_test_plugin]}. {plugins, [rebar_riak_test_plugin]}.
{riak_test, [ {riak_test, [
{test_paths, ["tests"]}, {test_paths, ["tests", "perf"]},
{test_output, "ebin"} {test_output, "ebin"}
]}. ]}.

1
report-2i.sh Symbolic link
View File

@ -0,0 +1 @@
priv/reporting/report-2i.sh

1
report.sh Symbolic link
View File

@ -0,0 +1 @@
priv/reporting/report.sh

5
results/.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
# Ignore everything in this directory
*
# Except this file
!.gitignore

3
run-perf-test.sh Normal file
View File

@ -0,0 +1,3 @@
#!/bin/sh
cp examples/riak_test.config.perf ~/.riak_test.config && make clean && make && ./riak_test -c rtperf -t get_put -v

564
src/observer.erl Normal file
View File

@ -0,0 +1,564 @@
-module(observer).
-compile(export_all).
-record(history, {network,
disk,
rate,
nodes,
lvlref,
collector_sock}).
-record(watcher, {nodes,
acceptor,
collector,
probes}).
%% See: https://www.kernel.org/doc/Documentation/iostats.txt
-record(disk, {read,
read_merged,
read_sectors,
read_wait_ms,
write,
write_merged,
write_sectors,
write_wait_ms,
io_pending,
io_wait_ms,
io_wait_weighted_ms}).
watch(Nodes, Collector) ->
Pid = spawn(?MODULE, watcher, [self(), Nodes, Collector]),
%%start(self(), 1000, Collector, Nodes, ping),
Pid.
watcher(Master, Nodes, {_Host, Port, _Dir} = Collector) ->
case gen_tcp:listen(Port, [{active, false}, binary,
{packet, 2}]) of
{ok, LSock} ->
Acceptor = spawn(?MODULE, lloop, [self(), LSock]),
monitor(process, Master),
Probes = [{Node, undefined} || Node <- Nodes],
W = #watcher{nodes=Nodes,
acceptor={Acceptor, LSock},
collector=Collector,
probes=Probes},
watcher_loop(W);
{error, eaddrinuse} ->
timer:sleep(100),
watcher(Master, Nodes, Collector)
%% case_clause other errors
end.
lloop(Master, LSock) ->
case gen_tcp:accept(LSock) of
{ok, Sock} ->
ok = gen_tcp:controlling_process(Sock, Master),
inet:setopts(Sock, [{active, once}]),
lloop(Master, LSock);
_ ->
ok
end.
watcher_loop(W=#watcher{probes=Probes,
acceptor={Acceptor,LSock},
collector={_,_,Dir}}) ->
Missing = [Node || {Node, undefined} <- Probes],
%% io:format("Missing: ~p~n", [Missing]),
W2 = install_probes(Missing, W),
Probes2 = W2#watcher.probes,
receive
{'DOWN', MRef, process, _, Reason} ->
case lists:keyfind(MRef, 2, Probes2) of
false ->
%% master died, exit
io:format("watcher exiting~n"),
ok;
{Node, MRef} ->
io:format("Probe exit. ~p: ~p~n", [Node, Reason]),
Probes3 = lists:keyreplace(Node, 1, Probes2, {Node, undefined}),
W3 = W2#watcher{probes=Probes3},
?MODULE:watcher_loop(W3)
end;
{tcp, Sock, Msg} ->
inet:setopts(Sock, [{active, once}]),
case catch binary_to_term(Msg) of
Stats when is_list(Stats) ->
case get(Sock) of
undefined ->
{ok, {Addr, _Port}} = inet:peername(Sock),
SAddr = inet:ntoa(Addr),
%% not sure that we want to just blindly append...
{ok, FD} = file:open(Dir++"/cstats-"++SAddr, [append]),
dump_stats(Stats, FD),
put(Sock, FD);
FD ->
dump_stats(Stats, FD)
end
end,
?MODULE:watcher_loop(W2);
stop ->
exit(Acceptor),
gen_tcp:close(LSock),
[begin
file:close(FD),
gen_tcp:close(Sock)
end
|| {Sock, FD} <- get()]
end.
install_probes([], W) ->
W;
install_probes(Nodes, W=#watcher{collector=Collector, nodes=AllNodes, probes=Probes}) ->
load_modules_on_nodes([?MODULE], Nodes),
R = rpc:multicall(Nodes, ?MODULE, start, [self(), 5000, Collector, AllNodes, collect]),
{Pids, Down} = R,
Probes2 = lists:foldl(fun({Node, Pid}, Acc) ->
if is_pid(Pid) ->
lists:keystore(Node, 1, Acc, {Node, monitor(process, Pid)});
true ->
Acc
end
end, Probes, Pids),
Probes3 = lists:foldl(fun(Node, Acc) ->
lists:keystore(Node, 1, Acc, {Node, undefined})
end, Probes2, Down),
W#watcher{probes=Probes3}.
start(Master, Rate, Collector, Nodes, Fun) ->
lager:info("In start: ~p~n", [node()]),
Pid = spawn(?MODULE, init, [Master, Rate, Collector, Nodes, Fun]),
{node(), Pid}.
init(Master, Rate, {Host, Port, _Dir}, Nodes, Fun) ->
lager:info("In init: ~p ~p~n", [node(), Host]),
{ok, Sock} = gen_tcp:connect(Host, Port,
[binary, {packet, 2},
{send_timeout, 500}]),
case application:get_env(riak_kv, storage_backend) of
{ok, riak_kv_eleveldb_backend} ->
LRef = get_leveldb_ref();
_ ->
LRef = undefined
end,
H = #history{network=undefined,
%% disk=undefined,
disk=[],
rate=Rate div 5000,
lvlref=LRef,
nodes=Nodes,
collector_sock=Sock},
monitor(process, Master),
loop(Fun, Rate, H).
loop(Fun, Rate, H) ->
%% io:format("loop: ~p~n", [node()]),
NewH = ?MODULE:Fun(H),
receive
{'DOWN', _, process, _, _} ->
%%io:format("shutting: ~p~n", [node()]),
ok
after Rate ->
?MODULE:loop(Fun, Rate, NewH)
end.
%% fix this later
%% ping(H=#history{nodes=Nodes}) ->
%% TS = timestamp(),
%% XNodes = lists:zip(lists:seq(1, length(Nodes)), Nodes),
%% pmap(fun({X,Node}) ->
%% case net_adm:ping(Node) of
%% pang ->
%% notify_down(TS, X, Node, H),
%% ok;
%% pong ->
%% case rpc:call(Node, riak_core_node_watcher, services, [Node]) of
%% L when is_list(L) ->
%% case lists:member(riak_kv, L) of
%% true ->
%% ok;
%% false ->
%% notify_down(TS, X, Node, H)
%% end;
%% _ ->
%% notify_down(TS, X, Node, H)
%% end;
%% _ ->
%% ok
%% end
%% end, XNodes),
%% H.
%% notify_down(TS, X, Node, H) ->
%% %% emit_stat(Stat, TS, Value, H) ->
%% NodeBin = atom_to_binary(Node, utf8),
%% Metric = <<"offline_nodes/", NodeBin/binary>>,
%% emit_stat2(Metric, TS, X, H).
collect(H0) ->
{H, L} = report_leveldb(H0),
{_, Q} = report_queues(H),
{_, P} = report_processes(H),
{H2, N} = report_network(H),
DiskList =
case get(disks) of
undefined ->
Disks = determine_disks(),
put(disks, Disks),
Disks;
Disks ->
Disks
end,
{H3, D} = report_disk2(DiskList, H2),
{_, V} = report_vmstat(H3),
{_, M} = report_memory(H3),
C = report_stats(riak_core_stat, all),
R = report_stats(riak_kv_stat, all),
Stats0 = L ++ Q ++ P ++ N ++ D ++ V ++ M ++ C ++ R,
Stats = term_to_binary(Stats0),
%% catch TCP errors here
case gen_tcp:send(H3#history.collector_sock, Stats) of
ok -> ok;
%% die on any error, we'll get restarted soon.
{error, _} ->
gen_tcp:close(H3#history.collector_sock),
error(splode)
end,
H3.
%% this portion is meant to be run inside a VM instance running riak
determine_disks() ->
DataDir =
case application:get_env(riak_kv, storage_backend) of
{ok, riak_kv_bitcask_backend} ->
{ok, Dir} = application:get_env(bitcask, data_root),
Dir;
{ok, riak_kv_eleveldb_backend} ->
{ok, Dir} = application:get_env(eleveldb, data_root),
Dir;
_ ->
error(unhandled_backend)
end,
Name0 = os:cmd("basename `df "++DataDir++
" | tail -1 | awk '{print $1}'`"),
{Name, _} = lists:split(length(Name0)-1, Name0),
%% keep the old format just in case we need to extend this later.
[{Name, Name}].
report_queues(H) ->
Max = lists:max([Len || Pid <- processes(),
{message_queue_len, Len} <- [process_info(Pid, message_queue_len)]]),
{H, [{message_queue_max, Max}]}.
report_processes(H) ->
Procs = erlang:system_info(process_count),
{H, [{erlang_processes, Procs}]}.
report_network(H=#history{network=LastStats, rate=Rate}) ->
{RX, TX} = get_network(),
Report =
case LastStats of
undefined ->
[];
{LastRX, LastTX} ->
RXRate = net_rate(LastRX, RX) div Rate,
TXRate = net_rate(LastTX, TX) div Rate,
[{net_rx, RXRate},
{net_tx, TXRate}]
end,
{H#history{network={RX, TX}}, Report}.
report_disk2(Disks, H=#history{disk=DiskStats}) ->
{NewStats, NewReport} =
lists:foldl(fun({Name, Dev}, {OrdAcc, LstAcc}) ->
LastStats = case orddict:find(Dev, DiskStats) of
error ->
undefined;
{ok, LS} ->
LS
end,
{Stats, Report} = report_disk2(Name, Dev, LastStats, H),
{orddict:store(Dev, Stats, OrdAcc),
LstAcc ++ Report}
end, {DiskStats, []}, Disks),
{H#history{disk=NewStats}, NewReport}.
report_disk2(_Name, Dev, LastStats, #history{rate=Rate}) ->
Stats = get_disk2(Dev),
Report =
case LastStats of
undefined ->
[];
_ ->
ReadRate = disk_rate(#disk.read_sectors, LastStats, Stats) div Rate,
WriteRate = disk_rate(#disk.write_sectors, LastStats, Stats) div Rate,
{AwaitR, AwaitW} = disk_await(LastStats, Stats),
Svctime = disk_svctime(LastStats, Stats),
QueueLen = disk_qlength(LastStats, Stats),
Util = disk_util(LastStats, Stats),
[{disk_read, ReadRate},
{disk_write, WriteRate},
{disk_await_r, AwaitR},
{disk_await_w, AwaitW},
{disk_svctime, Svctime},
{disk_queue_size, QueueLen},
{disk_utilization, Util}]
end,
{Stats, Report}.
append_atoms(Atom, List) ->
list_to_atom(List ++
"_" ++ atom_to_list(Atom)).
report_memory(H) ->
Stats = get_memory(),
Util = memory_util(Stats),
Dirty = memory_dirty(Stats),
Writeback = memory_writeback(Stats),
{H, [{memory_utilization, Util},
{memory_page_dirty, Dirty},
{memory_page_writeback, Writeback}]}.
report_leveldb(H = #history{ lvlref = undefined }) ->
{H, []};
report_leveldb(H = #history{ lvlref = LRef }) ->
try case eleveldb:status(LRef, <<"leveldb.ThrottleGauge">>) of
{ok, Result} ->
Value = list_to_integer(Result),
{H, [{leveldb_write_throttle, Value}]};
_ ->
{H, []}
end
catch
_:_ ->
LRef2 = get_leveldb_ref(),
{H#history{lvlref=LRef2}, []}
end.
net_rate(Bytes1, Bytes2) ->
(Bytes2 - Bytes1) div 1024.
disk_rate(I, Stats1, Stats2) ->
disk_rate(element(I, Stats1), element(I, Stats2)).
disk_rate(Sectors1, Sectors2) ->
%% 512-byte sectors
(Sectors2 - Sectors1) div 2.
disk_await(S1, S2) ->
NumR = erlang:max(S2#disk.read - S1#disk.read, 1),
NumW = erlang:max(S2#disk.write - S1#disk.write, 1),
AwaitR = (S2#disk.read_wait_ms - S1#disk.read_wait_ms) div NumR,
AwaitW = (S2#disk.write_wait_ms - S1#disk.write_wait_ms) div NumW,
{AwaitR, AwaitW}.
disk_svctime(S1, S2) ->
NumR = S2#disk.read - S1#disk.read,
NumW = S2#disk.write - S1#disk.write,
NumIO = erlang:max(NumR + NumW, 1),
Wait = S2#disk.io_wait_ms - S1#disk.io_wait_ms,
Wait div NumIO.
disk_util(S1, S2) ->
Wait = S2#disk.io_wait_ms - S1#disk.io_wait_ms,
Wait * 100 div 5000. %% Really should be div Rate
disk_qlength(S1, S2) ->
(S2#disk.io_wait_weighted_ms - S1#disk.io_wait_weighted_ms) div 5000.
filter(L, Pos, Val) ->
[T || T <- L,
element(Pos, T) /= Val].
message_queues([], _Threshold, _VNodeMap, Queues) ->
lists:reverse(lists:keysort(1, Queues));
message_queues([Pid|Pids], Threshold, VNodeMap, Queues) ->
case process_info(Pid, [message_queue_len, registered_name]) of
[{message_queue_len, Len},
{registered_name, RegName}] when Len > Threshold ->
Entry = {Len, pid_name(Pid, RegName, VNodeMap)},
message_queues(Pids, Threshold, VNodeMap, [Entry|Queues]);
_ ->
message_queues(Pids, Threshold, VNodeMap, Queues)
end.
get_network() ->
{ok, RX} = file:read_file("/sys/class/net/eth0/statistics/rx_bytes"),
{ok, TX} = file:read_file("/sys/class/net/eth0/statistics/tx_bytes"),
{to_integer(RX), to_integer(TX)}.
get_disk2() ->
{ok, Bin} = file:read_file("/sys/block/md127/stat"),
%% {ok, Bin} = file:read_file("/sys/block/dm-0/stat"),
Stats = parse_disk_stats(Bin),
Stats.
get_disk2(Dev) ->
{ok, Bin} = file:read_file("/sys/block/" ++ Dev ++ "/stat"),
Stats = parse_disk_stats(Bin),
Stats.
%% get_disk() ->
%% {ok, Bin} = file:read_file("/sys/block/md127/stat"),
%% Stats = parse_disk_stats(Bin),
%% {Stats#disk.read_sectors, Stats#disk.write_sectors}.
memory_util(Mem) ->
Stat = fun(Key) ->
list_to_integer(element(2, lists:keyfind(Key, 1, Mem)))
end,
Total = Stat("MemTotal:"),
Free = Stat("MemFree:"),
Buffers = Stat("Buffers:"),
Cached = Stat("Cached:"),
(Total - Free - Buffers - Cached) * 100 div Total.
memory_dirty(Mem) ->
{_, Dirty} = lists:keyfind("Dirty:", 1, Mem),
list_to_integer(Dirty).
memory_writeback(Mem) ->
{_, Writeback} = lists:keyfind("Writeback:", 1, Mem),
list_to_integer(Writeback).
get_memory() ->
S = os:cmd("cat /proc/meminfo"),
[case string:tokens(L," ") of
[Key, Value, _] ->
{Key, Value};
[Key, Value] ->
{Key, Value};
_ ->
ignore
end || L <- string:tokens(S, "\n")].
parse_disk_stats(Bin) ->
[Line|_] = binary:split(Bin, <<"\n">>),
Fields = string:tokens(binary_to_list(Line), " "),
Fields2 = [list_to_integer(Field) || Field <- Fields],
list_to_tuple([disk|Fields2]).
to_integer(Bin) ->
[Line|_] = binary:split(Bin, <<"\n">>),
list_to_integer(binary_to_list(Line)).
pid_name(Pid, [], VNodeMap) ->
case dict:find(Pid, VNodeMap) of
{ok, VNode} ->
VNode;
_ ->
Pid
end;
pid_name(_Pid, RegName, _VNodeMap) ->
RegName.
report_stats(Mod, Keys) ->
Stats = Mod:get_stats(),
case Keys of
all ->
Stats;
_ ->
lists:filter(fun({Key, _Value}) ->
lists:member(Key, Keys)
end, Stats)
end.
%%%===================================================================
%%% Utility functions
%%%===================================================================
pmap(F, L) ->
Parent = self(),
lists:mapfoldl(
fun(X, N) ->
Pid = spawn(fun() ->
Parent ! {pmap, N, F(X)}
end),
{Pid, N+1}
end, 0, L),
L2 = [receive {pmap, N, R} -> {N,R} end || _ <- L],
[R || {_, R} <- lists:keysort(1, L2)].
load_modules_on_nodes(Modules, Nodes) ->
[case code:get_object_code(Module) of
{Module, Bin, File} ->
%% rpc:multicall(Nodes, code, purge, [Module]),
{Ret, []} = rpc:multicall(Nodes, code,
load_binary, [Module, File, Bin]),
[{module, observer}] = lists:usort(Ret);
error ->
error({no_object_code, Module})
end || Module <- Modules].
get_leveldb_ref() ->
VNodes = riak_core_vnode_manager:all_vnodes(riak_kv_vnode),
{_, _, Pid} = hd(VNodes),
State = get_state(Pid),
ModState = element(4, State),
case element(3,ModState) of
riak_kv_eleveldb_backend ->
LvlState = element(4, ModState),
element(2, LvlState);
_ ->
undefined
end.
get_state(Pid) ->
{status, Pid, _Mod, Status} = sys:get_status(Pid),
Status2 = lists:flatten(Status),
Status3 = [L || {data, L} <- Status2],
Status4 = lists:flatten(Status3),
State = proplists:get_value("StateData", Status4),
State.
dump_stats(Stats, FD) ->
Out = io_lib:format("~p.~n ~p.~n",
[os:timestamp(), Stats]),
ok = file:write(FD, Out),
%% not sure this is required.
file:sync(FD).
-record(vmstat, {procs_r,
procs_b,
mem_swpd,
mem_free,
mem_buff,
mem_cache,
swap_si,
swap_so,
io_bi,
io_bo,
system_in,
system_cs,
cpu_us,
cpu_sy,
cpu_id,
cpu_wa}).
report_vmstat(H) ->
Result = os:cmd("vmstat 1 2"),
Lines = string:tokens(Result, "\n"),
Last = hd(lists:reverse(Lines)),
Report =
case parse_vmstat(Last) of
undefined ->
[];
VM = #vmstat{} ->
[{cpu_utilization, 100 - VM#vmstat.cpu_id},
{cpu_iowait, VM#vmstat.cpu_wa},
{memory_swap_in, VM#vmstat.swap_si},
{memory_swap_out, VM#vmstat.swap_so}]
end,
{H, Report}.
parse_vmstat(Line) ->
Values = string:tokens(Line, " "),
try
Fields = [list_to_integer(Field) || Field <- Values],
list_to_tuple([vmstat|Fields])
catch
_:_ ->
undefined
end.

View File

@ -80,6 +80,7 @@ main(Args) ->
application:start(ibrowse), application:start(ibrowse),
%% Start Lager %% Start Lager
application:load(lager), application:load(lager),
Config = proplists:get_value(config, ParsedArgs), Config = proplists:get_value(config, ParsedArgs),
ConfigFile = proplists:get_value(file, ParsedArgs), ConfigFile = proplists:get_value(file, ParsedArgs),
@ -96,7 +97,7 @@ main(Args) ->
%% Ensure existance of scratch_dir %% Ensure existance of scratch_dir
case file:make_dir(rt_config:get(rt_scratch_dir)) of case file:make_dir(rt_config:get(rt_scratch_dir)) of
ok -> great; ok -> great;
{eexist, _} -> great; {error, eexist} -> great;
{ErrorType, ErrorReason} -> lager:error("Could not create scratch dir, {~p, ~p}", [ErrorType, ErrorReason]) {ErrorType, ErrorReason} -> lager:error("Could not create scratch dir, {~p, ~p}", [ErrorType, ErrorReason])
end, end,
@ -110,8 +111,8 @@ main(Args) ->
end, end,
application:set_env(lager, handlers, [{lager_console_backend, ConsoleLagerLevel}, application:set_env(lager, handlers, [{lager_console_backend, ConsoleLagerLevel},
{lager_file_backend, [{file, "log/test.log"}, {lager_file_backend, [{file, "log/test.log"},
{level, ConsoleLagerLevel}]}]), {level, ConsoleLagerLevel}]}]),
lager:start(), lager:start(),
%% Report %% Report
@ -285,9 +286,10 @@ is_runnable_test({TestModule, _}) ->
code:ensure_loaded(Mod), code:ensure_loaded(Mod),
erlang:function_exported(Mod, Fun, 0). erlang:function_exported(Mod, Fun, 0).
run_test(Test, Outdir, TestMetaData, Report, _HarnessArgs, NumTests) -> run_test(Test, Outdir, TestMetaData, Report, HarnessArgs, NumTests) ->
rt_cover:maybe_start(Test), rt_cover:maybe_start(Test),
SingleTestResult = riak_test_runner:confirm(Test, Outdir, TestMetaData), SingleTestResult = riak_test_runner:confirm(Test, Outdir, TestMetaData,
HarnessArgs),
CoverDir = rt_config:get(cover_output, "coverage"), CoverDir = rt_config:get(cover_output, "coverage"),
case NumTests of case NumTests of
1 -> keep_them_up; 1 -> keep_them_up;

View File

@ -20,7 +20,7 @@
%% @doc riak_test_runner runs a riak_test module's run/0 function. %% @doc riak_test_runner runs a riak_test module's run/0 function.
-module(riak_test_runner). -module(riak_test_runner).
-export([confirm/3, metadata/0, metadata/1, function_name/1]). -export([confirm/4, metadata/0, metadata/1, function_name/1]).
%% Need to export to use with `spawn_link'. %% Need to export to use with `spawn_link'.
-export([return_to_exit/3]). -export([return_to_exit/3]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -39,12 +39,12 @@ metadata(Pid) ->
{metadata, TestMeta} -> TestMeta {metadata, TestMeta} -> TestMeta
end. end.
-spec(confirm(integer(), atom(), [{atom(), term()}]) -> [tuple()]). -spec(confirm(integer(), atom(), [{atom(), term()}], list()) -> [tuple()]).
%% @doc Runs a module's run/0 function after setting up a log capturing backend for lager. %% @doc Runs a module's run/0 function after setting up a log capturing backend for lager.
%% It then cleans up that backend and returns the logs as part of the return proplist. %% It then cleans up that backend and returns the logs as part of the return proplist.
confirm(TestModule, Outdir, TestMetaData) -> confirm(TestModule, Outdir, TestMetaData, HarnessArgs) ->
start_lager_backend(TestModule, Outdir), start_lager_backend(TestModule, Outdir),
rt:setup_harness(TestModule, []), rt:setup_harness(TestModule, HarnessArgs),
BackendExtras = case proplists:get_value(multi_config, TestMetaData) of BackendExtras = case proplists:get_value(multi_config, TestMetaData) of
undefined -> []; undefined -> [];
Value -> [{multi_config, Value}] Value -> [{multi_config, Value}]

View File

@ -79,6 +79,7 @@
log_to_nodes/2, log_to_nodes/2,
log_to_nodes/3, log_to_nodes/3,
members_according_to/1, members_according_to/1,
nearest_ringsize/1,
owners_according_to/1, owners_according_to/1,
partition/2, partition/2,
partitions_for_node/1, partitions_for_node/1,
@ -106,6 +107,7 @@
setup_harness/2, setup_harness/2,
setup_log_capture/1, setup_log_capture/1,
slow_upgrade/3, slow_upgrade/3,
stream_cmd/1, stream_cmd/2,
spawn_cmd/1, spawn_cmd/1,
spawn_cmd/2, spawn_cmd/2,
search_cmd/2, search_cmd/2,
@ -307,11 +309,12 @@ deploy_nodes(Versions, Services) ->
NodeConfig = [ version_to_config(Version) || Version <- Versions ], NodeConfig = [ version_to_config(Version) || Version <- Versions ],
Nodes = ?HARNESS:deploy_nodes(NodeConfig), Nodes = ?HARNESS:deploy_nodes(NodeConfig),
lager:info("Waiting for services ~p to start on ~p.", [Services, Nodes]), lager:info("Waiting for services ~p to start on ~p.", [Services, Nodes]),
[ ok = wait_for_service(Node, Service) || Node <- Nodes, Service <- Services ], [ ok = wait_for_service(Node, Service) || Node <- Nodes,
Service <- Services ],
Nodes. Nodes.
version_to_config({_, _}=Config) -> Config; version_to_config(Config) when is_tuple(Config)-> Config;
version_to_config(Version) -> {Version, default}. version_to_config(Version) when is_list(Version) -> {Version, default}.
deploy_clusters(Settings) -> deploy_clusters(Settings) ->
ClusterConfigs = [case Setting of ClusterConfigs = [case Setting of
@ -500,6 +503,47 @@ cmd(Cmd) ->
cmd(Cmd, Opts) -> cmd(Cmd, Opts) ->
?HARNESS:cmd(Cmd, Opts). ?HARNESS:cmd(Cmd, Opts).
%% @doc pretty much the same as os:cmd/1 but it will stream the output to lager.
%% If you're running a long running command, it will dump the output
%% once per second, as to not create the impression that nothing is happening.
-spec stream_cmd(string()) -> {integer(), string()}.
stream_cmd(Cmd) ->
Port = open_port({spawn, binary_to_list(iolist_to_binary(Cmd))}, [stream, stderr_to_stdout, exit_status]),
stream_cmd_loop(Port, "", "", now()).
%% @doc same as rt:stream_cmd/1, but with options, like open_port/2
-spec stream_cmd(string(), string()) -> {integer(), string()}.
stream_cmd(Cmd, Opts) ->
Port = open_port({spawn, binary_to_list(iolist_to_binary(Cmd))}, [stream, stderr_to_stdout, exit_status] ++ Opts),
stream_cmd_loop(Port, "", "", now()).
stream_cmd_loop(Port, Buffer, NewLineBuffer, Time={_MegaSecs, Secs, _MicroSecs}) ->
receive
{Port, {data, Data}} ->
{_, Now, _} = now(),
NewNewLineBuffer = case Now > Secs of
true ->
lager:info(NewLineBuffer),
"";
_ ->
NewLineBuffer
end,
case rt:str(Data, "\n") of
true ->
lager:info(NewNewLineBuffer),
Tokens = string:tokens(Data, "\n"),
[ lager:info(Token) || Token <- Tokens ],
stream_cmd_loop(Port, Buffer ++ NewNewLineBuffer ++ Data, "", Time);
_ ->
stream_cmd_loop(Port, Buffer, NewNewLineBuffer ++ Data, now())
end;
{Port, {exit_status, Status}} ->
catch port_close(Port),
{Status, Buffer}
after rt:config(rt_max_wait_time) ->
{-1, Buffer}
end.
%%%=================================================================== %%%===================================================================
%%% Remote code management %%% Remote code management
%%%=================================================================== %%%===================================================================
@ -956,6 +1000,20 @@ members_according_to(Node) ->
BadRpc BadRpc
end. end.
%% @doc Return an appropriate ringsize for the node count passed
%% in. 24 is the number of cores on the bigger intel machines, but this
%% may be too large for the single-chip machines.
nearest_ringsize(Count) ->
nearest_ringsize(Count * 24, 2).
nearest_ringsize(Count, Power) ->
case Count < trunc(Power * 0.9) of
true ->
Power;
false ->
nearest_ringsize(Count, Power * 2)
end.
%% @doc Return the cluster status of `Member' according to the ring %% @doc Return the cluster status of `Member' according to the ring
%% retrieved from `Node'. %% retrieved from `Node'.
status_of_according_to(Member, Node) -> status_of_according_to(Member, Node) ->

157
src/rt_bench.erl Normal file
View File

@ -0,0 +1,157 @@
-module(rt_bench).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-define(ESCRIPT, rt_config:get(basho_bench_escript)).
seq_state_dir() ->
rt_config:get(basho_bench_statedir).
bench(Config, NodeList, TestName) ->
bench(Config, NodeList, TestName, 1).
bench(Config, NodeList, TestName, Runners) ->
bench(Config, NodeList, TestName, Runners, false).
bench(Config, NodeList, TestName, Runners, Drop) ->
lager:info("Starting basho_bench run"),
LoadGens = rt_config:get(perf_loadgens, ["localhost"]),
case Drop of
true ->
Fun = fun(Node) ->
R = rtssh:ssh_cmd(Node, "sudo ~/bin/drop_caches.sh"),
lager:info("Dropped cache for node: ~p ret: ~p",
[Node, R])
end,
rt:pmap(Fun, NodeList);
_ -> ok
end,
%% make a local config file, to be copied to a remote
%% loadgen. They're named separately because for simplicity, we
%% use network operations even for local load generation
%% remote tempdir name
BBTmp = "/tmp/basho_bench_tmp"++os:getpid(),
%% local staging version
BBTmpStage = BBTmp ++ "_stage/",
ok = filelib:ensure_dir(BBTmpStage),
Filename = TestName++"-rt.config",
ConfigPath = BBTmpStage++"/"++Filename,
RemotePath = BBTmp++"/"++Filename,
[file:write_file(ConfigPath,io_lib:fwrite("~p.\n",[C]),
[append])
|| C <- Config],
BBDir = rt_config:get(basho_bench),
GenList =
[begin
G = lists:nth(C, LoadGens),
{G, C}
end
|| C <- lists:seq(1, Runners)],
F = fun({LG, N}, Owner) ->
try
Num = integer_to_list(N),
{0, _} = rtssh:ssh_cmd(LG, "mkdir -p "++BBTmp),
%% don't care much if we fail here
rtssh:ssh_cmd(LG, "rm -r " ++ seq_state_dir()),
{0, _} = rtssh:scp_to(LG, ConfigPath, BBTmp),
%% run basho bench on the remote loadgen,
%% specifying the remote testdir and the newly
%% copied remote config location
Cmd = ?ESCRIPT++" "++
BBDir++"/"++"basho_bench -d "++
BBDir++"/"++TestName++"_"++Num++" "++RemotePath,
lager:info("Spawning remote basho_bench w/ ~p on ~p",
[Cmd, LG]),
{0, R} = rtssh:ssh_cmd(LG, Cmd, false),
lager:info("bench run finished, on ~p returned ~p",
[LG, R]),
{0, _} = rtssh:ssh_cmd(LG, "rm -r "++BBTmp++"/"),
Owner ! {done, ok}
catch
Class:Error ->
lager:error("basho_bench died with error ~p:~p",
[Class, Error]),
Owner ! {done, error}
after
lager:info("finished bb run")
end
end,
S = self(),
[spawn(fun() -> F(R, S) end)|| R <- GenList],
[ok] = lists:usort([receive {done, R} -> R end
|| _ <- GenList]),
lager:debug("removing stage dir"),
{ok, FL} = file:list_dir(BBTmpStage),
[file:delete(BBTmpStage++File) || File <- FL],
ok = file:del_dir(BBTmpStage).
collect_bench_data(TestName, Dir) ->
%% grab all the benchmark stuff. need L to make real files because
%% it's a soft link
BBDir = rt_config:get(basho_bench),
Gens = rt_config:get(perf_loadgens, ["localhost"]),
Len = length(Gens),
[begin
N = integer_to_list(N0),
rtssh:scp_from(Gen, BBDir++"/"++TestName++"_"++N++"/current", Dir ++ "/" ++ Gen)
end
|| {Gen, N0} <- lists:zip(Gens, lists:seq(1, Len))],
ok.
-define(CONCURRENCY_FACTOR, rt_config:get(basho_bench_concurrency, 30)).
config(Rate, Duration, NodeList, KeyGen,
ValGen, Operations) ->
config(Rate, Duration, NodeList, KeyGen,
ValGen, Operations,
<<"testbucket">>, riakc_pb).
config(Rate, Duration, NodeList, KeyGen,
ValGen, Operations, Bucket, Driver0) ->
{Driver, DriverB} =
case Driver0 of
'2i' -> {pb, riakc_pb};
_ -> {Driver0, Driver0}
end,
DriverBucket = append_atoms(DriverB, '_bucket'),
DriverIps = append_atoms(Driver, '_ips'),
DriverReplies = append_atoms(DriverB, '_replies'),
DriverName = append_atoms(basho_bench_driver_, Driver0),
[
{mode, {rate, Rate}},
{duration, Duration},
{concurrent, ?CONCURRENCY_FACTOR * length(NodeList)},
{rng_seed, now},
{DriverBucket, Bucket},
{key_generator, KeyGen},
{value_generator, ValGen},
{operations, Operations},
%% just leave this in in case we need it, it's harmless when not
%% using the sequential generator
{sequential_int_state_dir, seq_state_dir()},
{DriverIps, NodeList},
{DriverReplies, default},
{driver, DriverName}
%%{code_paths, rt_config:get(basho_bench_code_paths)}
].
append_atoms(L, R) ->
list_to_atom(atom_to_list(L)++atom_to_list(R)).
valgen(Type, BinSize) ->
case Type of
fixed ->
{fixed_bin, BinSize};
exponential ->
Quarter = BinSize div 4,
{exponential_bin, Quarter, Quarter*3}
end.

View File

@ -79,8 +79,11 @@ set(Key, Value) ->
get(Key) -> get(Key) ->
case kvc:path(Key, application:get_all_env(riak_test)) of case kvc:path(Key, application:get_all_env(riak_test)) of
[] -> erlang:error("Missing configuration key", [Key]); [] ->
Value -> Value lager:warning("Missing configuration key: ~p", [Key]),
erlang:error("Missing configuration key", [Key]);
Value ->
Value
end. end.
get(Key, Default) -> get(Key, Default) ->

394
src/rtperf.erl Normal file
View File

@ -0,0 +1,394 @@
-module(rtperf).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("kernel/include/file.hrl").
update_app_config(Node, Config) ->
rtssh:update_app_config(Node, Config).
get_version() ->
unknown.
get_deps() ->
case rt_config:get(rt_deps, undefined) of
undefined ->
throw("Unable to determine Riak library path, rt_deps.");
_ ->
ok
end,
"deps".
harness_opts() ->
%% Option Name, Short Code, Long Code, Argument Spec, Help Message
[
{test_name, undefined, "name", {string, "ad-hoc"},
"name for this test"},
{bin_size, undefined, "bin-size", {integer, 4096},
"size of fixed binaries (median for non-fixed)"},
{bin_type, undefined, "bin-type", {atom, fixed},
"fixed | exponential"},
{version, undefined, "version", {string, "develop"},
"version to test"},
{prepop, undefined, "prepop", {boolean, false},
"prepopulate cluster"},
{restart, undefined, "restart", {boolean, false},
"stop running riak cluster and start new"},
{cuttle, undefined, "cuttle", {boolean, true},
"use cuttlefish config system"},
{duration, undefined, "run-time", {integer, 1},
"how long to run the test for"},
{target_pct, undefined, "target-pct", {integer, 75},
"target block cache to dataset size ratio"},
{ram_size, undefined, "ram-size", {integer, 1024},
"ram size of individual test nodes"}
].
setup_harness(Test, Args) ->
lager:info("Harness setup with args: ~p", [Args]),
case getopt:parse(harness_opts(), Args) of
{ok, {Parsed, []}} ->
_ = [rt_config:set(prefix(K), V)
|| {K, V} <- Parsed];
_Huh ->
getopt:usage(harness_opts(), escript:script_name()), halt(0)
end,
rtssh:setup_harness(Test, Args),
ok.
prefix(Atom) ->
list_to_atom("perf_"++atom_to_list(Atom)).
set_backend(Backend) ->
rt_config:set(rt_backend, Backend).
get_backends() ->
[riak_kv_bitcask_backend,
riak_kv_eleveldb_backend,
riak_kv_memory_backend].
run_test(Nodes, TestBenchConfig, BaseBenchConfig) ->
Collectors = start_data_collectors(Nodes),
TestName = test_name(),
Base = maybe_start_base_load(BaseBenchConfig, Nodes, TestName),
rt_bench:bench(TestBenchConfig, Nodes, TestName,
length(rt_config:get(perf_loadgens, [1]))),
maybe_stop_base_load(Base),
ok = stop_data_collectors(Collectors),
ok = collect_test_data(Nodes, TestName).
teardown() ->
ok.
cmd(Cmd) ->
rtssh:cmd(Cmd).
stop_all(_Hosts) ->
lager:info("called stop all, ignoring?").
start_data_collectors(Nodes) ->
OSPid = os:getpid(),
PrepDir = "/tmp/perf-"++OSPid,
file:make_dir(PrepDir),
{ok, Hostname} = inet:gethostname(),
P = observer:watch(Nodes, {Hostname, 65001, PrepDir}),
lager:info("started data collector: ~p", [P]),
P.
stop_data_collectors(Collector) ->
Collector ! stop,
ok.
maybe_start_base_load([], _, _) ->
none;
maybe_start_base_load(Config, Nodes, TestName) ->
spawn(fun() ->
rt_bench:bench(Config, Nodes, TestName++"_base",
length(rt_config:get(perf_loadgens, [1])))
end).
maybe_stop_base_load(_) -> %% should be none, but benches aren't stoppable rn.
ok.
%% need more sensible test names.
test_name() ->
Vsn = rt_config:get(perf_version),
BinSize = rt_config:get(perf_bin_size),
rt_config:get(perf_test_name)++"-"++Vsn++"-"++
integer_to_list(rt_config:get(perf_target_pct))++"pct-"++
atom_to_list(rt_config:get(perf_bin_type))++"-"++
integer_to_list(BinSize)++"b-"++date_string().
collect_test_data(Nodes, TestName) ->
%% collect the files
OSPid = os:getpid(),
PrepDir = "/tmp/perf-"++OSPid,
%% collect loadgen logs
ok = rt_bench:collect_bench_data(TestName, PrepDir),
%% collect node logs
[begin
rtssh:scp_from(rtssh:node_to_host(Node),
rtssh:node_path(Node) ++ "/log",
PrepDir++"/"++rtssh:node_to_host(Node))
end
|| Node <- Nodes],
%% no need to collect stats output, it's already in the prepdir
rt:cmd("mv "++PrepDir++" results/"++TestName),
%% really, really need to compress the results so they don't take
%% up os damn much space
ok.
maybe_prepop(Hosts, BinSize, SetSize) ->
Vsn = rt_config:get(perf_version),
case rt_config:get(perf_prepop) of
true ->
PPids = start_data_collectors(Hosts),
PrepopName = rt_config:get(perf_test_name)++"-"++Vsn++
"-prepop"++integer_to_list(BinSize)++"b-"++date_string(),
lager:info("Target size = ~p", [SetSize]),
PrepopConfig =
rt_bench:config(
max,
infinity,
Hosts,
{int_to_bin_bigendian, {partitioned_sequential_int, SetSize}},
rt_bench:valgen(rt_config:get(perf_bin_type), BinSize),
[{put,1}]),
%% drop the cache
rt_bench:bench(PrepopConfig, Hosts, PrepopName, 1, true),
stop_data_collectors(PPids),
collect_test_data(Hosts, PrepopName),
timer:sleep(timer:minutes(1)+timer:seconds(30));
false ->
ok
end.
date_string() ->
{Mega, Sec, _Micro} = os:timestamp(),
integer_to_list((Mega * 1000000) + Sec).
standard_config(NodeCount) ->
standard_config(NodeCount, on).
standard_config(NodeCount, AAE) ->
Backend = rt_config:get(rt_backend, undefined),
Fish = rt_config:get(perf_cuttle, true),
RingSize = rt:nearest_ringsize(NodeCount),
mk_std_conf(Backend, Fish, RingSize, AAE).
mk_std_conf(riak_kv_memory_backend, false, Ring, AAE) ->
[{riak_core,
[{ring_creation_size, Ring*2},
{handoff_concurrency, 16}]},
{riak_kv, [{storage_backend, riak_kv_memory_backend},
{anti_entropy,{AAE, []}},
{memory_backend, []},
{fsm_limit, 50000}
]}
];
mk_std_conf(riak_kv_memory_backend, true, Ring, AAE0) ->
AAE = aae_cuttle(AAE0),
{cuttlefish,
[{ring_size, Ring*2},
{transfer_limit, 16},
{"erlang.distribution_buffer_size", "128MB"},
{storage_backend, memory},
{anti_entropy, AAE}
]};
mk_std_conf(riak_kv_eleveldb_backend, false, Ring, AAE) ->
[{riak_core,
[{handoff_concurrency, 16},
{ring_creation_size, Ring}]},
{riak_kv,
[{storage_backend, riak_kv_eleveldb_backend},
{anti_entropy,{AAE,[]}},
{fsm_limit, undefined}]},
{eleveldb,
[{max_open_files, 500}]}
];
mk_std_conf(riak_kv_eleveldb_backend, true, Ring, AAE0) ->
AAE = aae_cuttle(AAE0),
{cuttlefish,
[{ring_size, Ring},
{transfer_limit, 16},
{"erlang.distribution_buffer_size", "128MB"},
{storage_backend, leveldb},
{anti_entropy, AAE}
]};
mk_std_conf(_, false, Ring, AAE) ->
[{riak_core,
[{handoff_concurrency, 16},
{ring_creation_size, Ring}]},
{riak_kv,
[{anti_entropy,{AAE, []}}]}
];
mk_std_conf(_, true, Ring, AAE0) ->
AAE = aae_cuttle(AAE0),
{cuttlefish,
[{ring_size, Ring},
{transfer_limit, 16},
{"storage_backend", "bitcask"},
{"erlang.distribution_buffer_size", "128MB"},
{"bitcask.io_mode", nif},
{anti_entropy, AAE}]}.
aae_cuttle(off) ->
passive;
aae_cuttle(on) ->
active.
target_size(Percentage, BinSize, RamSize, NodeCount) ->
TotalRam = (RamSize * 1024 * 1024 * 1024) * NodeCount,
CacheTarget = trunc((Percentage/100)*TotalRam),
BinPlus = (BinSize + 300) * 3,
%% hacky way of rounding up to the nearest 10k
trunc((CacheTarget/(BinPlus*10000))+1)*10000.
deploy_clusters(ClusterConfigs) ->
Clusters = rt_config:get(rtssh_clusters, []),
NumConfig = length(ClusterConfigs),
case length(Clusters) < NumConfig of
true ->
erlang:error("Requested more clusters than available");
false ->
Both = lists:zip(lists:sublist(Clusters, NumConfig), ClusterConfigs),
Deploy =
[begin
NumNodes = length(NodeConfig),
NumHosts = length(Hosts),
case NumNodes > NumHosts of
true ->
erlang:error("Not enough hosts available to deploy nodes",
[NumNodes, NumHosts]);
false ->
Hosts2 = lists:sublist(Hosts, NumNodes),
{Hosts2, NodeConfig}
end
end || {{_,Hosts}, NodeConfig} <- Both],
[deploy_nodes(NodeConfig, Hosts) || {Hosts, NodeConfig} <- Deploy]
end.
deploy_nodes(NodeConfig) ->
Hosts = rt_config:get(rtssh_hosts),
NumNodes = length(NodeConfig),
NumHosts = length(Hosts),
case NumNodes > NumHosts of
true ->
erlang:error("Not enough hosts available to deploy nodes",
[NumNodes, NumHosts]);
false ->
Hosts2 = lists:sublist(Hosts, NumNodes),
deploy_nodes(NodeConfig, Hosts2)
end.
deploy_nodes(NodeConfig, Hosts) ->
Path = rtssh:relpath(root),
lager:info("Riak path: ~p", [Path]),
Nodes = [rtssh:host_to_node(Host) || Host <- Hosts],
HostMap = lists:zip(Nodes, Hosts),
{Versions, Configs} = lists:unzip(NodeConfig),
VersionMap = lists:zip(Nodes, Versions),
rt_config:set(rt_hosts,
orddict:from_list(
orddict:to_list(rt_config:get(rt_hosts, orddict:new())) ++ HostMap)),
rt_config:set(rt_versions,
orddict:from_list(
orddict:to_list(rt_config:get(rt_versions, orddict:new())) ++ VersionMap)),
rt:pmap(fun({Node, _}) ->
{ok,
{_, _, _, _, _, [IP0|_]}} = inet:gethostbyname(
rtssh:node_to_host(Node)),
IP = inet:ntoa(IP0),
Config = [{"listener.protobuf.internal",
IP++":10017"},
{"listener.http.internal",
IP++":10018"}],
rtssh:set_conf(Node, Config)
end, lists:zip(Nodes, Configs)),
timer:sleep(500),
rt:pmap(fun({_, default}) ->
lager:info("Default configuration detected!"),
ok;
({Node, {cuttlefish, Config}}) ->
lager:info("Cuttlefish configuration detected!"),
rtssh:set_conf(Node, Config);
({Node, Config}) ->
lager:info("Legacy configuration detected!"),
rtssh:update_app_config(Node, Config)
end,
lists:zip(Nodes, Configs)),
timer:sleep(500),
case rt_config:get(cuttle, true) of
false ->
rt:pmap(fun(Node) ->
Host = rtssh:get_host(Node),
Config = [{riak_api,
[{pb, fun([{_, Port}]) ->
[{Host, Port}]
end},
{pb_ip, fun(_) ->
Host
end}]},
{riak_core,
[{http, fun([{_, Port}]) ->
[{Host, Port}]
end}]}],
rtssh:update_app_config(Node, Config)
end, Nodes),
timer:sleep(500),
rt:pmap(fun(Node) ->
rtssh:update_vm_args(Node,
[{"-name", Node},
{"-zddbl", "65535"},
{"-P", "256000"}])
end, Nodes),
timer:sleep(500);
true -> ok
end,
rtssh:create_dirs(Nodes),
rt:pmap(fun start/1, Nodes),
%% Ensure nodes started
[ok = rt:wait_until_pingable(N) || N <- Nodes],
%% %% Enable debug logging
%% [rpc:call(N, lager, set_loglevel, [lager_console_backend, debug]) || N <- Nodes],
%% We have to make sure that riak_core_ring_manager is running before we can go on.
[ok = rt:wait_until_registered(N, riak_core_ring_manager) || N <- Nodes],
%% Ensure nodes are singleton clusters
[ok = rt:check_singleton_node(N) || {N, Version} <- VersionMap,
Version /= "0.14.2"],
Nodes.
start(Node) ->
rtssh:start(Node).

View File

@ -150,31 +150,37 @@ deploy_nodes(NodeConfig, Hosts) ->
lists:zip(Nodes, Configs)), lists:zip(Nodes, Configs)),
timer:sleep(500), timer:sleep(500),
rt:pmap(fun(Node) -> case rt_config:get(cuttle, true) of
Host = get_host(Node), false ->
IP = get_ip(Host), rt:pmap(fun(Node) ->
Config = [{riak_api, [{pb, [{IP, 10017}]}, Host = get_host(Node),
{pb_ip, IP}, %%lager:info("ports ~p", [self()]),
{http,[{IP, 10018}]}]}, Config = [{riak_api,
{riak_core, [{http, [{IP, 10018}]}, [{pb, fun([{_, Port}]) ->
{cluster_mgr,{IP, 10016}}]}], [{Host, Port}]
%% Config = [{riak_api, [{pb, fun([{_, Port}]) -> end},
%% [{IP, Port}] {pb_ip, fun(_) ->
%% end}, Host
%% {pb_ip, fun(_) -> end}]},
%% IP {riak_core,
%% end}]}, [{http, fun([{_, Port}]) ->
%% {riak_core, [{http, fun([{_, Port}]) -> [{Host, Port}]
%% [{IP, Port}] end}]}],
%% end}]}], update_app_config(Node, Config)
update_app_config(Node, Config) end, Nodes),
end, Nodes),
timer:sleep(500),
rt:pmap(fun(Node) -> timer:sleep(500),
update_nodename(Node)
end, Nodes), rt:pmap(fun(Node) ->
timer:sleep(500), update_vm_args(Node,
[{"-name", Node},
{"-zddbl", "65535"},
{"-P", "256000"}])
end, Nodes),
timer:sleep(500);
true -> ok
end,
create_dirs(Nodes), create_dirs(Nodes),
@ -383,14 +389,33 @@ spawn_ssh_cmd(Node, Cmd) ->
spawn_ssh_cmd(Node, Cmd, []). spawn_ssh_cmd(Node, Cmd, []).
spawn_ssh_cmd(Node, Cmd, Opts) when is_atom(Node) -> spawn_ssh_cmd(Node, Cmd, Opts) when is_atom(Node) ->
Host = get_host(Node), Host = get_host(Node),
spawn_ssh_cmd(Host, Cmd, Opts); spawn_ssh_cmd(Host, Cmd, Opts, true);
spawn_ssh_cmd(Host, Cmd, Opts) -> spawn_ssh_cmd(Host, Cmd, Opts) ->
SSHCmd = format("ssh -o 'StrictHostKeyChecking no' ~s '~s'", [Host, Cmd]), spawn_ssh_cmd(Host, Cmd, Opts, true).
spawn_ssh_cmd(Node, Cmd, Opts, Return) when is_atom(Node) ->
Host = get_host(Node),
spawn_ssh_cmd(Host, Cmd, Opts, Return);
spawn_ssh_cmd(Host, Cmd, Opts, Return) ->
Quiet =
case Return of
true -> "";
false -> " > /dev/null 2>&1"
end,
SSHCmd = format("ssh -q -o 'StrictHostKeyChecking no' ~s '~s'"++Quiet,
[Host, Cmd]),
spawn_cmd(SSHCmd, Opts). spawn_cmd(SSHCmd, Opts).
ssh_cmd(Node, Cmd) -> ssh_cmd(Node, Cmd) ->
lager:info("Running: ~s :: ~s", [Node, Cmd]), ssh_cmd(Node, Cmd, true).
wait_for_cmd(spawn_ssh_cmd(Node, Cmd)).
ssh_cmd(Node, Cmd, Return) ->
case rt_config:get(rtssh_verbose, false) of
true ->
lager:info("Running: ~s :: ~s", [Node, Cmd]);
false -> ok
end,
wait_for_cmd(spawn_ssh_cmd(Node, Cmd, [stderr_to_stdout], Return)).
remote_read_file(Node, File) -> remote_read_file(Node, File) ->
timer:sleep(500), timer:sleep(500),
@ -456,9 +481,12 @@ do_update_vm_args(Node, Props) ->
lists:foldl(fun({Config, Value}, Acc) -> lists:foldl(fun({Config, Value}, Acc) ->
CBin = to_binary(Config), CBin = to_binary(Config),
VBin = to_binary(Value), VBin = to_binary(Value),
re:replace(Acc, case re:replace(Acc,
<<"((^|\\n)", CBin/binary, ").+\\n">>, <<"((^|\\n)", CBin/binary, ").+\\n">>,
<<"\\1 ", VBin/binary, $\n>>) <<"\\1 ", VBin/binary, $\n>>) of
CBin -> <<CBin/binary, VBin/binary, $\n>>;
Mod -> Mod
end
end, Bin, Props), end, Bin, Props),
%% io:format("~p~n", [iolist_to_binary(Output)]), %% io:format("~p~n", [iolist_to_binary(Output)]),
remote_write_file(Node, VMArgs, Output), remote_write_file(Node, VMArgs, Output),
@ -492,7 +520,7 @@ update_app_config_file(Node, ConfigFile, Config, Current) ->
lager:info("rtssh:update_app_config_file(~p, ~s, ~p)", lager:info("rtssh:update_app_config_file(~p, ~s, ~p)",
[Node, ConfigFile, Config]), [Node, ConfigFile, Config]),
BaseConfig = current_config(Node, ConfigFile, Current), BaseConfig = current_config(Node, ConfigFile, Current),
%% io:format("BaseConfig: ~p~n", [BaseConfig]),
MergeA = orddict:from_list(Config), MergeA = orddict:from_list(Config),
MergeB = orddict:from_list(BaseConfig), MergeB = orddict:from_list(BaseConfig),
NewConfig = NewConfig =
@ -583,6 +611,18 @@ all_the_files(Host, DevPath, File) ->
Files Files
end. end.
scp_to(Host, Path, RemotePath) ->
ssh_cmd(Host, "mkdir -p "++RemotePath),
SCP = format("scp -qr -o 'StrictHostKeyChecking no' ~s ~s:~s",
[Path, Host, RemotePath]),
wait_for_cmd(spawn_cmd(SCP)).
scp_from(Host, RemotePath, Path) ->
ssh_cmd(Host, "mkdir -p "++RemotePath),
SCP = format("scp -qr -o 'StrictHostKeyChecking no' ~s:~s ~s",
[Host, RemotePath, Path]),
wait_for_cmd(spawn_cmd(SCP)).
%%%=================================================================== %%%===================================================================
%%% Riak devrel path utilities %%% Riak devrel path utilities
%%%=================================================================== %%%===================================================================
@ -598,6 +638,9 @@ dev_bin_path(Path, N) ->
dev_etc_path(Path, N) -> dev_etc_path(Path, N) ->
dev_path(Path, N) ++ "/etc". dev_path(Path, N) ++ "/etc".
dev_data_path(Path, N) ->
dev_path(Path, N) ++ "/data".
relpath(Vsn) -> relpath(Vsn) ->
Path = ?PATH, Path = ?PATH,
relpath(Vsn, Path). relpath(Vsn, Path).
@ -611,7 +654,7 @@ relpath(root, Path) ->
relpath(_, _) -> relpath(_, _) ->
throw("Version requested but only one path provided"). throw("Version requested but only one path provided").
node_path(Node) -> node_path(Node) when is_atom(Node) ->
node_path(Node, node_version(Node)). node_path(Node, node_version(Node)).
node_path(Node, Version) -> node_path(Node, Version) ->
@ -635,6 +678,7 @@ spawn_cmd(Cmd) ->
spawn_cmd(Cmd, []). spawn_cmd(Cmd, []).
spawn_cmd(Cmd, Opts) -> spawn_cmd(Cmd, Opts) ->
Port = open_port({spawn, Cmd}, [stream, in, exit_status] ++ Opts), Port = open_port({spawn, Cmd}, [stream, in, exit_status] ++ Opts),
put(Port, Cmd),
Port. Port.
wait_for_cmd(Port) -> wait_for_cmd(Port) ->
@ -648,9 +692,9 @@ wait_for_cmd(Port) ->
catch port_close(Port), catch port_close(Port),
self() ! {Port, Msg}, self() ! {Port, Msg},
true true
after 0 -> after 0 ->
false false
end end
end), end),
get_cmd_result(Port, []). get_cmd_result(Port, []).
@ -659,10 +703,18 @@ get_cmd_result(Port, Acc) ->
{Port, {data, Bytes}} -> {Port, {data, Bytes}} ->
get_cmd_result(Port, [Bytes|Acc]); get_cmd_result(Port, [Bytes|Acc]);
{Port, {exit_status, Status}} -> {Port, {exit_status, Status}} ->
case Status of
0 ->
ok;
_ ->
Cmd = get(Port),
lager:info("~p returned exit status: ~p",
[Cmd, Status]),
ok
end,
erase(Port),
Output = lists:flatten(lists:reverse(Acc)), Output = lists:flatten(lists:reverse(Acc)),
{Status, Output} {Status, Output}
after 0 ->
timeout
end. end.
%%%=================================================================== %%%===================================================================