THRIFT-563 Support for Multiplexing Services on any Transport, Protocol and Server

Patch: Rob Slifka
This commit is contained in:
Jens Geyer 2013-03-24 11:53:31 +02:00
parent 3c434ab87d
commit 3661867feb
3 changed files with 388 additions and 0 deletions

View File

@ -0,0 +1,124 @@
package org.apache.thrift;
import org.apache.thrift.protocol.*;
import java.util.Map;
import java.util.HashMap;
/**
* <code>TMultiplexedProcessor</code> is a <code>TProcessor</code> allowing
* a single <code>TServer</code> to provide multiple services.
*
* <p>To do so, you instantiate the processor and then register additional
* processors with it, as shown in the following example:</p>
*
* <blockquote><code>
* TMultiplexedProcessor processor = new TMultiplexedProcessor();
*
* processor.registerProcessor(
* "Calculator",
* new Calculator.Processor(new CalculatorHandler()));
*
* processor.registerProcessor(
* "WeatherReport",
* new WeatherReport.Processor(new WeatherReportHandler()));
*
* TServerTransport t = new TServerSocket(9090);
* TSimpleServer server = new TSimpleServer(processor, t);
*
* server.serve();
* </code></blockquote>
*/
public class TMultiplexedProcessor implements TProcessor {
private final Map<String,TProcessor> SERVICE_PROCESSOR_MAP
= new HashMap<String,TProcessor>();
/**
* 'Register' a service with this <code>TMultiplexedProcessor</code>. This
* allows us to broker requests to individual services by using the service
* name to select them at request time.
*
* @param serviceName Name of a service, has to be identical to the name
* declared in the Thrift IDL, e.g. "WeatherReport".
* @param processor Implementation of a service, ususally referred to
* as "handlers", e.g. WeatherReportHandler implementing WeatherReport.Iface.
*/
public void registerProcessor(String serviceName, TProcessor processor) {
SERVICE_PROCESSOR_MAP.put(serviceName, processor);
}
/**
* This implementation of <code>process</code> performs the following steps:
*
* <ol>
* <li>Read the beginning of the message.</li>
* <li>Extract the service name from the message.</li>
* <li>Using the service name to locate the appropriate processor.</li>
* <li>Dispatch to the processor, with a decorated instance of TProtocol
* that allows readMessageBegin() to return the original TMessage.</li>
* </ol>
*
* @throws TException If the message type is not CALL or ONEWAY, if
* the service name was not found in the message, or if the service
* name was not found in the service map. You called {@link #registerProcessor(String, TProcessor) registerProcessor}
* during initialization, right? :)
*/
public boolean process(TProtocol iprot, TProtocol oprot) throws TException {
/*
Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
message header. This pulls the message "off the wire", which we'll
deal with at the end of this method.
*/
TMessage message = iprot.readMessageBegin();
if (message.type != TMessageType.CALL && message.type != TMessageType.ONEWAY) {
// TODO Apache Guys - Can the server ever get an EXCEPTION or REPLY?
// TODO Should we check for this here?
throw new TException("This should not have happened!?");
}
// Extract the service name
int index = message.name.indexOf(TMultiplexedProtocol.SEPARATOR);
if (index < 0) {
throw new TException("Service name not found in message name: " + message.name + ". Did you " +
"forget to use a TMultiplexProtocol in your client?");
}
// Create a new TMessage, something that can be consumed by any TProtocol
String serviceName = message.name.substring(0, index);
TProcessor actualProcessor = SERVICE_PROCESSOR_MAP.get(serviceName);
if (actualProcessor == null) {
throw new TException("Service name not found: " + serviceName + ". Did you forget " +
"to call registerProcessor()?");
}
// Create a new TMessage, removing the service name
TMessage standardMessage = new TMessage(
message.name.substring(serviceName.length()+TMultiplexedProtocol.SEPARATOR.length()),
message.type,
message.seqid
);
// Dispatch processing to the stored processor
return actualProcessor.process(new StoredMessageProtocol(iprot, standardMessage), oprot);
}
/**
* Our goal was to work with any protocol. In order to do that, we needed
* to allow them to call readMessageBegin() and get a TMessage in exactly
* the standard format, without the service name prepended to TMessage.name.
*/
private class StoredMessageProtocol extends TProtocolDecorator {
TMessage messageBegin;
public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin) {
super(protocol);
this.messageBegin = messageBegin;
}
@Override
public TMessage readMessageBegin() throws TException {
return messageBegin;
}
}
}

View File

@ -0,0 +1,72 @@
package org.apache.thrift.protocol;
import org.apache.thrift.TException;
/**
* <code>TMultiplexedProtocol</code> is a protocol-independent concrete decorator
* that allows a Thrift client to communicate with a multiplexing Thrift server,
* by prepending the service name to the function name during function calls.
*
* <p>NOTE: THIS IS NOT USED BY SERVERS. On the server, use {@link org.apache.thrift.TMultiplexedProcessor TMultiplexedProcessor} to handle requests
* from a multiplexing client.
*
* <p>This example uses a single socket transport to invoke two services:
*
* <blockquote><code>
* TSocket transport = new TSocket("localhost", 9090);<br/>
* transport.open();<br/>
*<br/>
* TBinaryProtocol protocol = new TBinaryProtocol(transport);<br/>
*<br/>
* TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");<br/>
* Calculator.Client service = new Calculator.Client(mp);<br/>
*<br/>
* TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");<br/>
* WeatherReport.Client service2 = new WeatherReport.Client(mp2);<br/>
*<br/>
* System.out.println(service.add(2,2));<br/>
* System.out.println(service2.getTemperature());<br/>
* </code></blockquote>
*
* @see org.apache.thrift.protocol.TProtocolDecorator
*/
public class TMultiplexedProtocol extends TProtocolDecorator {
/** Used to delimit the service name from the function name */
public static final String SEPARATOR = ":";
private final String SERVICE_NAME;
/**
* Wrap the specified protocol, allowing it to be used to communicate with a
* multiplexing server. The <code>serviceName</code> is required as it is
* prepended to the message header so that the multiplexing server can broker
* the function call to the proper service.
*
* @param protocol Your communication protocol of choice, e.g. <code>TBinaryProtocol</code>.
* @param serviceName The service name of the service communicating via this protocol.
*/
public TMultiplexedProtocol(TProtocol protocol, String serviceName) {
super(protocol);
SERVICE_NAME = serviceName;
}
/**
* Prepends the service name to the function name, separated by TMultiplexedProtocol.SEPARATOR.
*
* @param tMessage The original message.
* @throws TException Passed through from wrapped <code>TProtocol</code> instance.
*/
@Override
public void writeMessageBegin(TMessage tMessage) throws TException {
if (tMessage.type == TMessageType.CALL || tMessage.type == TMessageType.ONEWAY) {
super.writeMessageBegin(new TMessage(
SERVICE_NAME + SEPARATOR + tMessage.name,
tMessage.type,
tMessage.seqid
));
} else {
super.writeMessageBegin(tMessage);
}
}
}

View File

@ -0,0 +1,192 @@
package org.apache.thrift.protocol;
import org.apache.thrift.TException;
/**
* <code>TProtocolDecorator</code> forwards all requests to an enclosed
* <code>TProtocol</code> instance, providing a way to author concise
* concrete decorator subclasses. While it has no abstract methods, it
* is marked abstract as a reminder that by itself, it does not modify
* the behaviour of the enclosed <code>TProtocol</code>.
*
* <p>See p.175 of Design Patterns (by Gamma et al.)</p>
*
* @see org.apache.thrift.protocol.TMultiplexedProtocol
*/
public abstract class TProtocolDecorator extends TProtocol {
private final TProtocol concreteProtocol;
/**
* Encloses the specified protocol.
* @param protocol All operations will be forward to this protocol. Must be non-null.
*/
public TProtocolDecorator(TProtocol protocol) {
super(protocol.getTransport());
concreteProtocol = protocol;
}
public void writeMessageBegin(TMessage tMessage) throws TException {
concreteProtocol.writeMessageBegin(tMessage);
}
public void writeMessageEnd() throws TException {
concreteProtocol.writeMessageEnd();
}
public void writeStructBegin(TStruct tStruct) throws TException {
concreteProtocol.writeStructBegin(tStruct);
}
public void writeStructEnd() throws TException {
concreteProtocol.writeStructEnd();
}
public void writeFieldBegin(TField tField) throws TException {
concreteProtocol.writeFieldBegin(tField);
}
public void writeFieldEnd() throws TException {
concreteProtocol.writeFieldEnd();
}
public void writeFieldStop() throws TException {
concreteProtocol.writeFieldStop();
}
public void writeMapBegin(TMap tMap) throws TException {
concreteProtocol.writeMapBegin(tMap);
}
public void writeMapEnd() throws TException {
concreteProtocol.writeMapEnd();
}
public void writeListBegin(TList tList) throws TException {
concreteProtocol.writeListBegin(tList);
}
public void writeListEnd() throws TException {
concreteProtocol.writeListEnd();
}
public void writeSetBegin(TSet tSet) throws TException {
concreteProtocol.writeSetBegin(tSet);
}
public void writeSetEnd() throws TException {
concreteProtocol.writeSetEnd();
}
public void writeBool(boolean b) throws TException {
concreteProtocol.writeBool(b);
}
public void writeByte(byte b) throws TException {
concreteProtocol.writeByte(b);
}
public void writeI16(short i) throws TException {
concreteProtocol.writeI16(i);
}
public void writeI32(int i) throws TException {
concreteProtocol.writeI32(i);
}
public void writeI64(long l) throws TException {
concreteProtocol.writeI64(l);
}
public void writeDouble(double v) throws TException {
concreteProtocol.writeDouble(v);
}
public void writeString(String s) throws TException {
concreteProtocol.writeString(s);
}
public void writeBinary(byte[] bytes) throws TException {
concreteProtocol.writeBinary(bytes);
}
public TMessage readMessageBegin() throws TException {
return concreteProtocol.readMessageBegin();
}
public void readMessageEnd() throws TException {
concreteProtocol.readMessageEnd();
}
public TStruct readStructBegin() throws TException {
return concreteProtocol.readStructBegin();
}
public void readStructEnd() throws TException {
concreteProtocol.readStructEnd();
}
public TField readFieldBegin() throws TException {
return concreteProtocol.readFieldBegin();
}
public void readFieldEnd() throws TException {
concreteProtocol.readFieldEnd();
}
public TMap readMapBegin() throws TException {
return concreteProtocol.readMapBegin();
}
public void readMapEnd() throws TException {
concreteProtocol.readMapEnd();
}
public TList readListBegin() throws TException {
return concreteProtocol.readListBegin();
}
public void readListEnd() throws TException {
concreteProtocol.readListEnd();
}
public TSet readSetBegin() throws TException {
return concreteProtocol.readSetBegin();
}
public void readSetEnd() throws TException {
concreteProtocol.readSetEnd();
}
public boolean readBool() throws TException {
return concreteProtocol.readBool();
}
public byte readByte() throws TException {
return concreteProtocol.readByte();
}
public short readI16() throws TException {
return concreteProtocol.readI16();
}
public int readI32() throws TException {
return concreteProtocol.readI32();
}
public long readI64() throws TException {
return concreteProtocol.readI64();
}
public double readDouble() throws TException {
return concreteProtocol.readDouble();
}
public String readString() throws TException {
return concreteProtocol.readString();
}
public byte[] readBinary() throws TException {
return concreteProtocol.readBinary();
}
}