More boosification of thrift driver, server, transport and protocol code

Modified TestServer to use thread-pool manager 


	


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664738 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Marc Slemko 2006-08-04 03:16:25 +00:00
parent 1669885931
commit 6be374b55b
3 changed files with 154 additions and 34 deletions

View File

@ -4,6 +4,10 @@
# Mark Slee <mcslee@facebook.com>
# Default target is everything
ifndef thrift_home
thrift_home=../../build
endif #thrift_home
target: all
# Tools
@ -11,6 +15,8 @@ THRIFT = thrift
CC = g++
LD = g++
include_flags = $(patsubst %,-I$(thrift_home)/include/%, thrift boost-1_33_1)
# Compiler flags
LIBS = ../../lib/cpp/src/server/TSimpleServer.cc \
../../lib/cpp/src/protocol/TBinaryProtocol.cc \
@ -18,8 +24,8 @@ LIBS = ../../lib/cpp/src/server/TSimpleServer.cc \
../../lib/cpp/src/transport/TChunkedTransport.cc \
../../lib/cpp/src/transport/TServerSocket.cc \
../../lib/cpp/src/transport/TSocket.cc
DCFL = -Wall -O3 -g -Igen-cpp -I../../lib/cpp/src $(LIBS)
CFL = -Wall -O3 -Igen-cpp -I../../lib/cpp/src -lthrift
DCFL = -Wall -O3 -g -Igen-cpp $(include_flags) $(LIBS)
CFL = -Wall -O3 -Igen-cpp $(include_flags) -L$(thrift_home)/lib -lthrift
all: server client

View File

@ -1,13 +1,20 @@
#include <stdio.h>
#include <unistd.h>
#include <sys/time.h>
#include "protocol/TBinaryProtocol.h"
#include "transport/TBufferedTransport.h"
#include "transport/TSocket.h"
#include "ThriftTest.h"
using namespace std;
#include <protocol/TBinaryProtocol.h>
#include <transport/TBufferedTransport.h>
#include <transport/TSocket.h>
extern uint32_t g_socket_syscalls;
#include <boost/shared_ptr.hpp>
#include "ThriftTest.h"
using namespace boost;
using namespace std;
using namespace facebook::thrift;
using namespace facebook::thrift::protocol;
using namespace facebook::thrift::transport;
//extern uint32_t g_socket_syscalls;
// Current time, microseconds since the epoch
uint64_t now()
@ -36,10 +43,10 @@ int main(int argc, char** argv) {
numTests = atoi(argv[3]);
}
TSocket socket(host, port);
TBufferedTransport bufferedSocket(&socket, 2048);
TBinaryProtocol binaryProtocol;
ThriftTestClient testClient(&bufferedSocket, &binaryProtocol);
shared_ptr<TSocket> socket(new TSocket(host, port));
shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol());
ThriftTestClient testClient(bufferedSocket, binaryProtocol);
int test = 0;
for (test = 0; test < numTests; ++test) {
@ -49,7 +56,7 @@ int main(int argc, char** argv) {
*/
printf("Test #%d, connect %s:%d\n", test+1, host.c_str(), port);
try {
bufferedSocket.open();
bufferedSocket->open();
} catch (TTransportException& ttx) {
printf("Connect failed: %s\n", ttx.getMessage().c_str());
continue;
@ -320,10 +327,10 @@ int main(int argc, char** argv) {
uint64_t stop = now();
printf("Total time: %lu us\n", stop-start);
bufferedSocket.close();
bufferedSocket->close();
}
printf("\nSocket syscalls: %u", g_socket_syscalls);
// printf("\nSocket syscalls: %u", g_socket_syscalls);
printf("\nAll tests done.\n");
return 0;
}

View File

@ -1,13 +1,25 @@
#include <stdio.h>
#include "protocol/TBinaryProtocol.h"
#include "server/TSimpleServer.h"
#include "transport/TServerSocket.h"
#include <concurrency/ThreadManager.h>
#include <concurrency/PosixThreadFactory.h>
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <server/TThreadPoolServer.h>
#include <transport/TServerSocket.h>
#include "ThriftTest.h"
#include <iostream>
#include <stdexcept>
#include <sstream>
using namespace std;
using namespace facebook::thrift;
using namespace facebook::thrift::protocol;
using namespace facebook::thrift::transport;
using namespace facebook::thrift::server;
class TestServer : public ThriftTestServerIf {
public:
TestServer(TProtocol* protocol) :
TestServer(shared_ptr<TProtocol> protocol) :
ThriftTestServerIf(protocol) {}
void testVoid() {
@ -203,32 +215,127 @@ class TestServer : public ThriftTestServerIf {
return insane;
}
};
int main(int argc, char **argv) {
int port = 9090;
if (argc > 1) {
port = atoi(argv[1]);
string serverType = "simple";
string protocolType = "binary";
size_t workerCount = 4;
ostringstream usage;
usage <<
argv[0] << " [--port=<port number>] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>]" << endl <<
"\t\tserver-type\t\ttype of server, \"simple-server\" or \"thread-pool\". Default is " << serverType << endl <<
"\t\tprotocol-type\t\ttype of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
"\t\tworkers\t\tNumber of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
map<string, string> args;
for(int ix = 1; ix < argc; ix++) {
string arg(argv[ix]);
if(arg.compare(0,2, "--") == 0) {
size_t end = arg.find_first_of("=", 2);
if(end != string::npos) {
args[string(arg, 2, end - 2)] = string(arg, end + 1);
} else {
args[string(arg, 2, end - 2)] = "true";
}
ix++;
} else {
throw invalid_argument("Unexcepted command line token: "+arg);
}
}
try {
if(!args["port"].empty()) {
port = atoi(args["port"].c_str());
}
if(!args["server-type"].empty()) {
serverType = args["server-type"];
if(serverType == "simple") {
} else if(serverType == "thread-pool") {
} else {
throw invalid_argument("Unknown server type "+serverType);
}
}
if(!args["protocol-type"].empty()) {
protocolType = args["protocol-type"];
if(protocolType == "binary") {
} else if(protocolType == "ascii") {
throw invalid_argument("ASCII protocol not supported");
} else if(protocolType == "xml") {
throw invalid_argument("XML protocol not supported");
} else {
throw invalid_argument("Unknown protocol type "+protocolType);
}
}
if(!args["workers"].empty()) {
workerCount = atoi(args["workers"].c_str());
}
} catch(exception& e) {
cerr << e.what() << endl;
cerr << usage;
}
// Dispatcher
TBinaryProtocol binaryProtocol;
TestServer testServer(&binaryProtocol);
shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol);
shared_ptr<TestServer> testServer(new TestServer(binaryProtocol));
// Options
TServerOptions serverOptions;
shared_ptr<TServerOptions> serverOptions(new TServerOptions());
// Transport
TServerSocket serverSocket(port);
shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
// Server
TSimpleServer simpleServer(&testServer,
&serverOptions,
&serverSocket);
if(serverType == "simple") {
// Server
TSimpleServer simpleServer(testServer,
serverOptions,
serverSocket);
printf("Starting the server on port %d...\n", port);
simpleServer.run();
} else if(serverType == "thread-pool") {
shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
threadManager->threadFactory(threadFactory);
threadManager->start();
TThreadPoolServer threadPoolServer(testServer,
serverOptions,
serverSocket,
threadManager);
printf("Starting the server on port %d...\n", port);
threadPoolServer.run();
}
printf("Starting the server on port %d...\n", port);
simpleServer.run();
printf("done.\n");
return 0;
}