return spec of gen_msgpack_rpc:call and mprc:call changed.

gen_msgpack_rpc:call_async/3 working.
This commit is contained in:
UENISHI Kota 2011-05-04 16:17:15 +09:00
parent febb0e130c
commit 01192af706
4 changed files with 84 additions and 59 deletions

View File

@ -166,9 +166,21 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info({tcp, _Socket, Pack}, #state{module=Mod,mprc=MPRC}=State)->
handle_info({tcp, _Socket, Pack}, #state{mprc=MPRC}=State)->
Bin = <<(MPRC#mprc.carry)/binary, Pack/binary>>,
NewMPRC = MPRC#mprc{carry = <<>>},
decode_all(Bin, State#state{mprc=NewMPRC});
handle_info(_Info, State) ->
error_logger:error_msg("error ~s~p: ~p~n", [?FILE, ?LINE, _Info]),
{noreply, State}.
decode_all(<<>>, #state{mprc=MPRC}=State)->
ok = mprc:active_once(MPRC),
{noreply, State};
decode_all(Bin, #state{module=Mod,mprc=MPRC}=State)->
case msgpack:unpack(Bin) of
{[?MP_TYPE_NOTIFICATION,Method,Params],RemBin}->
% maybe we need string whitelist for atom-attack
Meth = binary_to_atom(Method, latin1),
@ -177,51 +189,47 @@ handle_info({tcp, _Socket, Pack}, #state{module=Mod,mprc=MPRC}=State)->
catch _:_Other ->
error_logger:error_msg("error ~s~p: ~p~n", [?FILE, ?LINE, _Other])
end,
NewMPRC = MPRC#mprc{carry = RemBin},
ok = mprc:active_once(MPRC),
{noreply, State#state{mprc=NewMPRC}};
decode_all(RemBin, State);
{[?MP_TYPE_RESPONSE,CallID,ResCode,Result],RemBin} ->
case msgpack_util:lookup(CallID) of
[]->
ok = mprc:active_once(MPRC),
{noreply, State#state{mprc=mprc:append_binary(MPRC,RemBin)}};
% /* needs refactor, just reply => reply, send => !
[{CallID,{reply,From}}|_] ->
[{CallID,{reply,From}}|_] -> % sync call
msgpack_util:call_done(CallID),
case ResCode of
nil -> gen_server:reply(From, Result);
Error -> gen_server:reply(From, {Error,Result})
case {ResCode, Result} of
{nil, Result} -> gen_server:reply(From, {ok,Result});
{Error, nil} -> gen_server:reply(From, {error,Error});
_Other ->
error_logger:error_msg("error ~s~p: ~p~n", [?FILE, ?LINE, _Other])
end,
NewMPRC = MPRC#mprc{carry = RemBin},
ok = mprc:active_once(MPRC),
{noreply, State#state{mprc=NewMPRC}};
[{CallID,{send,From}}|_] ->
decode_all(RemBin, State);
[{CallID,{send,From}}|_] -> % async_call
msgpack_util:call_done(CallID),
case ResCode of
nil -> From ! {CallID, Result};
Error -> From ! {CallID, {Error,Result}}
case {ResCode, Result} of
{nil,Result} -> From ! {CallID, {ok,Result}};
{Error,nil} -> From ! {CallID, {error,Error}};
_Other ->
error_logger:error_msg("error ~s~p: ~p~n", [?FILE, ?LINE, _Other])
end,
NewMPRC = MPRC#mprc{carry = RemBin},
ok = mprc:active_once(MPRC),
{noreply, State#state{mprc=NewMPRC}};
decode_all(RemBin, State);
% */ needs refactor end
_Other -> % Error
io:format("error ~s~p: ~p~n", [?FILE, ?LINE, _Other]),
ok = mprc:active_once(MPRC),
{noreply, State}
{noreply, State#state{mprc=mprc:append_binary(MPRC,Bin)}}
end;
{error, incomplete} ->
ok = mprc:active_once(MPRC),
{noreply, State#state{mprc=mprc:append_binary(MPRC,Pack)}};
{noreply, State#state{mprc=mprc:append_binary(MPRC,Bin)}};
{error, _Reason} ->
error_logger:error_msg("error ~s~p: ~p~n", [?FILE, ?LINE, _Reason]),
ok = mprc:active_once(MPRC),
{noreply, State}
end;
handle_info(_Info, State) ->
{noreply, State}.
{noreply, State#state{mprc=mprc:append_binary(MPRC,Bin)}}
end.
%% @private
%% Function: terminate(Reason, State) -> void()

View File

@ -95,14 +95,16 @@ join_(MPRC, [CallID|Remain], Got) when byte_size(MPRC#mprc.carry) > 0 ->
{[?MP_TYPE_RESPONSE, CallID, Error, Retval], RemainBin}->
MPRC0 = MPRC#mprc{carry=RemainBin},
msgpack_util:call_done(CallID),
case Error of
nil ->
case {Error, Retval} of
{nil, Retval} ->
%?debugVal(Retval),
%?debugVal(Remain),
%?debugVal(ets:tab2list(?MODULE)),
join_(MPRC0, Remain, [Retval|Got]);
_Other ->
join_(MPRC0, Remain, [{error, {Error,Retval}}|Got])
join_(MPRC0, Remain, [{ok,Retval}|Got]);
{Error,nil} ->
join_(MPRC0, Remain, [{error,Error}|Got]);
_Other -> % malformed message
throw({malform_msg, _Other})
end;
{[?MP_TYPE_RESPONSE, CallID0, Error, Retval], RemainBin}->
msgpack_util:insert({CallID0, Error, Retval}),
@ -117,11 +119,16 @@ join_(MPRC, [CallID|Remain], Got) when byte_size(MPRC#mprc.carry) > 0 ->
end;
[{CallID,Error,Retval}|_] ->
msgpack_util:call_done(CallID),
case Error of
nil ->
join_(MPRC, Remain, [Retval|Got]);
_Other ->
join_(MPRC, Remain, [{error, {Error,Retval}}|Got])
case {Error, Retval} of
{nil, Retval} ->
%?debugVal(Retval),
%?debugVal(Remain),
%?debugVal(ets:tab2list(?MODULE)),
join_(MPRC, Remain, [{ok,Retval}|Got]);
{Error,nil} ->
join_(MPRC, Remain, [{error,Error}|Got]);
_Other -> % malformed message
throw({malform_msg, _Other})
end
end;
join_(MPRC, Remain, Got) ->

View File

@ -46,12 +46,12 @@ easy_test()->
{ok, Pid2}=gen_msgpack_rpc:start_link({local,?MODULE},?MODULE,localhost,9199,[tcp]),
?assertEqual(3, gen_msgpack_rpc:call(Pid2, add, [1, 2])),
?assertEqual(3, gen_msgpack_rpc:call(?MODULE, add, [1, 2])),
?assertEqual({ok,3}, gen_msgpack_rpc:call(Pid2, add, [1, 2])),
?assertEqual({ok,3}, gen_msgpack_rpc:call(?MODULE, add, [1, 2])),
{ok, CallID} = gen_msgpack_rpc:call_async(Pid2, add, [1,2]),
receive
{CallID, 3} -> ok;
{CallID, {ok,3}} -> ok;
_ -> ?assert(false)
end,
@ -62,6 +62,15 @@ easy_test()->
?assertEqual(ok,gen_server:call(Pid,stop)),
ok.
loop_receive(0, List)-> lists:keysort(1,List);
loop_receive(N, List)->
receive
{CID, {ok,S}} ->
loop_receive(N-1, [{CID,S}|List]);
_R -> ?debugVal(_R), ?assert(false), List
end.
easy2_test()->
{ok,Pid} = mprs_tcp:start_link(sample_srv, [{host,localhost},{port,9199}]),
?assert(is_pid(Pid)),
@ -73,22 +82,22 @@ easy2_test()->
Pairs=[{5,5}, {0,0}, {234, 2}, {213456789, -3}, {234, -23}, {-1,1}, {1,-1}, {-1,-1},
{-2000, 2000}, {2000, -2000}, {234, -234}],
lists:map( fun({L,R})->
?assertEqual(L+R, gen_msgpack_rpc:call(Pid2, add, [L,R]))
?assertEqual({ok,L+R}, gen_msgpack_rpc:call(Pid2, add, [L,R]))
end, Pairs ),
%% {error, {<<"no such func">>,nil}}=mp_client:call(add, 890, no_such_func, []),
%% mp_client:close(add).
{ok, CallID} = gen_msgpack_rpc:call_async(Pid2, add, [1,2]),
receive
{CallID, 3} -> ok;
_ -> ?assert(false)
end,
ok=gen_msgpack_rpc:stop(Pid2),
CallIDs = lists:map( fun({L,R})->
{ok,CallID}=gen_msgpack_rpc:call_async(Pid2, add, [L,R]),
{CallID, L+R}
end, Pairs ),
Results = loop_receive(length(CallIDs), []),
?assertEqual(lists:keysort(1,CallIDs), Results),
?assertEqual({error, <<"no such func">>}, gen_msgpack_rpc:call(Pid2, addhage, [])),
ok=gen_msgpack_rpc:stop(Pid2),
ok=mprc:stop(),
?assertEqual(ok,gen_server:call(Pid,stop)),
ok.
%% case_add(_Config)->
-endif.

View File

@ -51,19 +51,19 @@ easy_test()->
{ok,S} = mprc:connect(localhost,9199,[]),
{Ret,MPRC0} = mprc:call(S, hello, []),
?assertEqual(Ret, "hello, msgpack!"),
?assertEqual(Ret, {ok,"hello, msgpack!"}),
{Ret0, MPRC1} = mprc:call(MPRC0, add, [230,4]),
?assertEqual(234, Ret0),
?assertEqual({ok,234}, Ret0),
A=2937845, B=238945029038453490, C=A+B,
{Ret1, MPRC2} = mprc:call(MPRC1, add, [A,B]),
?assertEqual(C, Ret1),
?assertEqual({ok,C}, Ret1),
% TODO: make it exception thrown
% ?assertEqual(234, mprc:call(S, addo, [230,0])),
{ok, CallID0} = mprc:call_async(MPRC2, add, [23, -23]),
{ok, CallID1} = mprc:call_async(MPRC2, add, [23, 23]),
{Ans, _MPRC3} = mprc:join(MPRC2, [CallID0, CallID1]),
?assertEqual([46,0], Ans),
?assertEqual([{ok,46},{ok,0}], Ans),
% %?debugVal(mprc:join(_MPRC3, CallID2)),
?assertEqual(ok, mprc:close(S)),
@ -83,8 +83,9 @@ easy2_test()->
{ok, CallID1} = mprc:call_async(S, add, [2, 23]),
{ok, CallID2} = mprc:call_async(S, add, [2, 1]),
{ok, CallID3} = mprc:call_async(S, add, [2, 1]),
{_Ans, _MPRC3} = mprc:join(S, [CallID0, CallID1, CallID2,CallID3]),
%% ?assertEqual([213, 46,0], Ans),
{Ans, _MPRC3} = mprc:join(S, [CallID0, CallID1, CallID2,CallID3]),
Ans0 = lists:sort(lists:map(fun({ok,R})->R end, Ans)),
?assertEqual(lists:sort([-21,3,3,25]), Ans0),
?assertEqual(ok, mprc:close(S)),
ok = mprc:stop(),
@ -103,7 +104,7 @@ conn_test()->
lists:map(fun(MPRC)->
{Ret,_MPRC0} = mprc:call(MPRC, add, [234, -34]),
?assertEqual(200, Ret)
?assertEqual({ok,200}, Ret)
end, MPRCs),
lists:map(fun(MPRC)-> ?assertEqual(ok, mprc:close(MPRC)) end, MPRCs),