update extractors test to check for possible issue w/ expiry and custom extractors

This commit is contained in:
Zeeshan Lakhani 2015-09-09 19:42:19 -04:00
parent 9bcd4a6c0c
commit 99e1babac1

View File

@ -27,10 +27,95 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("riakc/include/riakc.hrl").
-define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))).
-define(INDEX1, <<"test_idx1">>).
-define(BUCKET1, <<"test_bkt1">>).
-define(INDEX2, <<"test_idx2">>).
-define(BUCKET2, <<"test_bkt2">>).
-define(SCHEMANAME, <<"test">>).
-define(TEST_SCHEMA,
<<"<schema name=\"test\" version=\"1.5\">
<fields>
<dynamicField name=\"*_foo_register\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<dynamicField name=\"*\" type=\"ignored\"/>
<field name=\"_yz_id\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" required=\"true\" multiValued=\"false\"/>
<field name=\"_yz_ed\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
<field name=\"_yz_pn\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
<field name=\"_yz_fpn\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
<field name=\"_yz_vtag\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
<field name=\"_yz_rt\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<field name=\"_yz_rk\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<field name=\"_yz_rb\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<field name=\"_yz_err\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<field name=\"text\" type=\"text_general\" indexed=\"true\" stored=\"false\" multiValued=\"true\"/>
<field name=\"age\" type=\"int\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<field name=\"host\" type=\"string\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
</fields>
<uniqueKey>_yz_id</uniqueKey>
<types>
<fieldType name=\"ignored\" indexed=\"false\" stored=\"false\" multiValued=\"false\" class=\"solr.StrField\" />
<fieldType name=\"_yz_str\" class=\"solr.StrField\" sortMissingLast=\"true\" />
<fieldType name=\"string\" class=\"solr.StrField\" sortMissingLast=\"true\" />
<fieldType name=\"int\" class=\"solr.TrieIntField\" precisionStep=\"0\" positionIncrementGap=\"0\" />
<fieldType name=\"text_general\" class=\"solr.TextField\" positionIncrementGap=\"100\">
<analyzer type=\"index\">
<tokenizer class=\"solr.StandardTokenizerFactory\"/>
<filter class=\"solr.StopFilterFactory\" ignoreCase=\"true\" words=\"stopwords.txt\" enablePositionIncrements=\"true\" />
<filter class=\"solr.LowerCaseFilterFactory\"/>
</analyzer>
<analyzer type=\"query\">
<tokenizer class=\"solr.StandardTokenizerFactory\"/>
<filter class=\"solr.StopFilterFactory\" ignoreCase=\"true\" words=\"stopwords.txt\" enablePositionIncrements=\"true\" />
<filter class=\"solr.SynonymFilterFactory\" synonyms=\"synonyms.txt\" ignoreCase=\"true\" expand=\"true\"/>
<filter class=\"solr.LowerCaseFilterFactory\"/>
</analyzer>
</fieldType>
</types>
</schema>">>).
-define(TEST_SCHEMA_UPGRADE,
<<"<schema name=\"test\" version=\"1.5\">
<fields>
<dynamicField name=\"*_foo_register\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<dynamicField name=\"*\" type=\"ignored\"/>
<field name=\"_yz_id\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" required=\"true\" multiValued=\"false\"/>
<field name=\"_yz_ed\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
<field name=\"_yz_pn\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
<field name=\"_yz_fpn\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
<field name=\"_yz_vtag\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
<field name=\"_yz_rt\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<field name=\"_yz_rk\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<field name=\"_yz_rb\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<field name=\"_yz_err\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<field name=\"text\" type=\"text_general\" indexed=\"true\" stored=\"false\" multiValued=\"true\"/>
<field name=\"age\" type=\"int\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<field name=\"host\" type=\"string\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
<field name=\"method\" type=\"string\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
</fields>
<uniqueKey>_yz_id</uniqueKey>
<types>
<fieldType name=\"ignored\" indexed=\"false\" stored=\"false\" multiValued=\"false\" class=\"solr.StrField\" />
<fieldType name=\"_yz_str\" class=\"solr.StrField\" sortMissingLast=\"true\" />
<fieldType name=\"string\" class=\"solr.StrField\" sortMissingLast=\"true\" />
<fieldType name=\"int\" class=\"solr.TrieIntField\" precisionStep=\"0\" positionIncrementGap=\"0\" />
<fieldType name=\"text_general\" class=\"solr.TextField\" positionIncrementGap=\"100\">
<analyzer type=\"index\">
<tokenizer class=\"solr.StandardTokenizerFactory\"/>
<filter class=\"solr.StopFilterFactory\" ignoreCase=\"true\" words=\"stopwords.txt\" enablePositionIncrements=\"true\" />
<filter class=\"solr.LowerCaseFilterFactory\"/>
</analyzer>
<analyzer type=\"query\">
<tokenizer class=\"solr.StandardTokenizerFactory\"/>
<filter class=\"solr.StopFilterFactory\" ignoreCase=\"true\" words=\"stopwords.txt\" enablePositionIncrements=\"true\" />
<filter class=\"solr.SynonymFilterFactory\" synonyms=\"synonyms.txt\" ignoreCase=\"true\" expand=\"true\"/>
<filter class=\"solr.LowerCaseFilterFactory\"/>
</analyzer>
</fieldType>
</types>
</schema>">>).
-define(YZ_CAP, {yokozuna, extractor_map_in_cmd}).
-define(GET_MAP_RING_MFA, {yz_extractor, get_map, 1}).
-define(GET_MAP_MFA, {yz_extractor, get_map, 0}).
@ -38,6 +123,8 @@
-define(YZ_META_EXTRACTORS, {yokozuna, extractors}).
-define(YZ_EXTRACTOR_MAP, yokozuna_extractor_map).
-define(NEW_EXTRACTOR, {"application/httpheader", yz_noop_extractor}).
-define(EXTRACTOR_CT, element(1, ?NEW_EXTRACTOR)).
-define(EXTRACTOR_MOD, element(2, ?NEW_EXTRACTOR)).
-define(DEFAULT_MAP, [{default, yz_noop_extractor},
{"application/json",yz_json_extractor},
{"application/riak_counter", yz_dt_extractor},
@ -51,6 +138,13 @@
-define(SEQMAX, 20).
-define(CFG,
[
{riak_kv,
[
%% allow AAE to build trees and exchange rapidly
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 8},
{anti_entropy_tick, 1000}
]},
{yokozuna,
[
{enabled, true}
@ -78,13 +172,12 @@ confirm() ->
rt:count_calls(Cluster, [?GET_MAP_RING_MFA, ?GET_MAP_MFA]),
yokozuna_rt:write_data(Cluster, OldPid, ?INDEX1, ?BUCKET1, GenKeys),
yokozuna_rt:write_data(Cluster, OldPid, ?INDEX1,
{?SCHEMANAME, ?TEST_SCHEMA}, ?BUCKET1, GenKeys),
yokozuna_rt:commit(Cluster, ?INDEX1),
ok = rt:stop_tracing(),
%% wait for solr soft commit
timer:sleep(1100),
{ok, BProps} = riakc_pb_socket:get_bucket(OldPid, ?BUCKET1),
N = proplists:get_value(n_val, BProps),
@ -108,9 +201,8 @@ confirm() ->
?assertEqual(?DEFAULT_MAP, get_map(Node)),
%% Custom Register
ExtractMap = register_extractor(Node, element(1, ?NEW_EXTRACTOR),
element(2, ?NEW_EXTRACTOR)),
%% %% Custom Register
ExtractMap = register_extractor(Node, ?EXTRACTOR_CT, ?EXTRACTOR_MOD),
?assertEqual(?EXTRACTMAPEXPECT, ExtractMap),
@ -118,7 +210,8 @@ confirm() ->
yokozuna_rt:rolling_upgrade(Cluster, current),
[rt:assert_capability(ANode, ?YZ_CAP, true) || ANode <- Cluster],
[rt:assert_supported(rt:capability(ANode, all), ?YZ_CAP, [true, false]) || ANode <- Cluster],
[rt:assert_supported(rt:capability(ANode, all), ?YZ_CAP, [true, false]) ||
ANode <- Cluster],
%% test query count again
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX1, KeyCount),
@ -128,13 +221,13 @@ confirm() ->
rt:count_calls(Cluster, [?GET_MAP_RING_MFA, ?GET_MAP_MFA,
?GET_MAP_READTHROUGH_MFA]),
yokozuna_rt:write_data(Cluster, Pid, ?INDEX2, ?BUCKET2, GenKeys),
riakc_pb_socket:stop(Pid),
yokozuna_rt:write_data(Cluster, Pid, ?INDEX2, {?SCHEMANAME, ?TEST_SCHEMA},
?BUCKET2, GenKeys),
yokozuna_rt:commit(Cluster, ?INDEX2),
ok = rt:stop_tracing(),
%% wait for solr soft commit
timer:sleep(1100),
riakc_pb_socket:stop(Pid),
CurrGetMapRingCC = rt:get_call_count(Cluster, ?GET_MAP_RING_MFA),
CurrGetMapCC = rt:get_call_count(Cluster, ?GET_MAP_MFA),
@ -148,7 +241,7 @@ confirm() ->
?assert(CurrGetMapCC =< PrevGetMapCC),
lager:info("Number of calls to get_map_read_through/0: ~p~n, Number of calls to get_map/0: ~p~n",
[CurrGetMapRTCC, CurrGetMapCC]),
?assert(CurrGetMapRTCC < CurrGetMapCC),
?assert(CurrGetMapRTCC =< CurrGetMapCC),
{_RingVal2, MDVal2} = get_ring_and_cmd_vals(Node, ?YZ_META_EXTRACTORS,
?YZ_EXTRACTOR_MAP),
@ -156,17 +249,11 @@ confirm() ->
?assertEqual(?EXTRACTMAPEXPECT, MDVal2),
?assertEqual(?EXTRACTMAPEXPECT, get_map(Node)),
rt_intercept:add(Node, {yz_noop_extractor,
[{{extract, 1}, extract_httpheader}]}),
rt_intercept:wait_until_loaded(Node),
Packet = <<"GET http://www.google.com HTTP/1.1\n">>,
test_extractor_works(Cluster, Packet),
test_extractor_with_aae_expire(Cluster, ?INDEX2, ?BUCKET2, Packet),
ExpectedExtraction = [{method,'GET'},
{host,<<"www.google.com">>},
{uri,<<"/">>}],
?assertEqual(ExpectedExtraction,
verify_extractor(Node,
<<"GET http://www.google.com HTTP/1.1\n">>,
element(2, ?NEW_EXTRACTOR))),
rt:clean_cluster(Cluster),
pass.
@ -194,3 +281,65 @@ get_map(Node) ->
verify_extractor(Node, PacketData, Mod) ->
rpc:call(Node, yz_extractor, run, [PacketData, Mod]).
bucket_url({Host,Port}, BName, Key) ->
?FMT("http://~s:~B/buckets/~s/keys/~s",
[Host, Port, BName, Key]).
test_extractor_works(Cluster, Packet) ->
[rt_intercept:add(ANode, {yz_noop_extractor,
[{{extract, 1}, extract_httpheader}]}) ||
ANode <- Cluster],
[rt_intercept:wait_until_loaded(ANode) || ANode <- Cluster],
ExpectedExtraction = [{method, 'GET'},
{host, <<"www.google.com">>},
{uri, <<"/">>}],
?assertEqual(ExpectedExtraction,
verify_extractor(rt:select_random(Cluster), Packet, ?EXTRACTOR_MOD)).
test_extractor_with_aae_expire(Cluster, Index, Bucket, Packet) ->
%% Now make sure we register extractor across all nodes
[register_extractor(ANode, ?EXTRACTOR_CT, ?EXTRACTOR_MOD) ||
ANode <- Cluster],
Key = <<"google">>,
{Host, Port} = rt:select_random(yokozuna_rt:host_entries(
rt:connection_info(
Cluster))),
URL = bucket_url({Host, Port}, mochiweb_util:quote_plus(Bucket),
mochiweb_util:quote_plus(Key)),
CT = ?EXTRACTOR_CT,
{ok, "204", _, _} = ibrowse:send_req(
URL, [{"Content-Type", CT}], put, Packet),
yokozuna_rt:commit(Cluster, Index),
yokozuna_rt:search_expect({Host, Port}, Index, <<"host">>,
<<"www*">>, 1),
yokozuna_rt:expire_trees(Cluster),
yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()),
yokozuna_rt:search_expect({Host, Port}, Index, <<"host">>,
<<"www*">>, 1),
APid = rt:pbc(rt:select_random(Cluster)),
yokozuna_rt:override_schema(APid, Cluster, Index, ?SCHEMANAME,
?TEST_SCHEMA_UPGRADE),
{ok, "204", _, _} = ibrowse:send_req(
URL, [{"Content-Type", CT}], put, Packet),
yokozuna_rt:commit(Cluster, Index),
yokozuna_rt:search_expect({Host, Port}, Index, <<"method">>,
<<"GET">>, 1),
yokozuna_rt:expire_trees(Cluster),
yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()),
yokozuna_rt:search_expect({Host, Port}, Index, <<"method">>,
<<"GET">>, 1),
riakc_pb_socket:stop(APid).