mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 18:58:51 +00:00
rb: split up benchmark into separate server/client files and distinct interpreters
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669025 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fb5c0eb2b3
commit
d3cee029c9
@ -37,5 +37,5 @@ end
|
|||||||
|
|
||||||
desc 'Run benchmarking of NonblockingServer'
|
desc 'Run benchmarking of NonblockingServer'
|
||||||
task :benchmark do
|
task :benchmark do
|
||||||
ruby 'benchmark/fairness.rb'
|
ruby 'benchmark/benchmark.rb'
|
||||||
end
|
end
|
||||||
|
@ -3,68 +3,47 @@ $:.unshift File.dirname(__FILE__) + '/../lib'
|
|||||||
require 'thrift'
|
require 'thrift'
|
||||||
require 'thrift/server/nonblockingserver'
|
require 'thrift/server/nonblockingserver'
|
||||||
require 'thrift/transport/unixsocket'
|
require 'thrift/transport/unixsocket'
|
||||||
$:.unshift File.dirname(__FILE__) + "/gen-rb"
|
|
||||||
require 'BenchmarkService'
|
|
||||||
require 'thread'
|
|
||||||
require 'stringio'
|
require 'stringio'
|
||||||
|
|
||||||
HOST = 'localhost'
|
HOST = 'localhost'
|
||||||
PORT = 42587
|
PORT = 42587
|
||||||
|
|
||||||
Thread.abort_on_exception = true
|
|
||||||
|
|
||||||
###############
|
###############
|
||||||
## Server
|
## Server
|
||||||
###############
|
###############
|
||||||
|
|
||||||
module Server
|
class Server
|
||||||
include Thrift
|
attr_accessor :serverclass
|
||||||
|
attr_accessor :interpreter
|
||||||
|
attr_accessor :host
|
||||||
|
attr_accessor :port
|
||||||
|
|
||||||
class BenchmarkHandler
|
def initialize(opts)
|
||||||
# 1-based index into the fibonacci sequence
|
@serverclass = opts.fetch(:class, Thrift::NonblockingServer)
|
||||||
def fibonacci(n)
|
@interpreter = opts.fetch(:interpreter, "ruby")
|
||||||
seq = [1, 1]
|
@host = opts.fetch(:host, ::HOST)
|
||||||
3.upto(n) do
|
@port = opts.fetch(:port, ::PORT)
|
||||||
seq << seq[-1] + seq[-2]
|
|
||||||
end
|
|
||||||
seq[n-1] # n is 1-based
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.start_server(serverClass, trans = nil)
|
def start
|
||||||
return if serverClass == Object
|
return if @class == Object
|
||||||
handler = BenchmarkHandler.new
|
@pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/server.rb #{@host} #{@port} #{@serverclass.name}", "r+")
|
||||||
processor = ThriftBenchmark::BenchmarkService::Processor.new(handler)
|
|
||||||
transport = trans || ServerSocket.new(HOST, PORT)
|
|
||||||
transportFactory = FramedTransportFactory.new
|
|
||||||
args = [processor, transport, transportFactory, nil, 20]
|
|
||||||
if serverClass == NonblockingServer
|
|
||||||
logger = Logger.new(STDERR)
|
|
||||||
logger.level = Logger::WARN
|
|
||||||
args << logger
|
|
||||||
end
|
|
||||||
server = serverClass.new(*args)
|
|
||||||
@server_thread = Thread.new do
|
|
||||||
server.serve
|
|
||||||
end
|
|
||||||
@server = server
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.shutdown
|
def shutdown
|
||||||
return if @server.nil?
|
return unless @pipe
|
||||||
if @server.respond_to? :shutdown
|
Marshal.dump(:shutdown, @pipe)
|
||||||
@server.shutdown
|
begin
|
||||||
else
|
@pipe.read(10) # block until the server shuts down
|
||||||
@server_thread.kill
|
rescue EOFError
|
||||||
end
|
end
|
||||||
end
|
@pipe.close
|
||||||
|
@pipe = nil
|
||||||
def self.class
|
|
||||||
@server and @server.class
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
class BenchmarkManager
|
class BenchmarkManager
|
||||||
def initialize(opts)
|
def initialize(opts, server)
|
||||||
@socket = opts.fetch(:socket) do
|
@socket = opts.fetch(:socket) do
|
||||||
@host = opts.fetch(:host, 'localhost')
|
@host = opts.fetch(:host, 'localhost')
|
||||||
@port = opts.fetch(:port)
|
@port = opts.fetch(:port)
|
||||||
@ -73,6 +52,8 @@ class BenchmarkManager
|
|||||||
@num_processes = opts.fetch(:num_processes, 40)
|
@num_processes = opts.fetch(:num_processes, 40)
|
||||||
@clients_per_process = opts.fetch(:clients_per_process, 10)
|
@clients_per_process = opts.fetch(:clients_per_process, 10)
|
||||||
@calls_per_client = opts.fetch(:calls_per_client, 50)
|
@calls_per_client = opts.fetch(:calls_per_client, 50)
|
||||||
|
@interpreter = opts.fetch(:interpreter, "ruby")
|
||||||
|
@server = server
|
||||||
end
|
end
|
||||||
|
|
||||||
def run
|
def run
|
||||||
@ -91,38 +72,8 @@ class BenchmarkManager
|
|||||||
end
|
end
|
||||||
|
|
||||||
def spawn
|
def spawn
|
||||||
rd, wr = IO.pipe
|
pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/client.rb #{@host} #{@port} #{@clients_per_process} #{@calls_per_client}")
|
||||||
pid = fork do
|
@pool << pipe
|
||||||
STDIN.close
|
|
||||||
rd.close
|
|
||||||
@clients_per_process.times do
|
|
||||||
if @socket
|
|
||||||
socket = Thrift::UNIXSocket.new(@socket)
|
|
||||||
else
|
|
||||||
socket = Thrift::Socket.new(@host, @port)
|
|
||||||
end
|
|
||||||
transport = Thrift::FramedTransport.new(socket)
|
|
||||||
protocol = Thrift::BinaryProtocol.new(transport)
|
|
||||||
client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
|
|
||||||
begin
|
|
||||||
transport.open
|
|
||||||
rescue
|
|
||||||
Marshal.dump [:connection_failure, Time.now], wr
|
|
||||||
else
|
|
||||||
Marshal.dump [:start, Time.now], wr
|
|
||||||
@calls_per_client.times do
|
|
||||||
Marshal.dump [:call_start, Time.now], wr
|
|
||||||
client.fibonacci(15)
|
|
||||||
Marshal.dump [:call_end, Time.now], wr
|
|
||||||
end
|
|
||||||
transport.close
|
|
||||||
Marshal.dump [:end, Time.now], wr
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
wr.close
|
|
||||||
@pool << rd
|
|
||||||
pid
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def socket_class
|
def socket_class
|
||||||
@ -212,7 +163,9 @@ class BenchmarkManager
|
|||||||
fmt = "%.4f seconds"
|
fmt = "%.4f seconds"
|
||||||
puts
|
puts
|
||||||
tabulate "%d",
|
tabulate "%d",
|
||||||
[["Server class", "%s"], Server.class],
|
[["Server class", "%s"], @server.serverclass],
|
||||||
|
[["Server interpreter", "%s"], @server.interpreter],
|
||||||
|
[["Client interpreter", "%s"], @interpreter],
|
||||||
[["Socket class", "%s"], socket_class],
|
[["Socket class", "%s"], socket_class],
|
||||||
["Number of processes", @num_processes],
|
["Number of processes", @num_processes],
|
||||||
["Clients per process", @clients_per_process],
|
["Clients per process", @clients_per_process],
|
||||||
@ -245,21 +198,16 @@ def resolve_const(const)
|
|||||||
end
|
end
|
||||||
|
|
||||||
puts "Starting server..."
|
puts "Starting server..."
|
||||||
serverklass = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
|
args = {}
|
||||||
servertrans = nil
|
args[:interpreter] = ENV['THRIFT_SERVER_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby"
|
||||||
if ENV['THRIFT_SOCKET']
|
args[:class] = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
|
||||||
servertrans = Thrift::UNIXServerSocket.new(ENV['THRIFT_SOCKET'])
|
server = Server.new(args)
|
||||||
end
|
server.start
|
||||||
Server.start_server(serverklass, servertrans)
|
|
||||||
|
|
||||||
sleep 0.2 # give the server time to start
|
sleep 0.2 # give the server time to start
|
||||||
|
|
||||||
args = { :num_processes => 40, :clients_per_process => 5 }
|
args = { :num_processes => 40, :clients_per_process => 5, :host => HOST, :port => PORT }
|
||||||
if ENV['THRIFT_SOCKET']
|
args[:interpreter] = ENV['THRIFT_CLIENT_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby"
|
||||||
args[:socket] = ENV['THRIFT_SOCKET']
|
BenchmarkManager.new(args, server).run
|
||||||
else
|
|
||||||
args.merge!(:host => HOST, :port => PORT)
|
|
||||||
end
|
|
||||||
BenchmarkManager.new(args).run
|
|
||||||
|
|
||||||
Server.shutdown
|
server.shutdown
|
41
lib/rb/benchmark/client.rb
Normal file
41
lib/rb/benchmark/client.rb
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
$:.unshift File.dirname(__FILE__) + '/../lib'
|
||||||
|
require 'thrift'
|
||||||
|
require 'thrift/server/nonblockingserver'
|
||||||
|
$:.unshift File.dirname(__FILE__) + "/gen-rb"
|
||||||
|
require 'BenchmarkService'
|
||||||
|
|
||||||
|
class Client
|
||||||
|
def initialize(host, port, clients_per_process, calls_per_client)
|
||||||
|
@host = host
|
||||||
|
@port = port
|
||||||
|
@clients_per_process = clients_per_process
|
||||||
|
@calls_per_client = calls_per_client
|
||||||
|
end
|
||||||
|
|
||||||
|
def run
|
||||||
|
@clients_per_process.times do
|
||||||
|
socket = Thrift::Socket.new(@host, @port)
|
||||||
|
transport = Thrift::FramedTransport.new(socket)
|
||||||
|
protocol = Thrift::BinaryProtocol.new(transport)
|
||||||
|
client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
|
||||||
|
begin
|
||||||
|
transport.open
|
||||||
|
rescue
|
||||||
|
Marshal.dump [:connection_failure, Time.now], STDOUT
|
||||||
|
else
|
||||||
|
Marshal.dump [:start, Time.now], STDOUT
|
||||||
|
@calls_per_client.times do
|
||||||
|
Marshal.dump [:call_start, Time.now], STDOUT
|
||||||
|
client.fibonacci(15)
|
||||||
|
Marshal.dump [:call_end, Time.now], STDOUT
|
||||||
|
end
|
||||||
|
transport.close
|
||||||
|
Marshal.dump [:end, Time.now], STDOUT
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
host, port, clients_per_process, calls_per_client = ARGV
|
||||||
|
|
||||||
|
Client.new(host, port.to_i, clients_per_process.to_i, calls_per_client.to_i).run
|
59
lib/rb/benchmark/server.rb
Normal file
59
lib/rb/benchmark/server.rb
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
$:.unshift File.dirname(__FILE__) + '/../lib'
|
||||||
|
require 'thrift'
|
||||||
|
require 'thrift/server/nonblockingserver'
|
||||||
|
$:.unshift File.dirname(__FILE__) + "/gen-rb"
|
||||||
|
require 'BenchmarkService'
|
||||||
|
|
||||||
|
module Server
|
||||||
|
include Thrift
|
||||||
|
|
||||||
|
class BenchmarkHandler
|
||||||
|
# 1-based index into the fibonacci sequence
|
||||||
|
def fibonacci(n)
|
||||||
|
seq = [1, 1]
|
||||||
|
3.upto(n) do
|
||||||
|
seq << seq[-1] + seq[-2]
|
||||||
|
end
|
||||||
|
seq[n-1] # n is 1-based
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.start_server(host, port, serverClass)
|
||||||
|
handler = BenchmarkHandler.new
|
||||||
|
processor = ThriftBenchmark::BenchmarkService::Processor.new(handler)
|
||||||
|
transport = ServerSocket.new(host, port)
|
||||||
|
transportFactory = FramedTransportFactory.new
|
||||||
|
args = [processor, transport, transportFactory, nil, 20]
|
||||||
|
if serverClass == NonblockingServer
|
||||||
|
logger = Logger.new(STDERR)
|
||||||
|
logger.level = Logger::WARN
|
||||||
|
args << logger
|
||||||
|
end
|
||||||
|
server = serverClass.new(*args)
|
||||||
|
@server_thread = Thread.new do
|
||||||
|
server.serve
|
||||||
|
end
|
||||||
|
@server = server
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.shutdown
|
||||||
|
return if @server.nil?
|
||||||
|
if @server.respond_to? :shutdown
|
||||||
|
@server.shutdown
|
||||||
|
else
|
||||||
|
@server_thread.kill
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def resolve_const(const)
|
||||||
|
const and const.split('::').inject(Object) { |k,c| k.const_get(c) }
|
||||||
|
end
|
||||||
|
|
||||||
|
host, port, serverklass = ARGV
|
||||||
|
|
||||||
|
Server.start_server(host, port.to_i, resolve_const(serverklass))
|
||||||
|
|
||||||
|
Marshal.load(STDIN)
|
||||||
|
|
||||||
|
Server.shutdown
|
Loading…
Reference in New Issue
Block a user