mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
Thrift TTransportFactory model for servers
Summary: Servers need to create bufferedtransports etc. around the transports they get in a user-definable way. So use a factory pattern to allow the user to supply an object to the server that defines this behavior. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664792 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ffcddd688a
commit
d788b2e046
@ -1,7 +1,7 @@
|
||||
lib_LTLIBRARIES = libthrift.la
|
||||
|
||||
common_cxxflags = -Isrc $(BOOST_CPPFLAGS)
|
||||
common_ldflags = $(BOOST_LDFLAGS)
|
||||
common_cxxflags = -Wall -Isrc $(BOOST_CPPFLAGS)
|
||||
common_ldflags = -Wall $(BOOST_LDFLAGS)
|
||||
|
||||
# Define the source file for the module
|
||||
|
||||
@ -54,7 +54,9 @@ include_transport_HEADERS = \
|
||||
src/transport/TServerTransport.h \
|
||||
src/transport/TSocket.h \
|
||||
src/transport/TTransport.h \
|
||||
src/transport/TTransportException.h
|
||||
src/transport/TTransportException.h \
|
||||
src/transport/TTransportFactory.h \
|
||||
src/transport/TBufferedTransportFactory.h
|
||||
|
||||
include_serverdir = $(include_thriftdir)/server
|
||||
include_server_HEADERS = \
|
||||
|
@ -23,7 +23,8 @@ class TProcessor {
|
||||
public:
|
||||
virtual ~TProcessor() {}
|
||||
virtual bool process(shared_ptr<TTransport> in, shared_ptr<TTransport> out) = 0;
|
||||
virtual bool process(shared_ptr<TTransport> io) { return process(io, io); }
|
||||
bool process(shared_ptr<TTransport> io) { return process(io, io); }
|
||||
|
||||
protected:
|
||||
TProcessor() {}
|
||||
};
|
||||
|
@ -1,7 +1,9 @@
|
||||
#ifndef T_SERVER_H
|
||||
#define T_SERVER_H
|
||||
#ifndef _THRIFT_SERVER_TSERVER_H_
|
||||
#define _THRIFT_SERVER_TSERVER_H_ 1
|
||||
|
||||
#include <TProcessor.h>
|
||||
#include <transport/TServerTransport.h>
|
||||
#include <transport/TTransportFactory.h>
|
||||
#include <concurrency/Thread.h>
|
||||
|
||||
#include <boost/shared_ptr.hpp>
|
||||
@ -9,6 +11,7 @@
|
||||
namespace facebook { namespace thrift { namespace server {
|
||||
|
||||
using namespace facebook::thrift;
|
||||
using namespace facebook::thrift::transport;
|
||||
using namespace boost;
|
||||
|
||||
class TServerOptions;
|
||||
@ -24,10 +27,22 @@ public:
|
||||
virtual void run() = 0;
|
||||
|
||||
protected:
|
||||
TServer(shared_ptr<TProcessor> processor, shared_ptr<TServerOptions> options) :
|
||||
TServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerTransport> serverTransport,
|
||||
shared_ptr<TTransportFactory> transportFactory,
|
||||
shared_ptr<TServerOptions> options) :
|
||||
processor_(processor),
|
||||
serverTransport_(serverTransport),
|
||||
transportFactory_(transportFactory),
|
||||
options_(options) {}
|
||||
|
||||
TServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerOptions> options) :
|
||||
processor_(processor), options_(options) {}
|
||||
|
||||
|
||||
shared_ptr<TProcessor> processor_;
|
||||
shared_ptr<TServerTransport> serverTransport_;
|
||||
shared_ptr<TTransportFactory> transportFactory_;
|
||||
shared_ptr<TServerOptions> options_;
|
||||
};
|
||||
|
||||
@ -35,12 +50,12 @@ protected:
|
||||
* Class to encapsulate all generic server options.
|
||||
*/
|
||||
class TServerOptions {
|
||||
public:
|
||||
public:
|
||||
// TODO(mcslee): Fill in getters/setters here
|
||||
protected:
|
||||
protected:
|
||||
// TODO(mcslee): Fill data members in here
|
||||
};
|
||||
|
||||
}}} // facebook::thrift::server
|
||||
|
||||
#endif
|
||||
#endif // #ifndef _THRIFT_SERVER_TSERVER_H_
|
||||
|
@ -1,5 +1,4 @@
|
||||
#include "server/TSimpleServer.h"
|
||||
#include "transport/TBufferedTransport.h"
|
||||
#include "transport/TTransportException.h"
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
@ -15,6 +14,7 @@ namespace facebook { namespace thrift { namespace server {
|
||||
void TSimpleServer::run() {
|
||||
|
||||
shared_ptr<TTransport> client;
|
||||
pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
|
||||
|
||||
try {
|
||||
// Start the server listening
|
||||
@ -25,26 +25,21 @@ void TSimpleServer::run() {
|
||||
}
|
||||
|
||||
// Fetch client from server
|
||||
while (true) {
|
||||
try {
|
||||
try {
|
||||
while (true) {
|
||||
client = serverTransport_->accept();
|
||||
if (client != NULL) {
|
||||
// Process for as long as we can keep the processor happy!
|
||||
shared_ptr<TBufferedTransport> bufferedClient(new TBufferedTransport(client));
|
||||
while (processor_->process(bufferedClient)) {}
|
||||
}
|
||||
} catch (TTransportException& ttx) {
|
||||
if (client != NULL) {
|
||||
io = transportFactory_->getIOTransports(client);
|
||||
try {
|
||||
while (processor_->process(io.first, io.second)) {}
|
||||
} catch (TTransportException& ttx) {
|
||||
cerr << "TSimpleServer client died: " << ttx.getMessage() << endl;
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up the client
|
||||
if (client != NULL) {
|
||||
|
||||
// Ensure no resource leaks
|
||||
io.first->close();
|
||||
io.second->close();
|
||||
client->close();
|
||||
}
|
||||
}
|
||||
} catch (TTransportException& ttx) {
|
||||
cerr << "TServerTransport died on accept: " << ttx.getMessage() << endl;
|
||||
}
|
||||
|
||||
// TODO(mcslee): Could this be a timeout case? Or always the real thing?
|
||||
|
@ -1,5 +1,5 @@
|
||||
#ifndef T_SIMPLE_SERVER_H
|
||||
#define T_SIMPLE_SERVER_H
|
||||
#ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
|
||||
#define _THRIFT_SERVER_TSIMPLESERVER_H_ 1
|
||||
|
||||
#include "server/TServer.h"
|
||||
#include "transport/TServerTransport.h"
|
||||
@ -17,18 +17,17 @@ namespace facebook { namespace thrift { namespace server {
|
||||
class TSimpleServer : public TServer {
|
||||
public:
|
||||
TSimpleServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerOptions> options,
|
||||
shared_ptr<TServerTransport> serverTransport) :
|
||||
TServer(processor, options), serverTransport_(serverTransport) {}
|
||||
shared_ptr<TServerTransport> serverTransport,
|
||||
shared_ptr<TTransportFactory> transportFactory,
|
||||
shared_ptr<TServerOptions> options) :
|
||||
TServer(processor, serverTransport, transportFactory, options) {}
|
||||
|
||||
~TSimpleServer() {}
|
||||
|
||||
void run();
|
||||
|
||||
protected:
|
||||
shared_ptr<TServerTransport> serverTransport_;
|
||||
};
|
||||
|
||||
}}} // facebook::thrift::server
|
||||
|
||||
#endif
|
||||
#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
|
||||
|
@ -1,5 +1,4 @@
|
||||
#include "server/TThreadPoolServer.h"
|
||||
#include "transport/TBufferedTransport.h"
|
||||
#include "transport/TTransportException.h"
|
||||
#include "concurrency/Thread.h"
|
||||
#include "concurrency/ThreadManager.h"
|
||||
@ -15,54 +14,52 @@ using namespace facebook::thrift::transport;
|
||||
class TThreadPoolServer::Task: public Runnable {
|
||||
|
||||
shared_ptr<TProcessor> _processor;
|
||||
shared_ptr<TTransport> _transport;
|
||||
shared_ptr<TBufferedTransport> _bufferedTransport;
|
||||
shared_ptr<TTransport> _input;
|
||||
shared_ptr<TTransport> _output;
|
||||
|
||||
public:
|
||||
|
||||
Task(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TTransport> transport) :
|
||||
shared_ptr<TTransport> input,
|
||||
shared_ptr<TTransport> output) :
|
||||
_processor(processor),
|
||||
_transport(transport),
|
||||
_bufferedTransport(new TBufferedTransport(transport)) {
|
||||
_input(input),
|
||||
_output(output) {
|
||||
}
|
||||
|
||||
~Task() {}
|
||||
|
||||
void run() {
|
||||
|
||||
void run() {
|
||||
while(true) {
|
||||
|
||||
try {
|
||||
_processor->process(_bufferedTransport);
|
||||
|
||||
_processor->process(_input, _output);
|
||||
} catch (TTransportException& ttx) {
|
||||
|
||||
break;
|
||||
|
||||
break;
|
||||
} catch(...) {
|
||||
|
||||
break;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_bufferedTransport->close();
|
||||
_input->close();
|
||||
_output->close();
|
||||
}
|
||||
};
|
||||
|
||||
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerOptions> options,
|
||||
shared_ptr<TServerTransport> serverTransport,
|
||||
shared_ptr<ThreadManager> threadManager) :
|
||||
TServer(processor, options),
|
||||
serverTransport_(serverTransport),
|
||||
shared_ptr<TServerTransport> serverTransport,
|
||||
shared_ptr<TTransportFactory> transportFactory,
|
||||
shared_ptr<ThreadManager> threadManager,
|
||||
shared_ptr<TServerOptions> options) :
|
||||
TServer(processor, serverTransport, transportFactory, options),
|
||||
threadManager_(threadManager) {
|
||||
}
|
||||
|
||||
|
||||
TThreadPoolServer::~TThreadPoolServer() {}
|
||||
|
||||
void TThreadPoolServer::run() {
|
||||
|
||||
shared_ptr<TTransport> client;
|
||||
pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
|
||||
|
||||
try {
|
||||
// Start the server listening
|
||||
serverTransport_->listen();
|
||||
@ -71,15 +68,14 @@ void TThreadPoolServer::run() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Fetch client from server
|
||||
|
||||
while (true) {
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
|
||||
threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_,
|
||||
shared_ptr<TTransport>(serverTransport_->accept()))));
|
||||
|
||||
// Fetch client from server
|
||||
client = serverTransport_->accept();
|
||||
// Make IO transports
|
||||
io = transportFactory_->getIOTransports(client);
|
||||
// Add to threadmanager pool
|
||||
threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, io.first, io.second)));
|
||||
} catch (TTransportException& ttx) {
|
||||
break;
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
#ifndef T_THREADPOOL_SERVER_H
|
||||
#define T_THREADPOOL_SERVER_H
|
||||
#ifndef _THRIFT_SERVER_TTHREADPOOLSERVER_H_
|
||||
#define _THRIFT_SERVER_TTHREADPOOLSERVER_H_ 1
|
||||
|
||||
#include <concurrency/ThreadManager.h>
|
||||
#include <server/TServer.h>
|
||||
@ -19,9 +19,10 @@ public:
|
||||
class Task;
|
||||
|
||||
TThreadPoolServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerOptions> options,
|
||||
shared_ptr<TServerTransport> serverTransport,
|
||||
shared_ptr<ThreadManager> threadManager);
|
||||
shared_ptr<TTransportFactory> transportFactory,
|
||||
shared_ptr<ThreadManager> threadManager,
|
||||
shared_ptr<TServerOptions> options);
|
||||
|
||||
virtual ~TThreadPoolServer();
|
||||
|
||||
@ -29,11 +30,10 @@ public:
|
||||
|
||||
protected:
|
||||
|
||||
shared_ptr<TServerTransport> serverTransport_;
|
||||
shared_ptr<ThreadManager> threadManager_;
|
||||
|
||||
};
|
||||
|
||||
}}} // facebook::thrift::server
|
||||
|
||||
#endif
|
||||
#endif // #ifndef _THRIFT_SERVER_TTHREADPOOLSERVER_H_
|
||||
|
33
lib/cpp/src/transport/TBufferedTransportFactory.h
Normal file
33
lib/cpp/src/transport/TBufferedTransportFactory.h
Normal file
@ -0,0 +1,33 @@
|
||||
#ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORTFACTORY_H_
|
||||
#define _THRIFT_TRANSPORT_TBUFFEREDTRANSPORTFACTORY_H_ 1
|
||||
|
||||
#include <transport/TTransportFactory.h>
|
||||
#include <transport/TBufferedTransport.h>
|
||||
#include <boost/shared_ptr.hpp>
|
||||
|
||||
namespace facebook { namespace thrift { namespace transport {
|
||||
|
||||
/**
|
||||
* Wraps a transport into a buffered one.
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
*/
|
||||
class TBufferedTransportFactory : public TTransportFactory {
|
||||
public:
|
||||
TBufferedTransportFactory() {}
|
||||
|
||||
virtual ~TBufferedTransportFactory() {}
|
||||
|
||||
/**
|
||||
* Wraps the transport into a buffered one.
|
||||
*/
|
||||
virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
|
||||
boost::shared_ptr<TTransport> buffered(new TBufferedTransport(trans));
|
||||
return std::make_pair(buffered, buffered);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}}}
|
||||
|
||||
#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_
|
@ -1,7 +1,7 @@
|
||||
#ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
|
||||
#define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1
|
||||
|
||||
#include "TTransportException.h"
|
||||
#include <transport/TTransportException.h>
|
||||
#include <string>
|
||||
|
||||
namespace facebook { namespace thrift { namespace transport {
|
||||
|
33
lib/cpp/src/transport/TTransportFactory.h
Normal file
33
lib/cpp/src/transport/TTransportFactory.h
Normal file
@ -0,0 +1,33 @@
|
||||
#ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_
|
||||
#define _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_ 1
|
||||
|
||||
#include <transport/TTransport.h>
|
||||
#include <boost/shared_ptr.hpp>
|
||||
|
||||
namespace facebook { namespace thrift { namespace transport {
|
||||
|
||||
/**
|
||||
* Generic factory class to make an input and output transport out of a
|
||||
* source transport. Commonly used inside servers to make input and output
|
||||
* streams out of raw clients.
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
*/
|
||||
class TTransportFactory {
|
||||
public:
|
||||
TTransportFactory() {}
|
||||
|
||||
virtual ~TTransportFactory() {}
|
||||
|
||||
/**
|
||||
* Default implementation does nothing, just returns the transport given.
|
||||
*/
|
||||
virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
|
||||
return std::make_pair(trans, trans);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}}}
|
||||
|
||||
#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_
|
@ -1,6 +1,9 @@
|
||||
package com.facebook.thrift.server;
|
||||
|
||||
import com.facebook.thrift.TProcessor;
|
||||
import com.facebook.thrift.transport.TServerTransport;
|
||||
import com.facebook.thrift.transport.TTransportFactory;
|
||||
import com.facebook.thrift.transport.TBaseTransportFactory;
|
||||
|
||||
/**
|
||||
* Generic interface for a Thrift server.
|
||||
@ -17,24 +20,64 @@ public abstract class TServer {
|
||||
public Options() {}
|
||||
}
|
||||
|
||||
/** Core processor */
|
||||
/**
|
||||
* Core processor
|
||||
*/
|
||||
protected TProcessor processor_;
|
||||
|
||||
/** Server options */
|
||||
/**
|
||||
* Server options
|
||||
*/
|
||||
protected Options options_;
|
||||
|
||||
/**
|
||||
* Default options constructor
|
||||
* Server transport
|
||||
*/
|
||||
protected TServer(TProcessor processor) {
|
||||
this(processor, new Options());
|
||||
}
|
||||
protected TServerTransport serverTransport_;
|
||||
|
||||
/**
|
||||
* Default constructor, all servers take a processor and some options.
|
||||
* Transport Factory
|
||||
*/
|
||||
protected TServer(TProcessor processor, Options options) {
|
||||
protected TTransportFactory transportFactory_;
|
||||
|
||||
/**
|
||||
* Default constructors.
|
||||
*/
|
||||
|
||||
protected TServer(TProcessor processor,
|
||||
TServerTransport serverTransport) {
|
||||
this(processor,
|
||||
serverTransport,
|
||||
new TBaseTransportFactory(),
|
||||
new Options());
|
||||
}
|
||||
|
||||
protected TServer(TProcessor processor,
|
||||
TServerTransport serverTransport,
|
||||
TTransportFactory transportFactory) {
|
||||
this(processor,
|
||||
serverTransport,
|
||||
transportFactory,
|
||||
new Options());
|
||||
}
|
||||
|
||||
|
||||
protected TServer(TProcessor processor,
|
||||
TServerTransport serverTransport,
|
||||
Options options) {
|
||||
this(processor,
|
||||
serverTransport,
|
||||
new TBaseTransportFactory(),
|
||||
options);
|
||||
}
|
||||
|
||||
protected TServer(TProcessor processor,
|
||||
TServerTransport serverTransport,
|
||||
TTransportFactory transportFactory,
|
||||
Options options) {
|
||||
processor_ = processor;
|
||||
serverTransport_ = serverTransport;
|
||||
transportFactory_ = transportFactory;
|
||||
options_ = options;
|
||||
}
|
||||
|
||||
|
@ -13,19 +13,9 @@ import com.facebook.thrift.transport.TTransportException;
|
||||
*/
|
||||
public class TSimpleServer extends TServer {
|
||||
|
||||
private TServerTransport serverTransport_;
|
||||
|
||||
public TSimpleServer(TProcessor processor,
|
||||
TServerTransport serverTransport) {
|
||||
this(processor, new TServer.Options(), serverTransport);
|
||||
}
|
||||
|
||||
|
||||
public TSimpleServer(TProcessor processor,
|
||||
TServer.Options options,
|
||||
TServerTransport serverTransport) {
|
||||
super(processor, options);
|
||||
serverTransport_ = serverTransport;
|
||||
super(processor, serverTransport);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
@ -38,18 +28,24 @@ public class TSimpleServer extends TServer {
|
||||
|
||||
while (true) {
|
||||
TTransport client = null;
|
||||
TTransport[] io = null;
|
||||
try {
|
||||
client = serverTransport_.accept();
|
||||
if (client != null) {
|
||||
while (processor_.process(client, client));
|
||||
io = transportFactory_.getIOTransports(client);
|
||||
while (processor_.process(io[0], io[1]));
|
||||
}
|
||||
} catch (TException tx) {
|
||||
tx.printStackTrace();
|
||||
}
|
||||
|
||||
if (client != null) {
|
||||
client.close();
|
||||
client = null;
|
||||
if (io != null) {
|
||||
if (io[0] != null) {
|
||||
io[0].close();
|
||||
}
|
||||
if (io[1] != null) {
|
||||
io[1].close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,8 @@ import com.facebook.thrift.TProcessor;
|
||||
import com.facebook.thrift.transport.TServerTransport;
|
||||
import com.facebook.thrift.transport.TTransport;
|
||||
import com.facebook.thrift.transport.TTransportException;
|
||||
import com.facebook.thrift.transport.TTransportFactory;
|
||||
import com.facebook.thrift.transport.TBaseTransportFactory;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
@ -20,28 +22,28 @@ import java.util.concurrent.TimeUnit;
|
||||
*/
|
||||
public class TThreadPoolServer extends TServer {
|
||||
|
||||
// Server transport
|
||||
private TServerTransport serverTransport_;
|
||||
|
||||
// Executor service for handling client connections
|
||||
private ExecutorService executorService_;
|
||||
|
||||
// Customizable server options
|
||||
public static class Options extends TServer.Options {
|
||||
public int port = 9190;
|
||||
public int minWorkerThreads = 5;
|
||||
public int maxWorkerThreads = Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
public TThreadPoolServer(TProcessor processor,
|
||||
TServerTransport serverTransport) {
|
||||
this(processor, new Options(), serverTransport);
|
||||
this(processor,
|
||||
serverTransport,
|
||||
new TBaseTransportFactory(),
|
||||
new Options());
|
||||
}
|
||||
|
||||
|
||||
public TThreadPoolServer(TProcessor processor,
|
||||
Options options,
|
||||
TServerTransport serverTransport) {
|
||||
super(processor, options);
|
||||
TServerTransport serverTransport,
|
||||
TTransportFactory transportFactory,
|
||||
Options options) {
|
||||
super(processor, serverTransport, transportFactory, options);
|
||||
serverTransport_ = serverTransport;
|
||||
executorService_ = null;
|
||||
|
||||
@ -95,12 +97,22 @@ public class TThreadPoolServer extends TServer {
|
||||
* Loops on processing a client forever
|
||||
*/
|
||||
public void run() {
|
||||
TTransport[] io = null;
|
||||
try {
|
||||
while (processor_.process(client_, client_)) {}
|
||||
io = transportFactory_.getIOTransports(client_);
|
||||
while (processor_.process(io[0], io[1])) {}
|
||||
} catch (TException tx) {
|
||||
tx.printStackTrace();
|
||||
}
|
||||
client_.close();
|
||||
|
||||
if (io != null) {
|
||||
if (io[0] != null) {
|
||||
io[0].close();
|
||||
}
|
||||
if (io[1] != null) {
|
||||
io[1].close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
23
lib/java/src/transport/TBaseTransportFactory.java
Normal file
23
lib/java/src/transport/TBaseTransportFactory.java
Normal file
@ -0,0 +1,23 @@
|
||||
package com.facebook.thrift.transport;
|
||||
|
||||
/**
|
||||
* Base transport factory just returns the arg transport.
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
*/
|
||||
public class TBaseTransportFactory implements TTransportFactory {
|
||||
|
||||
/**
|
||||
* Returns a list of two transports (input, output) from a simple
|
||||
* Transport.
|
||||
*
|
||||
* @param in The base transport
|
||||
* @returns Array of two transports, first for input, second for output
|
||||
*/
|
||||
public TTransport[] getIOTransports(TTransport in) {
|
||||
TTransport[] out = new TTransport[2];
|
||||
out[0] = out[1] = in;
|
||||
return out;
|
||||
}
|
||||
|
||||
}
|
21
lib/java/src/transport/TTransportFactory.java
Normal file
21
lib/java/src/transport/TTransportFactory.java
Normal file
@ -0,0 +1,21 @@
|
||||
package com.facebook.thrift.transport;
|
||||
|
||||
/**
|
||||
* Factory class used to create an input and output transport out of a simple
|
||||
* transport. This is used primarily in servers, which get Transports from
|
||||
* a ServerTransport and then may want to mutate them.
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
*/
|
||||
public interface TTransportFactory {
|
||||
|
||||
/**
|
||||
* Returns a list of two transports (input, output) from a simple
|
||||
* Transport.
|
||||
*
|
||||
* @param in The base transport
|
||||
* @returns Array of two transports, first for input, second for output
|
||||
*/
|
||||
public TTransport[] getIOTransports(TTransport in);
|
||||
|
||||
}
|
@ -8,8 +8,13 @@ class TServer:
|
||||
|
||||
"""Base interface for a server, which must have a run method."""
|
||||
|
||||
def __init__(self, proc):
|
||||
self.processor = proc
|
||||
def __init__(self, processor, serverTransport, transportFactory=None):
|
||||
self.processor = processor
|
||||
self.serverTransport = serverTransport
|
||||
if transportFactory == None:
|
||||
self.transportFactory = TTransport.TTransportFactoryBase()
|
||||
else:
|
||||
self.transportFactory = transportFactory
|
||||
|
||||
def run(self):
|
||||
pass
|
||||
@ -18,18 +23,20 @@ class TSimpleServer(TServer):
|
||||
|
||||
"""Simple single-threaded server that just pumps around one transport."""
|
||||
|
||||
def __init__(self, proc, trans):
|
||||
TServer.__init__(self, proc)
|
||||
self.transport = trans
|
||||
def __init__(self, processor, serverTransport, transportFactory=None):
|
||||
TServer.__init__(self, processor, serverTransport, transportFactory)
|
||||
|
||||
def run(self):
|
||||
self.transport.listen()
|
||||
self.serverTransport.listen()
|
||||
while True:
|
||||
client = TTransport.TBufferedTransport(self.transport.accept())
|
||||
client = self.serverTransport.accept()
|
||||
(input, output) = self.transportFactory.getIOTransports(client)
|
||||
try:
|
||||
while True:
|
||||
self.processor.process(client, client)
|
||||
self.processor.process(input, output)
|
||||
except Exception, x:
|
||||
print '%s, %s, %s' % (type(x), x, traceback.format_exc())
|
||||
print 'Client died.'
|
||||
client.close()
|
||||
|
||||
input.close()
|
||||
output.close()
|
||||
|
@ -21,8 +21,9 @@ class TSocket(TTransportBase):
|
||||
self.handle.connect((self.host, self.port))
|
||||
|
||||
def close(self):
|
||||
self.handle.close()
|
||||
self.handle = None
|
||||
if self.handle != None:
|
||||
self.handle.close()
|
||||
self.handle = None
|
||||
|
||||
def readAll(self, sz):
|
||||
buff = ''
|
||||
|
@ -36,6 +36,22 @@ class TServerTransportBase:
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
class TTransportFactoryBase:
|
||||
|
||||
"""Base class for a Transport Factory"""
|
||||
|
||||
def getIOTransports(self, trans):
|
||||
return (trans, trans)
|
||||
|
||||
class TBufferedTransportFactory:
|
||||
|
||||
"""Factory transport that builds buffered transports"""
|
||||
|
||||
def getIOTransports(self, trans):
|
||||
buffered = TBufferedTransport(trans)
|
||||
return (buffered, buffered)
|
||||
|
||||
|
||||
class TBufferedTransport(TTransportBase):
|
||||
|
||||
"""Class that wraps another transport and buffers its I/O."""
|
||||
|
@ -45,23 +45,28 @@ int main(int argc, char** argv) {
|
||||
}
|
||||
|
||||
shared_ptr<TSocket> socket(new TSocket(host, port));
|
||||
shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
|
||||
shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket));
|
||||
shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol());
|
||||
ThriftTestClient testClient(bufferedSocket, binaryProtocol);
|
||||
|
||||
|
||||
uint64_t time_min = 0;
|
||||
uint64_t time_max = 0;
|
||||
uint64_t time_tot = 0;
|
||||
|
||||
int test = 0;
|
||||
for (test = 0; test < numTests; ++test) {
|
||||
|
||||
/**
|
||||
* CONNECT TEST
|
||||
*/
|
||||
printf("Test #%d, connect %s:%d\n", test+1, host.c_str(), port);
|
||||
try {
|
||||
bufferedSocket->open();
|
||||
} catch (TTransportException& ttx) {
|
||||
printf("Connect failed: %s\n", ttx.getMessage().c_str());
|
||||
continue;
|
||||
}
|
||||
|
||||
/**
|
||||
* CONNECT TEST
|
||||
*/
|
||||
printf("Test #%d, connect %s:%d\n", test+1, host.c_str(), port);
|
||||
|
||||
uint64_t start = now();
|
||||
|
||||
@ -379,12 +384,29 @@ int main(int argc, char** argv) {
|
||||
}
|
||||
|
||||
uint64_t stop = now();
|
||||
uint64_t tot = stop-start;
|
||||
|
||||
printf("Total time: %lu us\n", stop-start);
|
||||
|
||||
time_tot += tot;
|
||||
if (time_min == 0 || tot < time_min) {
|
||||
time_min = tot;
|
||||
}
|
||||
if (tot > time_max) {
|
||||
time_max = tot;
|
||||
}
|
||||
|
||||
bufferedSocket->close();
|
||||
}
|
||||
|
||||
// printf("\nSocket syscalls: %u", g_socket_syscalls);
|
||||
printf("\nAll tests done.\n");
|
||||
|
||||
uint64_t time_avg = time_tot / numTests;
|
||||
|
||||
printf("Min time: %lu us\n", time_min);
|
||||
printf("Max time: %lu us\n", time_max);
|
||||
printf("Avg time: %lu us\n", time_avg);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <server/TSimpleServer.h>
|
||||
#include <server/TThreadPoolServer.h>
|
||||
#include <transport/TServerSocket.h>
|
||||
#include <transport/TBufferedTransportFactory.h>
|
||||
#include "ThriftTest.h"
|
||||
|
||||
#include <iostream>
|
||||
@ -53,23 +54,13 @@ class TestHandler : public ThriftTestIf {
|
||||
}
|
||||
|
||||
Xtruct testStruct(Xtruct thing) {
|
||||
printf("testStruct({\"%s\", %d, %d, %ld})\n",
|
||||
thing.string_thing.c_str(),
|
||||
(int)thing.byte_thing,
|
||||
thing.i32_thing,
|
||||
thing.i64_thing);
|
||||
printf("testStruct({\"%s\", %d, %d, %ld})\n", thing.string_thing.c_str(), (int)thing.byte_thing, thing.i32_thing, thing.i64_thing);
|
||||
return thing;
|
||||
}
|
||||
|
||||
Xtruct2 testNest(Xtruct2 nest) {
|
||||
Xtruct thing = nest.struct_thing;
|
||||
printf("testNest({%d, {\"%s\", %d, %d, %ld}, %d})\n",
|
||||
(int)nest.byte_thing,
|
||||
thing.string_thing.c_str(),
|
||||
(int)thing.byte_thing,
|
||||
thing.i32_thing,
|
||||
thing.i64_thing,
|
||||
nest.i32_thing);
|
||||
printf("testNest({%d, {\"%s\", %d, %d, %ld}, %d})\n", (int)nest.byte_thing, thing.string_thing.c_str(), (int)thing.byte_thing, thing.i32_thing, thing.i64_thing, nest.i32_thing);
|
||||
return nest;
|
||||
}
|
||||
|
||||
@ -205,11 +196,7 @@ class TestHandler : public ThriftTestIf {
|
||||
list<Xtruct>::const_iterator x;
|
||||
printf("{");
|
||||
for (x = xtructs.begin(); x != xtructs.end(); ++x) {
|
||||
printf("{\"%s\", %d, %d, %ld}, ",
|
||||
x->string_thing.c_str(),
|
||||
(int)x->byte_thing,
|
||||
x->i32_thing,
|
||||
x->i64_thing);
|
||||
printf("{\"%s\", %d, %d, %ld}, ", x->string_thing.c_str(), (int)x->byte_thing, x->i32_thing, x->i64_thing);
|
||||
}
|
||||
printf("}");
|
||||
|
||||
@ -347,18 +334,23 @@ int main(int argc, char **argv) {
|
||||
|
||||
shared_ptr<ThriftTestServer> testServer(new ThriftTestServer(testHandler, binaryProtocol));
|
||||
|
||||
// Options
|
||||
shared_ptr<TServerOptions> serverOptions(new TServerOptions());
|
||||
|
||||
// Transport
|
||||
shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
|
||||
|
||||
// Factory
|
||||
shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
|
||||
|
||||
// Options
|
||||
shared_ptr<TServerOptions> serverOptions(new TServerOptions());
|
||||
|
||||
if (serverType == "simple") {
|
||||
|
||||
// Server
|
||||
TSimpleServer simpleServer(testServer,
|
||||
serverOptions,
|
||||
serverSocket);
|
||||
serverSocket,
|
||||
transportFactory,
|
||||
serverOptions
|
||||
);
|
||||
|
||||
printf("Starting the server on port %d...\n", port);
|
||||
simpleServer.run();
|
||||
@ -376,9 +368,10 @@ int main(int argc, char **argv) {
|
||||
threadManager->start();
|
||||
|
||||
TThreadPoolServer threadPoolServer(testServer,
|
||||
serverOptions,
|
||||
serverSocket,
|
||||
threadManager);
|
||||
transportFactory,
|
||||
threadManager,
|
||||
serverOptions);
|
||||
|
||||
printf("Starting the server on port %d...\n", port);
|
||||
threadPoolServer.run();
|
||||
|
@ -42,13 +42,16 @@ public class TestClient {
|
||||
ThriftTest.Client testClient =
|
||||
new ThriftTest.Client(tSocket, binaryProtocol);
|
||||
|
||||
long timeMin = 0;
|
||||
long timeMax = 0;
|
||||
long timeTot = 0;
|
||||
|
||||
for (int test = 0; test < numTests; ++test) {
|
||||
|
||||
/**
|
||||
* CONNECT TEST
|
||||
*/
|
||||
System.out.println("Test #" + (test+1) + ", " +
|
||||
"connect " + host + ":" + port);
|
||||
System.out.println("Test #" + (test+1) + ", " + "connect " + host + ":" + port);
|
||||
try {
|
||||
tSocket.open();
|
||||
} catch (TTransportException ttx) {
|
||||
@ -56,7 +59,7 @@ public class TestClient {
|
||||
continue;
|
||||
}
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
long start = System.nanoTime();
|
||||
|
||||
/**
|
||||
* VOID TEST
|
||||
@ -110,11 +113,7 @@ public class TestClient {
|
||||
out.i32_thing = -3;
|
||||
out.i64_thing = -5;
|
||||
Xtruct in = testClient.testStruct(out);
|
||||
System.out.print(" = {" +
|
||||
"\"" + in.string_thing + "\", " +
|
||||
in.byte_thing + ", " +
|
||||
in.i32_thing + ", " +
|
||||
in.i64_thing + "}\n");
|
||||
System.out.print(" = {" + "\"" + in.string_thing + "\", " + in.byte_thing + ", " + in.i32_thing + ", " + in.i64_thing + "}\n");
|
||||
|
||||
/**
|
||||
* NESTED STRUCT TEST
|
||||
@ -126,13 +125,7 @@ public class TestClient {
|
||||
out2.i32_thing = 5;
|
||||
Xtruct2 in2 = testClient.testNest(out2);
|
||||
in = in2.struct_thing;
|
||||
System.out.print(" = {" +
|
||||
in2.byte_thing + ", {" +
|
||||
"\"" + in.string_thing + "\", " +
|
||||
in.byte_thing + ", " +
|
||||
in.i32_thing + ", " +
|
||||
in.i64_thing + "}, " +
|
||||
in2.i32_thing + "}\n");
|
||||
System.out.print(" = {" + in2.byte_thing + ", {" + "\"" + in.string_thing + "\", " + in.byte_thing + ", " + in.i32_thing + ", " + in.i64_thing + "}, " + in2.i32_thing + "}\n");
|
||||
|
||||
/**
|
||||
* MAP TEST
|
||||
@ -299,19 +292,14 @@ public class TestClient {
|
||||
HashMap<Integer, Long> userMap = v2.userMap;
|
||||
System.out.print("{");
|
||||
for (int k3 : userMap.keySet()) {
|
||||
System.out.print(k3 + " => " +
|
||||
userMap.get(k3) + ", ");
|
||||
System.out.print(k3 + " => " + userMap.get(k3) + ", ");
|
||||
}
|
||||
System.out.print("}, ");
|
||||
|
||||
ArrayList<Xtruct> xtructs = v2.xtructs;
|
||||
System.out.print("{");
|
||||
for (Xtruct x : xtructs) {
|
||||
System.out.print("{" +
|
||||
"\"" + x.string_thing + "\", " +
|
||||
x.byte_thing + ", " +
|
||||
x.i32_thing + ", "+
|
||||
x.i64_thing + "}, ");
|
||||
System.out.print("{" + "\"" + x.string_thing + "\", " + x.byte_thing + ", " + x.i32_thing + ", "+ x.i64_thing + "}, ");
|
||||
}
|
||||
System.out.print("}");
|
||||
|
||||
@ -321,14 +309,32 @@ public class TestClient {
|
||||
}
|
||||
System.out.print("}\n");
|
||||
|
||||
long stop = System.currentTimeMillis();
|
||||
System.out.println("Total time: " + (stop-start) + "ms");
|
||||
long stop = System.nanoTime();
|
||||
long tot = stop-start;
|
||||
|
||||
System.out.println("Total time: " + tot/1000 + "us");
|
||||
|
||||
if (timeMin == 0 || tot < timeMin) {
|
||||
timeMin = tot;
|
||||
}
|
||||
if (tot > timeMax) {
|
||||
timeMax = tot;
|
||||
}
|
||||
timeTot += tot;
|
||||
|
||||
tSocket.close();
|
||||
}
|
||||
|
||||
long timeAvg = timeTot / numTests;
|
||||
|
||||
System.out.println("Min time: " + timeMin/1000 + "us");
|
||||
System.out.println("Max time: " + timeMax/1000 + "us");
|
||||
System.out.println("Avg time: " + timeAvg/1000 + "us");
|
||||
|
||||
} catch (Exception x) {
|
||||
x.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ sys.path.append('./gen-py')
|
||||
|
||||
import ThriftTest
|
||||
from ThriftTest_types import *
|
||||
from thrift.transport import TTransport
|
||||
from thrift.transport import TSocket
|
||||
from thrift.protocol import TBinaryProtocol
|
||||
from thrift.server import TServer
|
||||
@ -54,5 +55,6 @@ transport = TSocket.TServerSocket(9090)
|
||||
protocol = TBinaryProtocol.TBinaryProtocol()
|
||||
handler = TestHandler()
|
||||
iface = ThriftTest.Server(handler, protocol)
|
||||
server = TServer.TSimpleServer(iface, transport)
|
||||
factory = TTransport.TBufferedTransportFactory()
|
||||
server = TServer.TSimpleServer(iface, transport, factory)
|
||||
server.run()
|
||||
|
Loading…
Reference in New Issue
Block a user