* spawn each time request has come

* notify in UDP
This commit is contained in:
UENISHI Kota 2011-06-13 04:30:21 +09:00
parent dd274ced46
commit 4f3a2a8f11
3 changed files with 82 additions and 68 deletions

View File

@ -139,6 +139,10 @@ handle_info({tcp_closed, Socket}, #state{socket=Socket} = State) ->
% error_logger:debug_msg("~p Client ~p disconnected.\n", [self(), hoge]),
{stop, normal, State};
handle_info({do_reply, ReplyBin}, #state{socket=Socket} = State) ->
ok = gen_tcp:send(Socket, ReplyBin),
{noreply, State};
handle_info(_Info, State) ->
error_logger:info_msg("~p: unknown message: ~p .\n", [self(), _Info]),
{noreply, State}.
@ -169,20 +173,12 @@ code_change(OldVsn, #state{module=Module, context=Context} = State, Extra) ->
process_binary(#state{socket=Socket, module=Module, context=Context, carry=Bin} = State)->
case msgpack:unpack(Bin) of
{[Req,M,Argv], Remain} ->
handle_notify(Req, Module, M, Argv),
{[?MP_TYPE_NOTIFY,M,Argv], Remain} ->
handle_notify(Module, M, Argv),
process_binary(State#state{context=Context, carry=Remain});
{[?MP_TYPE_REQUEST,CallID,M,Argv], Remain} ->
spawn_request_handler(CallID, Module, M, Argv),
process_binary(State#state{context=Context, carry=Remain});
{[Req,CallID,M,Argv], Remain} ->
case handle_request(Req,CallID,Module,M,Argv,Socket,Context) of
{ok, NextContext}->
% Flow control: enable forwarding of next TCP message
process_binary(State#state{context=NextContext, carry=Remain});
{stop, Reason}->
{stop, Reason};
_Other->
error_logger:error_msg("failed unpack: ~p result: ~p~n", [Bin, _Other]),
{noreply, State}
end;
{error, incomplete} -> % carry over
ok=inet:setopts(Socket, [{active,once}]),
{noreply, State};
@ -191,50 +187,69 @@ process_binary(#state{socket=Socket, module=Module, context=Context, carry=Bin}
{stop, Reason, State}
end.
handle_notify(?MP_TYPE_NOTIFY, Module, M, Argv)->
handle_notify(Module, M, Argv)->
spawn(fun()->
Method = binary_to_existing_atom(M, latin1),
erlang:apply(Module, Method, Argv)
end).
handle_request(?MP_TYPE_REQUEST, CallID, Module, M, Argv,Socket, Context) when is_integer(CallID), is_binary(M) ->
try
Method = binary_to_atom(M, latin1),
Prefix = [?MP_TYPE_RESPONSE, CallID],
Spam = erlang:apply(Module,Method,Argv),
case Spam of
{reply, Result} when is_atom(Result) ->
ReplyBin = msgpack:pack(Prefix++[nil, atom_to_binary(Result,latin1)]),
% ?debugVal(ReplyBin),
ok=gen_tcp:send(Socket,ReplyBin),
{ok, Context};
{reply, Result} ->
ReplyBin = msgpack:pack(Prefix++[nil, Result]),
ok=gen_tcp:send(Socket,ReplyBin),
{ok, Context};
{noreply, _Result}-> {ok, Context};
{stop_reply, Result, Reason}->
ReplyBin = msgpack:pack(Prefix++[ nil, Result]),
ok=gen_tcp:send(Socket,ReplyBin),
{stop, Reason};
{stop, Reason}-> {stop, Reason};
{error, Reason}->
ReplyBin = msgpack:pack(Prefix++[Reason, nil]),
ok=gen_tcp:send(Socket,ReplyBin),
{ok,Context}
end
catch
_:undef ->
erlang:display(Module:module_info()),
error_logger:error_msg("~s:~p no such method: ~p:~s/~p~n",
[?FILE,?LINE,Module,binary_to_list(M),length(Argv)]),
Msg = << (<<"no such method: ">>)/binary, M/binary>>,
ok=gen_tcp:send(Socket, msgpack:pack([?MP_TYPE_RESPONSE, CallID, Msg, nil])),
{ok, Context};
_:What ->
Msg = [?MP_TYPE_RESPONSE, CallID, <<"unexpected error">>, nil],
error_logger:error_msg("unknown error: ~p (~p:~s/~p)~n", [What, Module,binary_to_list(M),length(Argv)]),
ok=gen_tcp:send(Socket, msgpack:pack(Msg)),
{ok, Context}
end.
spawn_request_handler(CallID, Module, M, Argv)->
Self = self(),
F= fun()->
Method = binary_to_existing_atom(M, latin1),
Prefix = [?MP_TYPE_RESPONSE, CallID],
Ret = case erlang:apply(Module,Method,Argv) of
{reply, Result} when is_atom(Result) ->
msgpack:pack(Prefix++[nil, atom_to_binary(Result,latin1)]);
{reply, Result} ->
msgpack:pack(Prefix++[nil, Result]);
{error, Reason} when is_atom(Reason)->
msgpack:pack(Prefix++[Reason, nil]);
{error, Reason} ->
msgpack:pack(Prefix++[Reason, nil])
end,
Self ! {do_reply, Ret}
end,
spawn_link(F).
%% handle_request(?MP_TYPE_REQUEST, CallID, Module, M, Argv,Socket, Context) when is_integer(CallID), is_binary(M) ->
%% try
%% Method = binary_to_atom(M, latin1),
%% Prefix = [?MP_TYPE_RESPONSE, CallID],
%% Spam = erlang:apply(Module,Method,Argv),
%% case Spam of
%% {reply, Result} when is_atom(Result) ->
%% ReplyBin = msgpack:pack(Prefix++[nil, atom_to_binary(Result,latin1)]),
%% % ?debugVal(ReplyBin),
%% ok=gen_tcp:send(Socket,ReplyBin),
%% {ok, Context};
%% {reply, Result} ->
%% ReplyBin = msgpack:pack(Prefix++[nil, Result]),
%% ok=gen_tcp:send(Socket,ReplyBin),
%% {ok, Context};
%% {noreply, _Result}-> {ok, Context};
%% {stop_reply, Result, Reason}->
%% ReplyBin = msgpack:pack(Prefix++[ nil, Result]),
%% ok=gen_tcp:send(Socket,ReplyBin),
%% {stop, Reason};
%% {stop, Reason}-> {stop, Reason};
%% {error, Reason}->
%% ReplyBin = msgpack:pack(Prefix++[Reason, nil]),
%% ok=gen_tcp:send(Socket,ReplyBin),
%% {ok,Context}
%% end
%% catch
%% _:undef ->
%% erlang:display(Module:module_info()),
%% error_logger:error_msg("~s:~p no such method: ~p:~s/~p~n",
%% [?FILE,?LINE,Module,binary_to_list(M),length(Argv)]),
%% Msg = << (<<"no such method: ">>)/binary, M/binary>>,
%% ok=gen_tcp:send(Socket, msgpack:pack([?MP_TYPE_RESPONSE, CallID, Msg, nil])),
%% {ok, Context};
%% _:What ->
%% Msg = [?MP_TYPE_RESPONSE, CallID, <<"unexpected error">>, nil],
%% error_logger:error_msg("unknown error: ~p (~p:~s/~p)~n", [What, Module,binary_to_list(M),length(Argv)]),
%% ok=gen_tcp:send(Socket, msgpack:pack(Msg)),
%% {ok, Context}
%% end.

View File

@ -195,8 +195,11 @@ dispatch(From, IP, InPortNo, Packet, #state{module=Module,socket=Socket}=State)-
case msgpack:unpack(Packet) of
{error, incomplete}->
error_logger:error_report([?MODULE, ?LINE, {IP,InPortNo,State, Packet}]);
{[?MP_TYPE_REQUEST,CallID,M,Argv], Remain}->
Method = binary_to_atom(M, latin1),
{[?MP_TYPE_NOTIFY,M, Argv], _}->
Method = binary_to_existing_atom(M, latin1),
erlang:apply(Module, Method, Argv);
{[?MP_TYPE_REQUEST,CallID,M,Argv], _}->
Method = binary_to_existing_atom(M, latin1),
ReplyBin =
try
case erlang:apply(Module,Method,Argv) of
@ -217,9 +220,5 @@ dispatch(From, IP, InPortNo, Packet, #state{module=Module,socket=Socket}=State)-
[What, Module,binary_to_list(M),length(Argv)]),
msgpack:pack([?MP_TYPE_RESPONSE, CallID, Msg, nil])
end,
From ! {send, Socket, IP, InPortNo, ReplyBin},
case Remain of
<<>> -> ok;
_ -> dispatch(From, IP, InPortNo, Remain, State)
end
From ! {send, Socket, IP, InPortNo, ReplyBin}
end.

View File

@ -91,13 +91,13 @@ notify_test()->
?assertEqual({ok,<<"hello, msgpack!">>}, gen_msgpack_rpc:call(Pid2, hello, [])),
?assertEqual({ok,4}, gen_msgpack_rpc:call(Pid2, add, [2,2])),
Pid3 = self(),
?assertEqual({ok,<<"ok">>}, gen_msgpack_rpc:call(Pid2, send_notify, [512, term_to_binary(Pid3)])),
%% Pid3 = self(),
%% ?assertEqual({ok,<<"ok">>}, gen_msgpack_rpc:call(Pid2, send_notify, [512, term_to_binary(Pid3)])),
receive
got_notify -> ok;
_ -> ?assert(false)
end,
%% receive
%% got_notify -> ok;
%% _ -> ?assert(false)
%% end,
ok=gen_msgpack_rpc:stop(Pid2),
ok=mprc:stop(),