mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 18:58:51 +00:00
THRIFT-1348 C++ Qt bindings
Patch: Doug Rosvick qt-cleanup.patch applied git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1243124 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0bab154d9f
commit
19a991528f
@ -1,155 +1,157 @@
|
||||
#include "TQIODeviceTransport.h"
|
||||
|
||||
#include <QAbstractSocket>
|
||||
#include <QIODevice>
|
||||
|
||||
#include <transport/TBufferTransports.h>
|
||||
|
||||
#include <QAbstractSocket>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
using boost::shared_ptr;
|
||||
|
||||
namespace apache { namespace thrift { namespace transport {
|
||||
using boost::shared_ptr;
|
||||
|
||||
TQIODeviceTransport::TQIODeviceTransport(shared_ptr<QIODevice> dev)
|
||||
: dev_(dev)
|
||||
{
|
||||
TQIODeviceTransport::TQIODeviceTransport(shared_ptr<QIODevice> dev)
|
||||
: dev_(dev)
|
||||
{
|
||||
}
|
||||
|
||||
TQIODeviceTransport::~TQIODeviceTransport()
|
||||
{
|
||||
dev_->close();
|
||||
}
|
||||
|
||||
void TQIODeviceTransport::open()
|
||||
{
|
||||
if (!isOpen()) {
|
||||
throw TTransportException(TTransportException::NOT_OPEN,
|
||||
"open(): underlying QIODevice isn't open");
|
||||
}
|
||||
}
|
||||
|
||||
TQIODeviceTransport::~TQIODeviceTransport()
|
||||
{
|
||||
dev_->close();
|
||||
}
|
||||
bool TQIODeviceTransport::isOpen()
|
||||
{
|
||||
return dev_->isOpen();
|
||||
}
|
||||
|
||||
void TQIODeviceTransport::open()
|
||||
{
|
||||
if (!isOpen()) {
|
||||
throw TTransportException(TTransportException::NOT_OPEN,
|
||||
"open(): underlying QIODevice isn't open");
|
||||
}
|
||||
}
|
||||
bool TQIODeviceTransport::peek()
|
||||
{
|
||||
return dev_->bytesAvailable() > 0;
|
||||
}
|
||||
|
||||
bool TQIODeviceTransport::isOpen()
|
||||
{
|
||||
return dev_->isOpen();
|
||||
}
|
||||
void TQIODeviceTransport::close()
|
||||
{
|
||||
dev_->close();
|
||||
}
|
||||
|
||||
bool TQIODeviceTransport::peek()
|
||||
{
|
||||
return dev_->bytesAvailable() > 0;
|
||||
}
|
||||
|
||||
void TQIODeviceTransport::close()
|
||||
{
|
||||
dev_->close();
|
||||
}
|
||||
|
||||
uint32_t TQIODeviceTransport::readAll(uint8_t* buf, uint32_t len) {
|
||||
uint32_t requestLen = len;
|
||||
while (len) {
|
||||
uint32_t readSize;
|
||||
try {
|
||||
readSize = read(buf, len);
|
||||
} catch (...) {
|
||||
if (len != requestLen) {
|
||||
// something read already
|
||||
return requestLen - len;
|
||||
}
|
||||
// error but nothing read yet
|
||||
throw;
|
||||
}
|
||||
if (readSize == 0) {
|
||||
// dev_->waitForReadyRead(50);
|
||||
} else {
|
||||
buf += readSize;
|
||||
len -= readSize;
|
||||
uint32_t TQIODeviceTransport::readAll(uint8_t* buf, uint32_t len)
|
||||
{
|
||||
uint32_t requestLen = len;
|
||||
while (len) {
|
||||
uint32_t readSize;
|
||||
try {
|
||||
readSize = read(buf, len);
|
||||
} catch (...) {
|
||||
if (len != requestLen) {
|
||||
// something read already
|
||||
return requestLen - len;
|
||||
}
|
||||
// error but nothing read yet
|
||||
throw;
|
||||
}
|
||||
return requestLen;
|
||||
}
|
||||
|
||||
uint32_t TQIODeviceTransport::read(uint8_t* buf, uint32_t len)
|
||||
{
|
||||
uint32_t actualSize;
|
||||
qint64 readSize;
|
||||
|
||||
if (!dev_->isOpen()) {
|
||||
throw TTransportException(TTransportException::NOT_OPEN,
|
||||
"read(): underlying QIODevice is not open");
|
||||
}
|
||||
|
||||
actualSize = (uint32_t)std::min((qint64)len, dev_->bytesAvailable());
|
||||
readSize = dev_->read(reinterpret_cast<char *>(buf), len);
|
||||
|
||||
if (readSize < 0) {
|
||||
QAbstractSocket* socket;
|
||||
if ((socket = qobject_cast<QAbstractSocket* >(dev_.get()))) {
|
||||
throw TTransportException(TTransportException::UNKNOWN,
|
||||
"Failed to read() from QAbstractSocket",
|
||||
socket->error());
|
||||
}
|
||||
throw TTransportException(TTransportException::UNKNOWN,
|
||||
"Failed to read from from QIODevice");
|
||||
}
|
||||
|
||||
return (uint32_t)readSize;
|
||||
}
|
||||
|
||||
void TQIODeviceTransport::write(const uint8_t* buf, uint32_t len)
|
||||
{
|
||||
while (len) {
|
||||
uint32_t written = write_partial(buf, len);
|
||||
len -= written;
|
||||
// dev_->waitForBytesWritten(50);
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len)
|
||||
{
|
||||
qint64 written;
|
||||
|
||||
if (!dev_->isOpen()) {
|
||||
throw TTransportException(TTransportException::NOT_OPEN,
|
||||
"write_partial(): underlying QIODevice is not open");
|
||||
}
|
||||
|
||||
written = dev_->write(reinterpret_cast<const char*>(buf), len);
|
||||
if (written < 0) {
|
||||
QAbstractSocket* socket;
|
||||
if ((socket = qobject_cast<QAbstractSocket*>(dev_.get()))) {
|
||||
throw TTransportException(TTransportException::UNKNOWN,
|
||||
"write_partial(): failed to write to QAbstractSocket", socket->error());
|
||||
}
|
||||
|
||||
throw TTransportException(TTransportException::UNKNOWN,
|
||||
"write_partial(): failed to write to underlying QIODevice");
|
||||
}
|
||||
|
||||
return (uint32_t)written;
|
||||
}
|
||||
|
||||
void TQIODeviceTransport::flush()
|
||||
{
|
||||
if (!dev_->isOpen()) {
|
||||
throw TTransportException(TTransportException::NOT_OPEN,
|
||||
"flush(): underlying QIODevice is not open");
|
||||
}
|
||||
|
||||
QAbstractSocket* socket;
|
||||
|
||||
if ((socket = qobject_cast<QAbstractSocket*>(dev_.get()))) {
|
||||
socket->flush();
|
||||
if (readSize == 0) {
|
||||
dev_->waitForReadyRead(50);
|
||||
} else {
|
||||
dev_->waitForBytesWritten(1);
|
||||
buf += readSize;
|
||||
len -= readSize;
|
||||
}
|
||||
}
|
||||
return requestLen;
|
||||
}
|
||||
|
||||
uint8_t* TQIODeviceTransport::borrow(uint8_t* buf, uint32_t* len)
|
||||
{
|
||||
return NULL;
|
||||
uint32_t TQIODeviceTransport::read(uint8_t* buf, uint32_t len)
|
||||
{
|
||||
uint32_t actualSize;
|
||||
qint64 readSize;
|
||||
|
||||
if (!dev_->isOpen()) {
|
||||
throw TTransportException(TTransportException::NOT_OPEN,
|
||||
"read(): underlying QIODevice is not open");
|
||||
}
|
||||
|
||||
void TQIODeviceTransport::consume(uint32_t len)
|
||||
{
|
||||
throw TTransportException(TTransportException::UNKNOWN);
|
||||
actualSize = (uint32_t)std::min((qint64)len, dev_->bytesAvailable());
|
||||
readSize = dev_->read(reinterpret_cast<char *>(buf), actualSize);
|
||||
|
||||
if (readSize < 0) {
|
||||
QAbstractSocket* socket;
|
||||
if ((socket = qobject_cast<QAbstractSocket* >(dev_.get()))) {
|
||||
throw TTransportException(TTransportException::UNKNOWN,
|
||||
"Failed to read() from QAbstractSocket",
|
||||
socket->error());
|
||||
}
|
||||
throw TTransportException(TTransportException::UNKNOWN,
|
||||
"Failed to read from from QIODevice");
|
||||
}
|
||||
|
||||
return (uint32_t)readSize;
|
||||
}
|
||||
|
||||
void TQIODeviceTransport::write(const uint8_t* buf, uint32_t len)
|
||||
{
|
||||
while (len) {
|
||||
uint32_t written = write_partial(buf, len);
|
||||
len -= written;
|
||||
dev_->waitForBytesWritten(50);
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len)
|
||||
{
|
||||
qint64 written;
|
||||
|
||||
if (!dev_->isOpen()) {
|
||||
throw TTransportException(TTransportException::NOT_OPEN,
|
||||
"write_partial(): underlying QIODevice is not open");
|
||||
}
|
||||
|
||||
written = dev_->write(reinterpret_cast<const char*>(buf), len);
|
||||
if (written < 0) {
|
||||
QAbstractSocket* socket;
|
||||
if ((socket = qobject_cast<QAbstractSocket*>(dev_.get()))) {
|
||||
throw TTransportException(TTransportException::UNKNOWN,
|
||||
"write_partial(): failed to write to QAbstractSocket", socket->error());
|
||||
}
|
||||
|
||||
throw TTransportException(TTransportException::UNKNOWN,
|
||||
"write_partial(): failed to write to underlying QIODevice");
|
||||
}
|
||||
|
||||
return (uint32_t)written;
|
||||
}
|
||||
|
||||
void TQIODeviceTransport::flush()
|
||||
{
|
||||
if (!dev_->isOpen()) {
|
||||
throw TTransportException(TTransportException::NOT_OPEN,
|
||||
"flush(): underlying QIODevice is not open");
|
||||
}
|
||||
|
||||
QAbstractSocket* socket;
|
||||
|
||||
if ((socket = qobject_cast<QAbstractSocket*>(dev_.get()))) {
|
||||
socket->flush();
|
||||
} else {
|
||||
dev_->waitForBytesWritten(1);
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t* TQIODeviceTransport::borrow(uint8_t* buf, uint32_t* len)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void TQIODeviceTransport::consume(uint32_t len)
|
||||
{
|
||||
throw TTransportException(TTransportException::UNKNOWN);
|
||||
}
|
||||
|
||||
}}} // apache::thrift::transport
|
||||
|
||||
|
@ -1,47 +1,44 @@
|
||||
#ifndef _THRIFT_ASYNC_TQIODEVICE_TRANSPORT_H_
|
||||
#define _THRIFT_ASYNC_TQIODEVICE_TRANSPORT_H_ 1
|
||||
|
||||
#include <QIODevice>
|
||||
|
||||
#include <boost/shared_ptr.hpp>
|
||||
|
||||
#include <transport/TVirtualTransport.h>
|
||||
|
||||
namespace apache { namespace thrift { namespace protocol {
|
||||
class TProtocol;
|
||||
}}} // apache::thrift::protocol
|
||||
class QIODevice;
|
||||
|
||||
namespace apache { namespace thrift { namespace transport {
|
||||
|
||||
class TQIODeviceTransport : public apache::thrift::transport::TVirtualTransport<TQIODeviceTransport> {
|
||||
public:
|
||||
explicit TQIODeviceTransport(boost::shared_ptr<QIODevice> dev);
|
||||
~TQIODeviceTransport();
|
||||
/**
|
||||
* Transport that operates on a QIODevice (socket, file, etc).
|
||||
*/
|
||||
class TQIODeviceTransport : public apache::thrift::transport::TVirtualTransport<TQIODeviceTransport> {
|
||||
public:
|
||||
explicit TQIODeviceTransport(boost::shared_ptr<QIODevice> dev);
|
||||
virtual ~TQIODeviceTransport();
|
||||
|
||||
void open();
|
||||
void open();
|
||||
bool isOpen();
|
||||
bool peek();
|
||||
void close();
|
||||
|
||||
bool isOpen();
|
||||
|
||||
bool peek();
|
||||
|
||||
void close();
|
||||
|
||||
uint32_t readAll(uint8_t *buf, uint32_t len);
|
||||
uint32_t readAll(uint8_t *buf, uint32_t len);
|
||||
uint32_t read(uint8_t* buf, uint32_t len);
|
||||
|
||||
uint32_t read(uint8_t* buf, uint32_t len);
|
||||
void write(const uint8_t* buf, uint32_t len);
|
||||
uint32_t write_partial(const uint8_t* buf, uint32_t len);
|
||||
|
||||
void write(const uint8_t* buf, uint32_t len);
|
||||
void flush();
|
||||
|
||||
uint32_t write_partial(const uint8_t* buf, uint32_t len);
|
||||
uint8_t* borrow(uint8_t* buf, uint32_t* len);
|
||||
void consume(uint32_t len);
|
||||
|
||||
void flush();
|
||||
|
||||
uint8_t* borrow(uint8_t* buf, uint32_t* len);
|
||||
void consume(uint32_t len);
|
||||
|
||||
private:
|
||||
boost::shared_ptr<QIODevice> dev_;
|
||||
};
|
||||
private:
|
||||
TQIODeviceTransport(const TQIODeviceTransport&);
|
||||
TQIODeviceTransport& operator=(const TQIODeviceTransport&);
|
||||
|
||||
boost::shared_ptr<QIODevice> dev_;
|
||||
};
|
||||
}}} // apache::thrift::transport
|
||||
|
||||
#endif // #ifndef _THRIFT_ASYNC_TQIODEVICE_TRANSPORT_H_
|
||||
|
@ -1,11 +1,13 @@
|
||||
#include "TQTcpServer.h"
|
||||
|
||||
#include <protocol/TProtocol.h>
|
||||
#include <async/TAsyncProcessor.h>
|
||||
#include "TQTcpServer.h"
|
||||
#include "TQIODeviceTransport.h"
|
||||
|
||||
#include <QTcpSocket>
|
||||
|
||||
#include "TQIODeviceTransport.h"
|
||||
#include <tr1/functional>
|
||||
|
||||
#include <protocol/TProtocol.h>
|
||||
#include <async/TAsyncProcessor.h>
|
||||
|
||||
using boost::shared_ptr;
|
||||
using apache::thrift::protocol::TProtocol;
|
||||
@ -91,8 +93,7 @@ void TQTcpServer::beginDecode()
|
||||
QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender()));
|
||||
Q_ASSERT(connection);
|
||||
|
||||
if (ctxMap_.find(connection) == ctxMap_.end())
|
||||
{
|
||||
if (ctxMap_.find(connection) == ctxMap_.end()) {
|
||||
qWarning("[TQTcpServer] Got data on an unknown QTcpSocket");
|
||||
return;
|
||||
}
|
||||
@ -119,8 +120,7 @@ void TQTcpServer::socketClosed()
|
||||
QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender()));
|
||||
Q_ASSERT(connection);
|
||||
|
||||
if (ctxMap_.find(connection) == ctxMap_.end())
|
||||
{
|
||||
if (ctxMap_.find(connection) == ctxMap_.end()) {
|
||||
qWarning("[TQTcpServer] Unknown QTcpSocket closed");
|
||||
return;
|
||||
}
|
||||
@ -130,8 +130,7 @@ void TQTcpServer::socketClosed()
|
||||
|
||||
void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy)
|
||||
{
|
||||
if (!healthy)
|
||||
{
|
||||
if (!healthy) {
|
||||
qWarning("[TQTcpServer] Processor failed to process data successfully");
|
||||
ctxMap_.erase(ctx->connection_.get());
|
||||
}
|
||||
|
@ -6,10 +6,7 @@
|
||||
|
||||
#include <boost/shared_ptr.hpp>
|
||||
|
||||
#include <tr1/functional>
|
||||
|
||||
namespace apache { namespace thrift { namespace protocol {
|
||||
class TProtocol;
|
||||
class TProtocolFactory;
|
||||
}}} // apache::thrift::protocol
|
||||
|
||||
@ -17,6 +14,11 @@ namespace apache { namespace thrift { namespace async {
|
||||
|
||||
class TAsyncProcessor;
|
||||
|
||||
/**
|
||||
* Server that uses Qt to listen for connections.
|
||||
* Simply give it a QTcpServer that is listening, along with an async
|
||||
* processor and a protocol factory, and then run the Qt event loop.
|
||||
*/
|
||||
class TQTcpServer : public QObject {
|
||||
Q_OBJECT
|
||||
public:
|
||||
@ -32,6 +34,9 @@ class TQTcpServer : public QObject {
|
||||
void socketClosed();
|
||||
|
||||
private:
|
||||
TQTcpServer(const TQTcpServer&);
|
||||
TQTcpServer& operator=(const TQTcpServer&);
|
||||
|
||||
class ConnectionContext;
|
||||
|
||||
void finish(boost::shared_ptr<ConnectionContext> ctx, bool healthy);
|
||||
|
Loading…
Reference in New Issue
Block a user