mirror of
https://github.com/valitydev/elixir-thrift.git
synced 2024-11-06 18:25:16 +00:00
Remove the client-side retry logic (#303)
In addition to the moderate state-management complexity introduced by this feature, we've also found that implementing message-level retry logic directly within a Thrift client to not be ideal. Instead, retries are better implemented by either application logic (which has complete understanding of the message payloads, idempotence, etc.) or closer to the wire level, which can make stronger guarantees about whether the original message was or was not delivered. We also no longer attempt to reconnect from the disconnect/2 path. Instead, we :stop with a clear error reason.
This commit is contained in:
parent
04debd652d
commit
c8f12b1047
@ -176,7 +176,6 @@ Name | Type | Description
|
||||
-----------------|------|---------------
|
||||
`:timeout` | positive integer | The default timeout for reading from, writing to, and connecting to sockets.
|
||||
`send_timeout` | positive integer | The amount of time in milliseconds to wait before sending data fails.
|
||||
`retry` | boolean | Whether or not to retry if the server closes the connection (default false). If the client detects that the server has closed the connection, the last message will be retried. This is helpful when using the client in IEx, or in a situation where the client won't be used for a while, but can result in duplicate messages being sent to the server. Due to the subtleties of `gen_tcp`, oneway messages are not retried.
|
||||
|
||||
##### GenServer Opts
|
||||
Name | Type | Description
|
||||
@ -189,11 +188,10 @@ Name | Type | Description
|
||||
```elixir
|
||||
alias Thrift.Test.UserService.Binary.Framed.Client
|
||||
{:ok, client} = Client.start_link("localhost", 2345,
|
||||
tcp_opts: [retry: true], gen_server_opts: [timeout: 10_000])
|
||||
tcp_opts: [], gen_server_opts: [timeout: 10_000])
|
||||
|
||||
```
|
||||
In the above example, the client will retry once if the remote server severs the connection.
|
||||
These options also set the GenServer timeout to be ten seconds, which means the remote
|
||||
These options set the GenServer timeout to be ten seconds, which means the remote
|
||||
side can take its time to reply.
|
||||
|
||||
|
||||
|
@ -40,18 +40,14 @@ defmodule Thrift.Binary.Framed.Client do
|
||||
tcp_opts: Client.tcp_opts,
|
||||
timeout: integer,
|
||||
sock: pid,
|
||||
seq_id: integer,
|
||||
retry: boolean,
|
||||
last_message: any
|
||||
seq_id: integer
|
||||
}
|
||||
defstruct host: nil,
|
||||
port: nil,
|
||||
tcp_opts: nil,
|
||||
timeout: 5000,
|
||||
sock: nil,
|
||||
seq_id: 0,
|
||||
retry: false,
|
||||
last_message: nil
|
||||
seq_id: 0
|
||||
end
|
||||
|
||||
require Logger
|
||||
@ -61,13 +57,11 @@ defmodule Thrift.Binary.Framed.Client do
|
||||
tcp_opts = Keyword.get(opts, :tcp_opts, [])
|
||||
|
||||
{timeout, tcp_opts} = Keyword.pop(tcp_opts, :timeout, 5000)
|
||||
{should_retry, tcp_opts} = Keyword.pop(tcp_opts, :retry, false)
|
||||
|
||||
s = %State{host: to_host(host),
|
||||
port: port,
|
||||
tcp_opts: tcp_opts,
|
||||
timeout: timeout,
|
||||
retry: should_retry}
|
||||
timeout: timeout}
|
||||
|
||||
{:connect, :init, s}
|
||||
end
|
||||
@ -87,8 +81,6 @@ defmodule Thrift.Binary.Framed.Client do
|
||||
|
||||
- `send_timeout`: An integer that governs how long our connection waits when sending data.
|
||||
|
||||
- `retry`: A boolean that tells the client whether or not it should retry on failures.
|
||||
|
||||
`gen_server_opts`: A keyword list of options that control the gen_server behaviour.
|
||||
|
||||
- `timeout`: The amount of time to wait (in milliseconds) for a reply from a `GenServer` call.
|
||||
@ -105,15 +97,14 @@ defmodule Thrift.Binary.Framed.Client do
|
||||
def close(conn), do: Connection.call(conn, :close)
|
||||
|
||||
@doc false
|
||||
def connect(info, %{sock: nil, host: host, port: port, tcp_opts: opts, timeout: timeout} = s) do
|
||||
def connect(_info, %{sock: nil, host: host, port: port, tcp_opts: opts, timeout: timeout} = s) do
|
||||
opts = opts
|
||||
|> Keyword.merge(@immutable_tcp_opts)
|
||||
|> Keyword.put_new(:send_timeout, 1000)
|
||||
|
||||
case :gen_tcp.connect(host, port, opts, timeout) do
|
||||
{:ok, sock} ->
|
||||
new_state = %{s | sock: sock}
|
||||
retry_failed_message(info, new_state)
|
||||
{:ok, %{s | sock: sock}}
|
||||
|
||||
{:error, _} = error ->
|
||||
Logger.error("Failed to connect to #{host}:#{port} due to #{inspect error}")
|
||||
@ -121,30 +112,8 @@ defmodule Thrift.Binary.Framed.Client do
|
||||
end
|
||||
end
|
||||
|
||||
defp retry_failed_message({:retry, _}, %{retry: true, last_message: {call_args, caller}} = state) do
|
||||
case handle_call(call_args, caller, state) do
|
||||
{:reply, response, state} ->
|
||||
Connection.reply(caller, response)
|
||||
{:ok, %{state | last_message: nil}}
|
||||
|
||||
other ->
|
||||
:gen_tcp.close(state.sock)
|
||||
{:close, {:error, other}, nil}
|
||||
end
|
||||
end
|
||||
|
||||
defp retry_failed_message({:retry, orig_error}, %{retry: false, sock: sock}) do
|
||||
:gen_tcp.close(sock)
|
||||
|
||||
{:stop, orig_error, nil}
|
||||
end
|
||||
|
||||
defp retry_failed_message(_, state) do
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
@doc false
|
||||
def disconnect(info, %{sock: sock} = s) do
|
||||
def disconnect(info, %{sock: sock}) do
|
||||
:ok = :gen_tcp.close(sock)
|
||||
|
||||
case info do
|
||||
@ -152,23 +121,14 @@ defmodule Thrift.Binary.Framed.Client do
|
||||
Connection.reply(from, :ok)
|
||||
{:stop, :normal, nil}
|
||||
|
||||
{:error, :closed} ->
|
||||
{:error, :closed} = error ->
|
||||
Logger.error("Connection closed")
|
||||
{:connect, info, %{s | sock: nil}}
|
||||
{:stop, error, nil}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason} = error ->
|
||||
reason = :inet.format_error(reason)
|
||||
Logger.error("Connection error: #{reason}")
|
||||
{:connect, info, %{s | sock: nil}}
|
||||
|
||||
{:retry, _} ->
|
||||
case s.last_message do
|
||||
{type, rpc_name, _, _} ->
|
||||
Logger.info("Retrying #{type} #{rpc_name}")
|
||||
_ ->
|
||||
Logger.info("Retrying failed call")
|
||||
end
|
||||
{:connect, info, %{s | sock: nil}}
|
||||
{:stop, error, nil}
|
||||
end
|
||||
end
|
||||
|
||||
@ -180,7 +140,6 @@ defmodule Thrift.Binary.Framed.Client do
|
||||
"""
|
||||
def oneway(conn, rpc_name, serialized_args, _opts) do
|
||||
:ok = Connection.cast(conn, {:oneway, rpc_name, serialized_args})
|
||||
:ok
|
||||
end
|
||||
|
||||
@spec call(pid, String.t, iodata, module, options) :: protocol_response
|
||||
@ -232,7 +191,7 @@ defmodule Thrift.Binary.Framed.Client do
|
||||
{:reply, {:error, :closed}, s}
|
||||
end
|
||||
|
||||
def handle_call({:call, rpc_name, serialized_args, tcp_opts} = call_args, caller,
|
||||
def handle_call({:call, rpc_name, serialized_args, tcp_opts}, _,
|
||||
%{sock: sock, seq_id: seq_id, timeout: default_timeout} = s) do
|
||||
|
||||
s = %{s | seq_id: seq_id + 1}
|
||||
@ -247,8 +206,8 @@ defmodule Thrift.Binary.Framed.Client do
|
||||
{:error, :timeout} = timeout ->
|
||||
{:reply, timeout, s}
|
||||
|
||||
{:error, :closed} = err ->
|
||||
{:disconnect, {:retry, err}, %{s |last_message: {call_args, caller}}}
|
||||
{:error, :closed} = error ->
|
||||
{:disconnect, error, s}
|
||||
|
||||
{:error, _} = error ->
|
||||
{:disconnect, error, error, s}
|
||||
|
@ -343,7 +343,7 @@ defmodule Thrift.Generator.ServiceTest do
|
||||
refute Process.alive?(ctx.client)
|
||||
end
|
||||
|
||||
thrift_test "clients don't retry by default" do
|
||||
thrift_test "clients exit on connection timeout" do
|
||||
Process.flag(:trap_exit, true)
|
||||
|
||||
new_server_port = random_port()
|
||||
@ -359,41 +359,13 @@ defmodule Thrift.Generator.ServiceTest do
|
||||
end
|
||||
end
|
||||
|
||||
thrift_test "clients retry when making an RPC on a closed server when retry is true" do
|
||||
new_server_port = random_port()
|
||||
{:ok, server} = start_server(new_server_port, 20)
|
||||
{:ok, client} = Client.start_link("127.0.0.1", new_server_port, [tcp_opts: [retry: true]])
|
||||
|
||||
:timer.sleep(50)
|
||||
|
||||
ServerSpy.set_reply([9, 8, 7, 6])
|
||||
|
||||
assert {:ok, [9, 8, 7, 6]} = Client.friend_ids_of(client, 1234)
|
||||
on_exit fn ->
|
||||
stop_server(server)
|
||||
end
|
||||
end
|
||||
|
||||
thrift_test "clients retry when making a oneway call on a closed server when retry is true" do
|
||||
new_server_port = random_port()
|
||||
{:ok, server} = start_server(new_server_port, 20)
|
||||
{:ok, client} = Client.start_link("127.0.0.1", new_server_port, [tcp_opts: [retry: true]])
|
||||
:timer.sleep(50)
|
||||
|
||||
assert {:ok, nil} = Client.do_some_work(client, "Do the work!")
|
||||
|
||||
on_exit fn ->
|
||||
stop_server(server)
|
||||
end
|
||||
end
|
||||
|
||||
thrift_test "clients exit if they try to use a closed client", ctx do
|
||||
Process.flag(:trap_exit, true)
|
||||
stop_server(ctx.server)
|
||||
assert {{:error, :econnrefused}, _} = catch_exit(Client.friend_ids_of(ctx.client, 1234))
|
||||
end
|
||||
|
||||
thrift_test "clients retry if the server dies handling a message", ctx do
|
||||
thrift_test "clients exit if the server dies handling a message", ctx do
|
||||
ref = Process.monitor(ctx.server)
|
||||
|
||||
ServerSpy.set_reply({:sleep, 5000, 1234})
|
||||
@ -418,7 +390,7 @@ defmodule Thrift.Generator.ServiceTest do
|
||||
|
||||
client = ctx.client
|
||||
assert_receive {:DOWN, ^ref, _, _, _}
|
||||
assert_receive {:EXIT, ^client, {:error, :econnrefused}}
|
||||
assert_receive {:EXIT, ^client, {:error, :closed}}
|
||||
end
|
||||
|
||||
thrift_test "it returns :ok on void oneway functions if the server dies", ctx do
|
||||
|
Loading…
Reference in New Issue
Block a user