THRIFT-2819

Client Node
Patch: Chi Vinh Le

Adds websocket client to Node with tests
This commit is contained in:
Randy Abernethy 2014-11-15 23:05:22 -08:00
parent c118db2ce4
commit 2e091f681b
6 changed files with 438 additions and 26 deletions

View File

@ -31,6 +31,11 @@ exports.HttpConnection = httpConnection.HttpConnection;
exports.createHttpConnection = httpConnection.createHttpConnection;
exports.createHttpClient = httpConnection.createHttpClient;
var wsConnection = require('./ws_connection');
exports.WSConnection = wsConnection.WSConnection;
exports.createWSConnection = wsConnection.createWSConnection;
exports.createWSClient = wsConnection.createWSClient;
var server = require('./server');
exports.createServer = server.createServer;
exports.createMultiplexServer = server.createMultiplexServer;

View File

@ -27,7 +27,7 @@ var MultiplexedProcessor = require('./multiplexed_processor').MultiplexedProcess
var TTransport = require('./transport');
var TBufferedTransport = require('./transport').TBufferedTransport;
var TBinaryProtocol = require('./protocol').TBinaryProtocol;
var TJSONProtocol = require('./protocol').TJSONProtocol;
// WSFrame constructor and prototype
/////////////////////////////////////////////////////////////////////
@ -452,11 +452,12 @@ exports.createWebServer = function(options) {
///////////////////////////////////////////////////
function processWS(data, socket, svc, binEncoding) {
svc.transport.receiver(function(transportWithData) {
var binary = svc.protocol != TJSONProtocol;
var input = new svc.protocol(transportWithData);
var output = new svc.protocol(new svc.transport(undefined, function(buf) {
try {
var frame = wsFrame.encode(buf, null, binEncoding);
socket.write(frame);
socket.write(frame, null, binary);
} catch (err) {
//TODO: Add better error processing
}
@ -519,10 +520,11 @@ exports.createWebServer = function(options) {
"\r\n");
//Handle WebSocket traffic
var data = null;
var binary = svc.protocol != TJSONProtocol;
socket.on('data', function(frame) {
try {
while (frame) {
var result = wsFrame.decode(frame);
var result = wsFrame.decode(frame, null, binary);
//Prepend any existing decoded data
if (data) {
if (result.data) {

View File

@ -0,0 +1,296 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
var util = require('util');
var WebSocket = require('ws');
var EventEmitter = require("events").EventEmitter;
var thrift = require('./thrift');
var ttransport = require('./transport');
var tprotocol = require('./protocol');
/**
* @class
* @name WSConnectOptions
* @property {string} transport - The Thrift layered transport to use (TBufferedTransport, etc).
* @property {string} protocol - The Thrift serialization protocol to use (TJSONProtocol, etc.).
* @property {string} path - The URL path to connect to (e.g. "/", "/mySvc", "/thrift/quoteSvc", etc.).
* @property {object} headers - A standard Node.js header hash, an object hash containing key/value
* pairs where the key is the header name string and the value is the header value string.
* @property {boolean} secure - True causes the connection to use wss, otherwise ws is used.
* @property {object} wsOptions - Options passed on to WebSocket.
* @example
* //Use a secured websocket connection
* // uses the buffered transport layer, uses the JSON protocol and directs RPC traffic
* // to wss://thrift.example.com:9090/hello
* var thrift = require('thrift');
* var options = {
* transport: thrift.TBufferedTransport,
* protocol: thrift.TJSONProtocol,
* path: "/hello",
* secure: true
* };
* var con = thrift.createWSConnection("thrift.example.com", 9090, options);
* con.open()
* var client = thrift.createWSClient(myService, connection);
* client.myServiceFunction();
* con.close()
*/
/**
* Initializes a Thrift WSConnection instance (use createWSConnection() rather than
* instantiating directly).
* @constructor
* @param {string} host - The host name or IP to connect to.
* @param {number} port - The TCP port to connect to.
* @param {WSConnectOptions} options - The configuration options to use.
* @throws {error} Exceptions other than ttransport.InputBufferUnderrunError are rethrown
* @event {error} The "error" event is fired when a Node.js error event occurs during
* request or response processing, in which case the node error is passed on. An "error"
* event may also be fired when the connectison can not map a response back to the
* appropriate client (an internal error), generating a TApplicationException.
* @classdesc WSConnection objects provide Thrift end point transport
* semantics implemented using Websockets.
* @see {@link createWSConnection}
*/
var WSConnection = exports.WSConnection = function(host, port, options) {
//Initialize the emitter base object
EventEmitter.call(this);
//Set configuration
var self = this;
this.options = options || {};
this.host = host;
this.port = port;
this.secure = this.options.secure || false;
this.transport = this.options.transport || ttransport.TBufferedTransport;
this.protocol = this.options.protocol || tprotocol.TJSONProtocol;
this.path = this.options.path;
this.send_pending = [];
//The sequence map is used to map seqIDs back to the
// calling client in multiplexed scenarios
this.seqId2Service = {};
//Prepare WebSocket options
this.wsOptions = {
host: this.host,
port: this.port || 80,
path: this.options.path || '/',
headers: this.options.headers || {}
};
for (var attrname in this.options.wsOptions) {
this.wsOptions[attrname] = this.options.wsOptions[attrname];
}
};
util.inherits(WSConnection, EventEmitter);
WSConnection.prototype.__reset = function() {
this.socket = null; //The web socket
this.send_pending = []; //Buffers/Callback pairs waiting to be sent
};
WSConnection.prototype.__onOpen = function() {
var self = this;
this.emit("open");
if (this.send_pending.length > 0) {
//If the user made calls before the connection was fully
//open, send them now
this.send_pending.forEach(function(data) {
self.socket.send(data);
});
this.send_pending = [];
}
};
WSConnection.prototype.__onClose = function(evt) {
this.emit("close");
this.__reset();
};
WSConnection.prototype.__decodeCallback = function(transport_with_data) {
var proto = new this.protocol(transport_with_data);
try {
while (true) {
var header = proto.readMessageBegin();
var dummy_seqid = header.rseqid * -1;
var client = this.client;
//The Multiplexed Protocol stores a hash of seqid to service names
// in seqId2Service. If the SeqId is found in the hash we need to
// lookup the appropriate client for this call.
// The client var is a single client object when not multiplexing,
// when using multiplexing it is a service name keyed hash of client
// objects.
//NOTE: The 2 way interdependencies between protocols, transports,
// connections and clients in the Node.js implementation are irregular
// and make the implementation difficult to extend and maintain. We
// should bring this stuff inline with typical thrift I/O stack
// operation soon.
// --ra
var service_name = this.seqId2Service[header.rseqid];
if (service_name) {
client = this.client[service_name];
delete this.seqId2Service[header.rseqid];
}
/*jshint -W083 */
client._reqs[dummy_seqid] = function(err, success) {
transport_with_data.commitPosition();
var clientCallback = client._reqs[header.rseqid];
delete client._reqs[header.rseqid];
if (clientCallback) {
clientCallback(err, success);
}
};
/*jshint +W083 */
if (client['recv_' + header.fname]) {
client['recv_' + header.fname](proto, header.mtype, dummy_seqid);
} else {
delete client._reqs[dummy_seqid];
this.emit("error",
new thrift.TApplicationException(
thrift.TApplicationExceptionType.WRONG_METHOD_NAME,
"Received a response to an unknown RPC function"));
}
}
} catch (e) {
if (e instanceof ttransport.InputBufferUnderrunError) {
transport_with_data.rollbackPosition();
} else {
throw e;
}
}
};
WSConnection.prototype.__onData = function(data) {
if (Object.prototype.toString.call(data) == "[object ArrayBuffer]") {
data = new Uint8Array(data);
}
var buf = new Buffer(data);
this.transport.receiver(this.__decodeCallback.bind(this))(buf);
};
WSConnection.prototype.__onMessage = function(evt) {
this.__onData(evt.data);
};
WSConnection.prototype.__onError = function(evt) {
this.emit("error", evt);
this.socket.close();
};
/**
* Returns true if the transport is open
* @readonly
* @returns {boolean}
*/
WSConnection.prototype.isOpen = function() {
return this.socket && this.socket.readyState == this.socket.OPEN;
};
/**
* Opens the transport connection
*/
WSConnection.prototype.open = function() {
//If OPEN/CONNECTING/CLOSING ignore additional opens
if (this.socket && this.socket.readyState != this.socket.CLOSED) {
return;
}
//If there is no socket or the socket is closed:
this.socket = new WebSocket(this.uri(), "", this.wsOptions);
this.socket.binaryType = 'arraybuffer';
this.socket.onopen = this.__onOpen.bind(this);
this.socket.onmessage = this.__onMessage.bind(this);
this.socket.onerror = this.__onError.bind(this);
this.socket.onclose = this.__onClose.bind(this);
};
/**
* Closes the transport connection
*/
WSConnection.prototype.close = function() {
this.socket.close();
};
/**
* Return URI for the connection
* @returns {string} URI
*/
WSConnection.prototype.uri = function() {
var schema = this.secure ? 'wss' : 'ws';
var port = '';
var path = this.path || '/';
var host = this.host;
// avoid port if default for schema
if (this.port && (('wss' == schema && this.port != 443) ||
('ws' == schema && this.port != 80))) {
port = ':' + this.port;
}
return schema + '://' + host + port + path;
};
/**
* Writes Thrift message data to the connection
* @param {Buffer} data - A Node.js Buffer containing the data to write
* @returns {void} No return value.
* @event {error} the "error" event is raised upon request failure passing the
* Node.js error object to the listener.
*/
WSConnection.prototype.write = function(data) {
if (this.isOpen()) {
//Send data and register a callback to invoke the client callback
this.socket.send(data);
} else {
//Queue the send to go out __onOpen
this.send_pending.push(data);
}
};
/**
* Creates a new WSConnection object, used by Thrift clients to connect
* to Thrift HTTP based servers.
* @param {string} host - The host name or IP to connect to.
* @param {number} port - The TCP port to connect to.
* @param {WSConnectOptions} options - The configuration options to use.
* @returns {WSConnection} The connection object.
* @see {@link WSConnectOptions}
*/
exports.createWSConnection = function(host, port, options) {
return new WSConnection(host, port, options);
};
/**
* Creates a new client object for the specified Thrift service.
* @param {object} cls - The module containing the service client
* @param {WSConnection} wsConnection - The connection to use.
* @returns {object} The client object.
* @see {@link createWSConnection}
*/
exports.createWSClient = function(cls, wsConnection) {
if (cls.Client) {
cls = cls.Client;
}
wsConnection.client =
new cls(new wsConnection.transport(undefined, function(buf) {
wsConnection.write(buf);
}),
wsConnection.protocol);
return wsConnection.client;
};

View File

@ -2,38 +2,44 @@
"name": "thrift",
"description": "node.js bindings for the Apache Thrift RPC system",
"homepage": "http://thrift.apache.org/",
"repository":
{ "type" : "git",
"url" : "https://git-wip-us.apache.org/repos/asf/thrift.git"
},
"repository": {
"type": "git",
"url": "https://git-wip-us.apache.org/repos/asf/thrift.git"
},
"version": "1.0.0-dev",
"author":
{ "name": "Apache Thrift Developers",
"email": "dev@thrift.apache.org",
"url": "http://thrift.apache.org"
},
"licenses":
[ { "type": "Apache-2.0",
"url": "http://www.apache.org/licenses/LICENSE-2.0"
}
],
"bugs":
{ "mail": "dev@thrift.apache.org",
"url": "https://issues.apache.org/jira/browse/THRIFT"
},
"directories" : { "lib" : "./lib/thrift" },
"author": {
"name": "Apache Thrift Developers",
"email": "dev@thrift.apache.org",
"url": "http://thrift.apache.org"
},
"licenses": [
{
"type": "Apache-2.0",
"url": "http://www.apache.org/licenses/LICENSE-2.0"
}
],
"bugs": {
"mail": "dev@thrift.apache.org",
"url": "https://issues.apache.org/jira/browse/THRIFT"
},
"directories": {
"lib": "./lib/thrift"
},
"main": "./lib/thrift",
"engines": { "node": ">= 0.2.4" },
"engines": {
"node": ">= 0.2.4"
},
"dependencies": {
"node-int64": "~0.3.0",
"q": "1.0.x",
"nodeunit": "~0.8.0"
"q": "1.0.x",
"nodeunit": "~0.8.0",
"ws": "~0.4.32"
},
"devDependencies": {
"connect": "2.7.x",
"commander": "2.1.x"
},
"scripts": {
"test" : "test/testAll.sh"
"test": "test/testAll.sh"
}
}

View File

@ -59,6 +59,18 @@ testHttpClientServer()
return $RET
}
testWSClientServer()
{
echo " Testing WebSocket Client/Server with protocol $1 and transport $2 $3";
RET=0
node ${DIR}/http_server.js -p $1 -t $2 $3 &
SERVERPID=$!
sleep 1
node ${DIR}/ws_client.js -p $1 -t $2 $3 || RET=1
kill -9 $SERVERPID || RET=1
return $RET
}
TESTOK=0
@ -104,4 +116,14 @@ testHttpClientServer binary framed || TESTOK=1
testHttpClientServer json buffered --promise || TESTOK=1
testHttpClientServer binary framed --ssl || TESTOK=1
#WebSocket tests
testWSClientServer compact buffered || TESTOK=1
testWSClientServer compact framed || TESTOK=1
testWSClientServer json buffered || TESTOK=1
testWSClientServer json framed || TESTOK=1
testWSClientServer binary buffered || TESTOK=1
testWSClientServer binary framed || TESTOK=1
testWSClientServer json buffered --promise || TESTOK=1
testWSClientServer binary framed --ssl || TESTOK=1
exit $TESTOK

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
//This is the client side test for the standard Apache Thrift
//"ThriftTest" suite. This client will test any protocol/transport
//combination specified on the command line.
var fs = require('fs');
var assert = require('assert');
var thrift = require('thrift');
var ThriftTest = require('./gen-nodejs/ThriftTest');
var ThriftTestDriver = require('./thrift_test_driver').ThriftTestDriver;
var ThriftTestDriverPromise = require('./thrift_test_driver_promise').ThriftTestDriver;
var program = require('commander');
program
.option('-p, --protocol <protocol>', 'Set thrift protocol (binary|json) [protocol]')
.option('-t, --transport <transport>', 'Set thrift transport (buffered|framed) [transport]')
.option('--ssl', 'use wss instead of ws')
.option('--promise', 'test with promise style functions')
.parse(process.argv);
var protocol = thrift.TBinaryProtocol;
if (program.protocol === "json") {
protocol = thrift.TJSONProtocol;
}
var transport = thrift.TBufferedTransport;
if (program.transport === "framed") {
transport = thrift.TFramedTransport;
}
var options = {
transport: transport,
protocol: protocol,
path: "/test"
};
if (program.ssl) {
options.wsOptions = { rejectUnauthorized: false };
options.secure = true;
}
var connection = thrift.createWSConnection("localhost", 9090, options);
connection.open();
var client = thrift.createWSClient(ThriftTest, connection);
connection.on('error', function(err) {
assert(false, err);
});
var testDriver = ThriftTestDriver;
if (program.promise) {
console.log(" --Testing promise style client");
testDriver = ThriftTestDriverPromise;
}
testDriver(client, function (status) {
console.log(status);
process.exit(0);
});
// to make it also run on expresso
exports.expressoTest = function() {};