THRIFT-5084: Multiplexed processor in Swift

Client: Swift
Patch: Alexander Edge

This closes #2002
This commit is contained in:
Alexander Edge 2020-02-05 17:03:53 +00:00 committed by Jens Geyer
parent f0c761e171
commit a89036c8c3
7 changed files with 440 additions and 14 deletions

View File

@ -1,3 +1,4 @@
// swift-tools-version:5.1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -20,5 +21,12 @@
import PackageDescription
let package = Package(
name: "Thrift"
name: "Thrift",
products: [
.library(name: "Thrift", targets: ["Thrift"])
],
targets: [
.target(name: "Thrift", path: "Sources"),
.testTarget(name: "ThriftTests", dependencies: ["Thrift"])
]
)

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
`TMultiplexedProcessor` is a `TProcessor` allowing
a single `TServer` to provide multiple services.
To do so, you instantiate the processor and then register additional
processors with it, as shown in the following example:
let processor = MultiplexedProcessor()
processor.register(CalculatorProcessor(service: CalculatorService()), for: "Calculator")
processor.register(WeatherProcessor(service: CalculatorService()), for: "Weather")
let server = TPerfectServer(port: 9090, processor: processor, TCompactProtocol.self, TCompactProtocol.self)
try server.start()
*/
public class MultiplexedProcessor: TProcessor {
enum Error: Swift.Error {
case incompatibleMessageType(TMessageType)
case missingProcessor(String)
case missingDefaultProcessor
}
private var processors = [String: TProcessor]()
private var defaultProcessor: TProcessor?
public init(defaultProcessor: TProcessor? = nil) {
self.defaultProcessor = defaultProcessor
}
public func register(defaultProcessor processor: TProcessor) {
defaultProcessor = processor
}
public func register(processor: TProcessor, for service: String) {
processors[service] = processor
}
public func process(on inProtocol: TProtocol, outProtocol: TProtocol) throws {
let message = try inProtocol.readMessageBegin()
guard message.1 != .call && message.1 != .oneway else { throw Error.incompatibleMessageType(message.1) }
if let separatorIndex = message.0.firstIndex(of: Character(.multiplexSeparator)) {
let serviceName = String(message.0.prefix(upTo: separatorIndex))
let messageName = String(message.0.suffix(from: message.0.index(after: separatorIndex)))
guard let processor = processors[serviceName] else { throw Error.missingProcessor(serviceName)}
let storedMessage = StoredMessage(message: (messageName, message.1, message.2), proto: inProtocol)
try processor.process(on: storedMessage, outProtocol: outProtocol)
} else {
guard let processor = defaultProcessor else { throw Error.missingDefaultProcessor }
try processor.process(on: inProtocol, outProtocol: outProtocol)
}
}
}
private final class StoredMessage: TProtocolDecorator {
private let message: (String, TMessageType, Int32)
init(message: (String, TMessageType, Int32), proto: TProtocol) {
self.message = message
super.init(proto: proto)
}
required init(on transport: TTransport) {
fatalError("init(on:) has not been implemented")
}
override func readMessageBegin() throws -> (String, TMessageType, Int32) {
message
}
}

View File

@ -17,8 +17,19 @@
* under the License.
*/
extension String {
static let multiplexSeparator = ":"
}
/**
`TMultiplexedProtocol` is a protocol-independent concrete decorator
that allows a Thrift client to communicate with a multiplexing Thrift server,
by prepending the service name to the function name during function calls.
- Note: THIS IS NOT USED BY SERVERS. On the server, use `TMultiplexedProcessor` to handle request
from a multiplexing client.
*/
public class TMultiplexedProtocol<Protocol: TProtocol>: TWrappedProtocol<Protocol> {
public let separator = ":"
public var serviceName = ""
@ -33,7 +44,7 @@ public class TMultiplexedProtocol<Protocol: TProtocol>: TWrappedProtocol<Protoco
switch messageType {
case .call, .oneway:
var serviceFunction = serviceName
serviceFunction += serviceName == "" ? "" : separator
serviceFunction += serviceName == "" ? "" : .multiplexSeparator
serviceFunction += name
return try super.writeMessageBegin(name: serviceFunction,
type: messageType,

View File

@ -17,13 +17,6 @@
* under the License.
*/
public typealias TProcessorMessageHandler<T> = (Int, TProtocol, TProtocol, T) -> Void
public protocol TProcessor {
associatedtype Service
var service: Service { get set }
func process(on inProtocol: TProtocol, outProtocol: TProtocol) throws
init(service: Service)
}

View File

@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import Foundation
class TProtocolDecorator: TProtocol {
private let proto: TProtocol
var transport: TTransport
init(proto: TProtocol) {
self.proto = proto
self.transport = proto.transport
}
required init(on transport: TTransport) {
fatalError("init(on:) has not been implemented")
}
func readMessageBegin() throws -> (String, TMessageType, Int32) {
try proto.readMessageBegin()
}
func readMessageEnd() throws {
try proto.readMessageEnd()
}
func readStructBegin() throws -> String {
try proto.readStructBegin()
}
func readStructEnd() throws {
try proto.readStructEnd()
}
func readFieldBegin() throws -> (String, TType, Int32) {
try proto.readFieldBegin()
}
func readFieldEnd() throws {
try proto.readFieldEnd()
}
func readMapBegin() throws -> (TType, TType, Int32) {
try proto.readMapBegin()
}
func readMapEnd() throws {
try proto.readMapEnd()
}
func readSetBegin() throws -> (TType, Int32) {
try proto.readSetBegin()
}
func readSetEnd() throws {
try proto.readSetEnd()
}
func readListBegin() throws -> (TType, Int32) {
try proto.readListBegin()
}
func readListEnd() throws {
try proto.readListEnd()
}
func read() throws -> String {
try proto.read()
}
func read() throws -> Bool {
try proto.read()
}
func read() throws -> UInt8 {
try proto.read()
}
func read() throws -> Int16 {
try proto.read()
}
func read() throws -> Int32 {
try proto.read()
}
func read() throws -> Int64 {
try proto.read()
}
func read() throws -> Double {
try proto.read()
}
func read() throws -> Data {
try proto.read()
}
func writeMessageBegin(name: String, type messageType: TMessageType, sequenceID: Int32) throws {
try proto.writeMessageBegin(name: name, type: messageType, sequenceID: sequenceID)
}
func writeMessageEnd() throws {
try proto.writeMessageEnd()
}
func writeStructBegin(name: String) throws {
try proto.writeStructBegin(name: name)
}
func writeStructEnd() throws {
try proto.writeStructEnd()
}
func writeFieldBegin(name: String, type fieldType: TType, fieldID: Int32) throws {
try proto.writeFieldBegin(name: name, type: fieldType, fieldID: fieldID)
}
func writeFieldStop() throws {
try proto.writeFieldStop()
}
func writeFieldEnd() throws {
try proto.writeFieldEnd()
}
func writeMapBegin(keyType: TType, valueType: TType, size: Int32) throws {
try proto.writeMapBegin(keyType: keyType, valueType: valueType, size: size)
}
func writeMapEnd() throws {
try proto.writeMapEnd()
}
func writeSetBegin(elementType: TType, size: Int32) throws {
try proto.writeSetBegin(elementType: elementType, size: size)
}
func writeSetEnd() throws {
try proto.writeSetEnd()
}
func writeListBegin(elementType: TType, size: Int32) throws {
try proto.writeListBegin(elementType: elementType, size: size)
}
func writeListEnd() throws {
try proto.writeListEnd()
}
func write(_ value: String) throws {
try proto.write(value)
}
func write(_ value: Bool) throws {
try proto.write(value)
}
func write(_ value: UInt8) throws {
try proto.write(value)
}
func write(_ value: Int16) throws {
try proto.write(value)
}
func write(_ value: Int32) throws {
try proto.write(value)
}
func write(_ value: Int64) throws {
try proto.write(value)
}
func write(_ value: Double) throws {
try proto.write(value)
}
func write(_ value: Data) throws {
try proto.write(value)
}
}

View File

@ -31,20 +31,22 @@ public let TSocketServerClientConnectionFinished = "TSocketServerClientConnectio
public let TSocketServerProcessorKey = "TSocketServerProcessor"
public let TSocketServerTransportKey = "TSocketServerTransport"
class TSocketServer<InProtocol: TProtocol, OutProtocol: TProtocol, Processor: TProcessor, Service> where Processor.Service == Service {
class TSocketServer<InProtocol: TProtocol, OutProtocol: TProtocol, Processor: TProcessor, Service> {
var socketFileHandle: FileHandle
var processingQueue = DispatchQueue(label: "TSocketServer.processing",
qos: .background,
attributes: .concurrent)
var serviceHandler: Service
let processor: Processor
public init(port: Int,
service: Service,
inProtocol: InProtocol.Type,
outProtocol: OutProtocol.Type,
processor: Processor.Type) throws {
processor: Processor) throws {
// set service handler
self.serviceHandler = service
self.processor = processor
// create a socket
var fd: Int32 = -1
@ -127,7 +129,6 @@ class TSocketServer<InProtocol: TProtocol, OutProtocol: TProtocol, Processor: TP
func handleClientConnection(_ clientSocket: FileHandle) {
let transport = TFileHandleTransport(fileHandle: clientSocket)
let processor = Processor(service: serviceHandler)
let inProtocol = InProtocol(on: transport)
let outProtocol = OutProtocol(on: transport)
@ -141,7 +142,7 @@ class TSocketServer<InProtocol: TProtocol, OutProtocol: TProtocol, Processor: TP
NotificationCenter.default
.post(name: Notification.Name(rawValue: TSocketServerClientConnectionFinished),
object: self,
userInfo: [TSocketServerProcessorKey: processor,
userInfo: [TSocketServerProcessorKey: self.processor,
TSocketServerTransportKey: transport])
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import XCTest
import Foundation
@testable import Thrift
private protocol CalculatorService { }
private class Calculator: CalculatorService { }
private class CalculatorProcessor: TProcessor {
private let service: CalculatorService
init(service: CalculatorService) {
self.service = service
}
var processCalled = false
func process(on inProtocol: TProtocol, outProtocol: TProtocol) throws {
processCalled = true
}
}
class TMultiplexedProcessorTests: XCTestCase {
let sut = MultiplexedProcessor()
var transport: TMemoryBufferTransport = TMemoryBufferTransport { $0.reset(readBuffer: $1) }
lazy var proto = TMultiplexedProtocol<TCompactProtocol>(on: transport)
override func setUp() {
super.setUp()
transport.reset()
}
override func tearDown() {
super.tearDown()
transport.reset()
}
func testCallMessageThrowsError() throws {
try proto.writeMessageBegin(name: "message", type: .call, sequenceID: 1)
try transport.flush()
XCTAssertThrowsError(try sut.process(on: proto, outProtocol: proto)) { error in
guard case MultiplexedProcessor.Error.incompatibleMessageType(let type) = error else {
XCTFail()
return
}
XCTAssertEqual(type, .call)
}
}
func testOneWayMessageThrowsError() throws {
try proto.writeMessageBegin(name: "message", type: .oneway, sequenceID: 1)
try transport.flush()
XCTAssertThrowsError(try sut.process(on: proto, outProtocol: proto)) { error in
guard case MultiplexedProcessor.Error.incompatibleMessageType(let type) = error else {
XCTFail()
return
}
XCTAssertEqual(type, .oneway)
}
}
func testMissingDefaultProcessorThrowsError() throws {
try proto.writeMessageBegin(name: "message", type: .reply, sequenceID: 1)
try transport.flush()
XCTAssertThrowsError(try sut.process(on: proto, outProtocol: proto)) { error in
guard case MultiplexedProcessor.Error.missingDefaultProcessor = error else {
XCTFail()
return
}
}
}
func testUsesDefaultProcessorForNonMultiplexedMessage() throws {
let calculator = Calculator()
let calculatorProcessor = CalculatorProcessor(service: calculator)
sut.register(defaultProcessor: calculatorProcessor)
try proto.writeMessageBegin(name: "message", type: .reply, sequenceID: 1)
try transport.flush()
try sut.process(on: proto, outProtocol: proto)
XCTAssertTrue(calculatorProcessor.processCalled)
}
func testUsesProcessorForMultiplexedMessage() throws {
let calculator = Calculator()
let calculatorProcessor = CalculatorProcessor(service: calculator)
sut.register(processor: calculatorProcessor, for: "Calculator")
try proto.writeMessageBegin(name: "Calculator:message", type: .reply, sequenceID: 1)
try transport.flush()
try sut.process(on: proto, outProtocol: proto)
XCTAssertTrue(calculatorProcessor.processCalled)
}
func testMissingProcessorForMultiplexedMessageThrowsError() throws {
try proto.writeMessageBegin(name: "Calculator:message", type: .reply, sequenceID: 1)
try transport.flush()
XCTAssertThrowsError(try sut.process(on: proto, outProtocol: proto)) { error in
guard case MultiplexedProcessor.Error.missingProcessor(let serviceName) = error else {
XCTFail()
return
}
XCTAssertEqual(serviceName, "Calculator")
}
}
}