mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
THRIFT-2504: Add default processor to java multiplexed processor to handle older clients
Client: Java This closes #114 This closes #1195
This commit is contained in:
parent
736075d2ac
commit
400b346db2
@ -372,6 +372,7 @@
|
|||||||
<artifact:dependencies filesetId="test-dependency-jars" useScope="runtime">
|
<artifact:dependencies filesetId="test-dependency-jars" useScope="runtime">
|
||||||
<dependency groupId="org.slf4j" artifactId="slf4j-log4j12" version="${slf4j.version}"/>
|
<dependency groupId="org.slf4j" artifactId="slf4j-log4j12" version="${slf4j.version}"/>
|
||||||
<dependency groupId="junit" artifactId="junit" version="4.4"/>
|
<dependency groupId="junit" artifactId="junit" version="4.4"/>
|
||||||
|
<dependency groupId="org.mockito" artifactId="mockito-all" version="1.9.5"/>
|
||||||
</artifact:dependencies>
|
</artifact:dependencies>
|
||||||
|
|
||||||
<!-- Copy the test dependencies to the build/lib dir -->
|
<!-- Copy the test dependencies to the build/lib dir -->
|
||||||
|
@ -52,6 +52,7 @@ public class TMultiplexedProcessor implements TProcessor {
|
|||||||
|
|
||||||
private final Map<String,TProcessor> SERVICE_PROCESSOR_MAP
|
private final Map<String,TProcessor> SERVICE_PROCESSOR_MAP
|
||||||
= new HashMap<String,TProcessor>();
|
= new HashMap<String,TProcessor>();
|
||||||
|
private TProcessor defaultProcessor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 'Register' a service with this <code>TMultiplexedProcessor</code>. This
|
* 'Register' a service with this <code>TMultiplexedProcessor</code>. This
|
||||||
@ -67,6 +68,14 @@ public class TMultiplexedProcessor implements TProcessor {
|
|||||||
SERVICE_PROCESSOR_MAP.put(serviceName, processor);
|
SERVICE_PROCESSOR_MAP.put(serviceName, processor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a service to be called to process queries without service name
|
||||||
|
* @param processor
|
||||||
|
*/
|
||||||
|
public void registerDefault(TProcessor processor) {
|
||||||
|
defaultProcessor = processor;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This implementation of <code>process</code> performs the following steps:
|
* This implementation of <code>process</code> performs the following steps:
|
||||||
*
|
*
|
||||||
@ -77,7 +86,7 @@ public class TMultiplexedProcessor implements TProcessor {
|
|||||||
* <li>Dispatch to the processor, with a decorated instance of TProtocol
|
* <li>Dispatch to the processor, with a decorated instance of TProtocol
|
||||||
* that allows readMessageBegin() to return the original TMessage.</li>
|
* that allows readMessageBegin() to return the original TMessage.</li>
|
||||||
* </ol>
|
* </ol>
|
||||||
*
|
*
|
||||||
* @throws TException If the message type is not CALL or ONEWAY, if
|
* @throws TException If the message type is not CALL or ONEWAY, if
|
||||||
* the service name was not found in the message, or if the service
|
* the service name was not found in the message, or if the service
|
||||||
* name was not found in the service map. You called {@link #registerProcessor(String, TProcessor) registerProcessor}
|
* name was not found in the service map. You called {@link #registerProcessor(String, TProcessor) registerProcessor}
|
||||||
@ -92,14 +101,16 @@ public class TMultiplexedProcessor implements TProcessor {
|
|||||||
TMessage message = iprot.readMessageBegin();
|
TMessage message = iprot.readMessageBegin();
|
||||||
|
|
||||||
if (message.type != TMessageType.CALL && message.type != TMessageType.ONEWAY) {
|
if (message.type != TMessageType.CALL && message.type != TMessageType.ONEWAY) {
|
||||||
// TODO Apache Guys - Can the server ever get an EXCEPTION or REPLY?
|
|
||||||
// TODO Should we check for this here?
|
|
||||||
throw new TException("This should not have happened!?");
|
throw new TException("This should not have happened!?");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract the service name
|
// Extract the service name
|
||||||
int index = message.name.indexOf(TMultiplexedProtocol.SEPARATOR);
|
int index = message.name.indexOf(TMultiplexedProtocol.SEPARATOR);
|
||||||
if (index < 0) {
|
if (index < 0) {
|
||||||
|
if (defaultProcessor != null) {
|
||||||
|
// Dispatch processing to the stored processor
|
||||||
|
return defaultProcessor.process(new StoredMessageProtocol(iprot, message), oprot);
|
||||||
|
}
|
||||||
throw new TException("Service name not found in message name: " + message.name + ". Did you " +
|
throw new TException("Service name not found in message name: " + message.name + ". Did you " +
|
||||||
"forget to use a TMultiplexProtocol in your client?");
|
"forget to use a TMultiplexProtocol in your client?");
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,86 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.thrift;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import org.apache.thrift.protocol.TMessage;
|
||||||
|
import org.apache.thrift.protocol.TMessageType;
|
||||||
|
import org.apache.thrift.protocol.TProtocol;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestMultiplexedProcessor {
|
||||||
|
private TMultiplexedProcessor mp;
|
||||||
|
private TProtocol iprot;
|
||||||
|
private TProtocol oprot;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
mp = new TMultiplexedProcessor();
|
||||||
|
iprot = mock(TProtocol.class);
|
||||||
|
oprot = mock(TProtocol.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = TException.class)
|
||||||
|
public void testWrongMessageType() throws TException {
|
||||||
|
when (iprot.readMessageBegin()).thenReturn(new TMessage("service:func", TMessageType.REPLY, 42));
|
||||||
|
mp.process(iprot, oprot);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = TException.class)
|
||||||
|
public void testNoSuchService() throws TException {
|
||||||
|
when(iprot.readMessageBegin()).thenReturn(new TMessage("service:func", TMessageType.CALL, 42));
|
||||||
|
|
||||||
|
mp.process(iprot, oprot);
|
||||||
|
}
|
||||||
|
|
||||||
|
static class StubProcessor implements TProcessor {
|
||||||
|
@Override
|
||||||
|
public boolean process(TProtocol in, TProtocol out) throws TException {
|
||||||
|
TMessage msg = in.readMessageBegin();
|
||||||
|
if (!"func".equals(msg.name) || msg.type!=TMessageType.CALL || msg.seqid!=42) {
|
||||||
|
throw new TException("incorrect parameters");
|
||||||
|
}
|
||||||
|
out.writeMessageBegin(new TMessage("func", TMessageType.REPLY, 42));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExistingService() throws TException {
|
||||||
|
when(iprot.readMessageBegin()).thenReturn(new TMessage("service:func", TMessageType.CALL, 42));
|
||||||
|
mp.registerProcessor("service", new StubProcessor());
|
||||||
|
mp.process(iprot, oprot);
|
||||||
|
verify(oprot).writeMessageBegin(any(TMessage.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefaultService() throws TException {
|
||||||
|
when(iprot.readMessageBegin()).thenReturn(new TMessage("func", TMessageType.CALL, 42));
|
||||||
|
mp.registerDefault(new StubProcessor());
|
||||||
|
mp.process(iprot, oprot);
|
||||||
|
verify(oprot).writeMessageBegin(any(TMessage.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user