mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
26681fbdb7
Patch: Zezeng Wang This closes #2168
98 lines
3.0 KiB
C++
98 lines
3.0 KiB
C++
#include <tr1/functional>
|
|
#include <thrift/protocol/TBinaryProtocol.h>
|
|
#include <thrift/async/TAsyncProtocolProcessor.h>
|
|
#include <thrift/async/TEvhttpServer.h>
|
|
#include <thrift/async/TEvhttpClientChannel.h>
|
|
#include "Aggr.h"
|
|
|
|
using std::tr1::bind;
|
|
using std::tr1::placeholders::_1;
|
|
|
|
using apache::thrift::TException;
|
|
using apache::thrift::protocol::TBinaryProtocolFactory;
|
|
using apache::thrift::protocol::TProtocolFactory;
|
|
using apache::thrift::async::TEvhttpServer;
|
|
using apache::thrift::async::TAsyncProcessor;
|
|
using apache::thrift::async::TAsyncBufferProcessor;
|
|
using apache::thrift::async::TAsyncProtocolProcessor;
|
|
using apache::thrift::async::TAsyncChannel;
|
|
using apache::thrift::async::TEvhttpClientChannel;
|
|
|
|
class AggrAsyncHandler : public AggrCobSvIf {
|
|
protected:
|
|
struct RequestContext {
|
|
std::tr1::function<void(std::vector<int32_t> const& _return)> cob;
|
|
std::vector<int32_t> ret;
|
|
int pending_calls;
|
|
};
|
|
|
|
public:
|
|
AggrAsyncHandler()
|
|
: eb_(nullptr)
|
|
, pfact_(new TBinaryProtocolFactory())
|
|
{
|
|
leaf_ports_.push_back(8081);
|
|
leaf_ports_.push_back(8082);
|
|
}
|
|
|
|
void addValue(std::tr1::function<void()> cob, const int32_t value) {
|
|
// Silently drop writes to the aggrgator.
|
|
return cob();
|
|
}
|
|
|
|
void getValues(std::tr1::function<void(
|
|
std::vector<int32_t> const& _return)> cob,
|
|
std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
|
|
RequestContext* ctx = new RequestContext();
|
|
ctx->cob = cob;
|
|
ctx->pending_calls = leaf_ports_.size();
|
|
for (std::vector<int>::iterator it = leaf_ports_.begin();
|
|
it != leaf_ports_.end(); ++it) {
|
|
boost::shared_ptr<TAsyncChannel> channel(
|
|
new TEvhttpClientChannel(
|
|
"localhost", "/", "127.0.0.1", *it, eb_));
|
|
AggrCobClient* client = new AggrCobClient(channel, pfact_.get());
|
|
client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1));
|
|
}
|
|
}
|
|
|
|
void setEventBase(struct event_base* eb) {
|
|
eb_ = eb;
|
|
}
|
|
|
|
void clientReturn(RequestContext* ctx, AggrCobClient* client) {
|
|
ctx->pending_calls -= 1;
|
|
|
|
try {
|
|
std::vector<int32_t> subret;
|
|
client->recv_getValues(subret);
|
|
ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end());
|
|
} catch (TException& exn) {
|
|
// TODO: Log error
|
|
}
|
|
|
|
delete client;
|
|
|
|
if (ctx->pending_calls == 0) {
|
|
ctx->cob(ctx->ret);
|
|
delete ctx;
|
|
}
|
|
}
|
|
|
|
protected:
|
|
struct event_base* eb_;
|
|
std::vector<int> leaf_ports_;
|
|
boost::shared_ptr<TProtocolFactory> pfact_;
|
|
};
|
|
|
|
|
|
int main() {
|
|
boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler());
|
|
boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler));
|
|
boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory());
|
|
boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact));
|
|
boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080));
|
|
handler->setEventBase(server->getEventBase());
|
|
server->serve();
|
|
}
|