mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 10:48:51 +00:00
rb: Implement Thrift::UNIXSocket and Thrift::UNIXServerSocket
In benchmarking it turns out these don't give any noticeable performance boost, but as I've already written them, somebody may want them for something. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669019 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
138c0e126f
commit
2ddd8ed40f
@ -2,6 +2,7 @@ require 'rubygems'
|
||||
$:.unshift File.dirname(__FILE__) + '/../lib'
|
||||
require 'thrift'
|
||||
require 'thrift/server/nonblockingserver'
|
||||
require 'thrift/transport/unixsocket'
|
||||
$:.unshift File.dirname(__FILE__) + "/gen-rb"
|
||||
require 'BenchmarkService'
|
||||
require 'thread'
|
||||
@ -29,10 +30,10 @@ module Server
|
||||
end
|
||||
end
|
||||
|
||||
def self.start_server(serverClass)
|
||||
def self.start_server(serverClass, trans = nil)
|
||||
handler = BenchmarkHandler.new
|
||||
processor = ThriftBenchmark::BenchmarkService::Processor.new(handler)
|
||||
transport = ServerSocket.new(HOST, PORT)
|
||||
transport = trans || ServerSocket.new(HOST, PORT)
|
||||
transportFactory = FramedTransportFactory.new
|
||||
args = [processor, transport, transportFactory, nil, 20]
|
||||
if serverClass == NonblockingServer
|
||||
@ -60,24 +61,13 @@ module Server
|
||||
end
|
||||
end
|
||||
|
||||
module Client
|
||||
include Thrift
|
||||
|
||||
def self.start_client(&block)
|
||||
transport = FramedTransport.new(Socket.new(HOST, PORT))
|
||||
protocol = BinaryProtocol.new(transport)
|
||||
client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
|
||||
# transport.open
|
||||
Thread.new do
|
||||
block.call(client, transport)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class BenchmarkManager
|
||||
def initialize(opts)
|
||||
@host = opts.fetch(:host, 'localhost')
|
||||
@port = opts.fetch(:port)
|
||||
@socket = opts.fetch(:socket) do
|
||||
@host = opts.fetch(:host, 'localhost')
|
||||
@port = opts.fetch(:port)
|
||||
nil
|
||||
end
|
||||
@num_processes = opts.fetch(:num_processes, 40)
|
||||
@clients_per_process = opts.fetch(:clients_per_process, 10)
|
||||
@calls_per_client = opts.fetch(:calls_per_client, 50)
|
||||
@ -104,7 +94,12 @@ class BenchmarkManager
|
||||
STDIN.close
|
||||
rd.close
|
||||
@clients_per_process.times do
|
||||
transport = Thrift::FramedTransport.new(Thrift::Socket.new(@host, @port))
|
||||
if @socket
|
||||
socket = Thrift::UNIXSocket.new(@socket)
|
||||
else
|
||||
socket = Thrift::Socket.new(@host, @port)
|
||||
end
|
||||
transport = Thrift::FramedTransport.new(socket)
|
||||
protocol = Thrift::BinaryProtocol.new(transport)
|
||||
client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
|
||||
begin
|
||||
@ -128,6 +123,14 @@ class BenchmarkManager
|
||||
pid
|
||||
end
|
||||
|
||||
def socket_class
|
||||
if @socket
|
||||
Thrift::UNIXSocket
|
||||
else
|
||||
Thrift::Socket
|
||||
end
|
||||
end
|
||||
|
||||
def collect_output
|
||||
puts "Collecting output..."
|
||||
# read from @pool until all sockets are closed
|
||||
@ -208,6 +211,7 @@ class BenchmarkManager
|
||||
puts
|
||||
tabulate "%d",
|
||||
[["Server class", "%s"], Server.class],
|
||||
[["Socket class", "%s"], socket_class],
|
||||
["Number of processes", @num_processes],
|
||||
["Clients per process", @clients_per_process],
|
||||
["Calls per client", @calls_per_client],
|
||||
@ -219,7 +223,7 @@ class BenchmarkManager
|
||||
["Average time per client (%d calls)" % @calls_per_client, @report[:avg_clients]],
|
||||
["Total time for all calls", @report[:total_calls]],
|
||||
["Real time for benchmarking", @report[:total_benchmark_time]],
|
||||
["Longest call time", @report[:longest_call]],
|
||||
["Shortest call time", @report[:longest_call]],
|
||||
["Longest client time (%d calls)" % @calls_per_client, @report[:longest_client]]
|
||||
end
|
||||
|
||||
@ -240,10 +244,20 @@ end
|
||||
|
||||
puts "Starting server..."
|
||||
serverklass = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
|
||||
Server.start_server(serverklass)
|
||||
servertrans = nil
|
||||
if ENV['THRIFT_SOCKET']
|
||||
servertrans = Thrift::UNIXServerSocket.new(ENV['THRIFT_SOCKET'])
|
||||
end
|
||||
Server.start_server(serverklass, servertrans)
|
||||
|
||||
sleep 0.2 # give the server time to start
|
||||
|
||||
BenchmarkManager.new(:host => HOST, :port => PORT, :num_processes => 40, :clients_per_process => 5).run
|
||||
args = { :num_processes => 40, :clients_per_process => 5 }
|
||||
if ENV['THRIFT_SOCKET']
|
||||
args[:socket] = ENV['THRIFT_SOCKET']
|
||||
else
|
||||
args.merge!(:host => HOST, :port => PORT)
|
||||
end
|
||||
BenchmarkManager.new(args).run
|
||||
|
||||
Server.shutdown
|
||||
|
@ -14,6 +14,7 @@ module Thrift
|
||||
def initialize(host='localhost', port=9090)
|
||||
@host = host
|
||||
@port = port
|
||||
@desc = "#{host}:#{port}"
|
||||
@handle = nil
|
||||
end
|
||||
|
||||
@ -23,7 +24,7 @@ module Thrift
|
||||
begin
|
||||
@handle = TCPSocket.new(@host, @port)
|
||||
rescue StandardError
|
||||
raise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@host}:#{@port}")
|
||||
raise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@desc}")
|
||||
end
|
||||
end
|
||||
|
||||
@ -57,7 +58,7 @@ module Thrift
|
||||
raise TransportException.new(TransportException::NOT_OPEN, e.message)
|
||||
end
|
||||
if (data.nil? or data.length == 0)
|
||||
raise TransportException.new(TransportException::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@host}:#{@port}")
|
||||
raise TransportException.new(TransportException::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@desc}")
|
||||
end
|
||||
data
|
||||
end
|
||||
|
51
lib/rb/lib/thrift/transport/unixsocket.rb
Normal file
51
lib/rb/lib/thrift/transport/unixsocket.rb
Normal file
@ -0,0 +1,51 @@
|
||||
require 'thrift/transport'
|
||||
require 'socket'
|
||||
|
||||
module Thrift
|
||||
class UNIXSocket < Socket
|
||||
def initialize(path)
|
||||
@path = path
|
||||
@desc = @path # for read()'s error
|
||||
@handle = nil
|
||||
end
|
||||
|
||||
def open
|
||||
begin
|
||||
@handle = ::UNIXSocket.new(@path)
|
||||
rescue StandardError
|
||||
raise TransportException.new(TransportException::NOT_OPEN, "Could not open UNIX socket at #{@path}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class UNIXServerSocket < ServerTransport
|
||||
def initialize(path)
|
||||
@path = path
|
||||
@handle = nil
|
||||
end
|
||||
|
||||
attr_accessor :handle
|
||||
|
||||
def listen
|
||||
@handle = ::UNIXServer.new(@path)
|
||||
end
|
||||
|
||||
def accept
|
||||
unless @handle.nil?
|
||||
sock = @handle.accept
|
||||
trans = UNIXSocket.new(nil)
|
||||
trans.handle = sock
|
||||
trans
|
||||
end
|
||||
end
|
||||
|
||||
def close
|
||||
if @handle
|
||||
@handle.close unless @handle.closed?
|
||||
@handle = nil
|
||||
# UNIXServer doesn't delete the socket file, so we have to do it ourselves
|
||||
File.delete(@path)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
@ -1,81 +1,33 @@
|
||||
require File.dirname(__FILE__) + '/spec_helper'
|
||||
require File.dirname(__FILE__) + "/socket_spec_shared"
|
||||
|
||||
class ThriftSocketSpec < Spec::ExampleGroup
|
||||
include Thrift
|
||||
|
||||
before(:each) do
|
||||
@socket = Socket.new
|
||||
@handle = mock("Handle", :closed? => false)
|
||||
@handle.stub!(:close)
|
||||
end
|
||||
|
||||
describe Socket do
|
||||
it "should open a TCPSocket" do
|
||||
TCPSocket.should_receive(:new).with('localhost', 9090).and_return(@handle)
|
||||
@socket.open.should == @handle
|
||||
before(:each) do
|
||||
@socket = Socket.new
|
||||
@handle = mock("Handle", :closed? => false)
|
||||
@handle.stub!(:close)
|
||||
TCPSocket.stub!(:new).and_return(@handle)
|
||||
end
|
||||
|
||||
it_should_behave_like "a socket"
|
||||
|
||||
it "should raise a TransportException when it cannot open a socket" do
|
||||
TCPSocket.should_receive(:new).and_raise(StandardError)
|
||||
lambda { @socket.open }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
|
||||
end
|
||||
|
||||
it "should open a TCPSocket with default args" do
|
||||
TCPSocket.should_receive(:new).with('localhost', 9090)
|
||||
@socket.open
|
||||
end
|
||||
|
||||
it "should accept host/port options" do
|
||||
TCPSocket.should_receive(:new).with('my.domain', 1234)
|
||||
Socket.new('my.domain', 1234).open
|
||||
end
|
||||
|
||||
it "should raise a TransportException when it cannot open a socket" do
|
||||
TCPSocket.should_receive(:new).with('localhost', 9090).and_raise(StandardError)
|
||||
lambda { @socket.open }.should raise_error(TransportException, "Could not connect to localhost:9090") { |e| e.type.should == TransportException::NOT_OPEN }
|
||||
end
|
||||
|
||||
it "should be open whenever it has a handle" do
|
||||
@socket.should_not be_open
|
||||
TCPSocket.should_receive(:new).and_return(@handle)
|
||||
@socket.open
|
||||
@socket.should be_open
|
||||
@socket.handle = nil
|
||||
@socket.should_not be_open
|
||||
@socket.handle = @handle
|
||||
@handle.should_receive(:close)
|
||||
@socket.close
|
||||
@socket.should_not be_open
|
||||
end
|
||||
|
||||
it "should write data to the handle" do
|
||||
TCPSocket.should_receive(:new).and_return(@handle)
|
||||
@socket.open
|
||||
@handle.should_receive(:write).with("foobar")
|
||||
@socket.write("foobar")
|
||||
@handle.should_receive(:write).with("fail").and_raise(StandardError)
|
||||
lambda { @socket.write("fail") }.should raise_error(TransportException) { |e| e.type.should == TransportException::NOT_OPEN }
|
||||
end
|
||||
|
||||
it "should raise an error when it cannot read from the handle" do
|
||||
TCPSocket.should_receive(:new).and_return(@handle)
|
||||
@socket.open
|
||||
@handle.should_receive(:read).with(17).and_raise(StandardError)
|
||||
lambda { @socket.read(17) }.should raise_error(TransportException) { |e| e.type.should == TransportException::NOT_OPEN }
|
||||
end
|
||||
|
||||
it "should raise an error when it reads no data from the handle" do
|
||||
TCPSocket.should_receive(:new).and_return(@handle)
|
||||
@socket.open
|
||||
@handle.should_receive(:read).with(17).and_return("")
|
||||
lambda { @socket.read(17) }.should raise_error(TransportException, "Socket: Could not read 17 bytes from localhost:9090")
|
||||
end
|
||||
|
||||
it "should return the data read when reading from the handle works" do
|
||||
TCPSocket.should_receive(:new).and_return(@handle)
|
||||
@socket.open
|
||||
@handle.should_receive(:read).with(17).and_return("test data")
|
||||
@socket.read(17).should == "test data"
|
||||
end
|
||||
|
||||
it "should declare itself as closed when it has an error" do
|
||||
TCPSocket.should_receive(:new).and_return(@handle)
|
||||
@socket.open
|
||||
@handle.should_receive(:write).with("fail").and_raise(StandardError)
|
||||
@socket.should be_open
|
||||
lambda { @socket.write("fail") }.should raise_error
|
||||
@socket.should_not be_open
|
||||
end
|
||||
end
|
||||
|
||||
describe ServerSocket do
|
||||
@ -84,8 +36,8 @@ class ThriftSocketSpec < Spec::ExampleGroup
|
||||
end
|
||||
|
||||
it "should create a handle when calling listen" do
|
||||
TCPServer.should_receive(:new).with(nil, 1234)
|
||||
@socket.listen
|
||||
@socket.handle.should be_an_instance_of(TCPServer)
|
||||
end
|
||||
|
||||
it "should accept an optional host argument" do
|
||||
|
52
lib/rb/spec/socket_spec_shared.rb
Normal file
52
lib/rb/spec/socket_spec_shared.rb
Normal file
@ -0,0 +1,52 @@
|
||||
require File.dirname(__FILE__) + '/spec_helper'
|
||||
|
||||
shared_examples_for "a socket" do
|
||||
it "should open a socket" do
|
||||
@socket.open.should == @handle
|
||||
end
|
||||
|
||||
it "should be open whenever it has a handle" do
|
||||
@socket.should_not be_open
|
||||
@socket.open
|
||||
@socket.should be_open
|
||||
@socket.handle = nil
|
||||
@socket.should_not be_open
|
||||
@socket.handle = @handle
|
||||
@socket.close
|
||||
@socket.should_not be_open
|
||||
end
|
||||
|
||||
it "should write data to the handle" do
|
||||
@socket.open
|
||||
@handle.should_receive(:write).with("foobar")
|
||||
@socket.write("foobar")
|
||||
@handle.should_receive(:write).with("fail").and_raise(StandardError)
|
||||
lambda { @socket.write("fail") }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
|
||||
end
|
||||
|
||||
it "should raise an error when it cannot read from the handle" do
|
||||
@socket.open
|
||||
@handle.should_receive(:read).with(17).and_raise(StandardError)
|
||||
lambda { @socket.read(17) }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
|
||||
end
|
||||
|
||||
it "should raise an error when it reads no data from the handle" do
|
||||
@socket.open
|
||||
@handle.should_receive(:read).with(17).and_return("")
|
||||
lambda { @socket.read(17) }.should raise_error(Thrift::TransportException, "Socket: Could not read 17 bytes from #{@socket.instance_variable_get("@desc")}")
|
||||
end
|
||||
|
||||
it "should return the data read when reading from the handle works" do
|
||||
@socket.open
|
||||
@handle.should_receive(:read).with(17).and_return("test data")
|
||||
@socket.read(17).should == "test data"
|
||||
end
|
||||
|
||||
it "should declare itself as closed when it has an error" do
|
||||
@socket.open
|
||||
@handle.should_receive(:write).with("fail").and_raise(StandardError)
|
||||
@socket.should be_open
|
||||
lambda { @socket.write("fail") }.should raise_error
|
||||
@socket.should_not be_open
|
||||
end
|
||||
end
|
60
lib/rb/spec/unixsocket_spec.rb
Normal file
60
lib/rb/spec/unixsocket_spec.rb
Normal file
@ -0,0 +1,60 @@
|
||||
require File.dirname(__FILE__) + '/spec_helper'
|
||||
require 'thrift/transport/unixsocket'
|
||||
require File.dirname(__FILE__) + "/socket_spec_shared"
|
||||
|
||||
class ThriftUNIXSocketSpec < Spec::ExampleGroup
|
||||
include Thrift
|
||||
|
||||
describe UNIXSocket do
|
||||
before(:each) do
|
||||
@path = '/tmp/thrift_spec_socket'
|
||||
@socket = UNIXSocket.new(@path)
|
||||
@handle = mock("Handle", :closed? => false)
|
||||
@handle.stub!(:close)
|
||||
::UNIXSocket.stub!(:new).and_return(@handle)
|
||||
end
|
||||
|
||||
it_should_behave_like "a socket"
|
||||
|
||||
it "should raise a TransportException when it cannot open a socket" do
|
||||
::UNIXSocket.should_receive(:new).and_raise(StandardError)
|
||||
lambda { @socket.open }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
|
||||
end
|
||||
end
|
||||
|
||||
describe UNIXServerSocket do
|
||||
before(:each) do
|
||||
@path = '/tmp/thrift_spec_socket'
|
||||
@socket = UNIXServerSocket.new(@path)
|
||||
end
|
||||
|
||||
it "should create a handle when calling listen" do
|
||||
UNIXServer.should_receive(:new).with(@path)
|
||||
@socket.listen
|
||||
end
|
||||
|
||||
it "should create a Thrift::UNIXSocket to wrap accepted sockets" do
|
||||
handle = mock("UNIXServer")
|
||||
UNIXServer.should_receive(:new).with(@path).and_return(handle)
|
||||
@socket.listen
|
||||
sock = mock("sock")
|
||||
handle.should_receive(:accept).and_return(sock)
|
||||
trans = mock("UNIXSocket")
|
||||
UNIXSocket.should_receive(:new).and_return(trans)
|
||||
trans.should_receive(:handle=).with(sock)
|
||||
@socket.accept.should == trans
|
||||
end
|
||||
|
||||
it "should close the handle when closed" do
|
||||
handle = mock("UNIXServer", :closed? => false)
|
||||
UNIXServer.should_receive(:new).with(@path).and_return(handle)
|
||||
@socket.listen
|
||||
handle.should_receive(:close)
|
||||
@socket.close
|
||||
end
|
||||
|
||||
it "should return nil when accepting if there is no handle" do
|
||||
@socket.accept.should be_nil
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue
Block a user