Ft/mspf 197/change http client conf (#20)

* MSPF-117: IdGenerator changed, context aware execution refactored

* MSPF-117: Error mapping refactoring

* MSPF-117: Test fixes, bug fixes

* MSPF-117: Regenerated thrift models

* MSPF-117: Bugfixes

* MSPF-120: Added woody metadata extensions, Bugfixes

* MSPF-197: Changed HttpClient usage, bumped thrift-lib version

* MSPF-197: Moved thrift plugin to separate profile
This commit is contained in:
Vladimir Pankrashkin 2017-02-16 16:52:51 +03:00 committed by GitHub
parent 59dae5ccb2
commit 9e5efb39f7
11 changed files with 146 additions and 68 deletions

View File

@ -12,7 +12,7 @@
<packaging>pom</packaging>
<groupId>com.rbkmoney.woody</groupId>
<artifactId>woody</artifactId>
<version>1.1.0</version>
<version>1.1.1</version>
<description>Java implementation for Woody spec</description>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>woody</artifactId>
<groupId>com.rbkmoney.woody</groupId>
<version>1.1.0</version>
<version>1.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -16,34 +16,30 @@ import java.util.function.Supplier;
public class CPool2TargetProvider<T> implements InvocationTargetProvider<T> {
private final ObjectPool<T> pool;
private final Class<T> targetType;
private final Supplier<T> supplier;
public static <T> CPool2TargetProvider<T> newInstance(Class<T> targetType, Supplier<T> supplier, GenericObjectPoolConfig config, AbandonedConfig abandonedConfig) {
public static <T> CPool2TargetProvider<T> newInstance(Class<T> targetType, Supplier<TargetObjectFactory<T>> objFactoryFunc, GenericObjectPoolConfig config, AbandonedConfig abandonedConfig) {
if (config == null) {
return new CPool2TargetProvider<>(targetType, supplier);
return new CPool2TargetProvider<>(targetType, objFactoryFunc);
} else if (abandonedConfig == null) {
return new CPool2TargetProvider<>(targetType, supplier, config);
return new CPool2TargetProvider<>(targetType, objFactoryFunc, config);
} else {
return new CPool2TargetProvider<>(targetType, supplier, config, abandonedConfig);
return new CPool2TargetProvider<>(targetType, objFactoryFunc, config, abandonedConfig);
}
}
public CPool2TargetProvider(Class<T> targetType, Supplier<T> supplier) {
this.supplier = supplier;
public CPool2TargetProvider(Class<T> targetType, Supplier<TargetObjectFactory<T>> objFactoryFunc) {
this.targetType = targetType;
this.pool = new GenericObjectPool<>(new TargetObjectFactory<>(this::createTarget));
this.pool = new GenericObjectPool<>(objFactoryFunc.get());
}
public CPool2TargetProvider(Class<T> targetType, Supplier<T> supplier, GenericObjectPoolConfig config) {
this.supplier = supplier;
public CPool2TargetProvider(Class<T> targetType, Supplier<TargetObjectFactory<T>> objFactoryFunc, GenericObjectPoolConfig config) {
this.targetType = targetType;
this.pool = new GenericObjectPool<>(new TargetObjectFactory<>(this::createTarget), config);
this.pool = new GenericObjectPool<>(objFactoryFunc.get(), config);
}
public CPool2TargetProvider(Class<T> targetType, Supplier<T> supplier, GenericObjectPoolConfig config, AbandonedConfig abandonedConfig) {
this.supplier = supplier;
public CPool2TargetProvider(Class<T> targetType, Supplier<TargetObjectFactory<T>> objFactoryFunc, GenericObjectPoolConfig config, AbandonedConfig abandonedConfig) {
this.targetType = targetType;
this.pool = new GenericObjectPool<>(new TargetObjectFactory<>(this::createTarget), config, abandonedConfig);
this.pool = new GenericObjectPool<>(objFactoryFunc.get(), config, abandonedConfig);
}
@Override
@ -84,11 +80,7 @@ public class CPool2TargetProvider<T> implements InvocationTargetProvider<T> {
return false;
}
protected T createTarget() {
return supplier.get();
}
private static class TargetObjectFactory<T> extends BasePooledObjectFactory<T> {
public static class TargetObjectFactory<T> extends BasePooledObjectFactory<T> {
private final Supplier<T> targetSupplier;
public TargetObjectFactory(Supplier<T> targetSupplier) {

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>woody</artifactId>
<groupId>com.rbkmoney.woody</groupId>
<version>1.1.0</version>
<version>1.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -20,7 +20,7 @@
<dependency>
<groupId>com.rbkmoney.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.3-3</version>
<version>0.9.3-5</version>
</dependency>
<!--Thirdparty libs-->
<dependency>
@ -61,7 +61,7 @@
</dependencies>
<profiles>
<profile>
<id>dev_thrift_gen</id>
<id>gen_thrift_classes</id>
<build>
<plugins>
<plugin>

View File

@ -22,7 +22,10 @@ import com.rbkmoney.woody.thrift.impl.http.interceptor.THMessageInterceptor;
import com.rbkmoney.woody.thrift.impl.http.interceptor.THTransportInterceptor;
import com.rbkmoney.woody.thrift.impl.http.interceptor.ext.MetadataExtensionBundle;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
@ -30,6 +33,7 @@ import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.transport.TTransport;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@ -49,7 +53,6 @@ public class THClientBuilder extends AbstractClientBuilder {
private List<MetadataExtensionKit> metadataExtensionKits;
public THClientBuilder() {
this.httpClient = createHttpClient();
super.withIdGenerator(WFlow.createDefaultIdGenerator());
}
@ -83,8 +86,17 @@ public class THClientBuilder extends AbstractClientBuilder {
return (THClientBuilder) super.withIdGenerator(generator);
}
public boolean isCustomHttpClient() {
return httpClient != null;
}
public HttpClient getHttpClient() {
return httpClient;
if (isCustomHttpClient()) {
return httpClient;
} else {
return createHttpClient();
}
}
@Override
@ -133,6 +145,23 @@ public class THClientBuilder extends AbstractClientBuilder {
}
}
protected void destroyProviderClient(Object client, boolean customClient) {
if (!customClient && client instanceof TServiceClient) {
TTransport tTransport = ((TServiceClient) client).getInputProtocol().getTransport();
if (tTransport instanceof THttpClient) {
HttpClient httpClient = ((THttpClient)tTransport).getHttpClient();
if (httpClient instanceof CloseableHttpClient) {
try {
((CloseableHttpClient) httpClient).close();
} catch (IOException e) {
throw new RuntimeException("Failed to release HttpClient", e);
}
}
}
}
}
protected TProtocolFactory createTransferProtocolFactory() {
return new TBinaryProtocol.Factory();
}
@ -142,7 +171,7 @@ public class THClientBuilder extends AbstractClientBuilder {
}
protected HttpClient createHttpClient() {
return HttpClientBuilder.create().build();
return HttpClients.createMinimal(new BasicHttpClientConnectionManager());
}
protected CommonInterceptor createMessageInterceptor() {
@ -187,4 +216,7 @@ public class THClientBuilder extends AbstractClientBuilder {
private Runnable createEventRunnable(ClientEventListener eventListener) {
return () -> eventListener.notifyEvent(new THClientEvent(TraceContext.getCurrentTraceData()));
}
public void destroy() {
}
}

View File

@ -4,9 +4,10 @@ import com.rbkmoney.woody.api.WoodyInstantiationException;
import com.rbkmoney.woody.api.event.ClientEventListener;
import com.rbkmoney.woody.api.flow.error.WErrorMapper;
import com.rbkmoney.woody.api.generator.IdGenerator;
import com.rbkmoney.woody.api.proxy.InvocationTargetProvider;
import com.rbkmoney.woody.api.proxy.CPool2TargetProvider;
import com.rbkmoney.woody.api.proxy.InvocationTargetProvider;
import com.rbkmoney.woody.api.trace.context.metadata.MetadataExtensionKit;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.AbandonedConfig;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.http.client.HttpClient;
@ -16,15 +17,16 @@ import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
/**
* Created by vpankrashkin on 09.06.16.
*
* <p>
* This builder provides the ability to build thread-safe clients around not thread-safe Thrift clients.
* It uses apache commons-pool2 for internal Thrift client pooling.
*/
public class THPooledClientBuilder extends THClientBuilder {
private volatile boolean httpClientSet = false;
private volatile boolean collectInstance = false;
private volatile GenericObjectPoolConfig poolConfig;
private volatile AbandonedConfig poolAbandonedConfig;
@ -40,7 +42,6 @@ public class THPooledClientBuilder extends THClientBuilder {
@Override
public THPooledClientBuilder withHttpClient(HttpClient httpClient) {
httpClientSet = true;
return (THPooledClientBuilder) super.withHttpClient(httpClient);
}
@ -51,24 +52,15 @@ public class THPooledClientBuilder extends THClientBuilder {
/**
* If you're using pooling config which spawns any threads, you need to set {@link THPooledClientBuilder#collectInstance} field to collect all built instances and shutdown them later with {@link THPooledClientBuilder#destroy()} method.
*
* @param collectInstance true - if you want to collect all created instances for future destroy, false - otherwise.
* @return current builder instance.
* */
*/
public THPooledClientBuilder withCollectInstance(boolean collectInstance) {
this.collectInstance = collectInstance;
return this;
}
@Override
public HttpClient getHttpClient() {
if (httpClientSet) {
return super.getHttpClient();
} else {
return createHttpClient();
}
}
@Override
public THPooledClientBuilder withAddress(URI address) {
return (THPooledClientBuilder) super.withAddress(address);
@ -86,8 +78,9 @@ public class THPooledClientBuilder extends THClientBuilder {
/**
* Use this method if you want to customize pooling configuration. Built in pool configuration will be used By default.
*
* @param poolConfig pooling config to use instead default one.
* */
*/
public THPooledClientBuilder withPoolConfig(GenericObjectPoolConfig poolConfig) {
this.poolConfig = poolConfig;
return this;
@ -95,8 +88,9 @@ public class THPooledClientBuilder extends THClientBuilder {
/**
* Use this method if you want to customize abandoned pooling configuration. Built in pool configuration will be used By default.
*
* @param poolAbandonedConfig abandoned pooling config to use instead default one.
* */
*/
public THPooledClientBuilder withPoolAbandonedConfig(AbandonedConfig poolAbandonedConfig) {
this.poolAbandonedConfig = poolAbandonedConfig;
return this;
@ -119,7 +113,13 @@ public class THPooledClientBuilder extends THClientBuilder {
if (destroyed) {
throw new IllegalStateException("Builder is already destroyed");
}
CPool2TargetProvider<T> targetProvider = CPool2TargetProvider.newInstance(iface, () -> createProviderClient(iface), poolConfig, poolAbandonedConfig);
CPool2TargetProvider<T> targetProvider = CPool2TargetProvider.newInstance(
iface,
() -> new THTargetObjectFactory<>(
() -> createProviderClient(iface),
this::destroyProviderClient,
isCustomHttpClient()),
poolConfig, poolAbandonedConfig);
if (collectInstance) {
collectedProviders.add(targetProvider);
}
@ -132,7 +132,7 @@ public class THPooledClientBuilder extends THClientBuilder {
/**
* If eviction policy was set in referred pooling config, we need to control pool eviction threads lifecycle for all built instances. This method initiates shutdown for all collected pools.
* {@link THPooledClientBuilder#collectInstance} flag must be set to make it work properly.
* */
*/
public void destroy() {
rwLock.writeLock().lock();
try {
@ -140,10 +140,27 @@ public class THPooledClientBuilder extends THClientBuilder {
} finally {
rwLock.writeLock().unlock();
}
for (CPool2TargetProvider provider: collectedProviders) {
for (CPool2TargetProvider provider : collectedProviders) {
provider.close();
}
}
private static class THTargetObjectFactory<T> extends CPool2TargetProvider.TargetObjectFactory<T> {
private final boolean customClient;
private final BiConsumer<Object, Boolean> destroyTargetConsumer;
public THTargetObjectFactory(Supplier<T> targetSupplier, BiConsumer<Object, Boolean> destroyTargetConsumer, boolean customClient) {
super(targetSupplier);
this.customClient = customClient;
this.destroyTargetConsumer = destroyTargetConsumer;
}
@Override
public void destroyObject(PooledObject<T> p) throws Exception {
destroyTargetConsumer.accept(p.getObject(), customClient);
super.destroyObject(p);
}
}
}

View File

@ -13,6 +13,7 @@ import org.apache.thrift.protocol.TProtocolDecorator;
public class THSProtocolWrapper extends TProtocolDecorator {
private final boolean isCLient;
private final CommonInterceptor interceptor;
private final TProtocol protocol;
/**
* Encloses the specified protocol.
@ -21,10 +22,15 @@ public class THSProtocolWrapper extends TProtocolDecorator {
*/
public THSProtocolWrapper(TProtocol protocol, CommonInterceptor interceptor, boolean isCLient) {
super(protocol);
this.protocol = protocol;
this.interceptor = interceptor;
this.isCLient = isCLient;
}
public TProtocol getProtocol() {
return protocol;
}
@Override
public TMessage readMessageBegin() throws TException {
TMessage tMessage = super.readMessageBegin();

View File

@ -8,18 +8,25 @@ import com.rbkmoney.woody.api.proxy.InvocationTargetProvider;
import com.rbkmoney.woody.api.proxy.SpawnTargetProvider;
import com.rbkmoney.woody.api.trace.context.metadata.MetadataExtensionKit;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.HttpClients;
import java.net.URI;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
/**
* Created by vpankrashkin on 09.06.16.
*
* <p>
* This builder provides the ability to build thread-safe clients around not thread-safe Thrift clients.
* It creates new Thrift client instance for every call which is dropped after call finishes.
*/
public class THSpawnClientBuilder extends THClientBuilder {
public THSpawnClientBuilder() {
withHttpClient(createHttpClient());
}
@Override
public THSpawnClientBuilder withErrorMapper(WErrorMapper errorMapper) {
return (THSpawnClientBuilder) super.withErrorMapper(errorMapper);
@ -61,9 +68,36 @@ public class THSpawnClientBuilder extends THClientBuilder {
}
}
@Override
protected HttpClient createHttpClient() {
return HttpClients.createMinimal();
}
private <T> InvocationTargetProvider<T> createTargetProvider(Class<T> iface) {
SpawnTargetProvider<T> targetProvider = new SpawnTargetProvider<>(iface, () -> createProviderClient(iface));
return targetProvider;
SpawnTargetProvider<T> targetProvider = new THSpawnTargetProvider<>(
iface,
() -> createProviderClient(iface),
this::destroyProviderClient,
isCustomHttpClient());
return targetProvider;
}
private static class THSpawnTargetProvider<T> extends SpawnTargetProvider<T> {
private final boolean customClient;
private final BiConsumer<Object, Boolean> releaseConsumer;
public THSpawnTargetProvider(Class<T> targetType, Supplier<T> supplier, BiConsumer<Object, Boolean> releaseConsumer, boolean customClient) {
super(targetType, supplier);
this.customClient = customClient;
this.releaseConsumer = releaseConsumer;
}
@Override
public void releaseTarget(T target) {
releaseConsumer.accept(target, customClient);
super.releaseTarget(target);
}
}
}

View File

@ -57,25 +57,24 @@ public abstract class AbstractConcurrentClientTest extends AbstractTest {
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
Collection<Callable> callableCollection = Collections.nCopies(nThreads, new Callable<Object>() {
@Override
public Object call() {
while (!Thread.currentThread().isInterrupted()) {
try {
client.getOwner(0);
clientCalls.incrementAndGet();
client.setOwner(new Owner(0, ""));
clientCalls.incrementAndGet();
//Thread.sleep(100);
Collection<Callable> callableCollection = Collections.nCopies(nThreads, () -> {
while (!Thread.currentThread().isInterrupted()) {
try {
client.getOwner(0);
clientCalls.incrementAndGet();
client.setOwner(new Owner(0, ""));
clientCalls.incrementAndGet();
//Thread.sleep(100);
} catch (Exception e) {
if (!(e instanceof InterruptedException))
e.printStackTrace();
} finally {
} catch (Exception e) {
if (!(e instanceof InterruptedException))
e.printStackTrace();
}
}
return null;
}
return null;
});
callableCollection.stream().forEach((callable -> executor.submit(callable)));

View File

@ -16,7 +16,6 @@ public class TestTHPooledClientBuilder extends AbstractConcurrentClientTest {
try {
THPooledClientBuilder clientBuilder = new THPooledClientBuilder();
clientBuilder.withAddress(new URI(url));
clientBuilder.withHttpClient(HttpClientBuilder.create().build());
clientBuilder.withIdGenerator(idGenerator);
clientBuilder.withEventListener(clientEventStub);
return clientBuilder.build(iface);

View File

@ -16,7 +16,6 @@ public class TestTHSpawnClientBuilder extends AbstractConcurrentClientTest {
try {
THSpawnClientBuilder clientBuilder = new THSpawnClientBuilder();
clientBuilder.withAddress(new URI(url));
clientBuilder.withHttpClient(HttpClientBuilder.create().build());
clientBuilder.withIdGenerator(idGenerator);
clientBuilder.withEventListener(clientEventStub);
return clientBuilder.build(iface);