diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index 92ccdf103..b205e787c 100644 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -13,6 +13,7 @@ libthrift_sources = src/Thrift.cpp \ src/concurrency/TimerManager.cpp \ src/protocol/TBinaryProtocol.cpp \ src/transport/TFileTransport.cpp \ + src/transport/THttpClient.cpp \ src/transport/TSocket.cpp \ src/transport/TServerSocket.cpp \ src/transport/TTransportUtils.cpp \ @@ -60,6 +61,7 @@ include_transport_HEADERS = \ src/transport/TFileTransport.h \ src/transport/TServerSocket.h \ src/transport/TServerTransport.h \ + src/transport/THttpClient.h \ src/transport/TSocket.h \ src/transport/TTransport.h \ src/transport/TTransportException.h \ diff --git a/lib/cpp/src/transport/THttpClient.cpp b/lib/cpp/src/transport/THttpClient.cpp new file mode 100644 index 000000000..7ee83fc32 --- /dev/null +++ b/lib/cpp/src/transport/THttpClient.cpp @@ -0,0 +1,318 @@ +#include "THttpClient.h" +#include "TSocket.h" + +namespace facebook { namespace thrift { namespace transport { + +using namespace std; + +/** + * Http client implementation. + * + * @author Mark Slee + */ + +// Yeah, yeah, hacky to put these here, I know. +static const char* CRLF = "\r\n"; +static const int CRLF_LEN = 2; + + THttpClient::THttpClient(boost::shared_ptr transport, string host, string path) : + transport_(transport), + host_(host), + path_(path), + readHeaders_(true), + chunked_(false), + chunkSize_(0), + contentLength_(0), + httpBuf_(NULL), + httpBufPos_(0), + httpBufSize_(1024) { + init(); +} + +THttpClient::THttpClient(string host, int port, string path) : + host_(host), + path_(path), + readHeaders_(true), + chunked_(false), + chunkSize_(0), + contentLength_(0), + httpBuf_(NULL), + httpBufPos_(0), + httpBufSize_(1024) { + transport_ = boost::shared_ptr(new TSocket(host, port)); + init(); +} + +void THttpClient::init() { + httpBuf_ = (char*)malloc(httpBufSize_+1); + if (httpBuf_ == NULL) { + throw TTransportException("Out of memory."); + } +} + +THttpClient::~THttpClient() { + if (httpBuf_ != NULL) { + free(httpBuf_); + } +} + +uint32_t THttpClient::read(uint8_t* buf, uint32_t len) { + if (readBuffer_.available() == 0) { + readBuffer_.resetBuffer(); + uint32_t got = readMoreData(); + if (got == 0) { + return 0; + } + } + return readBuffer_.read(buf, len); +} + +uint32_t THttpClient::readMoreData() { + // Get more data! + refill(); + + if (readHeaders_) { + readHeaders(); + } + + if (chunked_) { + return readChunked(); + } else { + char* read; + read = readContent((char*)httpBuf_, contentLength_); + shift(read); + return contentLength_; + } +} + +uint32_t THttpClient::readChunked() { + uint32_t length = 0; + char* nextLine = (char*)httpBuf_; + while (true) { + char* line = nextLine; + nextLine = readLine(nextLine); + uint32_t chunkSize = parseChunkSize(line); + if (chunkSize == 0) { + break; + } + // Read data content + nextLine = readContent(nextLine, chunkSize); + length += chunkSize; + + // Read trailing CRLF after content + nextLine = readLine(nextLine); + } + + // Read footer lines until a blank one appears + while (true) { + char* line = nextLine; + nextLine = readLine(nextLine); + if (strlen(line) == 0) { + break; + } + } + + // Shift down whatever we have left in the buf + shift(nextLine); + + return length; +} + +uint32_t THttpClient::parseChunkSize(char* line) { + char* semi = strchr(line, ';'); + if (semi != NULL) { + *semi = '\0'; + } + int s; + int size; + s = sscanf(line, "%x", &size); + return (uint32_t)size; +} + +char* THttpClient::readContent(char* pos, uint32_t size) { + uint32_t need = size; + + while (need > 0) { + uint32_t avail = httpBufPos_ - (pos - httpBuf_); + if (avail == 0) { + refill(); + } + uint32_t give = avail; + if (need < give) { + give = need; + } + readBuffer_.write((uint8_t*)pos, give); + pos += give; + need -= give; + } + return pos; +} + +char* THttpClient::readLine(char* pos) { + while (true) { + char* eol = NULL; + + // Note, the data we read could have ended right on the CRLF pair + if (pos != NULL) { + eol = strstr(pos, CRLF); + } + + // No CRLF yet? + if (eol == NULL) { + // Shift whatever we have now to front + pos = shift(pos); + // Refill the buffer + refill(); + } else { + // Return pointer to next line + *eol = '\0'; + return eol + CRLF_LEN; + } + } + +} + +char* THttpClient::shift(char* pos) { + if (pos != NULL && httpBufPos_ > (pos - httpBuf_)) { + // Shift down remaining data and read more + uint32_t length = httpBufPos_ - (pos - httpBuf_); + memmove(httpBuf_, pos, length); + httpBufPos_ = length; + } else { + httpBufPos_ = 0; + } + httpBuf_[httpBufPos_] = '\0'; + return httpBuf_; +} + +void THttpClient::refill() { + uint32_t avail = httpBufSize_ - httpBufPos_; + if (avail <= (httpBufSize_ / 4)) { + httpBufSize_ *= 2; + httpBuf_ = (char*)realloc(httpBuf_, httpBufSize_); + if (httpBuf_ == NULL) { + throw TTransportException("Out of memory."); + } + } + + // Read more data + uint32_t got = transport_->read((uint8_t*)(httpBuf_+httpBufPos_), httpBufSize_-httpBufPos_); + httpBufPos_ += got; + httpBuf_[httpBufPos_] = '\0'; + + if (got == 0) { + throw TTransportException("Could not finish reading HTTP headers"); + } +} + +void THttpClient::readHeaders() { + // Initialize headers state variables + contentLength_ = 0; + chunked_ = false; + chunkSize_ = 0; + + // Control state flow + bool statusLine = true; + bool finished = false; + + // Initialize local pos vars + char* nextLine = (char*)httpBuf_; + + // Loop until headers are finished + while (true) { + char* line = nextLine; + nextLine = readLine(nextLine); + + if (strlen(line) == 0) { + if (finished) { + readHeaders_ = false; + shift(nextLine); + return; + } else { + // Must have been an HTTP 100, keep going for another status line + statusLine = true; + } + } else { + if (statusLine) { + statusLine = false; + finished = parseStatusLine(line); + } else { + parseHeader(line); + } + } + } +} + +bool THttpClient::parseStatusLine(char* status) { + char* http = status; + + char* code = strchr(http, ' '); + *code = '\0'; + + while (*(code++) == ' '); + + char* msg = strchr(code, ' '); + *msg = '\0'; + + if (strcmp(code, "200") == 0) { + // HTTP 200 = OK, we got the response + return true; + } else if (strcmp(code, "100") == 0) { + // HTTP 100 = continue, just keep reading + return false; + } else { + throw TTransportException(status); + } +} + +void THttpClient::parseHeader(char* header) { + char* colon = strchr(header, ':'); + if (colon == NULL) { + return; + } + uint32_t sz = colon - header; + char* value = colon+1; + + if (strncmp(header, "Transfer-Encoding", sz) == 0) { + if (strstr(value, "chunked") != NULL) { + chunked_ = true; + } + } else if (strncmp(header, "Content-Length", sz) == 0) { + chunked_ = false; + contentLength_ = atoi(value); + } +} + +void THttpClient::write(const uint8_t* buf, uint32_t len) { + writeBuffer_.write(buf, len); +} + +void THttpClient::flush() { + // Fetch the contents of the write buffer + uint8_t* buf; + uint32_t len; + writeBuffer_.getBuffer(&buf, &len); + + // Construct the HTTP header + std::ostringstream h; + h << + "POST " << path_ << " HTTP/1.1" << CRLF << + "Host: " << host_ << CRLF << + "Content-Type: application/x-thrift" << CRLF << + "Content-Length: " << len << CRLF << + "Accept: application/x-thrift" << CRLF << + "User-Agent: C++/THttpClient" << CRLF << + CRLF; + string header = h.str(); + + // Write the header, then the data, then flush + transport_->write((const uint8_t*)header.c_str(), header.size()); + transport_->write(buf, len); + transport_->flush(); + + // Reset the buffer and header variables + writeBuffer_.resetBuffer(); + readHeaders_ = true; +} + +}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/THttpClient.h b/lib/cpp/src/transport/THttpClient.h new file mode 100644 index 000000000..e7f88c9dd --- /dev/null +++ b/lib/cpp/src/transport/THttpClient.h @@ -0,0 +1,88 @@ +#ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_ +#define _THRIFT_TRANSPORT_THTTPCLIENT_H_ 1 + +#include + +namespace facebook { namespace thrift { namespace transport { + +/** + * HTTP client implementation of the thrift transport. This was irritating + * to write, but the alternatives in C++ land are daunting. Linking CURL + * requires 23 dynamic libraries last time I checked (WTF?!?). All we have + * here is a VERY basic HTTP/1.1 client which supports HTTP 100 Continue, + * chunked transfer encoding, keepalive, etc. Tested against Apache. + * + * @author Mark Slee + */ +class THttpClient : public TTransport { + public: + THttpClient(boost::shared_ptr transport, std::string host, std::string path=""); + + THttpClient(std::string host, int port, std::string path=""); + + virtual ~THttpClient(); + + void open() { + transport_->open(); + } + + bool isOpen() { + return transport_->isOpen(); + } + + bool peek() { + return transport_->peek(); + } + + void close() { + transport_->close(); + } + + uint32_t read(uint8_t* buf, uint32_t len); + + void write(const uint8_t* buf, uint32_t len); + + void flush(); + + private: + void init(); + + protected: + + boost::shared_ptr transport_; + + TMemoryBuffer writeBuffer_; + TMemoryBuffer readBuffer_; + + std::string host_; + std::string path_; + + bool readHeaders_; + bool chunked_; + uint32_t chunkSize_; + uint32_t contentLength_; + + char* httpBuf_; + uint32_t httpBufPos_; + uint32_t httpBufSize_; + + uint32_t readMoreData(); + char* readLine(char* line); + + void readHeaders(); + void parseHeader(char* header); + bool parseStatusLine(char* status); + + uint32_t readChunked(); + uint32_t parseChunkSize(char* line); + + char* readContent(char* pos, uint32_t size); + + void refill(); + char* shift(char* pos); + +}; + +}}} // facebook::thrift::transport + +#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_ diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h index 3decb2cf6..9fbdb20d8 100644 --- a/lib/cpp/src/transport/TSocket.h +++ b/lib/cpp/src/transport/TSocket.h @@ -40,11 +40,10 @@ class TSocket : public TTransport { */ TSocket(std::string host, int port); - /** * Destroyes the socket object, closing it if necessary. */ - ~TSocket(); + virtual ~TSocket(); /** * Whether the socket is alive. @@ -125,8 +124,7 @@ class TSocket : public TTransport { */ void setSendTimeout(int ms); - - private: + protected: /** * Constructor to create socket from raw UNIX handle. Never called directly * but used by the TServerSocket class. diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp index 54b1f3f8f..adcb01325 100644 --- a/lib/cpp/src/transport/TTransportUtils.cpp +++ b/lib/cpp/src/transport/TTransportUtils.cpp @@ -180,6 +180,9 @@ void TFramedTransport::flush() { uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) { // Check avaible data for reading uint32_t avail = wPos_ - rPos_; + if (avail == 0) { + return 0; + } // Device how much to give uint32_t give = len; diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h index 06547c7b9..545a0a19b 100644 --- a/lib/cpp/src/transport/TTransportUtils.h +++ b/lib/cpp/src/transport/TTransportUtils.h @@ -312,6 +312,10 @@ class TMemoryBuffer : public TTransport { void write(const uint8_t* buf, uint32_t len); + uint32_t available() { + return wPos_ - rPos_; + } + private: // Data buffer uint8_t* buffer_;