Merge branch 'ft/fix_error_handler' of github.com:rbkmoney/newway into ft/fix_error_handler

This commit is contained in:
Inal Arsanukaev 2019-07-01 14:04:34 +03:00
commit b47cac3be3
13 changed files with 97 additions and 221 deletions

View File

@ -206,6 +206,12 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>easyway</artifactId>
<version>0.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -0,0 +1,22 @@
package com.rbkmoney.newway.config;
import com.rbkmoney.damsel.payment_processing.EventPayload;
import com.rbkmoney.newway.poller.listener.InvoicingKafkaListener;
import com.rbkmoney.newway.service.InvoicingService;
import com.rbkmoney.sink.common.parser.impl.MachineEventParser;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
@Configuration
@EnableKafka
public class KafkaConsumerBeanEnableConfig {
@Bean
@ConditionalOnProperty(value = "kafka.topics.invoice.enabled", havingValue = "true")
public InvoicingKafkaListener paymentEventsKafkaListener(InvoicingService invoicingService,
MachineEventParser<EventPayload> parser) {
return new InvoicingKafkaListener(invoicingService, parser);
}
}

View File

@ -1,18 +1,14 @@
package com.rbkmoney.newway.poller.listener;
import com.rbkmoney.damsel.payment_processing.EventPayload;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import com.rbkmoney.machinegun.eventsink.SinkEvent;
import com.rbkmoney.newway.service.InvoicingService;
import com.rbkmoney.sink.common.parser.Parser;
import com.rbkmoney.sink.common.parser.impl.MachineEventParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
public class InvoicingKafkaListener {
@ -20,7 +16,7 @@ public class InvoicingKafkaListener {
private final InvoicingService invoicingService;
private final MachineEventParser<EventPayload> parser;
@KafkaListener(topics = "${kafka.topics.invoicing}", containerFactory = "kafkaListenerContainerFactory")
@KafkaListener(topics = "${kafka.topics.invoice.id}", containerFactory = "kafkaListenerContainerFactory")
public void handle(SinkEvent sinkEvent, Acknowledgment ack) {
log.debug("Reading sinkEvent, sourceId:{}, eventId:{}", sinkEvent.getEvent().getSourceId(), sinkEvent.getEvent().getEventId());
EventPayload payload = parser.parse(sinkEvent.getEvent());

View File

@ -43,7 +43,9 @@ kafka:
auto-offset-reset: earliest
max-poll-records: 20
topics:
invoicing: mg-invoice-100-2
invoice:
id: mg-invoice-100-2
enabled: false
bm:
partyManagement:

View File

@ -1,50 +0,0 @@
package com.rbkmoney.newway;
import com.rbkmoney.geck.common.util.TypeUtil;
import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import static io.github.benas.randombeans.api.EnhancedRandom.random;
import static java.time.LocalDateTime.now;
import static java.time.ZoneId.systemDefault;
public abstract class AbstractTestUtils {
protected LocalDateTime fromTime = LocalDateTime.now().minusHours(3);
protected LocalDateTime toTime = LocalDateTime.now().minusHours(1);
protected LocalDateTime inFromToPeriodTime = LocalDateTime.now().minusHours(2);
protected static String generateDate() {
return TypeUtil.temporalToString(LocalDateTime.now());
}
protected static Long generateLong() {
return random(Long.class);
}
protected static Integer generateInt() {
return random(Integer.class);
}
protected static String generateString() {
return random(String.class);
}
protected static Instant generateCurrentTimePlusDay() {
return now().plusDays(1).toInstant(getZoneOffset());
}
protected static ZoneOffset getZoneOffset() {
return systemDefault().getRules().getOffset(now());
}
protected static String getContent(InputStream content) throws IOException {
return IOUtils.toString(content, StandardCharsets.UTF_8);
}
}

View File

@ -1,36 +0,0 @@
package com.rbkmoney.newway;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import java.util.Optional;
@NoArgsConstructor
@Setter
public class TestContainers {
private Boolean dockerContainersEnable;
private PostgreSQLContainer postgresSQLTestContainer;
public Optional<PostgreSQLContainer> getPostgresSQLTestContainer() {
return Optional.ofNullable(postgresSQLTestContainer);
}
public Boolean isDockerContainersEnable() {
return dockerContainersEnable;
}
public void startTestContainers() {
if (!isDockerContainersEnable()) {
getPostgresSQLTestContainer().ifPresent(GenericContainer::start);
}
}
public void stopTestContainers() {
if (!isDockerContainersEnable()) {
getPostgresSQLTestContainer().ifPresent(GenericContainer::stop);
}
}
}

View File

@ -1,45 +0,0 @@
package com.rbkmoney.newway;
import org.testcontainers.containers.PostgreSQLContainer;
import java.time.Duration;
public class TestContainersBuilder {
private boolean dockerContainersEnable;
private boolean postgreSQLTestContainerEnable;
private TestContainersBuilder(boolean dockerContainersEnable) {
this.dockerContainersEnable = dockerContainersEnable;
}
public static TestContainersBuilder builder(boolean dockerContainersEnable) {
return new TestContainersBuilder(dockerContainersEnable);
}
public TestContainersBuilder addPostgreSQLTestContainer() {
postgreSQLTestContainerEnable = true;
return this;
}
public TestContainers build() {
TestContainers testContainers = new TestContainers();
if (!dockerContainersEnable) {
addTestContainers(testContainers);
} else {
testContainers.setDockerContainersEnable(true);
}
return testContainers;
}
private void addTestContainers(TestContainers testContainers) {
if (postgreSQLTestContainerEnable) {
testContainers.setPostgresSQLTestContainer(
new PostgreSQLContainer<>("postgres:9.6")
.withStartupTimeout(Duration.ofMinutes(5))
);
}
testContainers.setDockerContainersEnable(false);
}
}

View File

@ -1,33 +1,31 @@
package com.rbkmoney.newway.dao;
import com.rbkmoney.newway.AbstractTestUtils;
import com.rbkmoney.easyway.*;
import com.rbkmoney.newway.NewwayApplication;
import com.rbkmoney.newway.TestContainers;
import com.rbkmoney.newway.TestContainersBuilder;
import com.rbkmoney.newway.utils.NewwayTestPropertyValuesBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = RANDOM_PORT)
@ContextConfiguration(classes = NewwayApplication.class, initializers = AbstractAppDaoTests.Initializer.class)
@TestPropertySource(properties =
"spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public abstract class AbstractAppDaoTests extends AbstractTestUtils {
private static TestContainers testContainers = TestContainersBuilder.builder(false)
.addPostgreSQLTestContainer()
private static TestContainers testContainers = TestContainersBuilder.builderWithTestContainers(getTestContainersParametersSupplier())
.addPostgresqlTestContainer()
.build();
@BeforeClass
@ -44,9 +42,27 @@ public abstract class AbstractAppDaoTests extends AbstractTestUtils {
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
NewwayTestPropertyValuesBuilder
.build(testContainers)
TestPropertyValues.of(
testContainers.getEnvironmentProperties(getEnvironmentPropertiesConsumer())
)
.applyTo(configurableApplicationContext);
}
}
private static Supplier<TestContainersParameters> getTestContainersParametersSupplier() {
return () -> {
TestContainersParameters testContainersParameters = new TestContainersParameters();
testContainersParameters.setPostgresqlJdbcUrl("jdbc:postgresql://localhost:5432/newway");
return testContainersParameters;
};
}
private static Consumer<EnvironmentProperties> getEnvironmentPropertiesConsumer() {
return environmentProperties -> {
environmentProperties.put("kafka.topics.invoice.enabled", "false");
environmentProperties.put("bm.pollingEnabled", "false");
environmentProperties.put("dmt.polling.enable", "false");
};
}
}

View File

@ -1,21 +1,20 @@
package com.rbkmoney.newway.kafka;
import com.rbkmoney.newway.AbstractTestUtils;
import com.rbkmoney.easyway.*;
import com.rbkmoney.newway.NewwayApplication;
import com.rbkmoney.newway.TestContainers;
import com.rbkmoney.newway.TestContainersBuilder;
import com.rbkmoney.newway.utils.NewwayTestPropertyValuesBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.KafkaContainer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
@ -25,18 +24,11 @@ import static org.springframework.boot.test.context.SpringBootTest.WebEnvironmen
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public abstract class AbstractKafkaTest extends AbstractTestUtils {
private static TestContainers testContainers = TestContainersBuilder.builder(false)
.addPostgreSQLTestContainer()
private static TestContainers testContainers = TestContainersBuilder.builderWithTestContainers(getTestContainersParametersSupplier())
.addKafkaTestContainer()
.addPostgresqlTestContainer()
.build();
public static final String SOURCE_ID = "source_id";
public static final String SOURCE_NS = "source_ns";
private static final String CONFLUENT_PLATFORM_VERSION = "5.0.1";
@ClassRule
public static KafkaContainer kafka = new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withEmbeddedZookeeper();
@BeforeClass
public static void beforeClass() {
testContainers.startTestContainers();
@ -51,17 +43,27 @@ public abstract class AbstractKafkaTest extends AbstractTestUtils {
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
NewwayTestPropertyValuesBuilder
.build(testContainers)
.and("kafka.bootstrap-servers=" + kafka.getBootstrapServers(),
"kafka.ssl.enabled=false",
"kafka.consumer.group-id=TestListener",
"kafka.consumer.enable-auto-commit=false",
"kafka.consumer.auto-offset-reset=earliest",
"kafka.consumer.client-id=test",
"kafka.client-id=test",
"kafka.topics.invoicing=test-topic")
TestPropertyValues.of(
testContainers.getEnvironmentProperties(getEnvironmentPropertiesConsumer())
)
.applyTo(configurableApplicationContext);
}
}
private static Supplier<TestContainersParameters> getTestContainersParametersSupplier() {
return () -> {
TestContainersParameters testContainersParameters = new TestContainersParameters();
testContainersParameters.setPostgresqlJdbcUrl("jdbc:postgresql://localhost:5432/newway");
return testContainersParameters;
};
}
private static Consumer<EnvironmentProperties> getEnvironmentPropertiesConsumer() {
return environmentProperties -> {
environmentProperties.put("kafka.topics.invoice.enabled", "true");
environmentProperties.put("bm.pollingEnabled", "false");
environmentProperties.put("dmt.polling.enable", "false");
};
}
}

View File

@ -31,10 +31,12 @@ import static org.mockito.ArgumentMatchers.any;
@ContextConfiguration(classes = {KafkaAutoConfiguration.class, InvoicingKafkaListener.class})
public class InvoicingKafkaListenerTest extends AbstractKafkaTest {
@org.springframework.beans.factory.annotation.Value("${kafka.topics.invoicing}")
@org.springframework.beans.factory.annotation.Value("${kafka.topics.invoice.id}")
public String topic;
@org.springframework.beans.factory.annotation.Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@MockBean
InvoicingService invoicingService;
@ -78,15 +80,15 @@ public class InvoicingKafkaListenerTest extends AbstractKafkaTest {
data.setBin(new byte[0]);
message.setCreatedAt(LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
message.setEventId(1L);
message.setSourceNs(SOURCE_NS);
message.setSourceId(SOURCE_ID);
message.setSourceNs("sad");
message.setSourceId("sda");
message.setData(data);
return message;
}
public static Producer<String, SinkEvent> createProducer() {
private Producer<String, SinkEvent> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "client_id");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ThriftSerializer<SinkEvent>().getClass());

View File

@ -1,39 +0,0 @@
package com.rbkmoney.newway.utils;
import com.rbkmoney.newway.TestContainers;
import org.springframework.boot.test.util.TestPropertyValues;
import java.util.ArrayList;
import java.util.List;
public class NewwayTestPropertyValuesBuilder {
public static TestPropertyValues build(TestContainers testContainers) {
List<String> strings = new ArrayList<>();
if (!testContainers.isDockerContainersEnable()) {
withUsingTestContainers(testContainers, strings);
} else {
withoutUsingTestContainers(strings);
}
strings.add("bm.pollingEnabled=false");
strings.add("dmt.polling.enable=false");
return TestPropertyValues.of(strings);
}
private static void withUsingTestContainers(TestContainers testContainers, List<String> strings) {
testContainers.getPostgresSQLTestContainer().ifPresent(
c -> {
strings.add("spring.datasource.url=" + c.getJdbcUrl());
strings.add("spring.datasource.username=" + c.getUsername());
strings.add("spring.datasource.password=" + c.getPassword());
strings.add("flyway.url=" + c.getJdbcUrl());
strings.add("flyway.user=" + c.getUsername());
strings.add("flyway.password=" + c.getPassword());
}
);
}
private static void withoutUsingTestContainers(List<String> strings) {
}
}

View File

@ -1,7 +1,7 @@
package com.rbkmoney.newway.utils;
import com.rbkmoney.easyway.AbstractTestUtils;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.newway.AbstractTestUtils;
import com.rbkmoney.xrates.base.TimestampInterval;
import com.rbkmoney.xrates.rate.*;