add UDP transport

This commit is contained in:
UENISHI Kota 2011-05-06 15:39:59 +09:00
parent 5958c32e23
commit e16446f88c
6 changed files with 397 additions and 73 deletions

View File

@ -75,17 +75,24 @@ connect(Address, Port, Options)->
-spec call(mprc(), Method::atom(), Argv::list()) ->
{ok, term()} | {error, {atom(), any()}}.
call(#mprc{transport=tcp}=MPRC, Method, Argv) when is_atom(Method), is_list(Argv) ->
mprc_tcp:call(MPRC, Method, Argv).
mprc_tcp:call(MPRC, Method, Argv);
call(#mprc{transport=udp}=MPRC, Method, Argv) when is_atom(Method), is_list(Argv) ->
mprc_udp:call(MPRC, Method, Argv).
call_async(#mprc{transport=tcp}=MPRC,Method,Argv)->
mprc_tcp:call_async(MPRC, Method, Argv).
mprc_tcp:call_async(MPRC, Method, Argv);
call_async(#mprc{transport=udp}=MPRC,Method,Argv)->
mprc_udp:call_async(MPRC, Method, Argv).
-spec join(mprc(), integer() | [integer()]) -> {term(), mprc} | {[term()], mprc()} | {error, term()}.
join(#mprc{transport=tcp}=MPRC, CallIDs)->
mprc_tcp:join(MPRC, CallIDs).
mprc_tcp:join(MPRC, CallIDs);
join(#mprc{transport=udp}=MPRC, CallIDs)->
mprc_udp:join(MPRC, CallIDs).
-spec close(mprc()) -> ok|{error,term()}.
close(#mprc{transport=tcp}=MPRC)-> mprc_tcp:close(MPRC).
close(#mprc{transport=tcp}=MPRC)-> mprc_tcp:close(MPRC);
close(#mprc{transport=udp}=MPRC)-> mprc_udp:close(MPRC).
-spec controlling_process(mprc(), pid())-> ok.
controlling_process(#mprc{transport=tcp}=MPRC, Pid)->
@ -104,6 +111,8 @@ append_binary(#mprc{transport=tcp}=MPRC, Bin)->
parse_options([], Options)-> Options;
parse_options([tcp|L], Options) ->
parse_options(L, Options#options{transport=tcp});
parse_options([udp|L], Options) ->
parse_options(L, Options#options{transport=udp});
parse_options([inet|L], Options) ->
parse_options(L, Options#options{ip=inet});
parse_options([{host,Host}|L],Options)->
@ -112,16 +121,18 @@ parse_options([{port,Port}|L], Options) when is_integer(Port), 0 < Port , Port <
parse_options(L, Options#options{port=Port});
parse_options([Opt|_], _)->
{error, {not_supported, Opt}}.
%% parse_options([udp|L], Options) ->
%% {error, {not_supported, udp}};
%% parse_options([sctp|L], Options) ->
%% {error, {not_supported, sctp}};
%% parse_options([uds|L], Options) ->
%% {error, {not_supported, uds}};
%% parse_options([
get_module(#options{transport=tcp}=_Opts)-> mprc_tcp.
get_module(#options{transport=tcp}=_Opts)-> mprc_tcp;
get_module(#options{transport=udp}=_Opts)-> mprc_udp.
make_options(#options{transport=udp}=Opt)->
[Opt#options.ip, binary, {active, false}];
make_options(Opt)->
% [{host,Opt#options.host}, {port,Opt#options.port},
[Opt#options.ip, {packet,0}, binary, {active, false}].

153
src/mprc_udp.erl Normal file
View File

@ -0,0 +1,153 @@
%%
%% MessagePack for Erlang
%%
%% Copyright (C) 2010-2011 UENISHI Kota
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%% @doc
%% easy client that can't recieve notify protocol.
%% <code>
%% sample()->
%% %just as a syntax sugar for start_link
%% %YourModule defines receiver-callback when notification came from server.
%% {ok, S}=mprc:connect(Address, Port, [tcp]),
%% mprc:call(S, somemethod, [1,2]), % returns 3
%% mprc:call_async(S, somemethod, [1,2]),
%% receive
%% {ok, Answer} -> ok;% maybe 3
%% _ -> error
%% after 1024 -> timeout end
%% mprc:close(Pid).
%% </code>
%%% @end
-module(mprc_udp).
-include("msgpack_rpc.hrl").
-include_lib("eunit/include/eunit.hrl").
%% external API
-export([start/0, stop/0,
connect/3, close/1, call/3, call_async/3, join/2,
controlling_process/2, active_once/1, append_binary/2]).
%%====================================================================
%% API
%%====================================================================
-spec start()-> ok.
start()->
msgpack_util:start().
-spec stop()-> ok.
stop()->
msgpack_util:stop().
-spec connect(gen_tcp:ip_address(), Port::(0..65535), Options::[term()])->
{ok, mprc()}|{error,Reason::term()}.
connect(Address, Port, Options)->
{ok,S}=gen_udp:open(0, Options),
{ok, #mprc{s=S, transport=udp, host=Address, port=Port}}.
% synchronous calls
% when method 'Method' doesn't exist in server implementation,
% it returns {error, {<<"no such method">>, nil}}
% user func error => {error, {<<"unexpected error">>, nil}}
-spec call(mprc(), Method::atom(), Argv::list()) ->
{ok, term()} | {error, {atom(), any()}}.
call(MPRC, Method, Argv) when is_atom(Method), is_list(Argv) ->
{ok,CallID}=call_async(MPRC,Method,Argv),
?MODULE:join(MPRC,CallID).
call_async(MPRC,Method,Argv)->
CallID = msgpack_util:get_callid(),
Meth = atom_to_binary(Method,latin1),
case msgpack:pack([?MP_TYPE_REQUEST,CallID,Meth,Argv]) of
{error, Reason}->
{error, Reason};
Pack ->
ok=gen_udp:send(MPRC#mprc.s, MPRC#mprc.host, MPRC#mprc.port, Pack),
{ok,CallID}
end.
-spec join(mprc(), integer() | [integer()]) -> {term(), mprc} | {[term()], mprc()} | {error, term()}.
join(MPRC, CallIDs) when is_list(CallIDs)-> join_(MPRC, CallIDs, []);
join(MPRC, CallID)->
{[Term], MPRC0} = join_(MPRC, [CallID], []),
{Term, MPRC0}.
join_(MPRC, [], Got)-> {Got, MPRC};
join_(MPRC, [CallID|Remain], Got) when byte_size(MPRC#mprc.carry) > 0 ->
case msgpack_util:lookup(CallID) of
[{CallID,CallID}|_] ->
case msgpack:unpack(MPRC#mprc.carry) of
{[?MP_TYPE_RESPONSE, CallID, Error, Retval], RemainBin}->
MPRC0 = MPRC#mprc{carry=RemainBin},
msgpack_util:call_done(CallID),
case {Error, Retval} of
{nil, Retval} ->
join_(MPRC0, Remain, [{ok,Retval}|Got]);
{Error,nil} ->
join_(MPRC0, Remain, [{error,Error}|Got]);
_Other -> % malformed message
throw({malform_msg, _Other})
end;
{[?MP_TYPE_RESPONSE, CallID0, Error, Retval], RemainBin}->
msgpack_util:insert({CallID0, Error, Retval}),
MPRC0 = MPRC#mprc{carry=RemainBin},
join_(MPRC0, [CallID|Remain], Got);
{error, incomplete} ->
{ok, {_Host,Port,PackedMsg}} = gen_udp:recv(MPRC#mprc.s, 65536),
% Host=MPRC#mprc.host,
Port=MPRC#mprc.port,
NewBin = <<(MPRC#mprc.carry)/binary, PackedMsg/binary>>,
join_(MPRC#mprc{carry=NewBin}, Remain, Got);
{error, Reason} ->
{error, Reason}
end;
[{CallID,Error,Retval}|_] ->
msgpack_util:call_done(CallID),
case {Error, Retval} of
{nil, Retval} ->
join_(MPRC, Remain, [{ok,Retval}|Got]);
{Error,nil} ->
join_(MPRC, Remain, [{error,Error}|Got]);
_Other -> % malformed message
throw({malform_msg, _Other})
end
end;
join_(MPRC, Remain, Got) ->
{ok, {_Host,Port,PackedMsg}} = gen_udp:recv(MPRC#mprc.s, 0),
% Host=MPRC#mprc.host,
Port=MPRC#mprc.port,
NewBin = <<(MPRC#mprc.carry)/binary, PackedMsg/binary>>,
join_(MPRC#mprc{carry=NewBin}, Remain, Got).
-spec close(mprc()) -> ok|{error,term()}.
close(Client)-> gen_udp:close(Client#mprc.s).
-spec controlling_process(mprc(), pid())-> ok.
controlling_process(MPRC, Pid)->
gen_udp:controlling_process(MPRC#mprc.s, Pid).
-spec active_once(mprc())-> ok.
active_once(MPRC)->
inet:setopts(MPRC#mprc.s, [{active,once}]).
append_binary(MPRC, Bin)->
NewBin = <<(MPRC#mprc.carry)/binary, Bin/binary>>,
MPRC#mprc{carry=NewBin}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------

View File

@ -38,6 +38,8 @@ stop(Pid)->
parse_options([], Options)-> Options;
parse_options([tcp|L], Options) ->
parse_options(L, Options#options{transport=tcp});
parse_options([udp|L], Options) ->
parse_options(L, Options#options{transport=udp});
parse_options([inet|L], Options) ->
parse_options(L, Options#options{ip=inet});
parse_options([{host,Host}|L],Options)->
@ -46,15 +48,14 @@ parse_options([{port,Port}|L], Options) when is_integer(Port), 0 < Port , Port <
parse_options(L, Options#options{port=Port});
parse_options([Opt|_], _)->
{error, {not_supported, Opt}}.
%% parse_options([udp|L], Options) ->
%% {error, {not_supported, udp}};
%% parse_options([sctp|L], Options) ->
%% {error, {not_supported, sctp}};
%% parse_options([uds|L], Options) ->
%% {error, {not_supported, uds}};
%% parse_options([
get_module(#options{transport=tcp}=_Opts)-> mprs_tcp.
get_module(#options{transport=tcp}=_Opts)-> mprs_tcp;
get_module(#options{transport=udp}=_Opts)-> mprs_udp.
make_options(Opt)->
[{host,Opt#options.host}, {port,Opt#options.port},

View File

@ -1,98 +1,225 @@
%%%-------------------------------------------------------------------
%%% File : mprs_udp.erl
%%% Author : UENISHI Kota <kuenishi@gmail.com>
%%% Description :
%%
%% MessagePack for Erlang
%%
%% Copyright (C) 2010-2011 UENISHI Kota
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%% @doc
%%%
%%% Created : 2 May 2011 by UENISHI Kota <kuenishi@gmail.com>
%%%-------------------------------------------------------------------
%%% @end
-module(mprs_udp).
-behaviour(gen_server).
-include("msgpack_rpc.hrl").
%% API
-export([start_link/0]).
-export([start_link/2, start_link/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([dispatch/5]).
-define(SERVER, ?MODULE).
-record(state, {}).
-record(state, {socket :: inet:socket(),
module :: atom()}).
-include_lib("eunit/include/eunit.hrl").
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%%===================================================================
%%% API
%%%===================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%% @doc
%% Starts the server
%% TODO: maket start_link/3 for multiple listening
%% @end
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
-spec start_link(Mod::atom(), [term()]) ->
{ok, Pid::pid()} | ignore | {error, Error::term()}.
start_link(Mod, Options) ->
start_link({local, ?SERVER}, Mod, Options).
-spec start_link(Id::atom(), Mod::atom(), [term()]) ->
{ok, Pid::pid()} | ignore | {error, Error::term()}.
start_link(Id, Mod, Options) ->
gen_server:start_link(Id, ?MODULE, [Mod, Options], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
init([Mod,Options]) ->
{Host,Options0} = msgpack_util:pppop(host,Options),
{Port,_Options1} = msgpack_util:pppop(port,Options0),
init_(Mod,Host,Port).
init_(Mod, Host, Port) when is_atom(Host)->
{ok,Addr} = inet:getaddr(Host, inet),
init_(Mod, Addr, Port);
init_(Mod, Host, Port) when is_list(Host)->
{ok,Addr} = inet:getaddr(Host, inet),
init_(Mod, Addr, Port);
init_(Mod, Addr, Port) when is_atom(Mod), 0<Port, Port<65535, is_tuple(Addr)->
Opts = [binary, {reuseaddr, true}, {active, false}, {ip, Addr}],
% ?debugVal(Opts),
case gen_udp:open(Port,Opts) of % FIXME: inet6
{ok, Socket} ->
%%Create first accepting process
inet:setopts(Socket,[{active,once}]),
{ok, #state{socket = Socket, module = Mod}};
{error, Reason} ->
?debugVal(Reason),
{stop, Reason}
end.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%% @private
%% @doc
%% Handling call messages
%%
%% @spec handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call(stop, _, State)->
{stop, normal, ok, State};
handle_call(Request, _From, State) ->
{stop, {unknown_call, Request}, {ok, abnormal}, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @spec handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_info({udp, Socket, IP, InPortNo, Packet}, #state{socket=Socket} = State) ->
spawn(?MODULE, dispatch, [self(),IP, InPortNo, Packet, State]),
% erlang:apply(fun ?MODULE:dispatch/5, [self(),IP, InPortNo, Packet, State]),
%% Signal the network driver that we are ready to accept another connection
inet:setopts(Socket, [{active,once}]),
{noreply, State};
handle_info({send, Socket, IP, InPortNo, Packet}, State) ->
ok=gen_udp:send(Socket, IP, InPortNo, Packet),
{noreply, State};
handle_info({'EXIT', _Pid, normal}, State) ->
% error_logger:error_report({?MODULE, ?LINE, {State}}),
{noreply, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
%% If there was an unexpected error accepting, log and sleep.
error_logger:error_report({?MODULE, ?LINE, {error, Reason}}),
{noreply, State};
handle_info(_Info, State) ->
error_logger:error_report({?MODULE, ?LINE, {_Info}}),
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
terminate(_Reason, State) ->
gen_udp:close(State#state.socket).
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%%===================================================================
%%% Internal functions
%%--------------------------------------------------------------------
%%%===================================================================
dispatch(From, IP, InPortNo, Packet, #state{module=Module,socket=Socket}=State)->
case msgpack:unpack(Packet) of
{error, incomplete}->
error_logger:error_report([?MODULE, ?LINE, {IP,InPortNo,State, Packet}]);
{[?MP_TYPE_REQUEST,CallID,M,Argv], Remain}->
Method = binary_to_atom(M, latin1),
ReplyBin =
try
case erlang:apply(Module,Method,Argv) of
{reply, Result} ->
msgpack:pack([?MP_TYPE_RESPONSE, CallID, nil, Result]);
{error, Reason}->
msgpack:pack([?MP_TYPE_RESPONSE, CallID, Reason, nil])
end
catch
_:undef ->
error_logger:error_msg("~s:~p no such method: ~p:~s/~p~n",
[?FILE,?LINE,Module,binary_to_list(M),length(Argv)]),
Msg = << (<<"no such method: ">>)/binary, M/binary>>,
msgpack:pack([?MP_TYPE_RESPONSE, CallID, Msg, nil]);
_:What ->
Msg = [?MP_TYPE_RESPONSE, CallID, <<"unexpected error">>, nil],
error_logger:error_msg("unknown error: ~p (~p:~s/~p)~n",
[What, Module,binary_to_list(M),length(Argv)]),
msgpack:pack([?MP_TYPE_RESPONSE, CallID, Msg, nil])
end,
From ! {send, Socket, IP, InPortNo, ReplyBin},
case Remain of
<<>> -> ok;
_ -> dispatch(From, IP, InPortNo, Remain, State)
end
end.

View File

@ -25,15 +25,17 @@
-type global_name() :: term().
-type transport() :: tcp | udp. % sctp | snappy | zip | etc...
-type nport() :: (1..65535).
-record(mprc, { s :: inet:socket(),
carry = <<>> :: binary(),
transport = tcp :: transport()}).
transport = tcp :: transport(),
host :: inet:ip_address(),
port :: nport()
}).
-type mprc() :: #mprc{}.
-type server_name() :: {local, name()} | {global, global_name()}.
-type server_ref() :: pid() | name() | { name(), node() } | {global, global_name()}.
-type nport() :: (1..65535).
-endif.

View File

@ -78,3 +78,33 @@ conn_test()->
?assertEqual(ok,gen_server:call(Pid,stop)),
ok.
% UDP test
easy3_test()->
{ok,Pid} = mprs:start_link(sample_srv, [{host,localhost},{port,9199},udp]),
?assert(is_pid(Pid)),
ok = mprc:start(),
{ok,S} = mprc:connect(localhost,9199,[udp]),
{Ret, _} = mprc:call(S, hello, []),
?assertEqual(Ret, {ok,<<"hello, msgpack!">>}),
{Ret0, _} = mprc:call(S, add, [230,4]),
?assertEqual({ok,234}, Ret0),
A=2937845, B=238945029038453490, C=A+B,
{Ret1, _} = mprc:call(S, add, [A,B]),
?assertEqual({ok,C}, Ret1),
%% % TODO: make it exception thrown
%% % ?assertEqual(234, mprc:call(S, addo, [230,0])),
{ok, CallID0} = mprc:call_async(S, add, [23, -23]),
{ok, CallID1} = mprc:call_async(S, add, [23, 23]),
{Ans, _MPRC3} = mprc:join(S, [CallID0, CallID1]),
?assertEqual(Ans, [{ok,46},{ok,0}]), % maybe reversed
% ?debugVal(mprc:join(_MPRC3, CallID2)),
?assertEqual(ok, mprc:close(S)),
ok = mprc:stop(),
?assertEqual(ok,gen_server:call(Pid,stop)),
ok.