mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
THRIFT-1914 Python: Support for Multiplexing Services on any
Transport, Protocol and Server Patch: smallfish & djwatson & haijunz & Roger Meier This closes #103 and #82 From 7aaea7ef4e6f44097b02543fa2e62597eae9d61e Mon Sep 17 00:00:00 2001 From: smallfish <smallfish.xy@gmail.com> Date: Tue, 22 Apr 2014 11:26:52 +0800 Subject: [PATCH] THRIFT-1914 Python: Support for Multiplexing Services on any Transport
This commit is contained in:
parent
6d1a83aa48
commit
879cab2cea
58
lib/py/src/TMultiplexedProcessor.py
Normal file
58
lib/py/src/TMultiplexedProcessor.py
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
from thrift.Thrift import TProcessor, TMessageType, TException
|
||||||
|
from thrift.protocol import TProtocolDecorator, TMultiplexedProtocol
|
||||||
|
|
||||||
|
class TMultiplexedProcessor(TProcessor):
|
||||||
|
def __init__(self):
|
||||||
|
self.services = {}
|
||||||
|
|
||||||
|
def registerProcessor(self, serviceName, processor):
|
||||||
|
self.services[serviceName] = processor
|
||||||
|
|
||||||
|
def process(self, iprot, oprot):
|
||||||
|
(name, type, seqid) = iprot.readMessageBegin();
|
||||||
|
if type != TMessageType.CALL & type != TMessageType.ONEWAY:
|
||||||
|
raise TException("TMultiplex protocol only supports CALL & ONEWAY")
|
||||||
|
|
||||||
|
index = name.find(TMultiplexedProtocol.SEPARATOR)
|
||||||
|
if index < 0:
|
||||||
|
raise TException("Service name not found in message name: " + name + ". Did you forget to use TMultiplexProtocol in your client?")
|
||||||
|
|
||||||
|
serviceName = name[0:index]
|
||||||
|
call = name[index+len(TMultiplexedProtocol.SEPARATOR):]
|
||||||
|
if not serviceName in self.services:
|
||||||
|
raise TException("Service name not found: " + serviceName + ". Did you forget to call registerProcessor()?")
|
||||||
|
|
||||||
|
standardMessage = (
|
||||||
|
call,
|
||||||
|
type,
|
||||||
|
seqid
|
||||||
|
)
|
||||||
|
return self.services[serviceName].process(StoredMessageProtocol(iprot, standardMessage), oprot)
|
||||||
|
|
||||||
|
|
||||||
|
class StoredMessageProtocol(TProtocolDecorator.TProtocolDecorator):
|
||||||
|
def __init__(self, protocol, messageBegin):
|
||||||
|
TProtocolDecorator.TProtocolDecorator.__init__(self, protocol)
|
||||||
|
self.messageBegin = messageBegin
|
||||||
|
|
||||||
|
def readMessageBegin(self):
|
||||||
|
return self.messageBegin
|
39
lib/py/src/protocol/TMultiplexedProtocol.py
Normal file
39
lib/py/src/protocol/TMultiplexedProtocol.py
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
from thrift.Thrift import TMessageType
|
||||||
|
from thrift.protocol import TProtocolDecorator
|
||||||
|
|
||||||
|
SEPARATOR = ":"
|
||||||
|
|
||||||
|
class TMultiplexedProtocol(TProtocolDecorator.TProtocolDecorator):
|
||||||
|
def __init__(self, protocol, serviceName):
|
||||||
|
TProtocolDecorator.TProtocolDecorator.__init__(self, protocol)
|
||||||
|
self.serviceName = serviceName
|
||||||
|
|
||||||
|
def writeMessageBegin(self, name, type, seqid):
|
||||||
|
if (type == TMessageType.CALL or
|
||||||
|
type == TMessageType.ONEWAY):
|
||||||
|
self.protocol.writeMessageBegin(
|
||||||
|
self.serviceName + SEPARATOR + name,
|
||||||
|
type,
|
||||||
|
seqid
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.protocol.writeMessageBegin(name, type, seqid)
|
42
lib/py/src/protocol/TProtocolDecorator.py
Normal file
42
lib/py/src/protocol/TProtocolDecorator.py
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
from thrift.protocol.TProtocol import TProtocolBase
|
||||||
|
from types import *
|
||||||
|
|
||||||
|
class TProtocolDecorator():
|
||||||
|
def __init__(self, protocol):
|
||||||
|
TProtocolBase(protocol)
|
||||||
|
self.protocol = protocol
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
if hasattr(self.protocol, name):
|
||||||
|
member = getattr(self.protocol, name)
|
||||||
|
if type(member) in [MethodType, UnboundMethodType, FunctionType, LambdaType, BuiltinFunctionType, BuiltinMethodType]:
|
||||||
|
return lambda *args, **kwargs: self._wrap(member, args, kwargs)
|
||||||
|
else:
|
||||||
|
return member
|
||||||
|
raise AttributeError(name)
|
||||||
|
|
||||||
|
def _wrap(self, func, args, kwargs):
|
||||||
|
if type(func) == MethodType:
|
||||||
|
result = func(*args, **kwargs)
|
||||||
|
else:
|
||||||
|
result = func(self.protocol, *args, **kwargs)
|
||||||
|
return result
|
@ -38,6 +38,8 @@ parser.add_option("--zlib", action="store_true", dest="zlib",
|
|||||||
help="use zlib wrapper for compressed transport")
|
help="use zlib wrapper for compressed transport")
|
||||||
parser.add_option("--ssl", action="store_true", dest="ssl",
|
parser.add_option("--ssl", action="store_true", dest="ssl",
|
||||||
help="use SSL for encrypted transport")
|
help="use SSL for encrypted transport")
|
||||||
|
parser.add_option("--multiple", action="store_true", dest="multiple",
|
||||||
|
help="use Multiple service")
|
||||||
parser.add_option("--framed", action="store_true", dest="framed",
|
parser.add_option("--framed", action="store_true", dest="framed",
|
||||||
help="use framed transport")
|
help="use framed transport")
|
||||||
parser.add_option("--http", dest="http_path",
|
parser.add_option("--http", dest="http_path",
|
||||||
@ -55,8 +57,9 @@ options, args = parser.parse_args()
|
|||||||
|
|
||||||
sys.path.insert(0, options.genpydir)
|
sys.path.insert(0, options.genpydir)
|
||||||
|
|
||||||
from ThriftTest import ThriftTest
|
from ThriftTest import ThriftTest, SecondService
|
||||||
from ThriftTest.ttypes import *
|
from ThriftTest.ttypes import *
|
||||||
|
from thrift.protocol import TMultiplexedProtocol
|
||||||
from thrift.transport import TTransport
|
from thrift.transport import TTransport
|
||||||
from thrift.transport import TSocket
|
from thrift.transport import TSocket
|
||||||
from thrift.transport import THttpClient
|
from thrift.transport import THttpClient
|
||||||
@ -84,7 +87,14 @@ class AbstractTest(unittest.TestCase):
|
|||||||
self.transport = TZlibTransport.TZlibTransport(self.transport, 9)
|
self.transport = TZlibTransport.TZlibTransport(self.transport, 9)
|
||||||
self.transport.open()
|
self.transport.open()
|
||||||
protocol = self.protocol_factory.getProtocol(self.transport)
|
protocol = self.protocol_factory.getProtocol(self.transport)
|
||||||
self.client = ThriftTest.Client(protocol)
|
if options.multiple:
|
||||||
|
p = TMultiplexedProtocol.TMultiplexedProtocol(protocol, "ThriftTest")
|
||||||
|
self.client = ThriftTest.Client(p)
|
||||||
|
p = TMultiplexedProtocol.TMultiplexedProtocol(protocol, "SecondService")
|
||||||
|
self.client2 = SecondService.Client(p)
|
||||||
|
else:
|
||||||
|
self.client = ThriftTest.Client(protocol)
|
||||||
|
self.client2 = None
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
# Close!
|
# Close!
|
||||||
@ -203,6 +213,10 @@ class AbstractTest(unittest.TestCase):
|
|||||||
self.client.testOneway(1) # type is int, not float
|
self.client.testOneway(1) # type is int, not float
|
||||||
self.assertEqual(self.client.testString('Python'), 'Python')
|
self.assertEqual(self.client.testString('Python'), 'Python')
|
||||||
|
|
||||||
|
def testblahBlah(self):
|
||||||
|
if self.client2:
|
||||||
|
self.assertEqual(self.client2.blahBlah(), None)
|
||||||
|
|
||||||
class NormalBinaryTest(AbstractTest):
|
class NormalBinaryTest(AbstractTest):
|
||||||
protocol_factory = TBinaryProtocol.TBinaryProtocolFactory()
|
protocol_factory = TBinaryProtocol.TBinaryProtocolFactory()
|
||||||
|
|
||||||
|
@ -33,6 +33,8 @@ parser.add_option("--zlib", action="store_true", dest="zlib",
|
|||||||
help="use zlib wrapper for compressed transport")
|
help="use zlib wrapper for compressed transport")
|
||||||
parser.add_option("--ssl", action="store_true", dest="ssl",
|
parser.add_option("--ssl", action="store_true", dest="ssl",
|
||||||
help="use SSL for encrypted transport")
|
help="use SSL for encrypted transport")
|
||||||
|
parser.add_option("--multiple", action="store_true", dest="multiple",
|
||||||
|
help="use multiple service")
|
||||||
parser.add_option('-v', '--verbose', action="store_const",
|
parser.add_option('-v', '--verbose', action="store_const",
|
||||||
dest="verbose", const=2,
|
dest="verbose", const=2,
|
||||||
help="verbose output")
|
help="verbose output")
|
||||||
@ -46,9 +48,10 @@ options, args = parser.parse_args()
|
|||||||
|
|
||||||
sys.path.insert(0, options.genpydir)
|
sys.path.insert(0, options.genpydir)
|
||||||
|
|
||||||
from ThriftTest import ThriftTest
|
from ThriftTest import ThriftTest, SecondService
|
||||||
from ThriftTest.ttypes import *
|
from ThriftTest.ttypes import *
|
||||||
from thrift.Thrift import TException
|
from thrift.Thrift import TException
|
||||||
|
from thrift import TMultiplexedProcessor
|
||||||
from thrift.transport import TTransport
|
from thrift.transport import TTransport
|
||||||
from thrift.transport import TSocket
|
from thrift.transport import TSocket
|
||||||
from thrift.transport import TZlibTransport
|
from thrift.transport import TZlibTransport
|
||||||
@ -62,6 +65,12 @@ PROT_FACTORIES = {'binary': TBinaryProtocol.TBinaryProtocolFactory,
|
|||||||
'compact': TCompactProtocol.TCompactProtocolFactory,
|
'compact': TCompactProtocol.TCompactProtocolFactory,
|
||||||
'json': TJSONProtocol.TJSONProtocolFactory}
|
'json': TJSONProtocol.TJSONProtocolFactory}
|
||||||
|
|
||||||
|
class SecondHandler:
|
||||||
|
|
||||||
|
def blahBlah(self):
|
||||||
|
if options.verbose > 1:
|
||||||
|
print 'blahBlah()'
|
||||||
|
|
||||||
class TestHandler:
|
class TestHandler:
|
||||||
|
|
||||||
def testVoid(self):
|
def testVoid(self):
|
||||||
@ -188,8 +197,15 @@ if len(args) > 1:
|
|||||||
server_type = args[0]
|
server_type = args[0]
|
||||||
|
|
||||||
# Set up the handler and processor objects
|
# Set up the handler and processor objects
|
||||||
handler = TestHandler()
|
if not options.multiple:
|
||||||
processor = ThriftTest.Processor(handler)
|
handler = TestHandler()
|
||||||
|
processor = ThriftTest.Processor(handler)
|
||||||
|
else:
|
||||||
|
processor = TMultiplexedProcessor.TMultiplexedProcessor()
|
||||||
|
handler = TestHandler()
|
||||||
|
processor.registerProcessor("ThriftTest", ThriftTest.Processor(handler))
|
||||||
|
handler = SecondHandler()
|
||||||
|
processor.registerProcessor("SecondService", SecondService.Processor(handler))
|
||||||
|
|
||||||
# Handle THttpServer as a special case
|
# Handle THttpServer as a special case
|
||||||
if server_type == 'THttpServer':
|
if server_type == 'THttpServer':
|
||||||
|
Loading…
Reference in New Issue
Block a user