diff --git a/src/gen_msgpack_rpc.erl b/src/gen_msgpack_rpc.erl index 83fed35..0ac3f09 100644 --- a/src/gen_msgpack_rpc.erl +++ b/src/gen_msgpack_rpc.erl @@ -49,9 +49,7 @@ %% external API -export([start_link/5, stop/1, behaviour_info/1]). --export([call/3, - call_async/3, call_async/4, cancel_async_call/1, cancel_async_call/2 - ]). +-export([call/3, call_async/3]). %% internal: gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -99,32 +97,20 @@ stop(Id)-> call(Id, Method, Argv) -> gen_server:call(Id, {call, Method, Argv}, infinity). -% TODO: write test code for call_async/3 -% afterwards, the caller will receive the response {ok, ResCode, Result} as a message --spec call_async(CallID::non_neg_integer(), - Method::atom(), Argv::list()) -> ok | {error, {atom(), any()}}. -call_async(CallID, Method, Argv)-> - call_async(?SERVER, CallID, Method, Argv). - --spec call_async(Client::server_ref(), CallID::non_neg_integer(), - Method::atom(), Argv::list()) -> ok | {error, {atom(), any()}}. -call_async(Client, CallID, Method, Argv) when is_atom(Method), is_list(Argv)-> - Meth = <<(atom_to_binary(Method,latin1))/binary>>, +% @doc +% the caller will receive the response {CallID, Result} as a message +% {ok, CallID} = gen_msgpack_rpc:call(Pid, add, [1,2]), +% receive +% {CallID, Result} -> do_sth +% after 1024 -> +% timeout +% end, +% +-spec call_async(Id::term(), Method::atom(), Argv::list()) -> {ok, CallID::non_neg_integer()}. +call_async(Id, Method, Argv)-> Pid = self(), - case msgpack:pack([?MP_TYPE_REQUEST,CallID,Meth,Argv]) of - {error, Reason}-> - {error, Reason}; - Pack -> - ok=gen_server:call(Client, {call, {CallID, Pid} ,Pack}) - end. - -% TODO: write test code for cancellation --spec cancel_async_call(CallID::non_neg_integer())->ok. -cancel_async_call(CallID)-> cancel_async_call(?SERVER, CallID). - --spec cancel_async_call(Client::server_ref(), CallID::non_neg_integer())->ok. -cancel_async_call(Client, CallID)-> - gen_server:call(Client, {cancel, CallID}). + {ok,CallID}=gen_server:call(Id, {call_async, Method, Argv, Pid}, infinity), + {ok,CallID}. %%==================================================================== %% gen_server callbacks @@ -151,9 +137,14 @@ init([Mod, MPRC, _Options])-> %%-------------------------------------------------------------------- handle_call({call, Method, Argv}, From, #state{mprc=MPRC} = State)-> {ok, CallID} = mprc:call_async(MPRC, Method, Argv), - msgpack_util:insert({CallID,From}), + msgpack_util:insert({CallID,{reply,From}}), {noreply, State}; +handle_call({call_async, Method, Argv, Pid}, _From, #state{mprc=MPRC} = State)-> + {ok, CallID} = mprc:call_async(MPRC, Method, Argv), + msgpack_util:insert({CallID,{send,Pid}}), + {reply, {ok, CallID}, State}; + handle_call(stop, _From, State)-> {stop, normal, ok, State}; handle_call(_Request, _From, State) -> @@ -194,7 +185,9 @@ handle_info({tcp, _Socket, Pack}, #state{module=Mod,mprc=MPRC}=State)-> []-> ok = mprc:active_once(MPRC), {noreply, State#state{mprc=mprc:append_binary(MPRC,RemBin)}}; - [{CallID,From}|_] -> + + % /* needs refactor, just reply => reply, send => ! + [{CallID,{reply,From}}|_] -> msgpack_util:call_done(CallID), case ResCode of nil -> gen_server:reply(From, Result); @@ -203,6 +196,17 @@ handle_info({tcp, _Socket, Pack}, #state{module=Mod,mprc=MPRC}=State)-> NewMPRC = MPRC#mprc{carry = RemBin}, ok = mprc:active_once(MPRC), {noreply, State#state{mprc=NewMPRC}}; + [{CallID,{send,From}}|_] -> + msgpack_util:call_done(CallID), + case ResCode of + nil -> From ! {CallID, Result}; + Error -> From ! {CallID, {Error,Result}} + end, + NewMPRC = MPRC#mprc{carry = RemBin}, + ok = mprc:active_once(MPRC), + {noreply, State#state{mprc=NewMPRC}}; + % */ needs refactor end + _Other -> % Error io:format("error ~s~p: ~p~n", [?FILE, ?LINE, _Other]), ok = mprc:active_once(MPRC), diff --git a/src/mprc.erl b/src/mprc.erl index 54c590e..c667046 100644 --- a/src/mprc.erl +++ b/src/mprc.erl @@ -97,9 +97,9 @@ join_(MPRC, [CallID|Remain], Got) when byte_size(MPRC#mprc.carry) > 0 -> msgpack_util:call_done(CallID), case Error of nil -> - %?debugVal(Retval), - %?debugVal(Remain), - %?debugVal(ets:tab2list(?MODULE)), + %?debugVal(Retval), + %?debugVal(Remain), + %?debugVal(ets:tab2list(?MODULE)), join_(MPRC0, Remain, [Retval|Got]); _Other -> join_(MPRC0, Remain, [{error, {Error,Retval}}|Got]) @@ -123,7 +123,7 @@ join_(MPRC, [CallID|Remain], Got) when byte_size(MPRC#mprc.carry) > 0 -> _Other -> join_(MPRC, Remain, [{error, {Error,Retval}}|Got]) end - end; + end; join_(MPRC, Remain, Got) -> %?debugVal(Remain), %?debugVal(MPRC), {ok, PackedMsg} = gen_tcp:recv(MPRC#mprc.s, 0), diff --git a/test/gen_msgpack_rpc_test.erl b/test/gen_msgpack_rpc_test.erl index d8674ef..9a6d808 100644 --- a/test/gen_msgpack_rpc_test.erl +++ b/test/gen_msgpack_rpc_test.erl @@ -48,6 +48,12 @@ easy_test()-> ?assertEqual(3, gen_msgpack_rpc:call(Pid2, add, [1, 2])), ?assertEqual(3, gen_msgpack_rpc:call(?MODULE, add, [1, 2])), + + {ok, CallID} = gen_msgpack_rpc:call_async(Pid2, add, [1,2]), + receive + {CallID, 3} -> ok; + _ -> ?assert(false) + end, ok=gen_msgpack_rpc:stop(Pid2), @@ -56,19 +62,6 @@ easy_test()-> ?assertEqual(ok,gen_server:call(Pid,stop)), ok. -%% my_first_case(_Config) -> -%% {ok, _Pid}=mp_client:connect(localhost,65500), -%% {ok, Result}=mp_client:call(42, hello, []), -%% true=is_list(Result), -%% ok=mp_client:close(). - -%% my_second_case(_)-> -%% {ok, _}=mp_client:connect({local, hoge}, localhost,65500), -%% {ok, Result}=mp_client:call(hoge,42, hello, []), -%% true=is_list(Result), -%% {ok, 7}=mp_client:call(hoge,43, add, [3,4]), -%% ok=mp_client:close(hoge). - %% case_add(_Config)-> %% Pairs=[{5,5}, {0,0}, {234, 2}, {213456789, -3}, {234, -23}, {-1,1}, {1,-1}, {-1,-1}, %% {-2000, 2000}, {2000, -2000}, {234, -234}], @@ -78,15 +71,4 @@ easy_test()-> %% {error, {<<"no such func">>,nil}}=mp_client:call(add, 890, no_such_func, []), %% mp_client:close(add). -%% my_test()-> -%% ok=sample_app:start(), -%% ok=my_first_case(nil), -%% ok=my_second_case(nil), -%% ok=case_add(nil), -%% ok=sample_app:stop(). - -%% {ok,Pid}=mp_client:connect(localhost,65500), -%% {ok,_Reply}=mp_client:call(Pid, hoge, []), -%% mp_client:close(). - -endif.