thrift/contrib/async-test/test-server.cpp
Roger Meier 49ff8b123a THRIFT-1552 Include paths for c/c++ should be prefixed with 'thrift/'
To ensure there are no include path collisions the C and C++ header
include paths should include 'thrift' as the root leaf. This will
prevent having to place /usr/include/thrift into the compilers include
header search path, which might otherwise result in the compiler
accidentally picking up headers that it shouldn't.

e.g. #include <foo/bar.h> should be #include <thrift/foo/bar.h>

Change-Id: I48f2b0f549bda0fc81e85506ac857adc800b98a1

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1325674 13f79535-47bb-0310-9956-ffa450edef68
2012-04-13 09:12:31 +00:00

98 lines
3.0 KiB
C++

#include <tr1/functional>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/async/TAsyncProtocolProcessor.h>
#include <thrift/async/TEvhttpServer.h>
#include <thrift/async/TEvhttpClientChannel.h>
#include "Aggr.h"
using std::tr1::bind;
using std::tr1::placeholders::_1;
using apache::thrift::TException;
using apache::thrift::protocol::TBinaryProtocolFactory;
using apache::thrift::protocol::TProtocolFactory;
using apache::thrift::async::TEvhttpServer;
using apache::thrift::async::TAsyncProcessor;
using apache::thrift::async::TAsyncBufferProcessor;
using apache::thrift::async::TAsyncProtocolProcessor;
using apache::thrift::async::TAsyncChannel;
using apache::thrift::async::TEvhttpClientChannel;
class AggrAsyncHandler : public AggrCobSvIf {
protected:
struct RequestContext {
std::tr1::function<void(std::vector<int32_t> const& _return)> cob;
std::vector<int32_t> ret;
int pending_calls;
};
public:
AggrAsyncHandler()
: eb_(NULL)
, pfact_(new TBinaryProtocolFactory())
{
leaf_ports_.push_back(8081);
leaf_ports_.push_back(8082);
}
void addValue(std::tr1::function<void()> cob, const int32_t value) {
// Silently drop writes to the aggrgator.
return cob();
}
void getValues(std::tr1::function<void(
std::vector<int32_t> const& _return)> cob,
std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
RequestContext* ctx = new RequestContext();
ctx->cob = cob;
ctx->pending_calls = leaf_ports_.size();
for (std::vector<int>::iterator it = leaf_ports_.begin();
it != leaf_ports_.end(); ++it) {
boost::shared_ptr<TAsyncChannel> channel(
new TEvhttpClientChannel(
"localhost", "/", "127.0.0.1", *it, eb_));
AggrCobClient* client = new AggrCobClient(channel, pfact_.get());
client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1));
}
}
void setEventBase(struct event_base* eb) {
eb_ = eb;
}
void clientReturn(RequestContext* ctx, AggrCobClient* client) {
ctx->pending_calls -= 1;
try {
std::vector<int32_t> subret;
client->recv_getValues(subret);
ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end());
} catch (TException& exn) {
// TODO: Log error
}
delete client;
if (ctx->pending_calls == 0) {
ctx->cob(ctx->ret);
delete ctx;
}
}
protected:
struct event_base* eb_;
std::vector<int> leaf_ports_;
boost::shared_ptr<TProtocolFactory> pfact_;
};
int main() {
boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler());
boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler));
boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory());
boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact));
boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080));
handler->setEventBase(server->getEventBase());
server->serve();
}