diff --git a/compiler/cpp/src/generate/t_erl_generator.cc b/compiler/cpp/src/generate/t_erl_generator.cc index 30003ab02..a5b6b9cdf 100644 --- a/compiler/cpp/src/generate/t_erl_generator.cc +++ b/compiler/cpp/src/generate/t_erl_generator.cc @@ -649,8 +649,8 @@ void t_erl_generator::generate_service_interface(t_service* tservice) { << "_thrift:function_info(Function, InfoType)." << endl; indent_down(); } else { - // Dummy function_info so we don't worry about the ;s - indent(f_service_) << "function_info(xxx, dummy) -> dummy." << endl; + // Use a special return code for nonexistent functions + indent(f_service_) << "function_info(_Func, _Info) -> no_function." << endl; } indent(f_service_) << endl; diff --git a/lib/erl/README b/lib/erl/README index ddb6946f3..667c549cd 100644 --- a/lib/erl/README +++ b/lib/erl/README @@ -25,32 +25,19 @@ Example Example session using thrift_client: -118> f(), {ok, C} = thrift_client:start_link("localhost", 9090, thriftTest_thrif -t). -{ok,<0.271.0>} -119> thrift_client:call(C, testVoid, []). +1> {ok, C0} = thrift_client_util:new("localhost", 9090, thriftTest_thrift, []), ok. +ok +2> {C1, R1} = thrift_client:call(C0, testVoid, []), R1. {ok,ok} -120> thrift_client:call(C, testVoid, [asdf]). +3> {C2, R2} = thrift_client:call(C1, testVoid, [asdf]), R2. {error,{bad_args,testVoid,[asdf]}} -121> thrift_client:call(C, testI32, [123]). +4> {C3, R3} = thrift_client:call(C2, testI32, [123]), R3. {ok,123} -122> thrift_client:call(C, testOneway, [1]). +5> {C4, R4} = thrift_client:call(C3, testOneway, [1]), R4. {ok,ok} -123> catch thrift_client:call(C, testXception, ["foo"]). +6> {C5, R5} = thrift_client:call(C4, testXception, ["foo"]), R5. {error,{no_function,testXception}} -124> catch thrift_client:call(C, testException, ["foo"]). +7> {C6, R6} = thrift_client:call(C5, testException, ["foo"]), R6. {ok,ok} -125> catch thrift_client:call(C, testException, ["Xception"]). -{xception,1001,"This is an Xception"} -126> thrift_client:call(C, testException, ["Xception"]). - -=ERROR REPORT==== 24-Feb-2008::23:00:23 === -Error in process <0.269.0> with exit value: {{nocatch,{xception,1001,"This is an - Xception"}},[{thrift_client,call,3},{erl_eval,do_apply,5},{shell,exprs,6},{shel -l,eval_loop,3}]} - -** exited: {{nocatch,{xception,1001,"This is an Xception"}}, - [{thrift_client,call,3}, - {erl_eval,do_apply,5}, - {shell,exprs,6}, - {shell,eval_loop,3}]} ** +8> {C7, R7} = (catch thrift_client:call(C6, testException, ["Xception"])), R7. +{exception,{xception,1001,<<"Xception">>}} diff --git a/lib/erl/build/otp.mk b/lib/erl/build/otp.mk index 1d16e2c83..0e0381eca 100644 --- a/lib/erl/build/otp.mk +++ b/lib/erl/build/otp.mk @@ -25,7 +25,6 @@ OS_TYPE=${shell uname} # MHOST is the host where this Makefile runs. MHOST=${shell hostname -s} -ERL_COMPILE_FLAGS+=-W0 # The location of the erlang runtime system. ifndef ERL_RUN_TOP diff --git a/lib/erl/include/thrift_protocol.hrl b/lib/erl/include/thrift_protocol.hrl index f4e1901f7..f85f4552a 100644 --- a/lib/erl/include/thrift_protocol.hrl +++ b/lib/erl/include/thrift_protocol.hrl @@ -18,7 +18,7 @@ %% -ifndef(THRIFT_PROTOCOL_INCLUDED). --define(THRIFT_PROTOCOL_INCLUDED, yea). +-define(THRIFT_PROTOCOL_INCLUDED, true). -record(protocol_message_begin, {name, type, seqid}). -record(protocol_struct_begin, {name}). @@ -27,5 +27,40 @@ -record(protocol_list_begin, {etype, size}). -record(protocol_set_begin, {etype, size}). +-type tprot_header_val() :: #protocol_message_begin{} + | #protocol_struct_begin{} + | #protocol_field_begin{} + | #protocol_map_begin{} + | #protocol_list_begin{} + | #protocol_set_begin{} + . +-type tprot_empty_tag() :: message_end + | struct_begin + | struct_end + | field_end + | map_end + | list_end + | set_end + . +-type tprot_header_tag() :: message_begin + | field_begin + | map_begin + | list_begin + | set_begin + . +-type tprot_data_tag() :: ui32 + | bool + | byte + | i16 + | i32 + | i64 + | double + | string + . +-type tprot_cont_tag() :: {list, _Type} + | {map, _KType, _VType} + | {set, _Type} + . + -endif. diff --git a/lib/erl/src/test_service.erl b/lib/erl/include/thrift_protocol_behaviour.hrl similarity index 52% rename from lib/erl/src/test_service.erl rename to lib/erl/include/thrift_protocol_behaviour.hrl index 7aa4827f8..b75bb4a43 100644 --- a/lib/erl/src/test_service.erl +++ b/lib/erl/include/thrift_protocol_behaviour.hrl @@ -17,13 +17,21 @@ %% under the License. %% --module(test_service). -% -% Test service definition +%% Signature specifications for protocol implementations. --export([function_info/2]). +-ifndef(THRIFT_PROTOCOL_BEHAVIOUR_INCLUDED). +-define(THRIFT_PROTOCOL_BEHAVIOUR_INCLUDED, true). -function_info(add, params_type) -> - {struct, [{1, i32}, - {2, i32}]}; -function_info(add, reply_type) -> i32. +-spec flush_transport(state()) -> {state(), ok | {error, _Reason}}. +-spec close_transport(state()) -> {state(), ok | {error, _Reason}}. + +-spec write(state(), term()) -> {state(), ok | {error, _Reason}}. + +%% NOTE: Keep this in sync with thrift_protocol:read and read_specific. +-spec read + (state(), tprot_empty_tag()) -> {state(), ok | {error, _Reason}}; + (state(), tprot_header_tag()) -> {state(), tprot_header_val() | {error, _Reason}}; + (state(), tprot_data_tag()) -> {state(), {ok, term()} | {error, _Reason}}. + + +-endif. diff --git a/lib/erl/src/test_handler.erl b/lib/erl/include/thrift_transport_behaviour.hrl similarity index 64% rename from lib/erl/src/test_handler.erl rename to lib/erl/include/thrift_transport_behaviour.hrl index 28a3acd30..dbc05aacf 100644 --- a/lib/erl/src/test_handler.erl +++ b/lib/erl/include/thrift_transport_behaviour.hrl @@ -17,10 +17,15 @@ %% under the License. %% --module(test_handler). +%% Signature specifications for transport implementations. --export([handle_function/2]). +-ifndef(THRIFT_TRANSPORT_BEHAVIOUR_INCLUDED). +-define(THRIFT_TRANSPORT_BEHAVIOUR_INCLUDED, true). -handle_function(add, Params = {A, B}) -> - io:format("Got params: ~p~n", [Params]), - {reply, A + B}. +-spec write(state(), iolist() | binary()) -> {state(), ok | {error, _Reason}}. +-spec read(state(), non_neg_integer()) -> {state(), {ok, binary()} | {error, _Reason}}. +-spec flush(state()) -> {state(), ok | {error, _Reason}}. +-spec close(state()) -> {state(), ok | {error, _Reason}}. + + +-endif. diff --git a/lib/erl/src/Makefile b/lib/erl/src/Makefile index 980af8122..78af14f68 100644 --- a/lib/erl/src/Makefile +++ b/lib/erl/src/Makefile @@ -27,6 +27,7 @@ INSTALL_DST = $(ERLANG_OTP)/lib/$(APP_NAME)-$(VSN) MODULES = $(shell find . -name \*.erl | sed 's:^\./::' | sed 's/\.erl//') MODULES_STRING_LIST = $(shell find . -name \*.erl | sed 's:^\./:":' | sed 's/\.erl/",/') +BEHAV_MODULES = $(shell find . -name \*.erl | xargs grep -l behaviour_info | sed 's:^\./::' | sed 's/\.erl//') HRL_FILES= INTERNAL_HRL_FILES= $(APP_NAME).hrl @@ -43,7 +44,8 @@ APP_TARGET= $(EBIN)/$(APP_FILE) APPUP_TARGET= $(EBIN)/$(APPUP_FILE) BEAMS= $(MODULES:%=$(EBIN)/%.$(EMULATOR)) -TARGET_FILES= $(BEAMS) $(APP_TARGET) $(APPUP_TARGET) +BEHAV_BEAMS= $(BEHAV_MODULES:%=$(EBIN)/%.$(EMULATOR)) +TARGET_FILES= $(BEHAV_BEAMS) $(BEAMS) $(APP_TARGET) $(APPUP_TARGET) WEB_TARGET=/var/yaws/www/$(APP_NAME) @@ -53,7 +55,8 @@ WEB_TARGET=/var/yaws/www/$(APP_NAME) ERL_FLAGS += ERL_INCLUDE = -I../include -I../../fslib/include -I../../system_status/include -ERL_COMPILE_FLAGS += $(ERL_INCLUDE) +ERL_BEHAV_PATH = -pz ../ebin +ERL_COMPILE_FLAGS += $(ERL_INCLUDE) $(ERL_BEHAV_PATH) # ---------------------------------------------------- # Targets diff --git a/lib/erl/src/thrift_base64_transport.erl b/lib/erl/src/thrift_base64_transport.erl index 9d13151c6..d31f2bacf 100644 --- a/lib/erl/src/thrift_base64_transport.erl +++ b/lib/erl/src/thrift_base64_transport.erl @@ -29,30 +29,35 @@ %% State -record(b64_transport, {wrapped}). +-type state() :: #b64_transport{}. +-include("thrift_transport_behaviour.hrl"). new(Wrapped) -> State = #b64_transport{wrapped = Wrapped}, thrift_transport:new(?MODULE, State). -write(#b64_transport{wrapped = Wrapped}, Data) -> - thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))). +write(This = #b64_transport{wrapped = Wrapped}, Data) -> + {NewWrapped, Result} = thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))), + {This#b64_transport{wrapped = NewWrapped}, Result}. %% base64 doesn't support reading quite yet since it would involve %% nasty buffering and such -read(#b64_transport{wrapped = Wrapped}, Data) -> - {error, no_reads_allowed}. +read(This = #b64_transport{}, _Data) -> + {This, {error, no_reads_allowed}}. -flush(#b64_transport{wrapped = Wrapped}) -> - thrift_transport:write(Wrapped, <<"\n">>), - thrift_transport:flush(Wrapped). +flush(This = #b64_transport{wrapped = Wrapped0}) -> + {Wrapped1, ok} = thrift_transport:write(Wrapped0, <<"\n">>), + {Wrapped2, ok} = thrift_transport:flush(Wrapped1), + {This#b64_transport{wrapped = Wrapped2}, ok}. -close(Me = #b64_transport{wrapped = Wrapped}) -> - flush(Me), - thrift_transport:close(Wrapped). +close(This0) -> + {This1 = #b64_transport{wrapped = Wrapped}, ok} = flush(This0), + {NewWrapped, ok} = thrift_transport:close(Wrapped), + {This1#b64_transport{wrapped = NewWrapped}, ok}. %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/lib/erl/src/thrift_binary_protocol.erl b/lib/erl/src/thrift_binary_protocol.erl index ad5338422..800fd8ea8 100644 --- a/lib/erl/src/thrift_binary_protocol.erl +++ b/lib/erl/src/thrift_binary_protocol.erl @@ -19,7 +19,7 @@ -module(thrift_binary_protocol). --behavior(thrift_protocol). +-behaviour(thrift_protocol). -include("thrift_constants.hrl"). -include("thrift_protocol.hrl"). @@ -37,6 +37,8 @@ strict_read=true, strict_write=true }). +-type state() :: #binary_protocol{}. +-include("thrift_protocol_behaviour.hrl"). -define(VERSION_MASK, 16#FFFF0000). -define(VERSION_1, 16#80010000). @@ -58,79 +60,81 @@ parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) -> parse_options(Rest, State#binary_protocol{strict_write=Bool}). -flush_transport(#binary_protocol{transport = Transport}) -> - thrift_transport:flush(Transport). +flush_transport(This = #binary_protocol{transport = Transport}) -> + {NewTransport, Result} = thrift_transport:flush(Transport), + {This#binary_protocol{transport = NewTransport}, Result}. -close_transport(#binary_protocol{transport = Transport}) -> - thrift_transport:close(Transport). +close_transport(This = #binary_protocol{transport = Transport}) -> + {NewTransport, Result} = thrift_transport:close(Transport), + {This#binary_protocol{transport = NewTransport}, Result}. %%% %%% instance methods %%% -write(This, #protocol_message_begin{ +write(This0, #protocol_message_begin{ name = Name, type = Type, seqid = Seqid}) -> - case This#binary_protocol.strict_write of + case This0#binary_protocol.strict_write of true -> - write(This, {i32, ?VERSION_1 bor Type}), - write(This, {string, Name}), - write(This, {i32, Seqid}); + {This1, ok} = write(This0, {i32, ?VERSION_1 bor Type}), + {This2, ok} = write(This1, {string, Name}), + {This3, ok} = write(This2, {i32, Seqid}), + {This3, ok}; false -> - write(This, {string, Name}), - write(This, {byte, Type}), - write(This, {i32, Seqid}) - end, - ok; + {This1, ok} = write(This0, {string, Name}), + {This2, ok} = write(This1, {byte, Type}), + {This3, ok} = write(This2, {i32, Seqid}), + {This3, ok} + end; -write(This, message_end) -> ok; +write(This, message_end) -> {This, ok}; -write(This, #protocol_field_begin{ +write(This0, #protocol_field_begin{ name = _Name, type = Type, id = Id}) -> - write(This, {byte, Type}), - write(This, {i16, Id}), - ok; + {This1, ok} = write(This0, {byte, Type}), + {This2, ok} = write(This1, {i16, Id}), + {This2, ok}; write(This, field_stop) -> - write(This, {byte, ?tType_STOP}), - ok; + write(This, {byte, ?tType_STOP}); -write(This, field_end) -> ok; +write(This, field_end) -> {This, ok}; -write(This, #protocol_map_begin{ +write(This0, #protocol_map_begin{ ktype = Ktype, vtype = Vtype, size = Size}) -> - write(This, {byte, Ktype}), - write(This, {byte, Vtype}), - write(This, {i32, Size}), - ok; + {This1, ok} = write(This0, {byte, Ktype}), + {This2, ok} = write(This1, {byte, Vtype}), + {This3, ok} = write(This2, {i32, Size}), + {This3, ok}; -write(This, map_end) -> ok; +write(This, map_end) -> {This, ok}; -write(This, #protocol_list_begin{ +write(This0, #protocol_list_begin{ etype = Etype, size = Size}) -> - write(This, {byte, Etype}), - write(This, {i32, Size}), - ok; + {This1, ok} = write(This0, {byte, Etype}), + {This2, ok} = write(This1, {i32, Size}), + {This2, ok}; -write(This, list_end) -> ok; +write(This, list_end) -> {This, ok}; -write(This, #protocol_set_begin{ +write(This0, #protocol_set_begin{ etype = Etype, size = Size}) -> - write(This, {byte, Etype}), - write(This, {i32, Size}), - ok; + {This1, ok} = write(This0, {byte, Etype}), + {This2, ok} = write(This1, {i32, Size}), + {This2, ok}; -write(This, set_end) -> ok; +write(This, set_end) -> {This, ok}; -write(This, #protocol_struct_begin{}) -> ok; -write(This, struct_end) -> ok; +write(This, #protocol_struct_begin{}) -> {This, ok}; +write(This, struct_end) -> {This, ok}; write(This, {bool, true}) -> write(This, {byte, 1}); write(This, {bool, false}) -> write(This, {byte, 0}); @@ -150,152 +154,166 @@ write(This, {i64, I64}) -> write(This, {double, Double}) -> write(This, <>); -write(This, {string, Str}) when is_list(Str) -> - write(This, {i32, length(Str)}), - write(This, list_to_binary(Str)); +write(This0, {string, Str}) when is_list(Str) -> + {This1, ok} = write(This0, {i32, length(Str)}), + {This2, ok} = write(This1, list_to_binary(Str)), + {This2, ok}; -write(This, {string, Bin}) when is_binary(Bin) -> - write(This, {i32, size(Bin)}), - write(This, Bin); +write(This0, {string, Bin}) when is_binary(Bin) -> + {This1, ok} = write(This0, {i32, size(Bin)}), + {This2, ok} = write(This1, Bin), + {This2, ok}; %% Data :: iolist() -write(This, Data) -> - thrift_transport:write(This#binary_protocol.transport, Data). +write(This = #binary_protocol{transport = Trans}, Data) -> + {NewTransport, Result} = thrift_transport:write(Trans, Data), + {This#binary_protocol{transport = NewTransport}, Result}. %% -read(This, message_begin) -> - case read(This, ui32) of +read(This0, message_begin) -> + {This1, Initial} = read(This0, ui32), + case Initial of {ok, Sz} when Sz band ?VERSION_MASK =:= ?VERSION_1 -> %% we're at version 1 - {ok, Name} = read(This, string), - Type = Sz band ?TYPE_MASK, - {ok, SeqId} = read(This, i32), - #protocol_message_begin{name = binary_to_list(Name), - type = Type, - seqid = SeqId}; + {This2, {ok, Name}} = read(This1, string), + {This3, {ok, SeqId}} = read(This2, i32), + Type = Sz band ?TYPE_MASK, + {This3, #protocol_message_begin{name = binary_to_list(Name), + type = Type, + seqid = SeqId}}; {ok, Sz} when Sz < 0 -> %% there's a version number but it's unexpected - {error, {bad_binary_protocol_version, Sz}}; + {This1, {error, {bad_binary_protocol_version, Sz}}}; - {ok, Sz} when This#binary_protocol.strict_read =:= true -> + {ok, _Sz} when This1#binary_protocol.strict_read =:= true -> %% strict_read is true and there's no version header; that's an error - {error, no_binary_protocol_version}; + {This1, {error, no_binary_protocol_version}}; - {ok, Sz} when This#binary_protocol.strict_read =:= false -> + {ok, Sz} when This1#binary_protocol.strict_read =:= false -> %% strict_read is false, so just read the old way - {ok, Name} = read(This, Sz), - {ok, Type} = read(This, byte), - {ok, SeqId} = read(This, i32), - #protocol_message_begin{name = binary_to_list(Name), - type = Type, - seqid = SeqId}; + {This2, {ok, Name}} = read_data(This1, Sz), + {This3, {ok, Type}} = read(This2, byte), + {This4, {ok, SeqId}} = read(This3, i32), + {This4, #protocol_message_begin{name = binary_to_list(Name), + type = Type, + seqid = SeqId}}; - Err = {error, closed} -> Err; - Err = {error, timeout}-> Err; - Err = {error, ebadf} -> Err + Else -> + {This1, Else} end; -read(This, message_end) -> ok; +read(This, message_end) -> {This, ok}; -read(This, struct_begin) -> ok; -read(This, struct_end) -> ok; +read(This, struct_begin) -> {This, ok}; +read(This, struct_end) -> {This, ok}; -read(This, field_begin) -> - case read(This, byte) of +read(This0, field_begin) -> + {This1, Result} = read(This0, byte), + case Result of {ok, Type = ?tType_STOP} -> - #protocol_field_begin{type = Type}; + {This1, #protocol_field_begin{type = Type}}; {ok, Type} -> - {ok, Id} = read(This, i16), - #protocol_field_begin{type = Type, - id = Id} + {This2, {ok, Id}} = read(This1, i16), + {This2, #protocol_field_begin{type = Type, + id = Id}} end; -read(This, field_end) -> ok; +read(This, field_end) -> {This, ok}; -read(This, map_begin) -> - {ok, Ktype} = read(This, byte), - {ok, Vtype} = read(This, byte), - {ok, Size} = read(This, i32), - #protocol_map_begin{ktype = Ktype, - vtype = Vtype, - size = Size}; -read(This, map_end) -> ok; +read(This0, map_begin) -> + {This1, {ok, Ktype}} = read(This0, byte), + {This2, {ok, Vtype}} = read(This1, byte), + {This3, {ok, Size}} = read(This2, i32), + {This3, #protocol_map_begin{ktype = Ktype, + vtype = Vtype, + size = Size}}; +read(This, map_end) -> {This, ok}; -read(This, list_begin) -> - {ok, Etype} = read(This, byte), - {ok, Size} = read(This, i32), - #protocol_list_begin{etype = Etype, - size = Size}; -read(This, list_end) -> ok; +read(This0, list_begin) -> + {This1, {ok, Etype}} = read(This0, byte), + {This2, {ok, Size}} = read(This1, i32), + {This2, #protocol_list_begin{etype = Etype, + size = Size}}; +read(This, list_end) -> {This, ok}; -read(This, set_begin) -> - {ok, Etype} = read(This, byte), - {ok, Size} = read(This, i32), - #protocol_set_begin{etype = Etype, - size = Size}; -read(This, set_end) -> ok; +read(This0, set_begin) -> + {This1, {ok, Etype}} = read(This0, byte), + {This2, {ok, Size}} = read(This1, i32), + {This2, #protocol_set_begin{etype = Etype, + size = Size}}; +read(This, set_end) -> {This, ok}; -read(This, field_stop) -> - {ok, ?tType_STOP} = read(This, byte), - ok; +read(This0, field_stop) -> + {This1, {ok, ?tType_STOP}} = read(This0, byte), + {This1, ok}; %% -read(This, bool) -> - case read(This, byte) of - {ok, Byte} -> {ok, Byte /= 0}; - Else -> Else +read(This0, bool) -> + {This1, Result} = read(This0, byte), + case Result of + {ok, Byte} -> {This1, {ok, Byte /= 0}}; + Else -> {This1, Else} end; -read(This, byte) -> - case read(This, 1) of - {ok, <>} -> {ok, Val}; - Else -> Else +read(This0, byte) -> + {This1, Bytes} = read_data(This0, 1), + case Bytes of + {ok, <>} -> {This1, {ok, Val}}; + Else -> {This1, Else} end; -read(This, i16) -> - case read(This, 2) of - {ok, <>} -> {ok, Val}; - Else -> Else +read(This0, i16) -> + {This1, Bytes} = read_data(This0, 2), + case Bytes of + {ok, <>} -> {This1, {ok, Val}}; + Else -> {This1, Else} end; -read(This, i32) -> - case read(This, 4) of - {ok, <>} -> {ok, Val}; - Else -> Else +read(This0, i32) -> + {This1, Bytes} = read_data(This0, 4), + case Bytes of + {ok, <>} -> {This1, {ok, Val}}; + Else -> {This1, Else} end; %% unsigned ints aren't used by thrift itself, but it's used for the parsing %% of the packet version header. Without this special function BEAM works fine %% but hipe thinks it received a bad version header. -read(This, ui32) -> - case read(This, 4) of - {ok, <>} -> {ok, Val}; - Else -> Else +read(This0, ui32) -> + {This1, Bytes} = read_data(This0, 4), + case Bytes of + {ok, <>} -> {This1, {ok, Val}}; + Else -> {This1, Else} end; -read(This, i64) -> - case read(This, 8) of - {ok, <>} -> {ok, Val}; - Else -> Else +read(This0, i64) -> + {This1, Bytes} = read_data(This0, 8), + case Bytes of + {ok, <>} -> {This1, {ok, Val}}; + Else -> {This1, Else} end; -read(This, double) -> - case read(This, 8) of - {ok, <>} -> {ok, Val}; - Else -> Else +read(This0, double) -> + {This1, Bytes} = read_data(This0, 8), + case Bytes of + {ok, <>} -> {This1, {ok, Val}}; + Else -> {This1, Else} end; % returns a binary directly, call binary_to_list if necessary -read(This, string) -> - {ok, Sz} = read(This, i32), - {ok, Bin} = read(This, Sz); +read(This0, string) -> + {This1, {ok, Sz}} = read(This0, i32), + read_data(This1, Sz). -read(This, 0) -> {ok, <<>>}; -read(This, Len) when is_integer(Len), Len >= 0 -> - thrift_transport:read(This#binary_protocol.transport, Len). +-spec read_data(#binary_protocol{}, non_neg_integer()) -> + {#binary_protocol{}, {ok, binary()} | {error, _Reason}}. +read_data(This, 0) -> {This, {ok, <<>>}}; +read_data(This = #binary_protocol{transport = Trans}, Len) when is_integer(Len) andalso Len > 0 -> + {NewTransport, Result} = thrift_transport:read(Trans, Len), + {This#binary_protocol{transport = NewTransport}, Result}. %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/lib/erl/src/thrift_buffered_transport.erl b/lib/erl/src/thrift_buffered_transport.erl index ebc16bd65..d4d614eb8 100644 --- a/lib/erl/src/thrift_buffered_transport.erl +++ b/lib/erl/src/thrift_buffered_transport.erl @@ -19,154 +19,51 @@ -module(thrift_buffered_transport). --behaviour(gen_server). -behaviour(thrift_transport). %% API -export([new/1, new_transport_factory/1]). -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - %% thrift_transport callbacks -export([write/2, read/2, flush/1, close/1]). -record(buffered_transport, {wrapped, % a thrift_transport write_buffer % iolist() }). +-type state() :: #buffered_transport{}. +-include("thrift_transport_behaviour.hrl"). + -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} -%% Description: Starts the server -%%-------------------------------------------------------------------- new(WrappedTransport) -> - case gen_server:start_link(?MODULE, [WrappedTransport], []) of - {ok, Pid} -> - thrift_transport:new(?MODULE, Pid); - Else -> - Else - end. + State = #buffered_transport{wrapped = WrappedTransport, + write_buffer = []}, + thrift_transport:new(?MODULE, State). +%% Writes data into the buffer +write(State = #buffered_transport{write_buffer = WBuf}, Data) -> + {State#buffered_transport{write_buffer = [WBuf, Data]}, ok}. -%%-------------------------------------------------------------------- -%% Function: write(Transport, Data) -> ok -%% -%% Data = iolist() -%% -%% Description: Writes data into the buffer -%%-------------------------------------------------------------------- -write(Transport, Data) -> - gen_server:call(Transport, {write, Data}). +%% Flushes the buffer through to the wrapped transport +flush(State = #buffered_transport{write_buffer = WBuf, + wrapped = Wrapped0}) -> + {Wrapped1, Response} = thrift_transport:write(Wrapped0, WBuf), + {Wrapped2, _} = thrift_transport:flush(Wrapped1), + NewState = State#buffered_transport{write_buffer = [], + wrapped = Wrapped2}, + {NewState, Response}. -%%-------------------------------------------------------------------- -%% Function: flush(Transport) -> ok -%% -%% Description: Flushes the buffer through to the wrapped transport -%%-------------------------------------------------------------------- -flush(Transport) -> - gen_server:call(Transport, flush). +%% Closes the transport and the wrapped transport +close(State = #buffered_transport{wrapped = Wrapped0}) -> + {Wrapped1, Result} = thrift_transport:close(Wrapped0), + NewState = State#buffered_transport{wrapped = Wrapped1}, + {NewState, Result}. -%%-------------------------------------------------------------------- -%% Function: close(Transport) -> ok -%% -%% Description: Closes the transport and the wrapped transport -%%-------------------------------------------------------------------- -close(Transport) -> - gen_server:cast(Transport, close). - -%%-------------------------------------------------------------------- -%% Function: Read(Transport, Len) -> {ok, Data} -%% -%% Data = binary() -%% -%% Description: Reads data through from the wrapped transoprt -%%-------------------------------------------------------------------- -read(Transport, Len) when is_integer(Len) -> - gen_server:call(Transport, {read, Len}, _Timeout=10000). - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% Function: init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Description: Initiates the server -%%-------------------------------------------------------------------- -init([Wrapped]) -> - {ok, #buffered_transport{wrapped = Wrapped, - write_buffer = []}}. - -%%-------------------------------------------------------------------- -%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% Description: Handling call messages -%%-------------------------------------------------------------------- -handle_call({write, Data}, _From, State = #buffered_transport{write_buffer = WBuf}) -> - {reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}}; - -handle_call({read, Len}, _From, State = #buffered_transport{wrapped = Wrapped}) -> - Response = thrift_transport:read(Wrapped, Len), - {reply, Response, State}; - -handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf, - wrapped = Wrapped}) -> - Response = thrift_transport:write(Wrapped, WBuf), - thrift_transport:flush(Wrapped), - {reply, Response, State#buffered_transport{write_buffer = []}}. - -%%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling cast messages -%%-------------------------------------------------------------------- -handle_cast(close, State = #buffered_transport{write_buffer = WBuf, - wrapped = Wrapped}) -> - thrift_transport:write(Wrapped, WBuf), - %% Wrapped is closed by terminate/2 - %% error_logger:info_msg("thrift_buffered_transport ~p: closing", [self()]), - {stop, normal, State}; -handle_cast(Msg, State=#buffered_transport{}) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling all non call/cast messages -%%-------------------------------------------------------------------- -handle_info(_Info, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> void() -%% Description: This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%%-------------------------------------------------------------------- -terminate(_Reason, State = #buffered_transport{wrapped=Wrapped}) -> - thrift_transport:close(Wrapped), - ok. - -%%-------------------------------------------------------------------- -%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} -%% Description: Convert process state when code is changed -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +%% Reads data through from the wrapped transport +read(State = #buffered_transport{wrapped = Wrapped0}, Len) when is_integer(Len) -> + {Wrapped1, Response} = thrift_transport:read(Wrapped0, Len), + NewState = State#buffered_transport{wrapped = Wrapped1}, + {NewState, Response}. %%-------------------------------------------------------------------- %%% Internal functions diff --git a/lib/erl/src/thrift_client.erl b/lib/erl/src/thrift_client.erl index d5bb146af..f7701b908 100644 --- a/lib/erl/src/thrift_client.erl +++ b/lib/erl/src/thrift_client.erl @@ -19,366 +19,127 @@ -module(thrift_client). --behaviour(gen_server). - %% API --export([start_link/2, start_link/3, start_link/4, - start/3, start/4, - call/3, send_call/3, close/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - +-export([new/2, call/3, send_call/3, close/1]). -include("thrift_constants.hrl"). -include("thrift_protocol.hrl"). --record(state, {service, protocol, seqid}). - -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} -%% Description: Starts the server as a linked process. -%%-------------------------------------------------------------------- -start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) -> - start_link(Host, Port, Service, []). - -start_link(Host, Port, Service, Options) -> - start(Host, Port, Service, [{monitor, link} | Options]). - -start_link(ProtocolFactory, Service) -> - start(ProtocolFactory, Service, [{monitor, link}]). - -%% -%% Splits client options into protocol options and transport options -%% -%% split_options([Options...]) -> {ProtocolOptions, TransportOptions} -%% -split_options(Options) -> - split_options(Options, [], [], []). - -split_options([], ClientIn, ProtoIn, TransIn) -> - {ClientIn, ProtoIn, TransIn}; - -split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn) - when OptKey =:= monitor -> - split_options(Rest, [Opt | ClientIn], ProtoIn, TransIn); - -split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn) - when OptKey =:= strict_read; - OptKey =:= strict_write -> - split_options(Rest, ClientIn, [Opt | ProtoIn], TransIn); - -split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn) - when OptKey =:= framed; - OptKey =:= connect_timeout; - OptKey =:= sockopts -> - split_options(Rest, ClientIn, ProtoIn, [Opt | TransIn]). +-record(tclient, {service, protocol, seqid}). -%%-------------------------------------------------------------------- -%% Function: start() -> {ok,Pid} | ignore | {error,Error} -%% Description: Starts the server as an unlinked process. -%%-------------------------------------------------------------------- +new(Protocol, Service) + when is_atom(Service) -> + {ok, #tclient{protocol = Protocol, + service = Service, + seqid = 0}}. -%% Backwards-compatible starter for the common-case of socket transports -start(Host, Port, Service, Options) - when is_integer(Port), is_atom(Service), is_list(Options) -> - {ClientOpts, ProtoOpts, TransOpts} = split_options(Options), - - {ok, TransportFactory} = - thrift_socket_transport:new_transport_factory(Host, Port, TransOpts), - - {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory( - TransportFactory, ProtoOpts), - - start(ProtocolFactory, Service, ClientOpts). - - -%% ProtocolFactory :: fun() -> thrift_protocol() -start(ProtocolFactory, Service, ClientOpts) - when is_function(ProtocolFactory), is_atom(Service) -> - {Starter, Opts} = - case lists:keysearch(monitor, 1, ClientOpts) of - {value, {monitor, link}} -> - {start_link, []}; - {value, {monitor, tether}} -> - {start, [{tether, self()}]}; - _ -> - {start, []} - end, - - Connect = - case lists:keysearch(connect, 1, ClientOpts) of - {value, {connect, Choice}} -> - Choice; - _ -> - %% By default, connect at creation-time. - true - end, - - - Started = gen_server:Starter(?MODULE, [Service, Opts], []), - - if - Connect -> - case Started of - {ok, Pid} -> - case gen_server:call(Pid, {connect, ProtocolFactory}) of - ok -> - {ok, Pid}; - Error -> - Error - end; - Else -> - Else - end; - true -> - Started +-spec call(#tclient{}, atom(), list()) -> {#tclient{}, {ok, term()} | {error, term()}}. +call(Client = #tclient{}, Function, Args) + when is_atom(Function), is_list(Args) -> + case send_function_call(Client, Function, Args) of + {Client1, ok} -> + receive_function_result(Client1, Function); + Else -> + Else end. -call(Client, Function, Args) - when is_pid(Client), is_atom(Function), is_list(Args) -> - case gen_server:call(Client, {call, Function, Args}) of - R = {ok, _} -> R; - R = {error, _} -> R; - {exception, Exception} -> throw(Exception) - end. - -cast(Client, Function, Args) - when is_pid(Client), is_atom(Function), is_list(Args) -> - gen_server:cast(Client, {call, Function, Args}). %% Sends a function call but does not read the result. This is useful %% if you're trying to log non-oneway function calls to write-only %% transports like thrift_disk_log_transport. -send_call(Client, Function, Args) - when is_pid(Client), is_atom(Function), is_list(Args) -> - gen_server:call(Client, {send_call, Function, Args}). +-spec send_call(#tclient{}, atom(), list()) -> {#tclient{}, ok}. +send_call(Client = #tclient{}, Function, Args) + when is_atom(Function), is_list(Args) -> + send_function_call(Client, Function, Args). -close(Client) when is_pid(Client) -> - gen_server:cast(Client, close). +-spec close(#tclient{}) -> ok. +close(#tclient{protocol=Protocol}) -> + thrift_protocol:close_transport(Protocol). -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% Function: init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Description: Initiates the server -%%-------------------------------------------------------------------- -init([Service, Opts]) -> - case lists:keysearch(tether, 1, Opts) of - {value, {tether, Pid}} -> - erlang:monitor(process, Pid); - _Else -> - ok - end, - {ok, #state{service = Service}}. - -%%-------------------------------------------------------------------- -%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% Description: Handling call messages -%%-------------------------------------------------------------------- -handle_call({connect, ProtocolFactory}, _From, - State = #state{service = Service}) -> - case ProtocolFactory() of - {ok, Protocol} -> - {reply, ok, State#state{protocol = Protocol, - seqid = 0}}; - Error -> - {stop, normal, Error, State} - end; - -handle_call({call, Function, Args}, _From, State = #state{service = Service}) -> - Result = catch_function_exceptions( - fun() -> - ok = send_function_call(State, Function, Args), - receive_function_result(State, Function) - end, - Service), - {reply, Result, State}; - - -handle_call({send_call, Function, Args}, _From, State = #state{service = Service}) -> - Result = catch_function_exceptions( - fun() -> - send_function_call(State, Function, Args) - end, - Service), - {reply, Result, State}. - - -%% Helper function that catches exceptions thrown by sending or receiving -%% a function and returns the correct response for call or send_only above. -catch_function_exceptions(Fun, Service) -> - try - Fun() - catch - throw:{return, Return} -> - Return; - error:function_clause -> - ST = erlang:get_stacktrace(), - case hd(ST) of - {Service, function_info, [Function, _]} -> - {error, {no_function, Function}}; - _ -> throw({error, {function_clause, ST}}) - end - end. - - -%%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling cast messages -%%-------------------------------------------------------------------- -handle_cast({call, Function, Args}, State = #state{service = Service, - protocol = Protocol, - seqid = SeqId}) -> - _Result = - try - ok = send_function_call(State, Function, Args), - receive_function_result(State, Function) - catch - Class:Reason -> - error_logger:error_msg("error ignored in handle_cast({cast,...},...): ~p:~p~n", [Class, Reason]) - end, - - {noreply, State}; - -handle_cast(close, State=#state{protocol = Protocol}) -> -%% error_logger:info_msg("thrift_client ~p received close", [self()]), - {stop,normal,State}; -handle_cast(_Msg, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling all non call/cast messages -%%-------------------------------------------------------------------- -handle_info({'DOWN', MonitorRef, process, Pid, _Info}, State) - when is_reference(MonitorRef), is_pid(Pid) -> - %% We don't actually verify the correctness of the DOWN message. - {stop, parent_died, State}; - -handle_info(_Info, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> void() -%% Description: This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%%-------------------------------------------------------------------- -terminate(Reason, State = #state{protocol=undefined}) -> - ok; -terminate(Reason, State = #state{protocol=Protocol}) -> - thrift_protocol:close_transport(Protocol), - ok. - -%%-------------------------------------------------------------------- -%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} -%% Description: Convert process state when code is changed -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -send_function_call(#state{protocol = Proto, - service = Service, - seqid = SeqId}, +-spec send_function_call(#tclient{}, atom(), list()) -> {#tclient{}, ok | {error, term()}}. +send_function_call(Client = #tclient{protocol = Proto0, + service = Service, + seqid = SeqId}, Function, Args) -> Params = Service:function_info(Function, params_type), - {struct, PList} = Params, - if - length(PList) =/= length(Args) -> - throw({return, {error, {bad_args, Function, Args}}}); - true -> ok - end, - - Begin = #protocol_message_begin{name = atom_to_list(Function), - type = ?tMessageType_CALL, - seqid = SeqId}, - ok = thrift_protocol:write(Proto, Begin), - ok = thrift_protocol:write(Proto, {Params, list_to_tuple([Function | Args])}), - ok = thrift_protocol:write(Proto, message_end), - thrift_protocol:flush_transport(Proto), - ok. - -receive_function_result(State = #state{protocol = Proto, - service = Service}, - Function) -> - ResultType = Service:function_info(Function, reply_type), - read_result(State, Function, ResultType). - -read_result(_State, - _Function, - oneway_void) -> - {ok, ok}; - -read_result(State = #state{protocol = Proto, - seqid = SeqId}, - Function, - ReplyType) -> - case thrift_protocol:read(Proto, message_begin) of - #protocol_message_begin{seqid = RetSeqId} when RetSeqId =/= SeqId -> - {error, {bad_seq_id, SeqId}}; - - #protocol_message_begin{type = ?tMessageType_EXCEPTION} -> - handle_application_exception(State); - - #protocol_message_begin{type = ?tMessageType_REPLY} -> - handle_reply(State, Function, ReplyType) + case Params of + no_function -> + {Client, {error, {no_function, Function}}}; + {struct, PList} when length(PList) =/= length(Args) -> + {Client, {error, {bad_args, Function, Args}}}; + {struct, _PList} -> + Begin = #protocol_message_begin{name = atom_to_list(Function), + type = ?tMessageType_CALL, + seqid = SeqId}, + {Proto1, ok} = thrift_protocol:write(Proto0, Begin), + {Proto2, ok} = thrift_protocol:write(Proto1, {Params, list_to_tuple([Function | Args])}), + {Proto3, ok} = thrift_protocol:write(Proto2, message_end), + {Proto4, ok} = thrift_protocol:flush_transport(Proto3), + {Client#tclient{protocol = Proto4}, ok} end. -handle_reply(State = #state{protocol = Proto, - service = Service}, +-spec receive_function_result(#tclient{}, atom()) -> {#tclient{}, {ok, term()} | {error, term()}}. +receive_function_result(Client = #tclient{service = Service}, Function) -> + ResultType = Service:function_info(Function, reply_type), + read_result(Client, Function, ResultType). + +read_result(Client, _Function, oneway_void) -> + {Client, {ok, ok}}; + +read_result(Client = #tclient{protocol = Proto0, + seqid = SeqId}, + Function, + ReplyType) -> + {Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin), + NewClient = Client#tclient{protocol = Proto1}, + case MessageBegin of + #protocol_message_begin{seqid = RetSeqId} when RetSeqId =/= SeqId -> + {NewClient, {error, {bad_seq_id, SeqId}}}; + + #protocol_message_begin{type = ?tMessageType_EXCEPTION} -> + handle_application_exception(NewClient); + + #protocol_message_begin{type = ?tMessageType_REPLY} -> + handle_reply(NewClient, Function, ReplyType) + end. + + +handle_reply(Client = #tclient{protocol = Proto0, + service = Service}, Function, ReplyType) -> {struct, ExceptionFields} = Service:function_info(Function, exceptions), ReplyStructDef = {struct, [{0, ReplyType}] ++ ExceptionFields}, - {ok, Reply} = thrift_protocol:read(Proto, ReplyStructDef), + {Proto1, {ok, Reply}} = thrift_protocol:read(Proto0, ReplyStructDef), + {Proto2, ok} = thrift_protocol:read(Proto1, message_end), + NewClient = Client#tclient{protocol = Proto2}, ReplyList = tuple_to_list(Reply), true = length(ReplyList) == length(ExceptionFields) + 1, ExceptionVals = tl(ReplyList), Thrown = [X || X <- ExceptionVals, X =/= undefined], - Result = - case Thrown of - [] when ReplyType == {struct, []} -> - {ok, ok}; - [] -> - {ok, hd(ReplyList)}; - [Exception] -> - {exception, Exception} - end, - ok = thrift_protocol:read(Proto, message_end), - Result. + case Thrown of + [] when ReplyType == {struct, []} -> + {NewClient, {ok, ok}}; + [] -> + {NewClient, {ok, hd(ReplyList)}}; + [Exception] -> + throw({NewClient, {exception, Exception}}) + end. -handle_application_exception(State = #state{protocol = Proto}) -> - {ok, Exception} = thrift_protocol:read(Proto, - ?TApplicationException_Structure), - ok = thrift_protocol:read(Proto, message_end), +handle_application_exception(Client = #tclient{protocol = Proto0}) -> + {Proto1, {ok, Exception}} = + thrift_protocol:read(Proto0, ?TApplicationException_Structure), + {Proto2, ok} = thrift_protocol:read(Proto1, message_end), XRecord = list_to_tuple( ['TApplicationException' | tuple_to_list(Exception)]), error_logger:error_msg("X: ~p~n", [XRecord]), true = is_record(XRecord, 'TApplicationException'), - {exception, XRecord}. + NewClient = Client#tclient{protocol = Proto2}, + throw({NewClient, {exception, XRecord}}). diff --git a/lib/erl/src/thrift_client_util.erl b/lib/erl/src/thrift_client_util.erl new file mode 100644 index 000000000..c52bb8b68 --- /dev/null +++ b/lib/erl/src/thrift_client_util.erl @@ -0,0 +1,61 @@ +%% +%% Licensed to the Apache Software Foundation (ASF) under one +%% or more contributor license agreements. See the NOTICE file +%% distributed with this work for additional information +%% regarding copyright ownership. The ASF licenses this file +%% to you under the Apache License, Version 2.0 (the +%% "License"); you may not use this file except in compliance +%% with the License. You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% + +-module(thrift_client_util). + +-export([new/4]). + +%% +%% Splits client options into client, protocol, and transport options +%% +%% split_options([Options...]) -> {ProtocolOptions, TransportOptions} +%% +split_options(Options) -> + split_options(Options, [], []). + +split_options([], ProtoIn, TransIn) -> + {ProtoIn, TransIn}; + +split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn) + when OptKey =:= strict_read; + OptKey =:= strict_write -> + split_options(Rest, [Opt | ProtoIn], TransIn); + +split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn) + when OptKey =:= framed; + OptKey =:= connect_timeout; + OptKey =:= sockopts -> + split_options(Rest, ProtoIn, [Opt | TransIn]). + + +%% Client constructor for the common-case of socket transports +%% with the binary protocol +new(Host, Port, Service, Options) + when is_integer(Port), is_atom(Service), is_list(Options) -> + {ProtoOpts, TransOpts} = split_options(Options), + + {ok, TransportFactory} = + thrift_socket_transport:new_transport_factory(Host, Port, TransOpts), + + {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory( + TransportFactory, ProtoOpts), + + {ok, Protocol} = ProtocolFactory(), + + thrift_client:new(Protocol, Service). diff --git a/lib/erl/src/thrift_disk_log_transport.erl b/lib/erl/src/thrift_disk_log_transport.erl index 761fa3097..de8ee417b 100644 --- a/lib/erl/src/thrift_disk_log_transport.erl +++ b/lib/erl/src/thrift_disk_log_transport.erl @@ -35,6 +35,8 @@ close_on_close = false, sync_every = infinity, sync_tref}). +-type state() :: #dl_transport{}. +-include("thrift_transport_behaviour.hrl"). %% Create a transport attached to an already open log. @@ -47,7 +49,7 @@ new(LogName, Opts) when is_atom(LogName), is_list(Opts) -> State2 = case State#dl_transport.sync_every of N when is_integer(N), N > 0 -> - {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, State), + {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, [State]), State#dl_transport{sync_tref = TRef}; _ -> State end, @@ -58,38 +60,41 @@ new(LogName, Opts) when is_atom(LogName), is_list(Opts) -> parse_opts([], State) -> State; parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) -> - State#dl_transport{close_on_close = Bool}; + parse_opts(Rest, State#dl_transport{close_on_close = Bool}); parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 -> - State#dl_transport{sync_every = Int}. + parse_opts(Rest, State#dl_transport{sync_every = Int}). %%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% disk_log_transport is write-only -read(_State, Len) -> - {error, no_read_from_disk_log}. +read(State, _Len) -> + {State, {error, no_read_from_disk_log}}. -write(#dl_transport{log = Log}, Data) -> - disk_log:balog(Log, erlang:iolist_to_binary(Data)). +write(This = #dl_transport{log = Log}, Data) -> + {This, disk_log:balog(Log, erlang:iolist_to_binary(Data))}. force_flush(#dl_transport{log = Log}) -> error_logger:info_msg("~p syncing~n", [?MODULE]), disk_log:sync(Log). -flush(#dl_transport{log = Log, sync_every = SE}) -> +flush(This = #dl_transport{log = Log, sync_every = SE}) -> case SE of undefined -> % no time-based sync disk_log:sync(Log); _Else -> % sync will happen automagically ok - end. + end, + {This, ok}. + + %% On close, close the underlying log if we're configured to do so. -close(#dl_transport{close_on_close = false}) -> - ok; -close(#dl_transport{log = Log}) -> - disk_log:lclose(Log). +close(This = #dl_transport{close_on_close = false}) -> + {This, ok}; +close(This = #dl_transport{log = Log}) -> + {This, disk_log:lclose(Log)}. %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -109,10 +114,10 @@ factory_impl(Name, ExtraLogOpts, TransportOpts) -> ExtraLogOpts], Log = case disk_log:open(LogOpts) of - {ok, Log} -> - Log; - {repaired, Log, Info1, Info2} -> - error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [Log, Info1, Info2]), - Log + {ok, LogS} -> + LogS; + {repaired, LogS, Info1, Info2} -> + error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [LogS, Info1, Info2]), + LogS end, new(Log, TransportOpts). diff --git a/lib/erl/src/thrift_file_transport.erl b/lib/erl/src/thrift_file_transport.erl index 5ac2dbe1f..ba3aa8983 100644 --- a/lib/erl/src/thrift_file_transport.erl +++ b/lib/erl/src/thrift_file_transport.erl @@ -29,6 +29,8 @@ -record(t_file_transport, {device, should_close = true, mode = write}). +-type state() :: #t_file_transport{}. +-include("thrift_transport_behaviour.hrl"). %%%% CONSTRUCTION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -63,25 +65,25 @@ parse_opts([], State) -> %%%% TRANSPORT IMPL %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -write(#t_file_transport{device = Device, mode = write}, Data) -> - file:write(Device, Data); -write(_T, _D) -> - {error, read_mode}. +write(This = #t_file_transport{device = Device, mode = write}, Data) -> + {This, file:write(Device, Data)}; +write(This, _D) -> + {This, {error, read_mode}}. -read(#t_file_transport{device = Device, mode = read}, Len) +read(This = #t_file_transport{device = Device, mode = read}, Len) when is_integer(Len), Len >= 0 -> - file:read(Device, Len); -read(_T, _D) -> - {error, read_mode}. + {This, file:read(Device, Len)}; +read(This, _D) -> + {This, {error, read_mode}}. -flush(#t_file_transport{device = Device, mode = write}) -> - file:sync(Device). +flush(This = #t_file_transport{device = Device, mode = write}) -> + {This, file:sync(Device)}. -close(#t_file_transport{device = Device, should_close = SC}) -> +close(This = #t_file_transport{device = Device, should_close = SC}) -> case SC of true -> - file:close(Device); + {This, file:close(Device)}; false -> - ok + {This, ok} end. diff --git a/lib/erl/src/thrift_framed_transport.erl b/lib/erl/src/thrift_framed_transport.erl index 01bab70ba..9b90112c9 100644 --- a/lib/erl/src/thrift_framed_transport.erl +++ b/lib/erl/src/thrift_framed_transport.erl @@ -19,16 +19,11 @@ -module(thrift_framed_transport). --behaviour(gen_server). -behaviour(thrift_transport). %% API -export([new/1]). -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - %% thrift_transport callbacks -export([write/2, read/2, flush/1, close/1]). @@ -36,102 +31,55 @@ read_buffer, % iolist() write_buffer % iolist() }). +-type state() :: #framed_transport{}. +-include("thrift_transport_behaviour.hrl"). -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} -%% Description: Starts the server -%%-------------------------------------------------------------------- new(WrappedTransport) -> - case gen_server:start_link(?MODULE, [WrappedTransport], []) of - {ok, Pid} -> - thrift_transport:new(?MODULE, Pid); - Else -> - Else - end. + State = #framed_transport{wrapped = WrappedTransport, + read_buffer = [], + write_buffer = []}, + thrift_transport:new(?MODULE, State). -%%-------------------------------------------------------------------- -%% Function: write(Transport, Data) -> ok -%% -%% Data = iolist() -%% -%% Description: Writes data into the buffer -%%-------------------------------------------------------------------- -write(Transport, Data) -> - gen_server:call(Transport, {write, Data}). +%% Writes data into the buffer +write(State = #framed_transport{write_buffer = WBuf}, Data) -> + {State#framed_transport{write_buffer = [WBuf, Data]}, ok}. -%%-------------------------------------------------------------------- -%% Function: flush(Transport) -> ok -%% -%% Description: Flushes the buffer through to the wrapped transport -%%-------------------------------------------------------------------- -flush(Transport) -> - gen_server:call(Transport, flush). +%% Flushes the buffer through to the wrapped transport +flush(State0 = #framed_transport{write_buffer = Buffer, + wrapped = Wrapped0}) -> + FrameLen = iolist_size(Buffer), + Data = [<>, Buffer], -%%-------------------------------------------------------------------- -%% Function: close(Transport) -> ok -%% -%% Description: Closes the transport and the wrapped transport -%%-------------------------------------------------------------------- -close(Transport) -> - gen_server:cast(Transport, close). + {Wrapped1, Response} = thrift_transport:write(Wrapped0, Data), -%%-------------------------------------------------------------------- -%% Function: Read(Transport, Len) -> {ok, Data} -%% -%% Data = binary() -%% -%% Description: Reads data through from the wrapped transoprt -%%-------------------------------------------------------------------- -read(Transport, Len) when is_integer(Len) -> - gen_server:call(Transport, {read, Len}). + {Wrapped2, _} = thrift_transport:flush(Wrapped1), -%%==================================================================== -%% gen_server callbacks -%%==================================================================== + State1 = State0#framed_transport{wrapped = Wrapped2, write_buffer = []}, + {State1, Response}. -%%-------------------------------------------------------------------- -%% Function: init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Description: Initiates the server -%%-------------------------------------------------------------------- -init([Wrapped]) -> - {ok, #framed_transport{wrapped = Wrapped, - read_buffer = [], - write_buffer = []}}. +%% Closes the transport and the wrapped transport +close(State = #framed_transport{wrapped = Wrapped0}) -> + {Wrapped1, Result} = thrift_transport:close(Wrapped0), + NewState = State#framed_transport{wrapped = Wrapped1}, + {NewState, Result}. -%%-------------------------------------------------------------------- -%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% Description: Handling call messages -%%-------------------------------------------------------------------- -handle_call({write, Data}, _From, State = #framed_transport{write_buffer = WBuf}) -> - {reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}}; - -handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped, - read_buffer = RBuf}) -> - {RBuf1, RBuf1Size} = +%% Reads data through from the wrapped transport +read(State0 = #framed_transport{wrapped = Wrapped0, read_buffer = RBuf}, + Len) when is_integer(Len) -> + {Wrapped1, {RBuf1, RBuf1Size}} = %% if the read buffer is empty, read another frame %% otherwise, just read from what's left in the buffer case iolist_size(RBuf) of 0 -> %% read the frame length - {ok, <>} = - thrift_transport:read(Wrapped, 4), + {WrappedS1, {ok, <>}} = + thrift_transport:read(Wrapped0, 4), %% then read the data - {ok, Bin} = - thrift_transport:read(Wrapped, FrameLen), - {Bin, erlang:byte_size(Bin)}; + {WrappedS2, {ok, Bin}} = + thrift_transport:read(WrappedS1, FrameLen), + {WrappedS2, {Bin, erlang:byte_size(Bin)}}; Sz -> - {RBuf, Sz} + {Wrapped0, {RBuf, Sz}} end, %% pull off Give bytes, return them to the user, leave the rest in the buffer @@ -139,69 +87,13 @@ handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped, <> = iolist_to_binary(RBuf1), Response = {ok, Data}, - State1 = State#framed_transport{read_buffer=RBuf2}, + State1 = State0#framed_transport{wrapped = Wrapped1, read_buffer=RBuf2}, - {reply, Response, State1}; - -handle_call(flush, _From, State) -> - {Response, State1} = do_flush(State), - {reply, Response, State1}. + {State1, Response}. %%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling cast messages +%% Internal functions %%-------------------------------------------------------------------- -handle_cast(close, State) -> - {_, State1} = do_flush(State), - %% Wrapped is closed by terminate/2 - %% error_logger:info_msg("thrift_framed_transport ~p: closing", [self()]), - {stop, normal, State}; -handle_cast(Msg, State=#framed_transport{}) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling all non call/cast messages -%%-------------------------------------------------------------------- -handle_info(_Info, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> void() -%% Description: This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%%-------------------------------------------------------------------- -terminate(_Reason, State = #framed_transport{wrapped=Wrapped}) -> - thrift_transport:close(Wrapped), - ok. - -%%-------------------------------------------------------------------- -%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} -%% Description: Convert process state when code is changed -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- -do_flush(State = #framed_transport{write_buffer = Buffer, - wrapped = Wrapped}) -> - FrameLen = iolist_size(Buffer), - Data = [<>, Buffer], - - Response = thrift_transport:write(Wrapped, Data), - - thrift_transport:flush(Wrapped), - - State1 = State#framed_transport{write_buffer = []}, - {Response, State1}. min(A,B) when A A; min(_,B) -> B. diff --git a/lib/erl/src/thrift_http_transport.erl b/lib/erl/src/thrift_http_transport.erl index f8c182773..09113cc25 100644 --- a/lib/erl/src/thrift_http_transport.erl +++ b/lib/erl/src/thrift_http_transport.erl @@ -19,20 +19,11 @@ -module(thrift_http_transport). --behaviour(gen_server). -behaviour(thrift_transport). %% API -export([new/2, new/3]). -%% gen_server callbacks --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). - %% thrift_transport callbacks -export([write/2, read/2, flush/1, close/1]). @@ -43,14 +34,9 @@ http_options, % see http(3) extra_headers % [{str(), str()}, ...] }). +-type state() :: pid(). +-include("thrift_transport_behaviour.hrl"). -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: new() -> {ok, Transport} | ignore | {error,Error} -%% Description: Starts the server -%%-------------------------------------------------------------------- new(Host, Path) -> new(Host, Path, _Options = []). @@ -60,54 +46,6 @@ new(Host, Path) -> %% {extra_headers, ExtraHeaders} = List of extra HTTP headers %%-------------------------------------------------------------------- new(Host, Path, Options) -> - case gen_server:start_link(?MODULE, {Host, Path, Options}, []) of - {ok, Pid} -> - thrift_transport:new(?MODULE, Pid); - Else -> - Else - end. - -%%-------------------------------------------------------------------- -%% Function: write(Transport, Data) -> ok -%% -%% Data = iolist() -%% -%% Description: Writes data into the buffer -%%-------------------------------------------------------------------- -write(Transport, Data) -> - gen_server:call(Transport, {write, Data}). - -%%-------------------------------------------------------------------- -%% Function: flush(Transport) -> ok -%% -%% Description: Flushes the buffer, making a request -%%-------------------------------------------------------------------- -flush(Transport) -> - gen_server:call(Transport, flush). - -%%-------------------------------------------------------------------- -%% Function: close(Transport) -> ok -%% -%% Description: Closes the transport -%%-------------------------------------------------------------------- -close(Transport) -> - gen_server:cast(Transport, close). - -%%-------------------------------------------------------------------- -%% Function: Read(Transport, Len) -> {ok, Data} -%% -%% Data = binary() -%% -%% Description: Reads data through from the wrapped transoprt -%%-------------------------------------------------------------------- -read(Transport, Len) when is_integer(Len) -> - gen_server:call(Transport, {read, Len}). - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -init({Host, Path, Options}) -> State1 = #http_transport{host = Host, path = Path, read_buffer = [], @@ -127,50 +65,17 @@ init({Host, Path, Options}) -> end, case lists:foldl(ApplyOption, State1, Options) of State2 = #http_transport{} -> - {ok, State2}; + thrift_transport:new(?MODULE, State2); Else -> - {stop, Else} + {error, Else} end. -handle_call({write, Data}, _From, State = #http_transport{write_buffer = WBuf}) -> - {reply, ok, State#http_transport{write_buffer = [WBuf, Data]}}; +%% Writes data into the buffer +write(State = #http_transport{write_buffer = WBuf}, Data) -> + {State#http_transport{write_buffer = [WBuf, Data]}, ok}. -handle_call({read, Len}, _From, State = #http_transport{read_buffer = RBuf}) -> - %% Pull off Give bytes, return them to the user, leave the rest in the buffer. - Give = min(iolist_size(RBuf), Len), - case iolist_to_binary(RBuf) of - <> -> - Response = {ok, Data}, - State1 = State#http_transport{read_buffer=RBuf1}, - {reply, Response, State1}; - _ -> - {reply, {error, 'EOF'}, State} - end; - -handle_call(flush, _From, State) -> - {Response, State1} = do_flush(State), - {reply, Response, State1}. - -handle_cast(close, State) -> - {_, State1} = do_flush(State), - {stop, normal, State1}; - -handle_cast(_Msg, State=#http_transport{}) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- -do_flush(State = #http_transport{host = Host, +%% Flushes the buffer, making a request +flush(State = #http_transport{host = Host, path = Path, read_buffer = Rbuf, write_buffer = Wbuf, @@ -179,7 +84,7 @@ do_flush(State = #http_transport{host = Host, case iolist_to_binary(Wbuf) of <<>> -> %% Don't bother flushing empty buffers. - {ok, State}; + {State, ok}; WBinary -> {ok, {{_Version, 200, _ReasonPhrase}, _Headers, Body}} = http:request(post, @@ -192,7 +97,22 @@ do_flush(State = #http_transport{host = Host, State1 = State#http_transport{read_buffer = [Rbuf, Body], write_buffer = []}, - {ok, State1} + {State1, ok} + end. + +close(State) -> + {State, ok}. + +read(State = #http_transport{read_buffer = RBuf}, Len) when is_integer(Len) -> + %% Pull off Give bytes, return them to the user, leave the rest in the buffer. + Give = min(iolist_size(RBuf), Len), + case iolist_to_binary(RBuf) of + <> -> + Response = {ok, Data}, + State1 = State#http_transport{read_buffer=RBuf1}, + {State1, Response}; + _ -> + {State, {error, 'EOF'}} end. min(A,B) when A A; diff --git a/lib/erl/src/thrift_memory_buffer.erl b/lib/erl/src/thrift_memory_buffer.erl index b4f607a95..c44449ec5 100644 --- a/lib/erl/src/thrift_memory_buffer.erl +++ b/lib/erl/src/thrift_memory_buffer.erl @@ -19,145 +19,43 @@ -module(thrift_memory_buffer). --behaviour(gen_server). -behaviour(thrift_transport). %% API -export([new/0, new_transport_factory/0]). -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - %% thrift_transport callbacks -export([write/2, read/2, flush/1, close/1]). -record(memory_buffer, {buffer}). +-type state() :: #memory_buffer{}. +-include("thrift_transport_behaviour.hrl"). -%%==================================================================== -%% API -%%==================================================================== new() -> - case gen_server:start_link(?MODULE, [], []) of - {ok, Pid} -> - thrift_transport:new(?MODULE, Pid); - Else -> - Else - end. + State = #memory_buffer{buffer = []}, + thrift_transport:new(?MODULE, State). new_transport_factory() -> {ok, fun() -> new() end}. -%%-------------------------------------------------------------------- -%% Function: write(Transport, Data) -> ok -%% -%% Data = iolist() -%% -%% Description: Writes data into the buffer -%%-------------------------------------------------------------------- -write(Transport, Data) -> - gen_server:call(Transport, {write, Data}). +%% Writes data into the buffer +write(State = #memory_buffer{buffer = Buf}, Data) -> + {State#memory_buffer{buffer = [Buf, Data]}, ok}. -%%-------------------------------------------------------------------- -%% Function: flush(Transport) -> ok -%% -%% Description: Flushes the buffer through to the wrapped transport -%%-------------------------------------------------------------------- -flush(Transport) -> - gen_server:call(Transport, flush). +flush(State) -> + {State, ok}. -%%-------------------------------------------------------------------- -%% Function: close(Transport) -> ok -%% -%% Description: Closes the transport and the wrapped transport -%%-------------------------------------------------------------------- -close(Transport) -> - gen_server:cast(Transport, close). +close(State) -> + {State, ok}. -%%-------------------------------------------------------------------- -%% Function: Read(Transport, Len) -> {ok, Data} -%% -%% Data = binary() -%% -%% Description: Reads data through from the wrapped transoprt -%%-------------------------------------------------------------------- -read(Transport, Len) when is_integer(Len) -> - gen_server:call(Transport, {read, Len}). - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% Function: init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Description: Initiates the server -%%-------------------------------------------------------------------- -init([]) -> - {ok, #memory_buffer{buffer = []}}. - -%%-------------------------------------------------------------------- -%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% Description: Handling call messages -%%-------------------------------------------------------------------- -handle_call({write, Data}, _From, State = #memory_buffer{buffer = Buf}) -> - {reply, ok, State#memory_buffer{buffer = [Buf, Data]}}; - -handle_call({read, Len}, _From, State = #memory_buffer{buffer = Buf}) -> +read(State = #memory_buffer{buffer = Buf}, Len) when is_integer(Len) -> Binary = iolist_to_binary(Buf), Give = min(iolist_size(Binary), Len), {Result, Remaining} = split_binary(Binary, Give), - {reply, {ok, Result}, State#memory_buffer{buffer = Remaining}}; - -handle_call(flush, _From, State) -> - {reply, ok, State}. + {State#memory_buffer{buffer = Remaining}, {ok, Result}}. %%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling cast messages -%%-------------------------------------------------------------------- -handle_cast(close, State) -> - {stop, normal, State}; -handle_cast(Msg, State=#memory_buffer{}) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling all non call/cast messages -%%-------------------------------------------------------------------- -handle_info(_Info, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> void() -%% Description: This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> - ok. - -%%-------------------------------------------------------------------- -%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} -%% Description: Convert process state when code is changed -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%%% Internal functions +%% Internal functions %%-------------------------------------------------------------------- min(A,B) when A A; min(_,B) -> B. diff --git a/lib/erl/src/thrift_processor.erl b/lib/erl/src/thrift_processor.erl index e26fb3303..431550501 100644 --- a/lib/erl/src/thrift_processor.erl +++ b/lib/erl/src/thrift_processor.erl @@ -24,55 +24,54 @@ -include("thrift_constants.hrl"). -include("thrift_protocol.hrl"). --record(thrift_processor, {handler, in_protocol, out_protocol, service}). +-record(thrift_processor, {handler, protocol, service}). -init({Server, ProtoGen, Service, Handler}) when is_function(ProtoGen, 0) -> - {ok, IProt, OProt} = ProtoGen(), - loop(#thrift_processor{in_protocol = IProt, - out_protocol = OProt, +init({_Server, ProtoGen, Service, Handler}) when is_function(ProtoGen, 0) -> + {ok, Proto} = ProtoGen(), + loop(#thrift_processor{protocol = Proto, service = Service, handler = Handler}). -loop(State = #thrift_processor{in_protocol = IProto, - out_protocol = OProto}) -> - case thrift_protocol:read(IProto, message_begin) of +loop(State0 = #thrift_processor{protocol = Proto0}) -> + {Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin), + State1 = State0#thrift_processor{protocol = Proto1}, + case MessageBegin of #protocol_message_begin{name = Function, type = ?tMessageType_CALL} -> - ok = handle_function(State, list_to_atom(Function)), - loop(State); + {State2, ok} = handle_function(State1, list_to_atom(Function)), + loop(State2); #protocol_message_begin{name = Function, type = ?tMessageType_ONEWAY} -> - ok = handle_function(State, list_to_atom(Function)), - loop(State); + {State2, ok} = handle_function(State1, list_to_atom(Function)), + loop(State2); {error, timeout} -> - thrift_protocol:close_transport(OProto), + thrift_protocol:close_transport(Proto1), ok; {error, closed} -> %% error_logger:info_msg("Client disconnected~n"), - thrift_protocol:close_transport(OProto), + thrift_protocol:close_transport(Proto1), exit(shutdown) end. -handle_function(State=#thrift_processor{in_protocol = IProto, - out_protocol = OProto, - handler = Handler, - service = Service}, +handle_function(State0=#thrift_processor{protocol = Proto0, + handler = Handler, + service = Service}, Function) -> InParams = Service:function_info(Function, params_type), - {ok, Params} = thrift_protocol:read(IProto, InParams), + {Proto1, {ok, Params}} = thrift_protocol:read(Proto0, InParams), + State1 = State0#thrift_processor{protocol = Proto1}, try Result = Handler:handle_function(Function, Params), %% {Micro, Result} = better_timer(Handler, handle_function, [Function, Params]), %% error_logger:info_msg("Processed ~p(~p) in ~.4fms~n", %% [Function, Params, Micro/1000.0]), - handle_success(State, Function, Result) + handle_success(State1, Function, Result) catch - Type:Data -> - handle_function_catch(State, Function, Type, Data) - end, - after_reply(OProto). + Type:Data when Type =:= throw orelse Type =:= error -> + handle_function_catch(State1, Function, Type, Data) + end. handle_function_catch(State = #thrift_processor{service = Service}, Function, ErrType, ErrData) -> @@ -84,39 +83,37 @@ handle_function_catch(State = #thrift_processor{service = Service}, error_logger:warning_msg( "oneway void ~p threw error which must be ignored: ~p", [Function, {ErrType, ErrData, Stack}]), - ok; + {State, ok}; {throw, Exception} when is_tuple(Exception), size(Exception) > 0 -> - error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]), - handle_exception(State, Function, Exception), - ok; % we still want to accept more requests from this client + %error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]), + handle_exception(State, Function, Exception); + % we still want to accept more requests from this client {error, Error} -> - ok = handle_error(State, Function, Error) + handle_error(State, Function, Error) end. -handle_success(State = #thrift_processor{out_protocol = OProto, - service = Service}, +handle_success(State = #thrift_processor{service = Service}, Function, Result) -> ReplyType = Service:function_info(Function, reply_type), StructName = atom_to_list(Function) ++ "_result", - ok = case Result of - {reply, ReplyData} -> - Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}}, - send_reply(OProto, Function, ?tMessageType_REPLY, Reply); + case Result of + {reply, ReplyData} -> + Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}}, + send_reply(State, Function, ?tMessageType_REPLY, Reply); - ok when ReplyType == {struct, []} -> - send_reply(OProto, Function, ?tMessageType_REPLY, {ReplyType, {StructName}}); + ok when ReplyType == {struct, []} -> + send_reply(State, Function, ?tMessageType_REPLY, {ReplyType, {StructName}}); - ok when ReplyType == oneway_void -> - %% no reply for oneway void - ok - end. + ok when ReplyType == oneway_void -> + %% no reply for oneway void + {State, ok} + end. -handle_exception(State = #thrift_processor{out_protocol = OProto, - service = Service}, +handle_exception(State = #thrift_processor{service = Service}, Function, Exception) -> ExceptionType = element(1, Exception), @@ -141,9 +138,9 @@ handle_exception(State = #thrift_processor{out_protocol = OProto, % Make sure we got at least one defined case lists:all(fun(X) -> X =:= undefined end, ExceptionList) of true -> - ok = handle_unknown_exception(State, Function, Exception); + handle_unknown_exception(State, Function, Exception); false -> - ok = send_reply(OProto, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple}) + send_reply(State, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple}) end. %% @@ -154,7 +151,7 @@ handle_unknown_exception(State, Function, Exception) -> handle_error(State, Function, {exception_not_declared_as_thrown, Exception}). -handle_error(#thrift_processor{out_protocol = OProto}, Function, Error) -> +handle_error(State, Function, Error) -> Stack = erlang:get_stacktrace(), error_logger:error_msg("~p had an error: ~p~n", [Function, {Error, Stack}]), @@ -170,19 +167,14 @@ handle_error(#thrift_processor{out_protocol = OProto}, Function, Error) -> #'TApplicationException'{ message = Message, type = ?TApplicationException_UNKNOWN}}, - send_reply(OProto, Function, ?tMessageType_EXCEPTION, Reply). + send_reply(State, Function, ?tMessageType_EXCEPTION, Reply). -send_reply(OProto, Function, ReplyMessageType, Reply) -> - ok = thrift_protocol:write(OProto, #protocol_message_begin{ - name = atom_to_list(Function), - type = ReplyMessageType, - seqid = 0}), - ok = thrift_protocol:write(OProto, Reply), - ok = thrift_protocol:write(OProto, message_end), - ok = thrift_protocol:flush_transport(OProto), - ok. - -after_reply(OProto) -> - ok = thrift_protocol:flush_transport(OProto) - %% ok = thrift_protocol:close_transport(OProto) - . +send_reply(State = #thrift_processor{protocol = Proto0}, Function, ReplyMessageType, Reply) -> + {Proto1, ok} = thrift_protocol:write(Proto0, #protocol_message_begin{ + name = atom_to_list(Function), + type = ReplyMessageType, + seqid = 0}), + {Proto2, ok} = thrift_protocol:write(Proto1, Reply), + {Proto3, ok} = thrift_protocol:write(Proto2, message_end), + {Proto4, ok} = thrift_protocol:flush_transport(Proto3), + {State#thrift_processor{protocol = Proto4}, ok}. diff --git a/lib/erl/src/thrift_protocol.erl b/lib/erl/src/thrift_protocol.erl index 1bfb0a426..bb499ce7b 100644 --- a/lib/erl/src/thrift_protocol.erl +++ b/lib/erl/src/thrift_protocol.erl @@ -49,10 +49,13 @@ new(Module, Data) when is_atom(Module) -> {ok, #protocol{module = Module, data = Data}}. -flush_transport(#protocol{module = Module, - data = Data}) -> - Module:flush_transport(Data). +-spec flush_transport(#protocol{}) -> {#protocol{}, ok}. +flush_transport(Proto = #protocol{module = Module, + data = Data}) -> + {NewData, Result} = Module:flush_transport(Data), + {Proto#protocol{data = NewData}, Result}. +-spec close_transport(#protocol{}) -> ok. close_transport(#protocol{module = Module, data = Data}) -> Module:close_transport(Data). @@ -86,7 +89,8 @@ term_to_typeid({list, _}) -> ?tType_LIST. %% Structure is like: %% [{Fid, Type}, ...] -read(IProto, {struct, Structure}, Tag) +-spec read(#protocol{}, {struct, _StructDef}, atom()) -> {#protocol{}, {ok, tuple()}}. +read(IProto0, {struct, Structure}, Tag) when is_list(Structure), is_atom(Tag) -> % If we want a tagged tuple, we need to offset all the tuple indices @@ -103,14 +107,23 @@ read(IProto, {struct, Structure}, Tag) % Fid -> {Type, Index} SDict = dict:from_list(SWithIndices), - ok = read(IProto, struct_begin), + {IProto1, ok} = read(IProto0, struct_begin), RTuple0 = erlang:make_tuple(length(Structure) + Offset, undefined), RTuple1 = if Tag =/= undefined -> setelement(1, RTuple0, Tag); true -> RTuple0 end, - RTuple2 = read_struct_loop(IProto, SDict, RTuple1), - {ok, RTuple2}. + {IProto2, RTuple2} = read_struct_loop(IProto1, SDict, RTuple1), + {IProto2, {ok, RTuple2}}. + + +%% NOTE: Keep this in sync with thrift_protocol_behaviour:read +-spec read + (#protocol{}, {struct, _Info}) -> {#protocol{}, {ok, tuple()} | {error, _Reason}}; + (#protocol{}, tprot_cont_tag()) -> {#protocol{}, {ok, term()} | {error, _Reason}}; + (#protocol{}, tprot_empty_tag()) -> {#protocol{}, ok | {error, _Reason}}; + (#protocol{}, tprot_header_tag()) -> {#protocol{}, tprot_header_val() | {error, _Reason}}; + (#protocol{}, tprot_data_tag()) -> {#protocol{}, {ok, term()} | {error, _Reason}}. read(IProto, {struct, {Module, StructureName}}) when is_atom(Module), is_atom(StructureName) -> @@ -119,137 +132,165 @@ read(IProto, {struct, {Module, StructureName}}) when is_atom(Module), read(IProto, S={struct, Structure}) when is_list(Structure) -> read(IProto, S, undefined); -read(IProto, {list, Type}) -> - #protocol_list_begin{etype = EType, size = Size} = - read(IProto, list_begin), - List = [Result || {ok, Result} <- - [read(IProto, Type) || _X <- lists:duplicate(Size, 0)]], - ok = read(IProto, list_end), - {ok, List}; +read(IProto0, {list, Type}) -> + {IProto1, #protocol_list_begin{etype = EType, size = Size}} = + read(IProto0, list_begin), + {EType, EType} = {term_to_typeid(Type), EType}, + {List, IProto2} = lists:mapfoldl(fun(_, ProtoS0) -> + {ProtoS1, {ok, Item}} = read(ProtoS0, Type), + {Item, ProtoS1} + end, + IProto1, + lists:duplicate(Size, 0)), + {IProto3, ok} = read(IProto2, list_end), + {IProto3, {ok, List}}; -read(IProto, {map, KeyType, ValType}) -> - #protocol_map_begin{size = Size} = - read(IProto, map_begin), +read(IProto0, {map, KeyType, ValType}) -> + {IProto1, #protocol_map_begin{size = Size, ktype = KType, vtype = VType}} = + read(IProto0, map_begin), + {KType, KType} = {term_to_typeid(KeyType), KType}, + {VType, VType} = {term_to_typeid(ValType), VType}, + {List, IProto2} = lists:mapfoldl(fun(_, ProtoS0) -> + {ProtoS1, {ok, Key}} = read(ProtoS0, KeyType), + {ProtoS2, {ok, Val}} = read(ProtoS1, ValType), + {{Key, Val}, ProtoS2} + end, + IProto1, + lists:duplicate(Size, 0)), + {IProto3, ok} = read(IProto2, map_end), + {IProto3, {ok, dict:from_list(List)}}; - List = [{Key, Val} || {{ok, Key}, {ok, Val}} <- - [{read(IProto, KeyType), - read(IProto, ValType)} || _X <- lists:duplicate(Size, 0)]], - ok = read(IProto, map_end), - {ok, dict:from_list(List)}; +read(IProto0, {set, Type}) -> + {IProto1, #protocol_set_begin{etype = EType, size = Size}} = + read(IProto0, set_begin), + {EType, EType} = {term_to_typeid(Type), EType}, + {List, IProto2} = lists:mapfoldl(fun(_, ProtoS0) -> + {ProtoS1, {ok, Item}} = read(ProtoS0, Type), + {Item, ProtoS1} + end, + IProto1, + lists:duplicate(Size, 0)), + {IProto3, ok} = read(IProto2, set_end), + {IProto3, {ok, sets:from_list(List)}}; -read(IProto, {set, Type}) -> - #protocol_set_begin{etype = _EType, - size = Size} = - read(IProto, set_begin), - List = [Result || {ok, Result} <- - [read(IProto, Type) || _X <- lists:duplicate(Size, 0)]], - ok = read(IProto, set_end), - {ok, sets:from_list(List)}; +read(Protocol, ProtocolType) -> + read_specific(Protocol, ProtocolType). -read(#protocol{module = Module, - data = ModuleData}, ProtocolType) -> - Module:read(ModuleData, ProtocolType). +%% NOTE: Keep this in sync with thrift_protocol_behaviour:read +-spec read_specific + (#protocol{}, tprot_empty_tag()) -> {#protocol{}, ok | {error, _Reason}}; + (#protocol{}, tprot_header_tag()) -> {#protocol{}, tprot_header_val() | {error, _Reason}}; + (#protocol{}, tprot_data_tag()) -> {#protocol{}, {ok, term()} | {error, _Reason}}. +read_specific(Proto = #protocol{module = Module, + data = ModuleData}, ProtocolType) -> + {NewData, Result} = Module:read(ModuleData, ProtocolType), + {Proto#protocol{data = NewData}, Result}. -read_struct_loop(IProto, SDict, RTuple) -> - #protocol_field_begin{type = FType, id = Fid, name = Name} = - thrift_protocol:read(IProto, field_begin), +read_struct_loop(IProto0, SDict, RTuple) -> + {IProto1, #protocol_field_begin{type = FType, id = Fid}} = + thrift_protocol:read(IProto0, field_begin), case {FType, Fid} of {?tType_STOP, _} -> - RTuple; + {IProto1, RTuple}; _Else -> case dict:find(Fid, SDict) of {ok, {Type, Index}} -> case term_to_typeid(Type) of FType -> - {ok, Val} = read(IProto, Type), - thrift_protocol:read(IProto, field_end), + {IProto2, {ok, Val}} = read(IProto1, Type), + {IProto3, ok} = thrift_protocol:read(IProto2, field_end), NewRTuple = setelement(Index, RTuple, Val), - read_struct_loop(IProto, SDict, NewRTuple); + read_struct_loop(IProto3, SDict, NewRTuple); Expected -> error_logger:info_msg( "Skipping field ~p with wrong type (~p != ~p)~n", [Fid, FType, Expected]), - skip_field(FType, IProto, SDict, RTuple) + skip_field(FType, IProto1, SDict, RTuple) end; _Else2 -> error_logger:info_msg("Skipping field ~p with unknown fid~n", [Fid]), - skip_field(FType, IProto, SDict, RTuple) + skip_field(FType, IProto1, SDict, RTuple) end end. -skip_field(FType, IProto, SDict, RTuple) -> +skip_field(FType, IProto0, SDict, RTuple) -> FTypeAtom = thrift_protocol:typeid_to_atom(FType), - thrift_protocol:skip(IProto, FTypeAtom), - read(IProto, field_end), - read_struct_loop(IProto, SDict, RTuple). + {IProto1, ok} = thrift_protocol:skip(IProto0, FTypeAtom), + {IProto2, ok} = read(IProto1, field_end), + read_struct_loop(IProto2, SDict, RTuple). + +-spec skip(#protocol{}, term()) -> {#protocol{}, ok}. + +skip(Proto0, struct) -> + {Proto1, ok} = read(Proto0, struct_begin), + {Proto2, ok} = skip_struct_loop(Proto1), + {Proto3, ok} = read(Proto2, struct_end), + {Proto3, ok}; + +skip(Proto0, map) -> + {Proto1, Map} = read(Proto0, map_begin), + {Proto2, ok} = skip_map_loop(Proto1, Map), + {Proto3, ok} = read(Proto2, map_end), + {Proto3, ok}; + +skip(Proto0, set) -> + {Proto1, Set} = read(Proto0, set_begin), + {Proto2, ok} = skip_set_loop(Proto1, Set), + {Proto3, ok} = read(Proto2, set_end), + {Proto3, ok}; + +skip(Proto0, list) -> + {Proto1, List} = read(Proto0, list_begin), + {Proto2, ok} = skip_list_loop(Proto1, List), + {Proto3, ok} = read(Proto2, list_end), + {Proto3, ok}; + +skip(Proto0, Type) when is_atom(Type) -> + {Proto1, _Ignore} = read(Proto0, Type), + {Proto1, ok}. -skip(Proto, struct) -> - ok = read(Proto, struct_begin), - ok = skip_struct_loop(Proto), - ok = read(Proto, struct_end); - -skip(Proto, map) -> - Map = read(Proto, map_begin), - ok = skip_map_loop(Proto, Map), - ok = read(Proto, map_end); - -skip(Proto, set) -> - Set = read(Proto, set_begin), - ok = skip_set_loop(Proto, Set), - ok = read(Proto, set_end); - -skip(Proto, list) -> - List = read(Proto, list_begin), - ok = skip_list_loop(Proto, List), - ok = read(Proto, list_end); - -skip(Proto, Type) when is_atom(Type) -> - _Ignore = read(Proto, Type), - ok. - - -skip_struct_loop(Proto) -> - #protocol_field_begin{type = Type} = read(Proto, field_begin), +skip_struct_loop(Proto0) -> + {Proto1, #protocol_field_begin{type = Type}} = read(Proto0, field_begin), case Type of ?tType_STOP -> - ok; + {Proto1, ok}; _Else -> - skip(Proto, Type), - ok = read(Proto, field_end), - skip_struct_loop(Proto) + {Proto2, ok} = skip(Proto1, Type), + {Proto3, ok} = read(Proto2, field_end), + skip_struct_loop(Proto3) end. -skip_map_loop(Proto, Map = #protocol_map_begin{ktype = Ktype, - vtype = Vtype, - size = Size}) -> +skip_map_loop(Proto0, Map = #protocol_map_begin{ktype = Ktype, + vtype = Vtype, + size = Size}) -> case Size of N when N > 0 -> - skip(Proto, Ktype), - skip(Proto, Vtype), - skip_map_loop(Proto, + {Proto1, ok} = skip(Proto0, Ktype), + {Proto2, ok} = skip(Proto1, Vtype), + skip_map_loop(Proto2, Map#protocol_map_begin{size = Size - 1}); - 0 -> ok + 0 -> {Proto0, ok} end. -skip_set_loop(Proto, Map = #protocol_set_begin{etype = Etype, - size = Size}) -> +skip_set_loop(Proto0, Map = #protocol_set_begin{etype = Etype, + size = Size}) -> case Size of N when N > 0 -> - skip(Proto, Etype), - skip_set_loop(Proto, + {Proto1, ok} = skip(Proto0, Etype), + skip_set_loop(Proto1, Map#protocol_set_begin{size = Size - 1}); - 0 -> ok + 0 -> {Proto0, ok} end. -skip_list_loop(Proto, Map = #protocol_list_begin{etype = Etype, - size = Size}) -> +skip_list_loop(Proto0, Map = #protocol_list_begin{etype = Etype, + size = Size}) -> case Size of N when N > 0 -> - skip(Proto, Etype), - skip_list_loop(Proto, + {Proto1, ok} = skip(Proto0, Etype), + skip_list_loop(Proto1, Map#protocol_list_begin{size = Size - 1}); - 0 -> ok + 0 -> {Proto0, ok} end. @@ -271,86 +312,91 @@ skip_list_loop(Proto, Map = #protocol_list_begin{etype = Etype, %% %% Description: %%-------------------------------------------------------------------- -write(Proto, {{struct, StructDef}, Data}) +-spec write(#protocol{}, term()) -> {#protocol{}, ok | {error, _Reason}}. + +write(Proto0, {{struct, StructDef}, Data}) when is_list(StructDef), is_tuple(Data), length(StructDef) == size(Data) - 1 -> [StructName | Elems] = tuple_to_list(Data), - ok = write(Proto, #protocol_struct_begin{name = StructName}), - ok = struct_write_loop(Proto, StructDef, Elems), - ok = write(Proto, struct_end), - ok; + {Proto1, ok} = write(Proto0, #protocol_struct_begin{name = StructName}), + {Proto2, ok} = struct_write_loop(Proto1, StructDef, Elems), + {Proto3, ok} = write(Proto2, struct_end), + {Proto3, ok}; write(Proto, {{struct, {Module, StructureName}}, Data}) when is_atom(Module), is_atom(StructureName), element(1, Data) =:= StructureName -> - StructType = Module:struct_info(StructureName), write(Proto, {Module:struct_info(StructureName), Data}); -write(Proto, {{list, Type}, Data}) +write(Proto0, {{list, Type}, Data}) when is_list(Data) -> - ok = write(Proto, + {Proto1, ok} = write(Proto0, #protocol_list_begin{ etype = term_to_typeid(Type), size = length(Data) }), - lists:foreach(fun(Elem) -> - ok = write(Proto, {Type, Elem}) - end, - Data), - ok = write(Proto, list_end), - ok; + Proto2 = lists:foldl(fun(Elem, ProtoIn) -> + {ProtoOut, ok} = write(ProtoIn, {Type, Elem}), + ProtoOut + end, + Proto1, + Data), + {Proto3, ok} = write(Proto2, list_end), + {Proto3, ok}; -write(Proto, {{map, KeyType, ValType}, Data}) -> - ok = write(Proto, - #protocol_map_begin{ - ktype = term_to_typeid(KeyType), - vtype = term_to_typeid(ValType), - size = dict:size(Data) - }), - dict:fold(fun(KeyData, ValData, _Acc) -> - ok = write(Proto, {KeyType, KeyData}), - ok = write(Proto, {ValType, ValData}) - end, - _AccO = ok, - Data), - ok = write(Proto, map_end), - ok; +write(Proto0, {{map, KeyType, ValType}, Data}) -> + {Proto1, ok} = write(Proto0, + #protocol_map_begin{ + ktype = term_to_typeid(KeyType), + vtype = term_to_typeid(ValType), + size = dict:size(Data) + }), + Proto2 = dict:fold(fun(KeyData, ValData, ProtoS0) -> + {ProtoS1, ok} = write(ProtoS0, {KeyType, KeyData}), + {ProtoS2, ok} = write(ProtoS1, {ValType, ValData}), + ProtoS2 + end, + Proto1, + Data), + {Proto3, ok} = write(Proto2, map_end), + {Proto3, ok}; -write(Proto, {{set, Type}, Data}) -> +write(Proto0, {{set, Type}, Data}) -> true = sets:is_set(Data), - ok = write(Proto, - #protocol_set_begin{ - etype = term_to_typeid(Type), - size = sets:size(Data) - }), - sets:fold(fun(Elem, _Acc) -> - ok = write(Proto, {Type, Elem}) - end, - _Acc0 = ok, - Data), - ok = write(Proto, set_end), - ok; + {Proto1, ok} = write(Proto0, + #protocol_set_begin{ + etype = term_to_typeid(Type), + size = sets:size(Data) + }), + Proto2 = sets:fold(fun(Elem, ProtoIn) -> + {ProtoOut, ok} = write(ProtoIn, {Type, Elem}), + ProtoOut + end, + Proto1, + Data), + {Proto3, ok} = write(Proto2, set_end), + {Proto3, ok}; -write(#protocol{module = Module, - data = ModuleData}, Data) -> - Module:write(ModuleData, Data). +write(Proto = #protocol{module = Module, + data = ModuleData}, Data) -> + {NewData, Result} = Module:write(ModuleData, Data), + {Proto#protocol{data = NewData}, Result}. -struct_write_loop(Proto, [{Fid, Type} | RestStructDef], [Data | RestData]) -> - case Data of - undefined -> - % null fields are skipped in response - skip; - _ -> - ok = write(Proto, - #protocol_field_begin{ - type = term_to_typeid(Type), - id = Fid - }), - ok = write(Proto, {Type, Data}), - ok = write(Proto, field_end) - end, - struct_write_loop(Proto, RestStructDef, RestData); +struct_write_loop(Proto0, [{Fid, Type} | RestStructDef], [Data | RestData]) -> + NewProto = case Data of + undefined -> + Proto0; % null fields are skipped in response + _ -> + {Proto1, ok} = write(Proto0, + #protocol_field_begin{ + type = term_to_typeid(Type), + id = Fid + }), + {Proto2, ok} = write(Proto1, {Type, Data}), + {Proto3, ok} = write(Proto2, field_end), + Proto3 + end, + struct_write_loop(NewProto, RestStructDef, RestData); struct_write_loop(Proto, [], []) -> - ok = write(Proto, field_stop), - ok. + write(Proto, field_stop). diff --git a/lib/erl/src/thrift_server.erl b/lib/erl/src/thrift_server.erl index 5d0012ba2..5012e168f 100644 --- a/lib/erl/src/thrift_server.erl +++ b/lib/erl/src/thrift_server.erl @@ -126,7 +126,7 @@ handle_info({inet_async, ListenSocket, Ref, {ok, ClientSocket}}, {stop, Reason, State} end; -handle_info({inet_async, ListenSocket, Ref, Error}, State) -> +handle_info({inet_async, _ListenSocket, _Ref, Error}, State) -> error_logger:error_msg("Error in acceptor: ~p~n", [Error]), {stop, Error, State}; @@ -177,7 +177,7 @@ start_processor(Socket, Service, Handler) -> {ok, SocketTransport} = thrift_socket_transport:new(Socket), {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport), {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport), - {ok, Protocol, Protocol} + {ok, Protocol} end, spawn(thrift_processor, init, [{Server, ProtoGen, Service, Handler}]). diff --git a/lib/erl/src/thrift_socket_server.erl b/lib/erl/src/thrift_socket_server.erl index 6794e6301..f7c7a028f 100644 --- a/lib/erl/src/thrift_socket_server.erl +++ b/lib/erl/src/thrift_socket_server.erl @@ -166,13 +166,12 @@ gen_tcp_listen(Port, Opts, State) -> new_acceptor(State=#thrift_socket_server{max=0}) -> error_logger:error_msg("Not accepting new connections"), State#thrift_socket_server{acceptor=null}; -new_acceptor(State=#thrift_socket_server{acceptor=OldPid, listen=Listen, +new_acceptor(State=#thrift_socket_server{listen=Listen, service=Service, handler=Handler, socket_opts=Opts, framed=Framed }) -> Pid = proc_lib:spawn_link(?MODULE, acceptor_loop, [{self(), Listen, Service, Handler, Opts, Framed}]), -%% error_logger:info_msg("Spawning new acceptor: ~p => ~p", [OldPid, Pid]), State#thrift_socket_server{acceptor=Pid}. acceptor_loop({Server, Listen, Service, Handler, SocketOpts, Framed}) @@ -188,7 +187,7 @@ acceptor_loop({Server, Listen, Service, Handler, SocketOpts, Framed}) false -> thrift_buffered_transport:new(SocketTransport) end, {ok, Protocol} = thrift_binary_protocol:new(Transport), - {ok, IProt=Protocol, OProt=Protocol} + {ok, Protocol} end, thrift_processor:init({Server, ProtoGen, Service, Handler}); {error, closed} -> diff --git a/lib/erl/src/thrift_socket_transport.erl b/lib/erl/src/thrift_socket_transport.erl index fcd69449e..5e1ef0263 100644 --- a/lib/erl/src/thrift_socket_transport.erl +++ b/lib/erl/src/thrift_socket_transport.erl @@ -29,6 +29,8 @@ -record(data, {socket, recv_timeout=infinity}). +-type state() :: #data{}. +-include("thrift_transport_behaviour.hrl"). new(Socket) -> new(Socket, []). @@ -45,25 +47,26 @@ new(Socket, Opts) when is_list(Opts) -> thrift_transport:new(?MODULE, State). %% Data :: iolist() -write(#data{socket = Socket}, Data) -> - gen_tcp:send(Socket, Data). +write(This = #data{socket = Socket}, Data) -> + {This, gen_tcp:send(Socket, Data)}. -read(#data{socket=Socket, recv_timeout=Timeout}, Len) +read(This = #data{socket=Socket, recv_timeout=Timeout}, Len) when is_integer(Len), Len >= 0 -> case gen_tcp:recv(Socket, Len, Timeout) of Err = {error, timeout} -> error_logger:info_msg("read timeout: peer conn ~p", [inet:peername(Socket)]), gen_tcp:close(Socket), - Err; - Data -> Data + {This, Err}; + Data -> + {This, Data} end. %% We can't really flush - everything is flushed when we write -flush(_) -> - ok. +flush(This) -> + {This, ok}. -close(#data{socket = Socket}) -> - gen_tcp:close(Socket). +close(This = #data{socket = Socket}) -> + {This, gen_tcp:close(Socket)}. %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/lib/erl/src/thrift_transport.erl b/lib/erl/src/thrift_transport.erl index 20c4b5dc3..39f8c056d 100644 --- a/lib/erl/src/thrift_transport.erl +++ b/lib/erl/src/thrift_transport.erl @@ -37,21 +37,42 @@ behaviour_info(callbacks) -> -record(transport, {module, data}). -new(Module, Data) when is_atom(Module) -> - {ok, #transport{module = Module, - data = Data}}. +-ifdef(transport_wrapper_module). +-define(debug_wrap(Transport), + case Transport#transport.module of + ?transport_wrapper_module -> + Transport; + _Else -> + {ok, Result} = ?transport_wrapper_module:new(Transport), + Result + end). +-else. +-define(debug_wrap(Transport), Transport). +-endif. -%% Data :: iolist() +new(Module, Data) when is_atom(Module) -> + Transport0 = #transport{module = Module, data = Data}, + Transport1 = ?debug_wrap(Transport0), + {ok, Transport1}. + +-spec write(#transport{}, iolist() | binary()) -> {#transport{}, ok | {error, _Reason}}. write(Transport, Data) -> Module = Transport#transport.module, - Module:write(Transport#transport.data, Data). + {NewTransData, Result} = Module:write(Transport#transport.data, Data), + {Transport#transport{data = NewTransData}, Result}. +-spec read(#transport{}, non_neg_integer()) -> {#transport{}, {ok, binary()} | {error, _Reason}}. read(Transport, Len) when is_integer(Len) -> Module = Transport#transport.module, - Module:read(Transport#transport.data, Len). + {NewTransData, Result} = Module:read(Transport#transport.data, Len), + {Transport#transport{data = NewTransData}, Result}. -flush(#transport{module = Module, data = Data}) -> - Module:flush(Data). +-spec flush(#transport{}) -> {#transport{}, ok | {error, _Reason}}. +flush(Transport = #transport{module = Module, data = Data}) -> + {NewTransData, Result} = Module:flush(Data), + {Transport#transport{data = NewTransData}, Result}. -close(#transport{module = Module, data = Data}) -> - Module:close(Data). +-spec close(#transport{}) -> {#transport{}, ok | {error, _Reason}}. +close(Transport = #transport{module = Module, data = Data}) -> + {NewTransData, Result} = Module:close(Data), + {Transport#transport{data = NewTransData}, Result}. diff --git a/lib/erl/src/thrift_transport_state_test.erl b/lib/erl/src/thrift_transport_state_test.erl new file mode 100644 index 000000000..e83a44d26 --- /dev/null +++ b/lib/erl/src/thrift_transport_state_test.erl @@ -0,0 +1,117 @@ +%% +%% Licensed to the Apache Software Foundation (ASF) under one +%% or more contributor license agreements. See the NOTICE file +%% distributed with this work for additional information +%% regarding copyright ownership. The ASF licenses this file +%% to you under the Apache License, Version 2.0 (the +%% "License"); you may not use this file except in compliance +%% with the License. You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% + +-module(thrift_transport_state_test). + +-behaviour(gen_server). +-behaviour(thrift_transport). + +%% API +-export([new/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% thrift_transport callbacks +-export([write/2, read/2, flush/1, close/1]). + +-record(trans, {wrapped, % #thrift_transport{} + version :: integer(), + counter :: pid() + }). +-type state() :: #trans{}. +-include("thrift_transport_behaviour.hrl"). + +-record(state, {cversion :: integer()}). + + +new(WrappedTransport) -> + case gen_server:start_link(?MODULE, [], []) of + {ok, Pid} -> + Trans = #trans{wrapped = WrappedTransport, + version = 0, + counter = Pid}, + thrift_transport:new(?MODULE, Trans); + Else -> + Else + end. + +%%==================================================================== +%% thrift_transport callbacks +%%==================================================================== + +write(Transport0 = #trans{wrapped = Wrapped0}, Data) -> + Transport1 = check_version(Transport0), + {Wrapped1, Result} = thrift_transport:write(Wrapped0, Data), + Transport2 = Transport1#trans{wrapped = Wrapped1}, + {Transport2, Result}. + +flush(Transport0 = #trans{wrapped = Wrapped0}) -> + Transport1 = check_version(Transport0), + {Wrapped1, Result} = thrift_transport:flush(Wrapped0), + Transport2 = Transport1#trans{wrapped = Wrapped1}, + {Transport2, Result}. + +close(Transport0 = #trans{wrapped = Wrapped0}) -> + Transport1 = check_version(Transport0), + shutdown_counter(Transport1), + {Wrapped1, Result} = thrift_transport:close(Wrapped0), + Transport2 = Transport1#trans{wrapped = Wrapped1}, + {Transport2, Result}. + +read(Transport0 = #trans{wrapped = Wrapped0}, Len) -> + Transport1 = check_version(Transport0), + {Wrapped1, Result} = thrift_transport:read(Wrapped0, Len), + Transport2 = Transport1#trans{wrapped = Wrapped1}, + {Transport2, Result}. + + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +init([]) -> + {ok, #state{cversion = 0}}. + +handle_call(check_version, _From, State = #state{cversion = Version}) -> + {reply, Version, State#state{cversion = Version+1}}. + +handle_cast(shutdown, State) -> + {stop, normal, State}. + +handle_info(_Info, State) -> {noreply, State}. +code_change(_OldVsn, State, _Extra) -> {ok, State}. +terminate(_Reason, _State) -> ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +check_version(Transport = #trans{version = Version, counter = Counter}) -> + case gen_server:call(Counter, check_version) of + Version -> + Transport#trans{version = Version+1}; + _Else -> + % State wasn't propagated properly. Die. + erlang:error(state_not_propagated) + end. + +shutdown_counter(#trans{counter = Counter}) -> + gen_server:cast(Counter, shutdown). diff --git a/test/erl/Makefile b/test/erl/Makefile index 212603722..a6b5ae611 100644 --- a/test/erl/Makefile +++ b/test/erl/Makefile @@ -29,7 +29,7 @@ SRCDIR=src ALL_INCLUDEDIR=$(GEN_INCLUDEDIR) $(INCLUDEDIR) ../../lib/erl/include INCLUDEFLAGS=$(patsubst %,-I%, ${ALL_INCLUDEDIR}) -MODULES = stress_server test_server test_disklog test_membuffer test_tether +MODULES = stress_server test_server test_client test_disklog test_membuffer INCLUDES = TARGETS = $(patsubst %,${TARGETDIR}/%.beam,${MODULES}) @@ -55,11 +55,11 @@ ${GENDIR}/: ${RPCFILE} ${GEN_TARGETDIR}/: ${GENDIR}/ rm -rf ${GEN_TARGETDIR} mkdir -p ${GEN_TARGETDIR} - erlc ${INCLUDEFLAGS} -o ${GEN_TARGETDIR} ${GEN_SRCDIR}/*.erl + erlc ${ERLC_FLAGS} ${INCLUDEFLAGS} -o ${GEN_TARGETDIR} ${GEN_SRCDIR}/*.erl $(TARGETS): ${TARGETDIR}/%.beam: ${SRCDIR}/%.erl ${GEN_INCLUDEDIR}/ ${HEADERS} mkdir -p ${TARGETDIR} - erlc ${INCLUDEFLAGS} -o ${TARGETDIR} $< + erlc ${ERLC_FLAGS} ${INCLUDEFLAGS} -o ${TARGETDIR} $< clean: rm -f ${TARGETDIR}/*.beam diff --git a/test/erl/src/test_client.erl b/test/erl/src/test_client.erl new file mode 100644 index 000000000..a26467f0c --- /dev/null +++ b/test/erl/src/test_client.erl @@ -0,0 +1,132 @@ +%% +%% Licensed to the Apache Software Foundation (ASF) under one +%% or more contributor license agreements. See the NOTICE file +%% distributed with this work for additional information +%% regarding copyright ownership. The ASF licenses this file +%% to you under the Apache License, Version 2.0 (the +%% "License"); you may not use this file except in compliance +%% with the License. You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% + +-module(test_client). + +-export([start/0, start/1]). + +-include("thriftTest_types.hrl"). + +-record(options, {port = 9090, + client_opts = []}). + +parse_args(Args) -> parse_args(Args, #options{}). +parse_args([], Opts) -> Opts; +parse_args([Head | Rest], Opts) -> + NewOpts = + case catch list_to_integer(Head) of + Port when is_integer(Port) -> + Opts#options{port = Port}; + _Else -> + case Head of + "framed" -> + Opts#options{client_opts = [{framed, true} | Opts#options.client_opts]}; + "" -> + Opts; + _Else -> + erlang:error({bad_arg, Head}) + end + end, + parse_args(Rest, NewOpts). + + +start() -> start([]). +start(Args) -> + #options{port = Port, client_opts = ClientOpts} = parse_args(Args), + {ok, Client0} = thrift_client_util:new( + "127.0.0.1", Port, thriftTest_thrift, ClientOpts), + + DemoXtruct = #xtruct{ + string_thing = <<"Zero">>, + byte_thing = 1, + i32_thing = 9128361, + i64_thing = 9223372036854775807}, + + DemoNest = #xtruct2{ + byte_thing = 7, + struct_thing = DemoXtruct, + % Note that we don't set i32_thing, it will come back as undefined + % from the Python server, but 0 from the C++ server, since it is not + % optional + i32_thing = 2}, + + % Is it safe to match these things? + DemoDict = dict:from_list([ {Key, Key-10} || Key <- lists:seq(0,10) ]), + DemoSet = sets:from_list([ Key || Key <- lists:seq(-3,3) ]), + + %DemoInsane = #insanity{ + % userMap = dict:from_list([{?thriftTest_FIVE, 5000}]), + % xtructs = [#xtruct{ string_thing = <<"Truck">>, byte_thing = 8, i32_thing = 8, i64_thing = 8}]}, + + {Client01, {ok, ok}} = thrift_client:call(Client0, testVoid, []), + + {Client02, {ok, <<"Test">>}} = thrift_client:call(Client01, testString, ["Test"]), + {Client03, {ok, <<"Test">>}} = thrift_client:call(Client02, testString, [<<"Test">>]), + {Client04, {ok, 63}} = thrift_client:call(Client03, testByte, [63]), + {Client05, {ok, -1}} = thrift_client:call(Client04, testI32, [-1]), + {Client06, {ok, 0}} = thrift_client:call(Client05, testI32, [0]), + {Client07, {ok, -34359738368}} = thrift_client:call(Client06, testI64, [-34359738368]), + {Client08, {ok, -5.2098523}} = thrift_client:call(Client07, testDouble, [-5.2098523]), + {Client09, {ok, DemoXtruct}} = thrift_client:call(Client08, testStruct, [DemoXtruct]), + {Client10, {ok, DemoNest}} = thrift_client:call(Client09, testNest, [DemoNest]), + {Client11, {ok, DemoDict}} = thrift_client:call(Client10, testMap, [DemoDict]), + {Client12, {ok, DemoSet}} = thrift_client:call(Client11, testSet, [DemoSet]), + {Client13, {ok, [-1,2,3]}} = thrift_client:call(Client12, testList, [[-1,2,3]]), + {Client14, {ok, 1}} = thrift_client:call(Client13, testEnum, [?thriftTest_ONE]), + {Client15, {ok, 309858235082523}} = thrift_client:call(Client14, testTypedef, [309858235082523]), + + % No python implementation, but works with C++ and Erlang. + %{Client16, {ok, InsaneResult}} = thrift_client:call(Client15, testInsanity, [DemoInsane]), + %io:format("~p~n", [InsaneResult]), + Client16 = Client15, + + {Client17, {ok, #xtruct{string_thing = <<"Message">>}}} = + thrift_client:call(Client16, testMultiException, ["Safe", "Message"]), + + Client18 = + try + {ClientS1, Result1} = thrift_client:call(Client17, testMultiException, ["Xception", "Message"]), + io:format("Unexpected return! ~p~n", [Result1]), + ClientS1 + catch + throw:{ClientS2, {exception, ExnS1 = #xception{}}} -> + #xception{errorCode = 1001, message = <<"This is an Xception">>} = ExnS1, + ClientS2; + throw:{ClientS2, {exception, _ExnS1 = #xception2{}}} -> + io:format("Wrong exception type!~n", []), + ClientS2 + end, + + Client19 = + try + {ClientS3, Result2} = thrift_client:call(Client18, testMultiException, ["Xception2", "Message"]), + io:format("Unexpected return! ~p~n", [Result2]), + ClientS3 + catch + throw:{ClientS4, {exception, _ExnS2 = #xception{}}} -> + io:format("Wrong exception type!~n", []), + ClientS4; + throw:{ClientS4, {exception, ExnS2 = #xception2{}}} -> + #xception2{errorCode = 2002, + struct_thing = #xtruct{ + string_thing = <<"This is an Xception2">>}} = ExnS2, + ClientS4 + end, + + thrift_client:close(Client19). diff --git a/test/erl/src/test_disklog.erl b/test/erl/src/test_disklog.erl index 7b0be72db..fc0dcf867 100644 --- a/test/erl/src/test_disklog.erl +++ b/test/erl/src/test_disklog.erl @@ -29,20 +29,21 @@ t() -> {size, {1024*1024, 10}}]), {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory( TransportFactory, []), - {ok, Client} = thrift_client:start_link(ProtocolFactory, thriftTest_thrift), + {ok, Proto} = ProtocolFactory(), + {ok, Client0} = thrift_client:new(Proto, thriftTest_thrift), io:format("Client started~n"), % We have to make oneway calls into this client only since otherwise it will try % to read from the disklog and go boom. - {ok, ok} = thrift_client:call(Client, testOneway, [16#deadbeef]), + {Client1, {ok, ok}} = thrift_client:call(Client0, testOneway, [16#deadbeef]), io:format("Call written~n"), % Use the send_call method to write a non-oneway call into the log - ok = thrift_client:send_call(Client, testString, [<<"hello world">>]), + {Client2, ok} = thrift_client:send_call(Client1, testString, [<<"hello world">>]), io:format("Non-oneway call sent~n"), - ok = thrift_client:close(Client), + {_Client3, ok} = thrift_client:close(Client2), io:format("Client closed~n"), ok. @@ -61,21 +62,22 @@ t_base64() -> thrift_buffered_transport:new_transport_factory(B64Factory), {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory( BufFactory, []), - {ok, Client} = thrift_client:start_link(ProtocolFactory, thriftTest_thrift), + {ok, Proto} = ProtocolFactory(), + {ok, Client0} = thrift_client:new(Proto, thriftTest_thrift), io:format("Client started~n"), % We have to make oneway calls into this client only since otherwise it will try % to read from the disklog and go boom. - {ok, ok} = thrift_client:call(Client, testOneway, [16#deadbeef]), + {Client1, {ok, ok}} = thrift_client:call(Client0, testOneway, [16#deadbeef]), io:format("Call written~n"), % Use the send_call method to write a non-oneway call into the log - ok = thrift_client:send_call(Client, testString, [<<"hello world">>]), + {Client2, ok} = thrift_client:send_call(Client1, testString, [<<"hello world">>]), io:format("Non-oneway call sent~n"), - ok = thrift_client:close(Client), + {_Client3, ok} = thrift_client:close(Client2), io:format("Client closed~n"), ok. - + diff --git a/test/erl/src/test_membuffer.erl b/test/erl/src/test_membuffer.erl index 7bd23a0f1..19ac5277e 100644 --- a/test/erl/src/test_membuffer.erl +++ b/test/erl/src/test_membuffer.erl @@ -30,12 +30,12 @@ test_data() -> t1() -> {ok, Transport} = thrift_memory_buffer:new(), - {ok, Protocol} = thrift_binary_protocol:new(Transport), + {ok, Protocol0} = thrift_binary_protocol:new(Transport), TestData = test_data(), - ok = thrift_protocol:write(Protocol, + {Protocol1, ok} = thrift_protocol:write(Protocol0, {{struct, element(2, thriftTest_types:struct_info('xtruct'))}, TestData}), - {ok, Result} = thrift_protocol:read(Protocol, + {_Protocol2, {ok, Result}} = thrift_protocol:read(Protocol1, {struct, element(2, thriftTest_types:struct_info('xtruct'))}, 'xtruct'), @@ -44,12 +44,12 @@ t1() -> t2() -> {ok, Transport} = thrift_memory_buffer:new(), - {ok, Protocol} = thrift_binary_protocol:new(Transport), + {ok, Protocol0} = thrift_binary_protocol:new(Transport), TestData = test_data(), - ok = thrift_protocol:write(Protocol, + {Protocol1, ok} = thrift_protocol:write(Protocol0, {{struct, element(2, thriftTest_types:struct_info('xtruct'))}, TestData}), - {ok, Result} = thrift_protocol:read(Protocol, + {_Protocol2, {ok, Result}} = thrift_protocol:read(Protocol1, {struct, element(2, thriftTest_types:struct_info('xtruct3'))}, 'xtruct3'), @@ -61,12 +61,12 @@ t2() -> t3() -> {ok, Transport} = thrift_memory_buffer:new(), - {ok, Protocol} = thrift_binary_protocol:new(Transport), + {ok, Protocol0} = thrift_binary_protocol:new(Transport), TestData = #bools{im_true = true, im_false = false}, - ok = thrift_protocol:write(Protocol, + {Protocol1, ok} = thrift_protocol:write(Protocol0, {{struct, element(2, thriftTest_types:struct_info('bools'))}, TestData}), - {ok, Result} = thrift_protocol:read(Protocol, + {_Protocol2, {ok, Result}} = thrift_protocol:read(Protocol1, {struct, element(2, thriftTest_types:struct_info('bools'))}, 'bools'), @@ -74,8 +74,23 @@ t3() -> true = TestData#bools.im_false =:= Result#bools.im_false. +t4() -> + {ok, Transport} = thrift_memory_buffer:new(), + {ok, Protocol0} = thrift_binary_protocol:new(Transport), + TestData = #insanity{xtructs=[]}, + {Protocol1, ok} = thrift_protocol:write(Protocol0, + {{struct, element(2, thriftTest_types:struct_info('insanity'))}, + TestData}), + {_Protocol2, {ok, Result}} = thrift_protocol:read(Protocol1, + {struct, element(2, thriftTest_types:struct_info('insanity'))}, + 'insanity'), + + TestData = Result. + + t() -> t1(), t2(), - t3(). + t3(), + t4(). diff --git a/test/erl/src/test_server.erl b/test/erl/src/test_server.erl index cd439ccdf..28d47b161 100644 --- a/test/erl/src/test_server.erl +++ b/test/erl/src/test_server.erl @@ -19,12 +19,42 @@ -module(test_server). --export([start_link/1, handle_function/2]). +-export([go/0, go/1, start_link/2, handle_function/2]). -include("thriftTest_types.hrl"). -start_link(Port) -> - thrift_server:start_link(Port, thriftTest_thrift, ?MODULE). +-record(options, {port = 9090, + server_opts = []}). + +parse_args(Args) -> parse_args(Args, #options{}). +parse_args([], Opts) -> Opts; +parse_args([Head | Rest], Opts) -> + NewOpts = + case catch list_to_integer(Head) of + Port when is_integer(Port) -> + Opts#options{port = Port}; + _Else -> + case Head of + "framed" -> + Opts#options{server_opts = [{framed, true} | Opts#options.server_opts]}; + "" -> + Opts; + _Else -> + erlang:error({bad_arg, Head}) + end + end, + parse_args(Rest, NewOpts). + +go() -> go([]). +go(Args) -> + #options{port = Port, server_opts = ServerOpts} = parse_args(Args), + spawn(fun() -> start_link(Port, ServerOpts), receive after infinity -> ok end end). + +start_link(Port, ServerOpts) -> + thrift_socket_server:start([{handler, ?MODULE}, + {service, thriftTest_thrift}, + {port, Port}] ++ + ServerOpts). handle_function(testVoid, {}) -> @@ -124,12 +154,12 @@ handle_function(testInsanity, {Insanity}) when is_record(Insanity, insanity) -> {?thriftTest_THREE, Crazy}]), SecondMap = dict:from_list([{?thriftTest_SIX, Looney}]), - + Insane = dict:from_list([{1, FirstMap}, {2, SecondMap}]), - + io:format("Return = ~p~n", [Insane]), - + {reply, Insane}; handle_function(testMulti, Args = {Arg0, Arg1, Arg2, _Arg3, Arg4, Arg5}) @@ -150,7 +180,7 @@ handle_function(testException, {String}) when is_binary(String) -> case String of <<"Xception">> -> throw(#xception{errorCode = 1001, - message = <<"This is an Xception">>}); + message = String}); _ -> ok end; diff --git a/test/erl/src/test_tether.erl b/test/erl/src/test_tether.erl deleted file mode 100644 index dc11a9a9a..000000000 --- a/test/erl/src/test_tether.erl +++ /dev/null @@ -1,186 +0,0 @@ -%% -%% Licensed to the Apache Software Foundation (ASF) under one -%% or more contributor license agreements. See the NOTICE file -%% distributed with this work for additional information -%% regarding copyright ownership. The ASF licenses this file -%% to you under the Apache License, Version 2.0 (the -%% "License"); you may not use this file except in compliance -%% with the License. You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%% Tests the behavior of clients in the face of transport errors. -%% Makes sure start, start_linked, and start_tethered work as expected. - --module(test_tether). - --compile(export_all). - - -t() -> - io:format("Beginning transport error test.~n"), - Pid1 = erlang:spawn(?MODULE, t_sub, [2]), - wait_for(Pid1), - io:format("Beginning protocol error test.~n"), - Pid2 = erlang:spawn(?MODULE, t_sub, [22]), - wait_for(Pid2), - ok. - -t_sub(Port) -> - io:format("Starting.~n", []), - register(tester, self()), - - Pid1 = erlang:spawn(?MODULE, test_start, [Port]), - receive after 200 -> ok end, % Wait for completion. - case is_up(Pid1) of - true -> - io:format("PASS. Unlinked owner still alive.~n"); - false -> - io:format("FAIL. Unlinked owner is dead.~n") - end, - - Pid2 = erlang:spawn(?MODULE, test_linked, [Port]), - receive after 200 -> ok end, % Wait for completion. - case is_up(Pid2) of - true -> - io:format("FAIL. Linked owner still alive.~n"); - false -> - io:format("PASS. Linked owner is dead.~n") - end, - - Pid3 = erlang:spawn(?MODULE, test_tethered, [Port]), - receive after 200 -> ok end, % Wait for completion. - case is_up(Pid3) of - true -> - io:format("PASS. Tethered owner still alive.~n"); - false -> - io:format("FAIL. Tethered owner is dead.~n") - end, - - check_extras(3). - -is_up(Pid) -> - MonitorRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonitorRef, process, Pid, _Info} -> - false - after - 50 -> - erlang:demonitor(MonitorRef), - true - end. - -wait_for(Pid) -> - MonitorRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonitorRef, process, Pid, _Info} -> - ok - end. - -check_extras(0) -> ok; -check_extras(N) -> - receive - {client, Type, Pid} -> - case {Type, is_up(Pid)} of - {unlinked, true} -> - io:format("PASS. Unlinked client still alive.~n"); - {unlinked, false} -> - io:format("FAIL. Unlinked client dead.~n"); - {linked, true} -> - io:format("FAIL. Linked client still alive.~n"); - {linked, false} -> - io:format("PASS. Linked client dead.~n"); - {tethered, true} -> - io:format("FAIL. Tethered client still alive.~n"); - {tethered, false} -> - io:format("PASS. Tethered client dead.~n") - end, - check_extras(N-1) - after - 500 -> - io:format("FAIL. Expected ~p more clients.~n", [N]) - end. - -make_thrift_client(Opts) -> - thrift_client:start(fun()->ok end, thriftTest_thrift, Opts). - -make_protocol_factory(Port) -> - {ok, TransportFactory} = - thrift_socket_transport:new_transport_factory( - "127.0.0.1", Port, []), - {ok, ProtocolFactory} = - thrift_binary_protocol:new_protocol_factory( - TransportFactory, []), - ProtocolFactory. - - -test_start(Port) -> - {ok, Client1} = make_thrift_client([{connect, false}]), - tester ! {client, unlinked, Client1}, - {ok, Client2} = make_thrift_client([{connect, false}]), - io:format("PASS. Unlinked clients created.~n"), - try - gen_server:call(Client2, {connect, make_protocol_factory(Port)}), - thrift_client:call(Client2, testVoid, []), - io:format("FAIL. Unlinked client connected and called.~n", []) - catch - Kind:Info -> - io:format("PASS. Caught unlinked error. ~p:~p~n", [Kind, Info]) - end, - receive after 100 -> - io:format("PASS. Still alive after unlinked death.~n"), - %% Hang around a little longer so our parent can verify. - receive after 200 -> ok end - end, - %% Exit abnormally to not kill our unlinked extra client. - exit(die). - -test_linked(Port) -> - {ok, Client1} = make_thrift_client([{connect, false}, {monitor, link}]), - tester ! {client, linked, Client1}, - {ok, Client2} = make_thrift_client([{connect, false}, {monitor, link}]), - io:format("PASS. Linked clients created.~n"), - try - gen_server:call(Client2, {connect, make_protocol_factory(Port)}), - thrift_client:call(Client2, testVoid, []), - io:format("FAIL. Linked client connected and called.~n", []) - catch - Kind:Info -> - io:format("FAIL. Caught linked error. ~p:~p~n", [Kind, Info]) - end, - receive after 100 -> - io:format("FAIL. Still alive after linked death.~n"), - % Hang around a little longer so our parent can verify. - receive after 200 -> ok end - end, - %% Exit abnormally to kill our linked extra client. - %% But we should never get here. - exit(die). - -test_tethered(Port) -> - {ok, Client1} = make_thrift_client([{connect, false}, {monitor, tether}]), - tester ! {client, tethered, Client1}, - {ok, Client2} = make_thrift_client([{connect, false}, {monitor, tether}]), - io:format("PASS. Tethered clients created.~n"), - try - gen_server:call(Client2, {connect, make_protocol_factory(Port)}), - thrift_client:call(Client2, testVoid, []), - io:format("FAIL. Tethered client connected and called.~n", []) - catch - Kind:Info -> - io:format("PASS. Caught tethered error. ~p:~p~n", [Kind, Info]) - end, - receive after 100 -> - io:format("PASS. Still alive after tethered death.~n"), - % Hang around a little longer so our parent can verify. - receive after 200 -> ok end - end, - %% Exit abnormally to kill our tethered extra client. - exit(die). diff --git a/tutorial/erl/client.erl b/tutorial/erl/client.erl index 978033496..adaebe424 100644 --- a/tutorial/erl/client.erl +++ b/tutorial/erl/client.erl @@ -29,46 +29,50 @@ p(X) -> t() -> Port = 9999, - - {ok, Client} = thrift_client:start_link("127.0.0.1", - Port, - calculator_thrift), - thrift_client:call(Client, ping, []), + {ok, Client0} = thrift_client_util:new("127.0.0.1", + Port, + calculator_thrift, + []), + + {Client1, {ok, ok}} = thrift_client:call(Client0, ping, []), io:format("ping~n", []), - {ok, Sum} = thrift_client:call(Client, add, [1, 1]), + {Client2, {ok, Sum}} = thrift_client:call(Client1, add, [1, 1]), io:format("1+1=~p~n", [Sum]), - {ok, Sum1} = thrift_client:call(Client, add, [1, 4]), + {Client3, {ok, Sum1}} = thrift_client:call(Client2, add, [1, 4]), io:format("1+4=~p~n", [Sum1]), Work = #work{op=?tutorial_SUBTRACT, num1=15, num2=10}, - {ok, Diff} = thrift_client:call(Client, calculate, [1, Work]), + {Client4, {ok, Diff}} = thrift_client:call(Client3, calculate, [1, Work]), io:format("15-10=~p~n", [Diff]), - {ok, Log} = thrift_client:call(Client, getStruct, [1]), + {Client5, {ok, Log}} = thrift_client:call(Client4, getStruct, [1]), io:format("Log: ~p~n", [Log]), - try - Work1 = #work{op=?tutorial_DIVIDE, - num1=1, - num2=0}, - {ok, _Quot} = thrift_client:call(Client, calculate, [2, Work1]), + Client6 = + try + Work1 = #work{op=?tutorial_DIVIDE, + num1=1, + num2=0}, + {ClientS1, {ok, _Quot}} = thrift_client:call(Client5, calculate, [2, Work1]), - io:format("LAME: exception handling is broken~n", []) - catch - Z -> - io:format("Got exception where expecting - the " ++ - "following is NOT a problem!!!~n"), - p(Z) - end, + io:format("LAME: exception handling is broken~n", []), + ClientS1 + catch + throw:{ClientS2, Z} -> + io:format("Got exception where expecting - the " ++ + "following is NOT a problem!!!~n"), + p(Z), + ClientS2 + end, - {ok, ok} = thrift_client:call(Client, zip, []), + {Client7, {ok, ok}} = thrift_client:call(Client6, zip, []), io:format("zip~n", []), - ok = thrift_client:close(Client), + {_Client8, ok} = thrift_client:close(Client7), ok.