mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 18:58:51 +00:00
042580f534
* make use of C++11 override keyword * added const specifier to TTransport::getOrigin() * added more const correctness to the compiler * make use of auto keyword * replaced usage of NULL with nullptr * make use of explicitly-defaulted function definition * extended changelog
243 lines
7.3 KiB
C++
243 lines
7.3 KiB
C++
/*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
|
|
#include <boost/test/auto_unit_test.hpp>
|
|
#include <boost/thread.hpp>
|
|
#include <iostream>
|
|
#include <climits>
|
|
#include <vector>
|
|
#include <thrift/concurrency/Monitor.h>
|
|
#include <thrift/protocol/TBinaryProtocol.h>
|
|
#include <thrift/protocol/TJSONProtocol.h>
|
|
#include <thrift/server/TThreadedServer.h>
|
|
#include <thrift/transport/THttpServer.h>
|
|
#include <thrift/transport/THttpClient.h>
|
|
#include <thrift/transport/TServerSocket.h>
|
|
#include <thrift/transport/TSocket.h>
|
|
#include <memory>
|
|
#include <thrift/transport/TBufferTransports.h>
|
|
#include "gen-cpp/OneWayService.h"
|
|
|
|
BOOST_AUTO_TEST_SUITE(OneWayHTTPTest)
|
|
|
|
using namespace apache::thrift;
|
|
using apache::thrift::protocol::TProtocol;
|
|
using apache::thrift::protocol::TBinaryProtocol;
|
|
using apache::thrift::protocol::TBinaryProtocolFactory;
|
|
using apache::thrift::protocol::TJSONProtocol;
|
|
using apache::thrift::protocol::TJSONProtocolFactory;
|
|
using apache::thrift::server::TThreadedServer;
|
|
using apache::thrift::server::TServerEventHandler;
|
|
using apache::thrift::transport::TTransport;
|
|
using apache::thrift::transport::THttpServer;
|
|
using apache::thrift::transport::THttpServerTransportFactory;
|
|
using apache::thrift::transport::THttpClient;
|
|
using apache::thrift::transport::TBufferedTransport;
|
|
using apache::thrift::transport::TBufferedTransportFactory;
|
|
using apache::thrift::transport::TMemoryBuffer;
|
|
using apache::thrift::transport::TServerSocket;
|
|
using apache::thrift::transport::TSocket;
|
|
using apache::thrift::transport::TTransportException;
|
|
using std::shared_ptr;
|
|
using std::cout;
|
|
using std::cerr;
|
|
using std::endl;
|
|
using std::string;
|
|
namespace utf = boost::unit_test;
|
|
|
|
// Define this env var to enable some logging (in case you need to debug)
|
|
#undef ENABLE_STDERR_LOGGING
|
|
|
|
class OneWayServiceHandler : public onewaytest::OneWayServiceIf {
|
|
public:
|
|
OneWayServiceHandler() = default;
|
|
|
|
void roundTripRPC() override {
|
|
#ifdef ENABLE_STDERR_LOGGING
|
|
cerr << "roundTripRPC()" << endl;
|
|
#endif
|
|
}
|
|
void oneWayRPC() override {
|
|
#ifdef ENABLE_STDERR_LOGGING
|
|
cerr << "oneWayRPC()" << std::endl ;
|
|
#endif
|
|
}
|
|
};
|
|
|
|
class OneWayServiceCloneFactory : virtual public onewaytest::OneWayServiceIfFactory {
|
|
public:
|
|
~OneWayServiceCloneFactory() override = default;
|
|
onewaytest::OneWayServiceIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
|
|
{
|
|
(void)connInfo ;
|
|
return new OneWayServiceHandler;
|
|
}
|
|
void releaseHandler( onewaytest::OneWayServiceIf* handler) override {
|
|
delete handler;
|
|
}
|
|
};
|
|
|
|
class RPC0ThreadClass {
|
|
public:
|
|
RPC0ThreadClass(TThreadedServer& server) : server_(server) { } // Constructor
|
|
~RPC0ThreadClass() = default; // Destructor
|
|
|
|
void Run() {
|
|
server_.serve() ;
|
|
}
|
|
TThreadedServer& server_ ;
|
|
} ;
|
|
|
|
using apache::thrift::concurrency::Monitor;
|
|
using apache::thrift::concurrency::Mutex;
|
|
using apache::thrift::concurrency::Synchronized;
|
|
|
|
// copied from IntegrationTest
|
|
class TServerReadyEventHandler : public TServerEventHandler, public Monitor {
|
|
public:
|
|
TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
|
|
~TServerReadyEventHandler() override = default;
|
|
void preServe() override {
|
|
Synchronized sync(*this);
|
|
isListening_ = true;
|
|
notify();
|
|
}
|
|
void* createContext(shared_ptr<TProtocol> input,
|
|
shared_ptr<TProtocol> output) override {
|
|
Synchronized sync(*this);
|
|
++accepted_;
|
|
notify();
|
|
|
|
(void)input;
|
|
(void)output;
|
|
return nullptr;
|
|
}
|
|
bool isListening() const { return isListening_; }
|
|
uint64_t acceptedCount() const { return accepted_; }
|
|
|
|
private:
|
|
bool isListening_;
|
|
uint64_t accepted_;
|
|
};
|
|
|
|
class TBlockableBufferedTransport : public TBufferedTransport {
|
|
public:
|
|
TBlockableBufferedTransport(std::shared_ptr<TTransport> transport)
|
|
: TBufferedTransport(transport, 10240),
|
|
blocked_(false) {
|
|
}
|
|
|
|
uint32_t write_buffer_length() {
|
|
auto have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get());
|
|
return have_bytes ;
|
|
}
|
|
|
|
void block() {
|
|
blocked_ = true ;
|
|
#ifdef ENABLE_STDERR_LOGGING
|
|
cerr << "block flushing\n" ;
|
|
#endif
|
|
}
|
|
void unblock() {
|
|
blocked_ = false ;
|
|
#ifdef ENABLE_STDERR_LOGGING
|
|
cerr << "unblock flushing, buffer is\n<<" << std::string((char *)wBuf_.get(), write_buffer_length()) << ">>\n" ;
|
|
#endif
|
|
}
|
|
|
|
void flush() override {
|
|
if (blocked_) {
|
|
#ifdef ENABLE_STDERR_LOGGING
|
|
cerr << "flush was blocked\n" ;
|
|
#endif
|
|
return ;
|
|
}
|
|
TBufferedTransport::flush() ;
|
|
}
|
|
|
|
bool blocked_ ;
|
|
} ;
|
|
|
|
BOOST_AUTO_TEST_CASE( JSON_BufferedHTTP )
|
|
{
|
|
std::shared_ptr<TServerSocket> ss = std::make_shared<TServerSocket>(0) ;
|
|
TThreadedServer server(
|
|
std::make_shared<onewaytest::OneWayServiceProcessorFactory>(std::make_shared<OneWayServiceCloneFactory>()),
|
|
ss, //port
|
|
std::make_shared<THttpServerTransportFactory>(),
|
|
std::make_shared<TJSONProtocolFactory>());
|
|
|
|
std::shared_ptr<TServerReadyEventHandler> pEventHandler(new TServerReadyEventHandler) ;
|
|
server.setServerEventHandler(pEventHandler);
|
|
|
|
#ifdef ENABLE_STDERR_LOGGING
|
|
cerr << "Starting the server...\n";
|
|
#endif
|
|
RPC0ThreadClass t(server) ;
|
|
boost::thread thread(&RPC0ThreadClass::Run, &t);
|
|
|
|
{
|
|
Synchronized sync(*(pEventHandler.get()));
|
|
while (!pEventHandler->isListening()) {
|
|
pEventHandler->wait();
|
|
}
|
|
}
|
|
|
|
int port = ss->getPort() ;
|
|
#ifdef ENABLE_STDERR_LOGGING
|
|
cerr << "port " << port << endl ;
|
|
#endif
|
|
|
|
{
|
|
std::shared_ptr<TSocket> socket(new TSocket("localhost", port));
|
|
socket->setRecvTimeout(10000) ; // 1000msec should be enough
|
|
std::shared_ptr<TBlockableBufferedTransport> blockable_transport(new TBlockableBufferedTransport(socket));
|
|
std::shared_ptr<TTransport> transport(new THttpClient(blockable_transport, "localhost", "/service"));
|
|
std::shared_ptr<TProtocol> protocol(new TJSONProtocol(transport));
|
|
onewaytest::OneWayServiceClient client(protocol);
|
|
|
|
|
|
transport->open();
|
|
client.roundTripRPC();
|
|
blockable_transport->block() ;
|
|
uint32_t size0 = blockable_transport->write_buffer_length() ;
|
|
client.send_oneWayRPC() ;
|
|
uint32_t size1 = blockable_transport->write_buffer_length() ;
|
|
client.send_oneWayRPC() ;
|
|
uint32_t size2 = blockable_transport->write_buffer_length() ;
|
|
BOOST_CHECK((size1 - size0) == (size2 - size1)) ;
|
|
blockable_transport->unblock() ;
|
|
client.send_roundTripRPC() ;
|
|
blockable_transport->flush() ;
|
|
try {
|
|
client.recv_roundTripRPC() ;
|
|
} catch (const TTransportException &e) {
|
|
BOOST_ERROR( "we should not get a transport exception -- this means we failed: " + std::string(e.what()) ) ;
|
|
}
|
|
transport->close();
|
|
}
|
|
server.stop();
|
|
thread.join() ;
|
|
#ifdef ENABLE_STDERR_LOGGING
|
|
cerr << "finished.\n";
|
|
#endif
|
|
}
|
|
|
|
BOOST_AUTO_TEST_SUITE_END()
|