mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 10:48:51 +00:00
THRIFT-3081 consolidate client processing loop in Simple, Threaded, and Thread Pool servers
This commit is contained in:
parent
811d279d58
commit
5ec805b22b
@ -54,6 +54,7 @@ set( thriftcpp_SOURCES
|
||||
src/thrift/transport/TServerSocket.cpp
|
||||
src/thrift/transport/TTransportUtils.cpp
|
||||
src/thrift/transport/TBufferTransports.cpp
|
||||
src/thrift/server/TConnectedClient.cpp
|
||||
src/thrift/server/TServer.cpp
|
||||
src/thrift/server/TSimpleServer.cpp
|
||||
src/thrift/server/TThreadPoolServer.cpp
|
||||
|
@ -89,6 +89,7 @@ libthrift_la_SOURCES = src/thrift/Thrift.cpp \
|
||||
src/thrift/transport/TSSLServerSocket.cpp \
|
||||
src/thrift/transport/TTransportUtils.cpp \
|
||||
src/thrift/transport/TBufferTransports.cpp \
|
||||
src/thrift/server/TConnectedClient.cpp \
|
||||
src/thrift/server/TServer.cpp \
|
||||
src/thrift/server/TSimpleServer.cpp \
|
||||
src/thrift/server/TThreadPoolServer.cpp \
|
||||
|
121
lib/cpp/src/thrift/server/TConnectedClient.cpp
Normal file
121
lib/cpp/src/thrift/server/TConnectedClient.cpp
Normal file
@ -0,0 +1,121 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
#include <thrift/server/TConnectedClient.h>
|
||||
|
||||
namespace apache {
|
||||
namespace thrift {
|
||||
namespace server {
|
||||
|
||||
using apache::thrift::TProcessor;
|
||||
using apache::thrift::protocol::TProtocol;
|
||||
using apache::thrift::server::TServerEventHandler;
|
||||
using apache::thrift::transport::TTransport;
|
||||
using apache::thrift::transport::TTransportException;
|
||||
using boost::shared_ptr;
|
||||
using std::string;
|
||||
|
||||
TConnectedClient::TConnectedClient(const string& serverType,
|
||||
const shared_ptr<TProcessor>& processor,
|
||||
const shared_ptr<TProtocol>& inputProtocol,
|
||||
const shared_ptr<TProtocol>& outputProtocol,
|
||||
const shared_ptr<TServerEventHandler>& eventHandler,
|
||||
const shared_ptr<TTransport>& client)
|
||||
|
||||
: serverType_(serverType),
|
||||
processor_(processor),
|
||||
inputProtocol_(inputProtocol),
|
||||
outputProtocol_(outputProtocol),
|
||||
eventHandler_(eventHandler),
|
||||
client_(client),
|
||||
opaqueContext_(0) {}
|
||||
|
||||
TConnectedClient::~TConnectedClient() {}
|
||||
|
||||
void TConnectedClient::run() {
|
||||
if (eventHandler_) {
|
||||
opaqueContext_ = eventHandler_->createContext(inputProtocol_, outputProtocol_);
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
if (eventHandler_) {
|
||||
eventHandler_->processContext(opaqueContext_, client_);
|
||||
}
|
||||
|
||||
try {
|
||||
if (!processor_->process(inputProtocol_, outputProtocol_, opaqueContext_)) {
|
||||
break;
|
||||
}
|
||||
} catch (const TTransportException& ttx) {
|
||||
if (ttx.getType() == TTransportException::TIMED_OUT) {
|
||||
// Receive timeout - continue processing.
|
||||
continue;
|
||||
} else if (ttx.getType() == TTransportException::END_OF_FILE ||
|
||||
ttx.getType() == TTransportException::INTERRUPTED) {
|
||||
// Client disconnected or was interrupted. No logging needed. Done.
|
||||
break;
|
||||
} else {
|
||||
// All other transport exceptions are logged.
|
||||
// State of connection is unknown. Done.
|
||||
string errStr = (serverType_ + " client died: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
break;
|
||||
}
|
||||
} catch (const TException& tex) {
|
||||
// Some protocols throw this after they send an error response to the client
|
||||
// They should be trained to return true instead and if they want to log,
|
||||
// then they should log.
|
||||
string errStr = (serverType_ + " processing exception: ") + tex.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
// Continue processing
|
||||
}
|
||||
}
|
||||
|
||||
cleanup();
|
||||
}
|
||||
|
||||
void TConnectedClient::cleanup()
|
||||
{
|
||||
if (eventHandler_) {
|
||||
eventHandler_->deleteContext(opaqueContext_, inputProtocol_, outputProtocol_);
|
||||
}
|
||||
|
||||
try {
|
||||
inputProtocol_->getTransport()->close();
|
||||
} catch (const TTransportException& ttx) {
|
||||
string errStr = string(serverType_ + " input close failed: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
try {
|
||||
outputProtocol_->getTransport()->close();
|
||||
} catch (const TTransportException& ttx) {
|
||||
string errStr = string(serverType_ + " output close failed: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
try {
|
||||
client_->close();
|
||||
} catch (const TTransportException& ttx) {
|
||||
string errStr = string(serverType_ + " client close failed: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
} // apache::thrift::server
|
116
lib/cpp/src/thrift/server/TConnectedClient.h
Normal file
116
lib/cpp/src/thrift/server/TConnectedClient.h
Normal file
@ -0,0 +1,116 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
#ifndef _THRIFT_SERVER_TCONNECTEDCLIENT_H_
|
||||
#define _THRIFT_SERVER_TCONNECTEDCLIENT_H_ 1
|
||||
|
||||
#include <boost/shared_ptr.hpp>
|
||||
#include <thrift/TProcessor.h>
|
||||
#include <thrift/protocol/TProtocol.h>
|
||||
#include <thrift/server/TServer.h>
|
||||
#include <thrift/transport/TTransport.h>
|
||||
|
||||
namespace apache {
|
||||
namespace thrift {
|
||||
namespace server {
|
||||
|
||||
/**
|
||||
* This represents a client connected to a TServer. The
|
||||
* processing loop for a client must provide some required
|
||||
* functionality common to all implementations so it is
|
||||
* encapsulated here.
|
||||
*/
|
||||
|
||||
class TConnectedClient : public apache::thrift::concurrency::Runnable
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param[in] serverType the server type as a string, used
|
||||
* for logging output.
|
||||
* @param[in] processor the TProcessor
|
||||
* @param[in] inputProtocol the input TProtocol
|
||||
* @param[in] outputProtocol the output TProtocol
|
||||
* @param[in] eventHandler the server event handler
|
||||
* @param[in] client the TTransport representing the client
|
||||
*/
|
||||
TConnectedClient(
|
||||
const std::string& serverType,
|
||||
const boost::shared_ptr<apache::thrift::TProcessor>& processor,
|
||||
const boost::shared_ptr<apache::thrift::protocol::TProtocol>& inputProtocol,
|
||||
const boost::shared_ptr<apache::thrift::protocol::TProtocol>& outputProtocol,
|
||||
const boost::shared_ptr<apache::thrift::server::TServerEventHandler>& eventHandler,
|
||||
const boost::shared_ptr<apache::thrift::transport::TTransport>& client);
|
||||
|
||||
/**
|
||||
* Destructor.
|
||||
*/
|
||||
virtual ~TConnectedClient();
|
||||
|
||||
/**
|
||||
* Drive the client until it is done.
|
||||
* The client processing loop is:
|
||||
*
|
||||
* [optional] call eventHandler->createContext once
|
||||
* [optional] call eventHandler->processContext per request
|
||||
* call processor->process per request
|
||||
* handle expected transport exceptions:
|
||||
* END_OF_FILE means the client is gone
|
||||
* INTERRUPTED means the client was interrupted
|
||||
* by TServerTransport::interruptChildren()
|
||||
* handle unexpected transport exceptions by logging
|
||||
* handle standard exceptions by logging
|
||||
* handle unexpected exceptions by logging
|
||||
* cleanup()
|
||||
*/
|
||||
virtual void run() /* override */;
|
||||
|
||||
protected:
|
||||
/**
|
||||
* Cleanup after a client. This happens if the client disconnects,
|
||||
* or if the server is stopped, or if an exception occurs.
|
||||
*
|
||||
* The cleanup processing is:
|
||||
* [optional] call eventHandler->deleteContext once
|
||||
* close the inputProtocol's TTransport
|
||||
* close the outputProtocol's TTransport
|
||||
* close the client
|
||||
*/
|
||||
virtual void cleanup();
|
||||
|
||||
private:
|
||||
std::string serverType_;
|
||||
boost::shared_ptr<apache::thrift::TProcessor> processor_;
|
||||
boost::shared_ptr<apache::thrift::protocol::TProtocol> inputProtocol_;
|
||||
boost::shared_ptr<apache::thrift::protocol::TProtocol> outputProtocol_;
|
||||
boost::shared_ptr<apache::thrift::server::TServerEventHandler> eventHandler_;
|
||||
boost::shared_ptr<apache::thrift::transport::TTransport> client_;
|
||||
|
||||
/**
|
||||
* Context acquired from the eventHandler_ if one exists.
|
||||
*/
|
||||
void *opaqueContext_;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif // #ifndef _THRIFT_SERVER_TCONNECTEDCLIENT_H_
|
@ -17,6 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
#include <thrift/server/TConnectedClient.h>
|
||||
#include <thrift/server/TSimpleServer.h>
|
||||
#include <thrift/transport/TTransportException.h>
|
||||
#include <string>
|
||||
@ -103,58 +104,9 @@ void TSimpleServer::serve() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Get the processor
|
||||
shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client);
|
||||
|
||||
void* connectionContext = NULL;
|
||||
if (eventHandler_) {
|
||||
connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol);
|
||||
}
|
||||
try {
|
||||
for (;;) {
|
||||
if (eventHandler_) {
|
||||
eventHandler_->processContext(connectionContext, client);
|
||||
}
|
||||
if (!processor->process(inputProtocol, outputProtocol, connectionContext) ||
|
||||
// Peek ahead, is the remote side closed?
|
||||
!inputProtocol->getTransport()->peek()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (const TTransportException& ttx) {
|
||||
if (ttx.getType() != TTransportException::END_OF_FILE &&
|
||||
ttx.getType() != TTransportException::INTERRUPTED)
|
||||
{
|
||||
string errStr = string("TSimpleServer client died: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
} catch (const std::exception& x) {
|
||||
GlobalOutput.printf("TSimpleServer exception: %s: %s", typeid(x).name(), x.what());
|
||||
} catch (...) {
|
||||
GlobalOutput("TSimpleServer uncaught exception.");
|
||||
}
|
||||
if (eventHandler_) {
|
||||
eventHandler_->deleteContext(connectionContext, inputProtocol, outputProtocol);
|
||||
}
|
||||
|
||||
try {
|
||||
inputTransport->close();
|
||||
} catch (const TTransportException& ttx) {
|
||||
string errStr = string("TSimpleServer input close failed: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
try {
|
||||
outputTransport->close();
|
||||
} catch (const TTransportException& ttx) {
|
||||
string errStr = string("TSimpleServer output close failed: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
try {
|
||||
client->close();
|
||||
} catch (const TTransportException& ttx) {
|
||||
string errStr = string("TSimpleServer client close failed: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
TConnectedClient("TSimpleServer",
|
||||
getProcessor(inputProtocol, outputProtocol, client),
|
||||
inputProtocol, outputProtocol, eventHandler_, client).run();
|
||||
}
|
||||
|
||||
if (stop_) {
|
||||
|
@ -94,7 +94,7 @@ public:
|
||||
void serve();
|
||||
|
||||
/**
|
||||
* Interrupt serve() so that it meets post-conditions.
|
||||
* Interrupt serve() so that it meets post-conditions and returns.
|
||||
*/
|
||||
void stop();
|
||||
|
||||
|
@ -19,12 +19,14 @@
|
||||
|
||||
#include <thrift/thrift-config.h>
|
||||
|
||||
#include <thrift/server/TConnectedClient.h>
|
||||
#include <thrift/server/TThreadPoolServer.h>
|
||||
#include <thrift/transport/TTransportException.h>
|
||||
#include <thrift/concurrency/Thread.h>
|
||||
#include <thrift/concurrency/ThreadManager.h>
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
#include <boost/make_shared.hpp>
|
||||
|
||||
namespace apache {
|
||||
namespace thrift {
|
||||
@ -37,78 +39,6 @@ using namespace apache::thrift::concurrency;
|
||||
using namespace apache::thrift::protocol;
|
||||
using namespace apache::thrift::transport;
|
||||
|
||||
class TThreadPoolServer::Task : public Runnable {
|
||||
|
||||
public:
|
||||
Task(TThreadPoolServer& server,
|
||||
shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TProtocol> input,
|
||||
shared_ptr<TProtocol> output,
|
||||
shared_ptr<TTransport> transport)
|
||||
: server_(server),
|
||||
processor_(processor),
|
||||
input_(input),
|
||||
output_(output),
|
||||
transport_(transport) {}
|
||||
|
||||
~Task() {}
|
||||
|
||||
void run() {
|
||||
boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
|
||||
void* connectionContext = NULL;
|
||||
if (eventHandler) {
|
||||
connectionContext = eventHandler->createContext(input_, output_);
|
||||
}
|
||||
try {
|
||||
for (;;) {
|
||||
if (eventHandler) {
|
||||
eventHandler->processContext(connectionContext, transport_);
|
||||
}
|
||||
if (!processor_->process(input_, output_, connectionContext)
|
||||
|| !input_->getTransport()->peek()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (const TTransportException& ttx) {
|
||||
if (ttx.getType() != TTransportException::END_OF_FILE &&
|
||||
ttx.getType() != TTransportException::INTERRUPTED) {
|
||||
string errStr = string("TThreadPoolServer::Task client died: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
} catch (const std::exception& x) {
|
||||
GlobalOutput.printf("TThreadPoolServer exception %s: %s", typeid(x).name(), x.what());
|
||||
} catch (...) {
|
||||
GlobalOutput(
|
||||
"TThreadPoolServer, unexpected exception in "
|
||||
"TThreadPoolServer::Task::run()");
|
||||
}
|
||||
|
||||
if (eventHandler) {
|
||||
eventHandler->deleteContext(connectionContext, input_, output_);
|
||||
}
|
||||
|
||||
try {
|
||||
input_->getTransport()->close();
|
||||
} catch (TTransportException& ttx) {
|
||||
string errStr = string("TThreadPoolServer input close failed: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
try {
|
||||
output_->getTransport()->close();
|
||||
} catch (TTransportException& ttx) {
|
||||
string errStr = string("TThreadPoolServer output close failed: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
TServer& server_;
|
||||
shared_ptr<TProcessor> processor_;
|
||||
shared_ptr<TProtocol> input_;
|
||||
shared_ptr<TProtocol> output_;
|
||||
shared_ptr<TTransport> transport_;
|
||||
};
|
||||
|
||||
TThreadPoolServer::~TThreadPoolServer() {}
|
||||
|
||||
void TThreadPoolServer::serve() {
|
||||
@ -146,9 +76,13 @@ void TThreadPoolServer::serve() {
|
||||
shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client);
|
||||
|
||||
// Add to threadmanager pool
|
||||
shared_ptr<TThreadPoolServer::Task> task(
|
||||
new TThreadPoolServer::Task(*this, processor, inputProtocol, outputProtocol, client));
|
||||
threadManager_->add(task, timeout_, taskExpiration_);
|
||||
threadManager_->add(
|
||||
boost::make_shared<TConnectedClient>(
|
||||
"TThreadPoolServer",
|
||||
getProcessor(inputProtocol, outputProtocol, client),
|
||||
inputProtocol, outputProtocol, eventHandler_, client),
|
||||
timeout_,
|
||||
taskExpiration_);
|
||||
|
||||
} catch (TTransportException& ttx) {
|
||||
if (inputTransport) {
|
||||
|
@ -37,8 +37,6 @@ using apache::thrift::transport::TTransportFactory;
|
||||
|
||||
class TThreadPoolServer : public TServer {
|
||||
public:
|
||||
class Task;
|
||||
|
||||
template <typename ProcessorFactory>
|
||||
TThreadPoolServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
|
||||
const boost::shared_ptr<TServerTransport>& serverTransport,
|
||||
@ -107,8 +105,19 @@ public:
|
||||
|
||||
virtual ~TThreadPoolServer();
|
||||
|
||||
/**
|
||||
* Process all connections that arrive using a thread pool.
|
||||
* Call stop() on another thread to interrupt processing and
|
||||
* return control to the caller.
|
||||
* Post-conditions (return guarantees):
|
||||
* The serverTransport will be closed.
|
||||
* There will be no connected clients.
|
||||
*/
|
||||
virtual void serve();
|
||||
|
||||
/**
|
||||
* Interrupt serve() so that it meets post-conditions and returns.
|
||||
*/
|
||||
virtual void stop();
|
||||
|
||||
virtual int64_t getTimeout() const;
|
||||
|
@ -17,6 +17,8 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
#include <boost/bind.hpp>
|
||||
#include <thrift/server/TConnectedClient.h>
|
||||
#include <thrift/server/TThreadedServer.h>
|
||||
#include <thrift/transport/TTransportException.h>
|
||||
#include <thrift/concurrency/PlatformThreadFactory.h>
|
||||
@ -39,94 +41,6 @@ using namespace apache::thrift::protocol;
|
||||
using namespace apache::thrift::transport;
|
||||
using namespace apache::thrift::concurrency;
|
||||
|
||||
class TThreadedServer::Task : public Runnable {
|
||||
|
||||
public:
|
||||
Task(TThreadedServer& server,
|
||||
shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TProtocol> input,
|
||||
shared_ptr<TProtocol> output,
|
||||
shared_ptr<TTransport> transport)
|
||||
: server_(server),
|
||||
processor_(processor),
|
||||
input_(input),
|
||||
output_(output),
|
||||
transport_(transport) {}
|
||||
|
||||
~Task() {}
|
||||
|
||||
void run() {
|
||||
boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
|
||||
void* connectionContext = NULL;
|
||||
if (eventHandler) {
|
||||
connectionContext = eventHandler->createContext(input_, output_);
|
||||
}
|
||||
try {
|
||||
for (;;) {
|
||||
if (eventHandler) {
|
||||
eventHandler->processContext(connectionContext, transport_);
|
||||
}
|
||||
if (!processor_->process(input_, output_, connectionContext)
|
||||
|| !input_->getTransport()->peek()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (const TTransportException& ttx) {
|
||||
if (ttx.getType() != TTransportException::END_OF_FILE &&
|
||||
ttx.getType() != TTransportException::INTERRUPTED) {
|
||||
string errStr = string("TThreadedServer client died: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
} catch (const std::exception& x) {
|
||||
GlobalOutput.printf("TThreadedServer exception: %s: %s", typeid(x).name(), x.what());
|
||||
} catch (...) {
|
||||
GlobalOutput("TThreadedServer uncaught exception.");
|
||||
}
|
||||
if (eventHandler) {
|
||||
eventHandler->deleteContext(connectionContext, input_, output_);
|
||||
}
|
||||
|
||||
try {
|
||||
input_->getTransport()->close();
|
||||
} catch (TTransportException& ttx) {
|
||||
string errStr = string("TThreadedServer input close failed: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
try {
|
||||
output_->getTransport()->close();
|
||||
} catch (TTransportException& ttx) {
|
||||
string errStr = string("TThreadedServer output close failed: ") + ttx.what();
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
|
||||
// Remove this task from parent bookkeeping
|
||||
{
|
||||
Synchronized s(server_.tasksMonitor_);
|
||||
server_.tasks_.erase(this);
|
||||
if (server_.tasks_.empty()) {
|
||||
server_.tasksMonitor_.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
TThreadedServer& server_;
|
||||
friend class TThreadedServer;
|
||||
|
||||
shared_ptr<TProcessor> processor_;
|
||||
shared_ptr<TProtocol> input_;
|
||||
shared_ptr<TProtocol> output_;
|
||||
shared_ptr<TTransport> transport_;
|
||||
};
|
||||
|
||||
void TThreadedServer::init() {
|
||||
stop_ = false;
|
||||
|
||||
if (!threadFactory_) {
|
||||
threadFactory_.reset(new PlatformThreadFactory);
|
||||
}
|
||||
}
|
||||
|
||||
TThreadedServer::~TThreadedServer() {}
|
||||
|
||||
void TThreadedServer::serve() {
|
||||
@ -162,21 +76,19 @@ void TThreadedServer::serve() {
|
||||
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
|
||||
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
|
||||
|
||||
shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client);
|
||||
shared_ptr<TConnectedClient> pClient(
|
||||
new TConnectedClient("TThreadedServer",
|
||||
getProcessor(inputProtocol, outputProtocol, client),
|
||||
inputProtocol, outputProtocol, eventHandler_, client),
|
||||
boost::bind(&TThreadedServer::disposeClient, this, _1));
|
||||
|
||||
TThreadedServer::Task* task
|
||||
= new TThreadedServer::Task(*this, processor, inputProtocol, outputProtocol, client);
|
||||
|
||||
// Create a task
|
||||
shared_ptr<Runnable> runnable = shared_ptr<Runnable>(task);
|
||||
|
||||
// Create a thread for this task
|
||||
shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable));
|
||||
// Create a thread for this client
|
||||
shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(pClient));
|
||||
|
||||
// Insert thread into the set of threads
|
||||
{
|
||||
Synchronized s(tasksMonitor_);
|
||||
tasks_.insert(task);
|
||||
Synchronized s(clientsMonitor_);
|
||||
clients_.insert(pClient.get());
|
||||
}
|
||||
|
||||
// Start the thread!
|
||||
@ -235,9 +147,9 @@ void TThreadedServer::serve() {
|
||||
GlobalOutput(errStr.c_str());
|
||||
}
|
||||
try {
|
||||
Synchronized s(tasksMonitor_);
|
||||
while (!tasks_.empty()) {
|
||||
tasksMonitor_.wait();
|
||||
Synchronized s(clientsMonitor_);
|
||||
while (!clients_.empty()) {
|
||||
clientsMonitor_.wait();
|
||||
}
|
||||
} catch (TException& tx) {
|
||||
string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
|
||||
@ -254,6 +166,19 @@ void TThreadedServer::stop() {
|
||||
serverTransport_->interruptChildren();
|
||||
}
|
||||
}
|
||||
|
||||
void TThreadedServer::disposeClient(TConnectedClient *pClient) {
|
||||
// Remove this task from parent bookkeeping
|
||||
{
|
||||
Synchronized s(clientsMonitor_);
|
||||
clients_.erase(pClient);
|
||||
if (clients_.empty()) {
|
||||
clientsMonitor_.notify();
|
||||
}
|
||||
}
|
||||
delete pClient;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
} // apache::thrift::server
|
||||
|
@ -20,9 +20,11 @@
|
||||
#ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
|
||||
#define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
|
||||
|
||||
#include <set>
|
||||
#include <thrift/server/TServer.h>
|
||||
#include <thrift/transport/TServerTransport.h>
|
||||
#include <thrift/concurrency/Monitor.h>
|
||||
#include <thrift/concurrency/PlatformThreadFactory.h>
|
||||
#include <thrift/concurrency/Thread.h>
|
||||
|
||||
#include <boost/shared_ptr.hpp>
|
||||
@ -35,19 +37,22 @@ using apache::thrift::TProcessor;
|
||||
using apache::thrift::transport::TServerTransport;
|
||||
using apache::thrift::transport::TTransportFactory;
|
||||
using apache::thrift::concurrency::Monitor;
|
||||
using apache::thrift::concurrency::PlatformThreadFactory;
|
||||
using apache::thrift::concurrency::ThreadFactory;
|
||||
|
||||
class TConnectedClient;
|
||||
|
||||
class TThreadedServer : public TServer {
|
||||
|
||||
public:
|
||||
class Task;
|
||||
|
||||
template <typename ProcessorFactory>
|
||||
TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
|
||||
const boost::shared_ptr<TServerTransport>& serverTransport,
|
||||
const boost::shared_ptr<TTransportFactory>& transportFactory,
|
||||
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
|
||||
THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
|
||||
THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
|
||||
: TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
|
||||
threadFactory_(new PlatformThreadFactory),
|
||||
stop_(false) {}
|
||||
|
||||
template <typename ProcessorFactory>
|
||||
TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
|
||||
@ -55,14 +60,20 @@ public:
|
||||
const boost::shared_ptr<TTransportFactory>& transportFactory,
|
||||
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
|
||||
const boost::shared_ptr<ThreadFactory>& threadFactory,
|
||||
THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
|
||||
THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
|
||||
: TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
|
||||
threadFactory_(threadFactory),
|
||||
stop_(false) {}
|
||||
|
||||
template <typename Processor>
|
||||
TThreadedServer(const boost::shared_ptr<Processor>& processor,
|
||||
const boost::shared_ptr<TServerTransport>& serverTransport,
|
||||
const boost::shared_ptr<TTransportFactory>& transportFactory,
|
||||
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
|
||||
THRIFT_OVERLOAD_IF(Processor, TProcessor));
|
||||
THRIFT_OVERLOAD_IF(Processor, TProcessor))
|
||||
: TServer(processor, serverTransport, transportFactory, protocolFactory),
|
||||
threadFactory_(new PlatformThreadFactory),
|
||||
stop_(false) {}
|
||||
|
||||
template <typename Processor>
|
||||
TThreadedServer(const boost::shared_ptr<Processor>& processor,
|
||||
@ -70,66 +81,43 @@ public:
|
||||
const boost::shared_ptr<TTransportFactory>& transportFactory,
|
||||
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
|
||||
const boost::shared_ptr<ThreadFactory>& threadFactory,
|
||||
THRIFT_OVERLOAD_IF(Processor, TProcessor));
|
||||
THRIFT_OVERLOAD_IF(Processor, TProcessor))
|
||||
: TServer(processor, serverTransport, transportFactory, protocolFactory),
|
||||
threadFactory_(threadFactory),
|
||||
stop_(false) {}
|
||||
|
||||
virtual ~TThreadedServer();
|
||||
|
||||
/**
|
||||
* Process all connections that arrive, each on their own
|
||||
* dedicated thread. There is no limit to the number of
|
||||
* threads or connections (see THRIFT-3084).
|
||||
* Call stop() on another thread to interrupt processing and
|
||||
* return control to the caller.
|
||||
* Post-conditions (return guarantees):
|
||||
* The serverTransport will be closed.
|
||||
* There will be no connected clients.
|
||||
*/
|
||||
virtual void serve();
|
||||
void stop();
|
||||
|
||||
/**
|
||||
* Interrupt serve() so that it meets post-conditions and returns.
|
||||
*/
|
||||
virtual void stop();
|
||||
|
||||
protected:
|
||||
void init();
|
||||
/**
|
||||
* Smart pointer release method
|
||||
*/
|
||||
virtual void disposeClient(TConnectedClient *pClient);
|
||||
|
||||
boost::shared_ptr<ThreadFactory> threadFactory_;
|
||||
volatile bool stop_;
|
||||
|
||||
Monitor tasksMonitor_;
|
||||
std::set<Task*> tasks_;
|
||||
Monitor clientsMonitor_;
|
||||
std::set<TConnectedClient*> clients_;
|
||||
};
|
||||
|
||||
template <typename ProcessorFactory>
|
||||
TThreadedServer::TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
|
||||
const boost::shared_ptr<TServerTransport>& serverTransport,
|
||||
const boost::shared_ptr<TTransportFactory>& transportFactory,
|
||||
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
|
||||
THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
|
||||
: TServer(processorFactory, serverTransport, transportFactory, protocolFactory) {
|
||||
init();
|
||||
}
|
||||
|
||||
template <typename ProcessorFactory>
|
||||
TThreadedServer::TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
|
||||
const boost::shared_ptr<TServerTransport>& serverTransport,
|
||||
const boost::shared_ptr<TTransportFactory>& transportFactory,
|
||||
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
|
||||
const boost::shared_ptr<ThreadFactory>& threadFactory,
|
||||
THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
|
||||
: TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
|
||||
threadFactory_(threadFactory) {
|
||||
init();
|
||||
}
|
||||
|
||||
template <typename Processor>
|
||||
TThreadedServer::TThreadedServer(const boost::shared_ptr<Processor>& processor,
|
||||
const boost::shared_ptr<TServerTransport>& serverTransport,
|
||||
const boost::shared_ptr<TTransportFactory>& transportFactory,
|
||||
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
|
||||
THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
|
||||
: TServer(processor, serverTransport, transportFactory, protocolFactory) {
|
||||
init();
|
||||
}
|
||||
|
||||
template <typename Processor>
|
||||
TThreadedServer::TThreadedServer(const boost::shared_ptr<Processor>& processor,
|
||||
const boost::shared_ptr<TServerTransport>& serverTransport,
|
||||
const boost::shared_ptr<TTransportFactory>& transportFactory,
|
||||
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
|
||||
const boost::shared_ptr<ThreadFactory>& threadFactory,
|
||||
THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
|
||||
: TServer(processor, serverTransport, transportFactory, protocolFactory),
|
||||
threadFactory_(threadFactory) {
|
||||
init();
|
||||
}
|
||||
}
|
||||
}
|
||||
} // apache::thrift::server
|
||||
|
Loading…
Reference in New Issue
Block a user