mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 10:48:51 +00:00
allow configurable recv_timeouts
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666435 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
919a801a28
commit
b7c8802d36
@ -27,7 +27,9 @@
|
||||
max=2048,
|
||||
ip=any,
|
||||
listen=null,
|
||||
acceptor=null}).
|
||||
acceptor=null,
|
||||
socket_opts=[{recv_timeout, 500}]
|
||||
}).
|
||||
|
||||
start(State=#thrift_socket_server{}) ->
|
||||
io:format("~p~n", [State]),
|
||||
@ -78,6 +80,8 @@ parse_options([{ip, Ip} | Rest], State) ->
|
||||
IpTuple
|
||||
end,
|
||||
parse_options(Rest, State#thrift_socket_server{ip=ParsedIp});
|
||||
parse_options([{socket_opts, L} | Rest], State) when is_list(L), length(L) > 0 ->
|
||||
parse_options(Rest, State#thrift_socket_server{socket_opts=L});
|
||||
parse_options([{handler, Handler} | Rest], State) ->
|
||||
parse_options(Rest, State#thrift_socket_server{handler=Handler});
|
||||
parse_options([{service, Service} | Rest], State) ->
|
||||
@ -152,19 +156,22 @@ gen_tcp_listen(Port, Opts, State) ->
|
||||
new_acceptor(State=#thrift_socket_server{max=0}) ->
|
||||
error_logger:error_msg("Not accepting new connections"),
|
||||
State#thrift_socket_server{acceptor=null};
|
||||
new_acceptor(State=#thrift_socket_server{acceptor=OldPid, listen=Listen,service=Service, handler=Handler}) ->
|
||||
new_acceptor(State=#thrift_socket_server{acceptor=OldPid, listen=Listen,
|
||||
service=Service, handler=Handler,
|
||||
socket_opts=Opts
|
||||
}) ->
|
||||
Pid = proc_lib:spawn_link(?MODULE, acceptor_loop,
|
||||
[{self(), Listen, Service, Handler}]),
|
||||
[{self(), Listen, Service, Handler, Opts}]),
|
||||
%% error_logger:info_msg("Spawning new acceptor: ~p => ~p", [OldPid, Pid]),
|
||||
State#thrift_socket_server{acceptor=Pid}.
|
||||
|
||||
acceptor_loop({Server, Listen, Service, Handler})
|
||||
when is_pid(Server) ->
|
||||
case catch gen_tcp:accept(Listen) of
|
||||
acceptor_loop({Server, Listen, Service, Handler, SocketOpts})
|
||||
when is_pid(Server), is_list(SocketOpts) ->
|
||||
case catch gen_tcp:accept(Listen) of % infiinite timeout
|
||||
{ok, Socket} ->
|
||||
gen_server:cast(Server, {accepted, self()}),
|
||||
ProtoGen = fun() ->
|
||||
{ok, SocketTransport} = thrift_socket_transport:new(Socket),
|
||||
{ok, SocketTransport} = thrift_socket_transport:new(Socket, SocketOpts),
|
||||
{ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport),
|
||||
{ok, Protocol} = thrift_binary_protocol:new(BufferedTransport),
|
||||
{ok, IProt=Protocol, OProt=Protocol}
|
||||
|
@ -3,18 +3,33 @@
|
||||
-behaviour(thrift_transport).
|
||||
|
||||
-export([new/1,
|
||||
new/2,
|
||||
write/2, read/2, flush/1, close/1]).
|
||||
|
||||
-record(data, {socket}).
|
||||
-record(data, {socket,
|
||||
recv_timeout=infinity}).
|
||||
|
||||
new(Socket) ->
|
||||
thrift_transport:new(?MODULE, #data{socket = Socket}).
|
||||
new(Socket, []).
|
||||
|
||||
write(#data{socket = Socket}, Data) when is_binary(Data) ->
|
||||
new(Socket, Opts) when is_list(Opts) ->
|
||||
State =
|
||||
case lists:keysearch(recv_timeout, 1, Opts) of
|
||||
{value, {recv_timeout, Timeout}}
|
||||
when is_integer(Timeout), Timeout > 0 ->
|
||||
#data{socket=Socket, recv_timeout=Timeout};
|
||||
_ ->
|
||||
#data{socket=Socket}
|
||||
end,
|
||||
thrift_transport:new(?MODULE, State).
|
||||
|
||||
write(#data{socket = Socket}, Data)
|
||||
when is_binary(Data) ->
|
||||
gen_tcp:send(Socket, Data).
|
||||
|
||||
read(#data{socket = Socket}, Len) when is_integer(Len), Len >= 0 ->
|
||||
gen_tcp:recv(Socket, Len).
|
||||
read(D = #data{socket=Socket, recv_timeout=Timeout}, Len)
|
||||
when is_integer(Len), Len >= 0 ->
|
||||
gen_tcp:recv(Socket, Len, Timeout).
|
||||
|
||||
%% We can't really flush - everything is flushed when we write
|
||||
flush(_) ->
|
||||
|
Loading…
Reference in New Issue
Block a user