mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 18:58:51 +00:00
Implement NonblockingServer and add specs
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@668999 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
90a2cbe0a4
commit
e0fdddea44
129
lib/rb/lib/thrift/server/nonblockingserver.rb
Normal file
129
lib/rb/lib/thrift/server/nonblockingserver.rb
Normal file
@ -0,0 +1,129 @@
|
||||
require 'thrift/server'
|
||||
|
||||
# thrift/server already imports fastthread/thread
|
||||
|
||||
module Thrift
|
||||
# this class expects to always use a FramedTransport for reading messages
|
||||
#--
|
||||
# this isn't very pretty, but we're working around the fact that FramedTransport
|
||||
# and the processors are all written in a synchronous manner.
|
||||
# So lets read data off the wire ourselves, check if we have a full frame, and
|
||||
# only then hand it to the transport to parse
|
||||
#
|
||||
# we inherit from ThreadPoolServer for the initialize/rescuable_serve methods
|
||||
class NonblockingServer < ThreadPoolServer
|
||||
def serve
|
||||
@server_thread = Thread.current
|
||||
@serverTransport.listen
|
||||
|
||||
begin
|
||||
connections = {}
|
||||
running_connections = {}
|
||||
# the swapping_connections stuff is to ensure the thread doesn't
|
||||
# put the connection back into the regular list, then have the server
|
||||
# thread process it again, then have the first thread remove it from
|
||||
# the running_connections list
|
||||
swapping_connections = {}
|
||||
thread_group = ThreadGroup.new
|
||||
loop do
|
||||
break if @shutdown
|
||||
rd, = select([@serverTransport.handle, *connections.keys])
|
||||
next if rd.nil?
|
||||
rd.each do |socket|
|
||||
if socket == @serverTransport.handle
|
||||
client = @serverTransport.accept
|
||||
buffer = ''
|
||||
outtrans = @transportFactory.get_transport(client)
|
||||
outprot = @protocolFactory.get_protocol(outtrans)
|
||||
connections[client.handle] = [client, buffer, outtrans, outprot]
|
||||
else
|
||||
client, buffer, outtrans, outprot = connections[socket]
|
||||
if socket.eof?
|
||||
client.close
|
||||
connections.delete(socket)
|
||||
else
|
||||
buffer << client.read(4096, true)
|
||||
if has_full_frame?(buffer)
|
||||
running_connections[socket] = connections.delete(socket)
|
||||
@thread_q.push :token
|
||||
t = Thread.new(Thread.current) do |master|
|
||||
begin
|
||||
membuf = MemoryBuffer.new(buffer)
|
||||
intrans = @transportFactory.get_transport(membuf)
|
||||
inprot = @protocolFactory.get_protocol(intrans)
|
||||
@processor.process(inprot, outprot)
|
||||
if @shutdown
|
||||
client.close
|
||||
running_connections.delete(socket)
|
||||
else
|
||||
swapping_connections[socket] = running_connections.delete(socket)
|
||||
master.wakeup
|
||||
end
|
||||
rescue => e
|
||||
outtrans.close
|
||||
@exception_q.push e
|
||||
ensure
|
||||
running_connections.delete(socket)
|
||||
connections[socket] = swapping_connections.delete(socket) if swapping_connections.include? socket
|
||||
intrans.close
|
||||
@thread_q.pop
|
||||
end
|
||||
end
|
||||
thread_group.add t
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
if @shutdown
|
||||
@serverTransport.close
|
||||
connections.merge! running_connections
|
||||
connections.merge! swapping_connections
|
||||
connections.values.each do |client, buffer, outtrans, outprot|
|
||||
# can't close completely or we'll break active messages
|
||||
# but lets at least stop accepting input
|
||||
client.handle.close_read
|
||||
end
|
||||
start = Time.now.to_f
|
||||
until thread_group.list.empty?
|
||||
if @shutdown_timeout
|
||||
now = Time.now.to_f
|
||||
cur_timeout = @shutdown_timeout - (now - start)
|
||||
break if cur_timeout <= 0
|
||||
thread_group.list.first.join(cur_timeout)
|
||||
else
|
||||
thread_group.list.first.join
|
||||
end
|
||||
end
|
||||
thread_group.list.each { |t| t.kill } if @shutdown_kill
|
||||
# now kill connections completely if they still exists
|
||||
connections.values.each do |client, buffer, outtrans, outprot|
|
||||
client.close
|
||||
end
|
||||
end
|
||||
ensure
|
||||
@serverTransport.close
|
||||
end
|
||||
end
|
||||
|
||||
# Stop accepting new messages and wait for active messages to finish
|
||||
# If the given timeout passes without the active messages finishing,
|
||||
# control will exit from #serve and leave the remaining threads active.
|
||||
# If you pass true for kill, the remaining threads will be reaped instead.
|
||||
# A false timeout means wait indefinitely
|
||||
def shutdown(timeout = nil, kill = false)
|
||||
@shutdown_timeout = timeout
|
||||
@shutdown_kill = kill
|
||||
@shutdown = true
|
||||
@server_thread.wakeup
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def has_full_frame?(buf)
|
||||
return no unless buf.length >= 4
|
||||
size = buf.unpack('N').first
|
||||
size + 4 <= buf.length
|
||||
end
|
||||
end
|
||||
end
|
@ -16,3 +16,11 @@ struct Foo {
|
||||
struct BoolStruct {
|
||||
1: bool yesno = 1
|
||||
}
|
||||
|
||||
service NonblockingService {
|
||||
Hello greeting(1:bool english)
|
||||
bool block()
|
||||
async void unblock()
|
||||
async void shutdown()
|
||||
void sleep(1:double seconds)
|
||||
}
|
||||
|
192
lib/rb/spec/gen-rb/NonblockingService.rb
Normal file
192
lib/rb/spec/gen-rb/NonblockingService.rb
Normal file
@ -0,0 +1,192 @@
|
||||
#
|
||||
# Autogenerated by Thrift
|
||||
#
|
||||
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
#
|
||||
|
||||
require 'thrift/protocol'
|
||||
require 'thrift'
|
||||
require 'ThriftSpec_types'
|
||||
|
||||
module SpecNamespace
|
||||
module NonblockingService
|
||||
class Client
|
||||
include Thrift::Client
|
||||
|
||||
def greeting(english)
|
||||
send_greeting(english)
|
||||
return recv_greeting()
|
||||
end
|
||||
|
||||
def send_greeting(english)
|
||||
send_message('greeting', Greeting_args, :english => english)
|
||||
end
|
||||
|
||||
def recv_greeting()
|
||||
result = receive_message(Greeting_result)
|
||||
return result.success unless result.success.nil?
|
||||
raise Thrift::ApplicationException.new(Thrift::ApplicationException::MISSING_RESULT, 'greeting failed: unknown result')
|
||||
end
|
||||
|
||||
def block()
|
||||
send_block()
|
||||
return recv_block()
|
||||
end
|
||||
|
||||
def send_block()
|
||||
send_message('block', Block_args)
|
||||
end
|
||||
|
||||
def recv_block()
|
||||
result = receive_message(Block_result)
|
||||
return result.success unless result.success.nil?
|
||||
raise Thrift::ApplicationException.new(Thrift::ApplicationException::MISSING_RESULT, 'block failed: unknown result')
|
||||
end
|
||||
|
||||
def unblock()
|
||||
send_unblock()
|
||||
end
|
||||
|
||||
def send_unblock()
|
||||
send_message('unblock', Unblock_args)
|
||||
end
|
||||
def shutdown()
|
||||
send_shutdown()
|
||||
end
|
||||
|
||||
def send_shutdown()
|
||||
send_message('shutdown', Shutdown_args)
|
||||
end
|
||||
def sleep(seconds)
|
||||
send_sleep(seconds)
|
||||
recv_sleep()
|
||||
end
|
||||
|
||||
def send_sleep(seconds)
|
||||
send_message('sleep', Sleep_args, :seconds => seconds)
|
||||
end
|
||||
|
||||
def recv_sleep()
|
||||
result = receive_message(Sleep_result)
|
||||
return
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
class Processor
|
||||
include Thrift::Processor
|
||||
|
||||
def process_greeting(seqid, iprot, oprot)
|
||||
args = read_args(iprot, Greeting_args)
|
||||
result = Greeting_result.new()
|
||||
result.success = @handler.greeting(args.english)
|
||||
write_result(result, oprot, 'greeting', seqid)
|
||||
end
|
||||
|
||||
def process_block(seqid, iprot, oprot)
|
||||
args = read_args(iprot, Block_args)
|
||||
result = Block_result.new()
|
||||
result.success = @handler.block()
|
||||
write_result(result, oprot, 'block', seqid)
|
||||
end
|
||||
|
||||
def process_unblock(seqid, iprot, oprot)
|
||||
args = read_args(iprot, Unblock_args)
|
||||
@handler.unblock()
|
||||
return
|
||||
end
|
||||
|
||||
def process_shutdown(seqid, iprot, oprot)
|
||||
args = read_args(iprot, Shutdown_args)
|
||||
@handler.shutdown()
|
||||
return
|
||||
end
|
||||
|
||||
def process_sleep(seqid, iprot, oprot)
|
||||
args = read_args(iprot, Sleep_args)
|
||||
result = Sleep_result.new()
|
||||
@handler.sleep(args.seconds)
|
||||
write_result(result, oprot, 'sleep', seqid)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
# HELPER FUNCTIONS AND STRUCTURES
|
||||
|
||||
class Greeting_args
|
||||
include Thrift::Struct
|
||||
attr_accessor :english
|
||||
FIELDS = {
|
||||
1 => {:type => Thrift::Types::BOOL, :name => 'english'}
|
||||
}
|
||||
end
|
||||
|
||||
class Greeting_result
|
||||
include Thrift::Struct
|
||||
attr_accessor :success
|
||||
FIELDS = {
|
||||
0 => {:type => Thrift::Types::STRUCT, :name => 'success', :class => Hello}
|
||||
}
|
||||
end
|
||||
|
||||
class Block_args
|
||||
include Thrift::Struct
|
||||
FIELDS = {
|
||||
|
||||
}
|
||||
end
|
||||
|
||||
class Block_result
|
||||
include Thrift::Struct
|
||||
attr_accessor :success
|
||||
FIELDS = {
|
||||
0 => {:type => Thrift::Types::BOOL, :name => 'success'}
|
||||
}
|
||||
end
|
||||
|
||||
class Unblock_args
|
||||
include Thrift::Struct
|
||||
FIELDS = {
|
||||
|
||||
}
|
||||
end
|
||||
|
||||
class Unblock_result
|
||||
include Thrift::Struct
|
||||
FIELDS = {
|
||||
|
||||
}
|
||||
end
|
||||
|
||||
class Shutdown_args
|
||||
include Thrift::Struct
|
||||
FIELDS = {
|
||||
|
||||
}
|
||||
end
|
||||
|
||||
class Shutdown_result
|
||||
include Thrift::Struct
|
||||
FIELDS = {
|
||||
|
||||
}
|
||||
end
|
||||
|
||||
class Sleep_args
|
||||
include Thrift::Struct
|
||||
attr_accessor :seconds
|
||||
FIELDS = {
|
||||
1 => {:type => Thrift::Types::DOUBLE, :name => 'seconds'}
|
||||
}
|
||||
end
|
||||
|
||||
class Sleep_result
|
||||
include Thrift::Struct
|
||||
FIELDS = {
|
||||
|
||||
}
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
166
lib/rb/spec/nonblockingserver_spec.rb
Normal file
166
lib/rb/spec/nonblockingserver_spec.rb
Normal file
@ -0,0 +1,166 @@
|
||||
require File.dirname(__FILE__) + '/spec_helper'
|
||||
require 'thrift/server/nonblockingserver'
|
||||
$:.unshift File.dirname(__FILE__) + '/gen-rb'
|
||||
require 'NonblockingService'
|
||||
|
||||
class ThriftNonblockingServerSpec < Spec::ExampleGroup
|
||||
include Thrift
|
||||
include SpecNamespace
|
||||
|
||||
class Handler
|
||||
def initialize
|
||||
@queue = Queue.new
|
||||
end
|
||||
|
||||
attr_accessor :server
|
||||
|
||||
def greeting(english)
|
||||
if english
|
||||
SpecNamespace::Hello.new
|
||||
else
|
||||
SpecNamespace::Hello.new(:greeting => "Aloha!")
|
||||
end
|
||||
end
|
||||
|
||||
def block
|
||||
@queue.pop
|
||||
end
|
||||
|
||||
def unblock
|
||||
@queue.num_waiting.times { @queue.push true }
|
||||
end
|
||||
|
||||
def sleep(time)
|
||||
Kernel.sleep time
|
||||
end
|
||||
|
||||
def shutdown
|
||||
@server.shutdown
|
||||
end
|
||||
end
|
||||
|
||||
before(:each) do
|
||||
@port = 43251
|
||||
handler = Handler.new
|
||||
processor = NonblockingService::Processor.new(handler)
|
||||
@transport = ServerSocket.new('localhost', @port)
|
||||
transportFactory = FramedTransportFactory.new
|
||||
@server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5)
|
||||
handler.server = @server
|
||||
@server_thread = Thread.new do
|
||||
begin
|
||||
@server.serve
|
||||
rescue => e
|
||||
p e
|
||||
puts e.backtrace * "\n"
|
||||
raise e
|
||||
end
|
||||
end
|
||||
Thread.pass
|
||||
|
||||
@clients = []
|
||||
end
|
||||
|
||||
after(:each) do
|
||||
@clients.each { |client, trans| trans.close }
|
||||
@server_thread.kill
|
||||
@transport.close
|
||||
end
|
||||
|
||||
def setup_client
|
||||
transport = FramedTransport.new(Socket.new('localhost', @port))
|
||||
protocol = BinaryProtocol.new(transport)
|
||||
client = NonblockingService::Client.new(protocol)
|
||||
transport.open
|
||||
@clients << [client, transport]
|
||||
client
|
||||
end
|
||||
|
||||
def setup_client_thread(result)
|
||||
queue = Queue.new
|
||||
Thread.new do
|
||||
client = setup_client
|
||||
while (msg = queue.pop)
|
||||
case msg
|
||||
when :block
|
||||
result << client.block
|
||||
when :unblock
|
||||
client.unblock
|
||||
when :hello
|
||||
result << client.greeting(true) # ignore result
|
||||
when :sleep
|
||||
client.sleep(0.5)
|
||||
result << :slept
|
||||
when :shutdown
|
||||
client.shutdown
|
||||
when :exit
|
||||
result << :done
|
||||
break
|
||||
end
|
||||
end
|
||||
@clients.each { |c,t| t.close and break if c == client } #close the transport
|
||||
end
|
||||
queue
|
||||
end
|
||||
|
||||
it "should handle basic message passing" do
|
||||
client = setup_client
|
||||
client.greeting(true).should == Hello.new
|
||||
client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
|
||||
end
|
||||
|
||||
it "should handle concurrent clients" do
|
||||
queue = Queue.new
|
||||
4.times { Thread.new { queue.push setup_client.block } }
|
||||
setup_client.unblock
|
||||
4.times { queue.pop.should be_true }
|
||||
end
|
||||
|
||||
it "should handle messages from more than 5 long-lived connections" do
|
||||
queues = []
|
||||
result = Queue.new
|
||||
7.times do |i|
|
||||
queues << setup_client_thread(result)
|
||||
Thread.pass if i == 4 # give the server time to accept connections
|
||||
end
|
||||
client = setup_client
|
||||
# block 4 connections
|
||||
4.times { |i| queues[i] << :block }
|
||||
queues[4] << :hello
|
||||
queues[5] << :hello
|
||||
queues[6] << :hello
|
||||
3.times { result.pop.should == Hello.new }
|
||||
client.greeting(true).should == Hello.new
|
||||
queues[5] << :unblock
|
||||
4.times { result.pop.should be_true }
|
||||
queues[2] << :hello
|
||||
result.pop.should == Hello.new
|
||||
client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
|
||||
7.times { queues.shift << :exit }
|
||||
client.greeting(true).should == Hello.new
|
||||
end
|
||||
|
||||
it "should shut down when asked" do
|
||||
@server.shutdown
|
||||
@server_thread.join(2).should be_an_instance_of(Thread)
|
||||
end
|
||||
|
||||
it "should continue processing active messages when shutting down" do
|
||||
result = Queue.new
|
||||
client = setup_client_thread(result)
|
||||
client << :sleep
|
||||
sleep 0.1 # give the server time to start processing the client's message
|
||||
@server.shutdown
|
||||
@server_thread.join(2).should be_an_instance_of(Thread)
|
||||
result.pop.should == :slept
|
||||
end
|
||||
|
||||
it "should kill active messages when they don't expire while shutting down" do
|
||||
result = Queue.new
|
||||
client = setup_client_thread(result)
|
||||
client << :block
|
||||
sleep 0.1 # start processing the client's message
|
||||
@server.shutdown(1, true)
|
||||
@server_thread.join(3).should_not be_nil
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue
Block a user