gen_msgpack_rpc:call_async/3

This commit is contained in:
UENISHI Kota 2011-05-04 15:09:16 +09:00
parent 52cb1e90f4
commit b42eb42774
3 changed files with 44 additions and 58 deletions

View File

@ -49,9 +49,7 @@
%% external API
-export([start_link/5, stop/1, behaviour_info/1]).
-export([call/3,
call_async/3, call_async/4, cancel_async_call/1, cancel_async_call/2
]).
-export([call/3, call_async/3]).
%% internal: gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -99,32 +97,20 @@ stop(Id)->
call(Id, Method, Argv) ->
gen_server:call(Id, {call, Method, Argv}, infinity).
% TODO: write test code for call_async/3
% afterwards, the caller will receive the response {ok, ResCode, Result} as a message
-spec call_async(CallID::non_neg_integer(),
Method::atom(), Argv::list()) -> ok | {error, {atom(), any()}}.
call_async(CallID, Method, Argv)->
call_async(?SERVER, CallID, Method, Argv).
-spec call_async(Client::server_ref(), CallID::non_neg_integer(),
Method::atom(), Argv::list()) -> ok | {error, {atom(), any()}}.
call_async(Client, CallID, Method, Argv) when is_atom(Method), is_list(Argv)->
Meth = <<(atom_to_binary(Method,latin1))/binary>>,
% @doc
% the caller will receive the response {CallID, Result} as a message
% {ok, CallID} = gen_msgpack_rpc:call(Pid, add, [1,2]),
% receive
% {CallID, Result} -> do_sth
% after 1024 ->
% timeout
% end,
%
-spec call_async(Id::term(), Method::atom(), Argv::list()) -> {ok, CallID::non_neg_integer()}.
call_async(Id, Method, Argv)->
Pid = self(),
case msgpack:pack([?MP_TYPE_REQUEST,CallID,Meth,Argv]) of
{error, Reason}->
{error, Reason};
Pack ->
ok=gen_server:call(Client, {call, {CallID, Pid} ,Pack})
end.
% TODO: write test code for cancellation
-spec cancel_async_call(CallID::non_neg_integer())->ok.
cancel_async_call(CallID)-> cancel_async_call(?SERVER, CallID).
-spec cancel_async_call(Client::server_ref(), CallID::non_neg_integer())->ok.
cancel_async_call(Client, CallID)->
gen_server:call(Client, {cancel, CallID}).
{ok,CallID}=gen_server:call(Id, {call_async, Method, Argv, Pid}, infinity),
{ok,CallID}.
%%====================================================================
%% gen_server callbacks
@ -151,9 +137,14 @@ init([Mod, MPRC, _Options])->
%%--------------------------------------------------------------------
handle_call({call, Method, Argv}, From, #state{mprc=MPRC} = State)->
{ok, CallID} = mprc:call_async(MPRC, Method, Argv),
msgpack_util:insert({CallID,From}),
msgpack_util:insert({CallID,{reply,From}}),
{noreply, State};
handle_call({call_async, Method, Argv, Pid}, _From, #state{mprc=MPRC} = State)->
{ok, CallID} = mprc:call_async(MPRC, Method, Argv),
msgpack_util:insert({CallID,{send,Pid}}),
{reply, {ok, CallID}, State};
handle_call(stop, _From, State)->
{stop, normal, ok, State};
handle_call(_Request, _From, State) ->
@ -194,7 +185,9 @@ handle_info({tcp, _Socket, Pack}, #state{module=Mod,mprc=MPRC}=State)->
[]->
ok = mprc:active_once(MPRC),
{noreply, State#state{mprc=mprc:append_binary(MPRC,RemBin)}};
[{CallID,From}|_] ->
% /* needs refactor, just reply => reply, send => !
[{CallID,{reply,From}}|_] ->
msgpack_util:call_done(CallID),
case ResCode of
nil -> gen_server:reply(From, Result);
@ -203,6 +196,17 @@ handle_info({tcp, _Socket, Pack}, #state{module=Mod,mprc=MPRC}=State)->
NewMPRC = MPRC#mprc{carry = RemBin},
ok = mprc:active_once(MPRC),
{noreply, State#state{mprc=NewMPRC}};
[{CallID,{send,From}}|_] ->
msgpack_util:call_done(CallID),
case ResCode of
nil -> From ! {CallID, Result};
Error -> From ! {CallID, {Error,Result}}
end,
NewMPRC = MPRC#mprc{carry = RemBin},
ok = mprc:active_once(MPRC),
{noreply, State#state{mprc=NewMPRC}};
% */ needs refactor end
_Other -> % Error
io:format("error ~s~p: ~p~n", [?FILE, ?LINE, _Other]),
ok = mprc:active_once(MPRC),

View File

@ -97,9 +97,9 @@ join_(MPRC, [CallID|Remain], Got) when byte_size(MPRC#mprc.carry) > 0 ->
msgpack_util:call_done(CallID),
case Error of
nil ->
%?debugVal(Retval),
%?debugVal(Remain),
%?debugVal(ets:tab2list(?MODULE)),
%?debugVal(Retval),
%?debugVal(Remain),
%?debugVal(ets:tab2list(?MODULE)),
join_(MPRC0, Remain, [Retval|Got]);
_Other ->
join_(MPRC0, Remain, [{error, {Error,Retval}}|Got])
@ -123,7 +123,7 @@ join_(MPRC, [CallID|Remain], Got) when byte_size(MPRC#mprc.carry) > 0 ->
_Other ->
join_(MPRC, Remain, [{error, {Error,Retval}}|Got])
end
end;
end;
join_(MPRC, Remain, Got) ->
%?debugVal(Remain), %?debugVal(MPRC),
{ok, PackedMsg} = gen_tcp:recv(MPRC#mprc.s, 0),

View File

@ -48,6 +48,12 @@ easy_test()->
?assertEqual(3, gen_msgpack_rpc:call(Pid2, add, [1, 2])),
?assertEqual(3, gen_msgpack_rpc:call(?MODULE, add, [1, 2])),
{ok, CallID} = gen_msgpack_rpc:call_async(Pid2, add, [1,2]),
receive
{CallID, 3} -> ok;
_ -> ?assert(false)
end,
ok=gen_msgpack_rpc:stop(Pid2),
@ -56,19 +62,6 @@ easy_test()->
?assertEqual(ok,gen_server:call(Pid,stop)),
ok.
%% my_first_case(_Config) ->
%% {ok, _Pid}=mp_client:connect(localhost,65500),
%% {ok, Result}=mp_client:call(42, hello, []),
%% true=is_list(Result),
%% ok=mp_client:close().
%% my_second_case(_)->
%% {ok, _}=mp_client:connect({local, hoge}, localhost,65500),
%% {ok, Result}=mp_client:call(hoge,42, hello, []),
%% true=is_list(Result),
%% {ok, 7}=mp_client:call(hoge,43, add, [3,4]),
%% ok=mp_client:close(hoge).
%% case_add(_Config)->
%% Pairs=[{5,5}, {0,0}, {234, 2}, {213456789, -3}, {234, -23}, {-1,1}, {1,-1}, {-1,-1},
%% {-2000, 2000}, {2000, -2000}, {234, -234}],
@ -78,15 +71,4 @@ easy_test()->
%% {error, {<<"no such func">>,nil}}=mp_client:call(add, 890, no_such_func, []),
%% mp_client:close(add).
%% my_test()->
%% ok=sample_app:start(),
%% ok=my_first_case(nil),
%% ok=my_second_case(nil),
%% ok=case_add(nil),
%% ok=sample_app:stop().
%% {ok,Pid}=mp_client:connect(localhost,65500),
%% {ok,_Reply}=mp_client:call(Pid, hoge, []),
%% mp_client:close().
-endif.