From 879cab2cea799625635d861f5beb89039da5f38c Mon Sep 17 00:00:00 2001 From: Roger Meier Date: Sat, 3 May 2014 17:51:21 +0200 Subject: [PATCH] THRIFT-1914 Python: Support for Multiplexing Services on any Transport, Protocol and Server Patch: smallfish & djwatson & haijunz & Roger Meier This closes #103 and #82 From 7aaea7ef4e6f44097b02543fa2e62597eae9d61e Mon Sep 17 00:00:00 2001 From: smallfish Date: Tue, 22 Apr 2014 11:26:52 +0800 Subject: [PATCH] THRIFT-1914 Python: Support for Multiplexing Services on any Transport --- lib/py/src/TMultiplexedProcessor.py | 58 +++++++++++++++++++++ lib/py/src/protocol/TMultiplexedProtocol.py | 39 ++++++++++++++ lib/py/src/protocol/TProtocolDecorator.py | 42 +++++++++++++++ test/py/TestClient.py | 18 ++++++- test/py/TestServer.py | 22 ++++++-- 5 files changed, 174 insertions(+), 5 deletions(-) create mode 100644 lib/py/src/TMultiplexedProcessor.py create mode 100644 lib/py/src/protocol/TMultiplexedProtocol.py create mode 100644 lib/py/src/protocol/TProtocolDecorator.py diff --git a/lib/py/src/TMultiplexedProcessor.py b/lib/py/src/TMultiplexedProcessor.py new file mode 100644 index 000000000..a8d5565c3 --- /dev/null +++ b/lib/py/src/TMultiplexedProcessor.py @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from thrift.Thrift import TProcessor, TMessageType, TException +from thrift.protocol import TProtocolDecorator, TMultiplexedProtocol + +class TMultiplexedProcessor(TProcessor): + def __init__(self): + self.services = {} + + def registerProcessor(self, serviceName, processor): + self.services[serviceName] = processor + + def process(self, iprot, oprot): + (name, type, seqid) = iprot.readMessageBegin(); + if type != TMessageType.CALL & type != TMessageType.ONEWAY: + raise TException("TMultiplex protocol only supports CALL & ONEWAY") + + index = name.find(TMultiplexedProtocol.SEPARATOR) + if index < 0: + raise TException("Service name not found in message name: " + name + ". Did you forget to use TMultiplexProtocol in your client?") + + serviceName = name[0:index] + call = name[index+len(TMultiplexedProtocol.SEPARATOR):] + if not serviceName in self.services: + raise TException("Service name not found: " + serviceName + ". Did you forget to call registerProcessor()?") + + standardMessage = ( + call, + type, + seqid + ) + return self.services[serviceName].process(StoredMessageProtocol(iprot, standardMessage), oprot) + + +class StoredMessageProtocol(TProtocolDecorator.TProtocolDecorator): + def __init__(self, protocol, messageBegin): + TProtocolDecorator.TProtocolDecorator.__init__(self, protocol) + self.messageBegin = messageBegin + + def readMessageBegin(self): + return self.messageBegin diff --git a/lib/py/src/protocol/TMultiplexedProtocol.py b/lib/py/src/protocol/TMultiplexedProtocol.py new file mode 100644 index 000000000..d25f367b5 --- /dev/null +++ b/lib/py/src/protocol/TMultiplexedProtocol.py @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from thrift.Thrift import TMessageType +from thrift.protocol import TProtocolDecorator + +SEPARATOR = ":" + +class TMultiplexedProtocol(TProtocolDecorator.TProtocolDecorator): + def __init__(self, protocol, serviceName): + TProtocolDecorator.TProtocolDecorator.__init__(self, protocol) + self.serviceName = serviceName + + def writeMessageBegin(self, name, type, seqid): + if (type == TMessageType.CALL or + type == TMessageType.ONEWAY): + self.protocol.writeMessageBegin( + self.serviceName + SEPARATOR + name, + type, + seqid + ) + else: + self.protocol.writeMessageBegin(name, type, seqid) diff --git a/lib/py/src/protocol/TProtocolDecorator.py b/lib/py/src/protocol/TProtocolDecorator.py new file mode 100644 index 000000000..3e9e5004e --- /dev/null +++ b/lib/py/src/protocol/TProtocolDecorator.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from thrift.protocol.TProtocol import TProtocolBase +from types import * + +class TProtocolDecorator(): + def __init__(self, protocol): + TProtocolBase(protocol) + self.protocol = protocol + + def __getattr__(self, name): + if hasattr(self.protocol, name): + member = getattr(self.protocol, name) + if type(member) in [MethodType, UnboundMethodType, FunctionType, LambdaType, BuiltinFunctionType, BuiltinMethodType]: + return lambda *args, **kwargs: self._wrap(member, args, kwargs) + else: + return member + raise AttributeError(name) + + def _wrap(self, func, args, kwargs): + if type(func) == MethodType: + result = func(*args, **kwargs) + else: + result = func(self.protocol, *args, **kwargs) + return result diff --git a/test/py/TestClient.py b/test/py/TestClient.py index 471e030d2..18aea8664 100755 --- a/test/py/TestClient.py +++ b/test/py/TestClient.py @@ -38,6 +38,8 @@ parser.add_option("--zlib", action="store_true", dest="zlib", help="use zlib wrapper for compressed transport") parser.add_option("--ssl", action="store_true", dest="ssl", help="use SSL for encrypted transport") +parser.add_option("--multiple", action="store_true", dest="multiple", + help="use Multiple service") parser.add_option("--framed", action="store_true", dest="framed", help="use framed transport") parser.add_option("--http", dest="http_path", @@ -55,8 +57,9 @@ options, args = parser.parse_args() sys.path.insert(0, options.genpydir) -from ThriftTest import ThriftTest +from ThriftTest import ThriftTest, SecondService from ThriftTest.ttypes import * +from thrift.protocol import TMultiplexedProtocol from thrift.transport import TTransport from thrift.transport import TSocket from thrift.transport import THttpClient @@ -84,7 +87,14 @@ class AbstractTest(unittest.TestCase): self.transport = TZlibTransport.TZlibTransport(self.transport, 9) self.transport.open() protocol = self.protocol_factory.getProtocol(self.transport) - self.client = ThriftTest.Client(protocol) + if options.multiple: + p = TMultiplexedProtocol.TMultiplexedProtocol(protocol, "ThriftTest") + self.client = ThriftTest.Client(p) + p = TMultiplexedProtocol.TMultiplexedProtocol(protocol, "SecondService") + self.client2 = SecondService.Client(p) + else: + self.client = ThriftTest.Client(protocol) + self.client2 = None def tearDown(self): # Close! @@ -203,6 +213,10 @@ class AbstractTest(unittest.TestCase): self.client.testOneway(1) # type is int, not float self.assertEqual(self.client.testString('Python'), 'Python') + def testblahBlah(self): + if self.client2: + self.assertEqual(self.client2.blahBlah(), None) + class NormalBinaryTest(AbstractTest): protocol_factory = TBinaryProtocol.TBinaryProtocolFactory() diff --git a/test/py/TestServer.py b/test/py/TestServer.py index 28241ccec..802234139 100755 --- a/test/py/TestServer.py +++ b/test/py/TestServer.py @@ -33,6 +33,8 @@ parser.add_option("--zlib", action="store_true", dest="zlib", help="use zlib wrapper for compressed transport") parser.add_option("--ssl", action="store_true", dest="ssl", help="use SSL for encrypted transport") +parser.add_option("--multiple", action="store_true", dest="multiple", + help="use multiple service") parser.add_option('-v', '--verbose', action="store_const", dest="verbose", const=2, help="verbose output") @@ -46,9 +48,10 @@ options, args = parser.parse_args() sys.path.insert(0, options.genpydir) -from ThriftTest import ThriftTest +from ThriftTest import ThriftTest, SecondService from ThriftTest.ttypes import * from thrift.Thrift import TException +from thrift import TMultiplexedProcessor from thrift.transport import TTransport from thrift.transport import TSocket from thrift.transport import TZlibTransport @@ -62,6 +65,12 @@ PROT_FACTORIES = {'binary': TBinaryProtocol.TBinaryProtocolFactory, 'compact': TCompactProtocol.TCompactProtocolFactory, 'json': TJSONProtocol.TJSONProtocolFactory} +class SecondHandler: + + def blahBlah(self): + if options.verbose > 1: + print 'blahBlah()' + class TestHandler: def testVoid(self): @@ -188,8 +197,15 @@ if len(args) > 1: server_type = args[0] # Set up the handler and processor objects -handler = TestHandler() -processor = ThriftTest.Processor(handler) +if not options.multiple: + handler = TestHandler() + processor = ThriftTest.Processor(handler) +else: + processor = TMultiplexedProcessor.TMultiplexedProcessor() + handler = TestHandler() + processor.registerProcessor("ThriftTest", ThriftTest.Processor(handler)) + handler = SecondHandler() + processor.registerProcessor("SecondService", SecondService.Processor(handler)) # Handle THttpServer as a special case if server_type == 'THttpServer':