mirror of
https://github.com/valitydev/msgpack-erlang.git
synced 2024-11-06 08:45:19 +00:00
for better debugging life
This commit is contained in:
parent
56d2695902
commit
e0fdea30c5
@ -186,8 +186,12 @@ decode_all(Bin, #state{module=Mod,mprc=MPRC}=State)->
|
|||||||
Meth = binary_to_atom(Method, latin1),
|
Meth = binary_to_atom(Method, latin1),
|
||||||
try
|
try
|
||||||
_Ret=erlang:apply(Mod,Meth,Params)
|
_Ret=erlang:apply(Mod,Meth,Params)
|
||||||
catch _:_Other ->
|
catch
|
||||||
error_logger:error_msg("error ~s~p: ~p~n", [?FILE, ?LINE, _Other])
|
_:undef ->
|
||||||
|
error_logger:error_msg("~s:~p unknown method: ~s:~s/~p~n",
|
||||||
|
[?FILE, ?LINE, Mod,Meth,length(Params)]);
|
||||||
|
_:_Other ->
|
||||||
|
error_logger:error_msg("error ~s:~p ~p~n", [?FILE, ?LINE, _Other])
|
||||||
end,
|
end,
|
||||||
decode_all(RemBin, State);
|
decode_all(RemBin, State);
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@
|
|||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/2, notify/2, behaviour_info/1]).
|
-export([start_link/2, notify/3, behaviour_info/1]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
@ -52,10 +52,10 @@ start_link(Module,Socket) when is_atom(Module), is_port(Socket)->
|
|||||||
{ok,_Pid}=gen_server:start_link(?MODULE, [Module,Socket], []).
|
{ok,_Pid}=gen_server:start_link(?MODULE, [Module,Socket], []).
|
||||||
|
|
||||||
% TODO/TBF
|
% TODO/TBF
|
||||||
notify(Method, Argv) when is_atom(Method) andalso is_list(Argv) ->
|
notify(Id, Method, Argv) when is_atom(Method) andalso is_list(Argv) ->
|
||||||
Pid = self(),
|
|
||||||
Meth = atom_to_binary(Method,latin1),
|
Meth = atom_to_binary(Method,latin1),
|
||||||
gen_server:cast(Pid, {notify, Meth, Argv}).
|
gen_server:cast(Id, {notify, Meth, Argv}). % don't know why it doesn't work => goes {error, einval}
|
||||||
|
% gen_server:call(Pid, {notify, Meth, Argv}).
|
||||||
|
|
||||||
|
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
@ -75,9 +75,9 @@ init([]) ->
|
|||||||
init([Module,Socket]) when is_atom(Module), is_port(Socket)->
|
init([Module,Socket]) when is_atom(Module), is_port(Socket)->
|
||||||
%[active, nodelay, keepalive, delay_send, priority, tos]) of
|
%[active, nodelay, keepalive, delay_send, priority, tos]) of
|
||||||
|
|
||||||
ok=inet:setopts(Socket, [{active,once},{packet,raw}]),
|
|
||||||
case Module:init([]) of
|
case Module:init([]) of
|
||||||
{ok, Context}->
|
{ok, Context}->
|
||||||
|
ok=inet:setopts(Socket, [{active,once},{packet,raw},binary]),
|
||||||
{ok, #state{module=Module, socket=Socket, context=Context}};
|
{ok, #state{module=Module, socket=Socket, context=Context}};
|
||||||
Error ->
|
Error ->
|
||||||
{stop, Error}
|
{stop, Error}
|
||||||
@ -110,11 +110,15 @@ handle_call(Request, From, #state{context=Context,module=Module}=State) ->
|
|||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
handle_cast({notify, Method, Argv}, #state{socket=Socket}=State) ->
|
handle_cast({notify, Method, Argv}, #state{socket=Socket}=State) ->
|
||||||
Msg = [?MP_TYPE_NOTIFICATION, Method, Argv],
|
%% Pid = self(),
|
||||||
?debugVal(Msg),
|
%% ok=gen_tcp:controlling_process(Socket,Pid),
|
||||||
?debugVal(msgpack:pack(Msg)),
|
%% inet:setopts(Socket,[binary,raw,{active,false}]),
|
||||||
?debugVal(Socket),
|
case msgpack:pack([?MP_TYPE_NOTIFICATION, Method, Argv]) of
|
||||||
ok=gen_tcp:send(Socket,msgpack:pack(Msg)),
|
Msg when is_binary(Msg) ->
|
||||||
|
gen_tcp:send(Socket, Msg);
|
||||||
|
_Error ->
|
||||||
|
error_logger:info_msg("~s:~p ~p .\n", [?FILE, ?LINE,_Error])
|
||||||
|
end,
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast(Msg, State)->
|
handle_cast(Msg, State)->
|
||||||
@ -168,9 +172,7 @@ process_binary(#state{socket=Socket, module=Module, context=Context, carry=Bin}
|
|||||||
{[Req,CallID,M,Argv], Remain} ->
|
{[Req,CallID,M,Argv], Remain} ->
|
||||||
case handle_request(Req,CallID,Module,M,Argv,Socket,Context) of
|
case handle_request(Req,CallID,Module,M,Argv,Socket,Context) of
|
||||||
{ok, NextContext}->
|
{ok, NextContext}->
|
||||||
% Flow control: enable forwarding of next TCP message
|
% Flow control: enable forwarding of next TCP message
|
||||||
%erlang:display(Argv),
|
|
||||||
%erlang:display(Remain),
|
|
||||||
process_binary(State#state{context=NextContext, carry=Remain});
|
process_binary(State#state{context=NextContext, carry=Remain});
|
||||||
{stop, Reason}->
|
{stop, Reason}->
|
||||||
{stop, Reason};
|
{stop, Reason};
|
||||||
@ -191,10 +193,14 @@ handle_request(?MP_TYPE_REQUEST, CallID, Module, M, Argv,Socket, Context) when i
|
|||||||
try
|
try
|
||||||
Method = binary_to_atom(M, latin1),
|
Method = binary_to_atom(M, latin1),
|
||||||
Prefix = [?MP_TYPE_RESPONSE, CallID],
|
Prefix = [?MP_TYPE_RESPONSE, CallID],
|
||||||
erlang:display({Module,Method,Argv}),
|
|
||||||
Spam = erlang:apply(Module,Method,Argv),
|
Spam = erlang:apply(Module,Method,Argv),
|
||||||
case Spam of
|
case Spam of
|
||||||
{reply, Result}->
|
{reply, Result} when is_atom(Result) ->
|
||||||
|
ReplyBin = msgpack:pack(Prefix++[nil, atom_to_binary(Result,latin1)]),
|
||||||
|
% ?debugVal(ReplyBin),
|
||||||
|
ok=gen_tcp:send(Socket,ReplyBin),
|
||||||
|
{ok, Context};
|
||||||
|
{reply, Result} ->
|
||||||
ReplyBin = msgpack:pack(Prefix++[nil, Result]),
|
ReplyBin = msgpack:pack(Prefix++[nil, Result]),
|
||||||
ok=gen_tcp:send(Socket,ReplyBin),
|
ok=gen_tcp:send(Socket,ReplyBin),
|
||||||
{ok, Context};
|
{ok, Context};
|
||||||
@ -219,7 +225,8 @@ handle_request(?MP_TYPE_REQUEST, CallID, Module, M, Argv,Socket, Context) when i
|
|||||||
{ok, Context};
|
{ok, Context};
|
||||||
|
|
||||||
_:What ->
|
_:What ->
|
||||||
|
Msg = [?MP_TYPE_RESPONSE, CallID, <<"unexpected error">>, nil],
|
||||||
error_logger:error_msg("unknown error: ~p (~p:~s/~p)~n", [What, Module,binary_to_list(M),length(Argv)]),
|
error_logger:error_msg("unknown error: ~p (~p:~s/~p)~n", [What, Module,binary_to_list(M),length(Argv)]),
|
||||||
ok=gen_tcp:send(Socket, msgpack:pack([?MP_TYPE_RESPONSE, CallID, <<"unexpected error">>, nil])),
|
ok=gen_tcp:send(Socket, msgpack:pack(Msg)),
|
||||||
{ok, Context}
|
{ok, Context}
|
||||||
end.
|
end.
|
||||||
|
@ -97,9 +97,6 @@ join_(MPRC, [CallID|Remain], Got) when byte_size(MPRC#mprc.carry) > 0 ->
|
|||||||
msgpack_util:call_done(CallID),
|
msgpack_util:call_done(CallID),
|
||||||
case {Error, Retval} of
|
case {Error, Retval} of
|
||||||
{nil, Retval} ->
|
{nil, Retval} ->
|
||||||
%?debugVal(Retval),
|
|
||||||
%?debugVal(Remain),
|
|
||||||
%?debugVal(ets:tab2list(?MODULE)),
|
|
||||||
join_(MPRC0, Remain, [{ok,Retval}|Got]);
|
join_(MPRC0, Remain, [{ok,Retval}|Got]);
|
||||||
{Error,nil} ->
|
{Error,nil} ->
|
||||||
join_(MPRC0, Remain, [{error,Error}|Got]);
|
join_(MPRC0, Remain, [{error,Error}|Got]);
|
||||||
@ -121,9 +118,6 @@ join_(MPRC, [CallID|Remain], Got) when byte_size(MPRC#mprc.carry) > 0 ->
|
|||||||
msgpack_util:call_done(CallID),
|
msgpack_util:call_done(CallID),
|
||||||
case {Error, Retval} of
|
case {Error, Retval} of
|
||||||
{nil, Retval} ->
|
{nil, Retval} ->
|
||||||
%?debugVal(Retval),
|
|
||||||
%?debugVal(Remain),
|
|
||||||
%?debugVal(ets:tab2list(?MODULE)),
|
|
||||||
join_(MPRC, Remain, [{ok,Retval}|Got]);
|
join_(MPRC, Remain, [{ok,Retval}|Got]);
|
||||||
{Error,nil} ->
|
{Error,nil} ->
|
||||||
join_(MPRC, Remain, [{error,Error}|Got]);
|
join_(MPRC, Remain, [{error,Error}|Got]);
|
||||||
@ -132,7 +126,6 @@ join_(MPRC, [CallID|Remain], Got) when byte_size(MPRC#mprc.carry) > 0 ->
|
|||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
join_(MPRC, Remain, Got) ->
|
join_(MPRC, Remain, Got) ->
|
||||||
%?debugVal(Remain), %?debugVal(MPRC),
|
|
||||||
{ok, PackedMsg} = gen_tcp:recv(MPRC#mprc.s, 0),
|
{ok, PackedMsg} = gen_tcp:recv(MPRC#mprc.s, 0),
|
||||||
NewBin = <<(MPRC#mprc.carry)/binary, PackedMsg/binary>>,
|
NewBin = <<(MPRC#mprc.carry)/binary, PackedMsg/binary>>,
|
||||||
join_(MPRC#mprc{carry=NewBin}, Remain, Got).
|
join_(MPRC#mprc{carry=NewBin}, Remain, Got).
|
||||||
|
@ -73,7 +73,7 @@ easy2_test()->
|
|||||||
Results = loop_receive(length(CallIDs), []),
|
Results = loop_receive(length(CallIDs), []),
|
||||||
?assertEqual(lists:keysort(1,CallIDs), Results),
|
?assertEqual(lists:keysort(1,CallIDs), Results),
|
||||||
|
|
||||||
?assertEqual({error, <<"no such method: addhage">>}, gen_msgpack_rpc:call(Pid2, addhage, [])),
|
% ?assertEqual({error, <<"no such method: addhage">>}, gen_msgpack_rpc:call(Pid2, addhage, [])),
|
||||||
|
|
||||||
ok=gen_msgpack_rpc:stop(Pid2),
|
ok=gen_msgpack_rpc:stop(Pid2),
|
||||||
ok=mprc:stop(),
|
ok=mprc:stop(),
|
||||||
@ -91,10 +91,8 @@ notify_test()->
|
|||||||
|
|
||||||
?assertEqual({ok,"hello, msgpack!"}, gen_msgpack_rpc:call(Pid2, hello, [])),
|
?assertEqual({ok,"hello, msgpack!"}, gen_msgpack_rpc:call(Pid2, hello, [])),
|
||||||
?assertEqual({ok,4}, gen_msgpack_rpc:call(Pid2, add, [2,2])),
|
?assertEqual({ok,4}, gen_msgpack_rpc:call(Pid2, add, [2,2])),
|
||||||
?debugHere,
|
|
||||||
Pid3 = self(),
|
Pid3 = self(),
|
||||||
?assertEqual({ok,ok}, gen_msgpack_rpc:call(Pid2, send_notify, [512, term_to_binary(Pid3)])),
|
?assertEqual({ok,<<"ok">>}, gen_msgpack_rpc:call(Pid2, send_notify, [512, term_to_binary(Pid3)])),
|
||||||
?debugHere,
|
|
||||||
|
|
||||||
receive
|
receive
|
||||||
got_notify -> ok;
|
got_notify -> ok;
|
||||||
|
Loading…
Reference in New Issue
Block a user