Add stream for delete invalid message (#25)

This commit is contained in:
struga 2023-01-26 16:25:26 +07:00 committed by GitHub
parent 1162083061
commit 9c8ac08b05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 164 additions and 28 deletions

View File

@ -0,0 +1,20 @@
package dev.vality.wb.list.manager.config.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.ConstructorBinding;
import org.springframework.context.annotation.Configuration;
@Getter
@Setter
@ConstructorBinding
@Configuration
@ConfigurationProperties(prefix = "kafka.correction-stream")
public class WbListCorrectionStreamProperties {
private Boolean enabled;
private String applicationId;
private String clientId;
}

View File

@ -0,0 +1,11 @@
package dev.vality.wb.list.manager.exception;
public class KafkaStreamInitialException extends RuntimeException {
public KafkaStreamInitialException() {
}
public KafkaStreamInitialException(Throwable cause) {
super(cause);
}
}

View File

@ -1,5 +1,7 @@
package dev.vality.wb.list.manager.listener; package dev.vality.wb.list.manager.listener;
import dev.vality.wb.list.manager.config.properties.WbListCorrectionStreamProperties;
import dev.vality.wb.list.manager.stream.WbListErrorRowsCorrectionStreamFactory;
import dev.vality.wb.list.manager.stream.WbListStreamFactory; import dev.vality.wb.list.manager.stream.WbListStreamFactory;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -20,18 +22,34 @@ public class StartupListener implements ApplicationListener<ContextRefreshedEven
private final WbListStreamFactory wbListStreamFactory; private final WbListStreamFactory wbListStreamFactory;
private final Properties wbListStreamProperties; private final Properties wbListStreamProperties;
private final KafkaListenerEndpointRegistry registry; private final KafkaListenerEndpointRegistry registry;
private final WbListErrorRowsCorrectionStreamFactory wbListErrorRowsCorrectionStreamFactory;
private final WbListCorrectionStreamProperties wbListCorrectionStreamProperties;
private KafkaStreams kafkaStreams; private KafkaStreams kafkaStreams;
private KafkaStreams kafkaStreamsWbListErrorRowsCorrection;
@Override @Override
public void onApplicationEvent(ContextRefreshedEvent event) { public void onApplicationEvent(ContextRefreshedEvent event) {
kafkaStreams = wbListStreamFactory.create(wbListStreamProperties); kafkaStreams = wbListStreamFactory.create(wbListStreamProperties);
kafkaStreams.start(); kafkaStreams.start();
log.info("StartupListener start stream kafkaStreams: {}", kafkaStreams.allMetadata()); log.info("StartupListener start stream kafkaStreams: {}", kafkaStreams.metadataForAllStreamsClients());
if (wbListCorrectionStreamProperties.getEnabled()) {
wbListStreamProperties.put("application.id", wbListCorrectionStreamProperties.getApplicationId());
wbListStreamProperties.put("client.id", wbListCorrectionStreamProperties.getClientId());
kafkaStreamsWbListErrorRowsCorrection =
wbListErrorRowsCorrectionStreamFactory.create(wbListStreamProperties);
kafkaStreamsWbListErrorRowsCorrection.start();
log.info("StartupListener start stream kafkaStreamsWbListErrorRowsCorrection: {}",
kafkaStreamsWbListErrorRowsCorrection.metadataForAllStreamsClients());
}
} }
public void stop() { public void stop() {
kafkaStreams.close(Duration.ofSeconds(1L)); kafkaStreams.close(Duration.ofSeconds(1L));
if (wbListCorrectionStreamProperties.getEnabled()) {
kafkaStreamsWbListErrorRowsCorrection.close(Duration.ofSeconds(1L));
}
registry.stop(); registry.stop();
} }

View File

@ -33,7 +33,7 @@ public class ListRepository implements CrudRepository<Row, String> {
@Override @Override
public void create(Row row) { public void create(Row row) {
try { try {
log.debug("ListRepository create in bucket: {} row: {}", bucket, row); log.info("ListRepository create in bucket: {} row: {}", bucket, row);
RiakObject quoteObject = new RiakObject() RiakObject quoteObject = new RiakObject()
.setContentType(TEXT_PLAIN) .setContentType(TEXT_PLAIN)
.setValue(BinaryValue.create(row.getValue())); .setValue(BinaryValue.create(row.getValue()));
@ -56,9 +56,11 @@ public class ListRepository implements CrudRepository<Row, String> {
@Override @Override
public void remove(Row row) { public void remove(Row row) {
try { try {
log.debug("ListRepository remove from bucket: {} row: {}", bucket, row); log.info("ListRepository remove from bucket: {} row: {}", bucket, row);
Location quoteObjectLocation = createLocation(bucket, row.getKey()); Location quoteObjectLocation = createLocation(bucket, row.getKey());
DeleteValue delete = new DeleteValue.Builder(quoteObjectLocation).build(); DeleteValue delete = new DeleteValue.Builder(quoteObjectLocation)
.withOption(DeleteValue.Option.W, Quorum.oneQuorum())
.build();
client.execute(delete); client.execute(delete);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("InterruptedException in ListRepository when remove e: ", e); log.error("InterruptedException in ListRepository when remove e: ", e);

View File

@ -0,0 +1,64 @@
package dev.vality.wb.list.manager.stream;
import dev.vality.damsel.wb_list.ChangeCommand;
import dev.vality.damsel.wb_list.Command;
import dev.vality.damsel.wb_list.Event;
import dev.vality.damsel.wb_list.EventType;
import dev.vality.wb.list.manager.exception.KafkaStreamInitialException;
import dev.vality.wb.list.manager.serializer.CommandSerde;
import dev.vality.wb.list.manager.serializer.EventSerde;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Properties;
@Slf4j
@Component
@RequiredArgsConstructor
public class WbListErrorRowsCorrectionStreamFactory {
public static final String EMPTY_STRING = "";
private final CommandSerde commandSerde = new CommandSerde();
private final EventSerde eventSerde = new EventSerde();
@Value("${kafka.wblist.topic.command}")
private String readTopic;
@Value("${kafka.wblist.topic.event.sink}")
private String resultTopic;
public KafkaStreams create(final Properties streamsConfiguration) {
try {
StreamsBuilder builder = new StreamsBuilder();
builder.stream(resultTopic, Consumed.with(Serdes.String(), eventSerde))
.filter((s, event) -> isCreateCommandWithEmptyListName(event))
.peek((s, changeCommand) -> log.info("Command with empty list_name: {}", changeCommand))
.mapValues(this::invertCommandToDelete)
.to(readTopic, Produced.with(Serdes.String(), commandSerde));
return new KafkaStreams(builder.build(), streamsConfiguration);
} catch (Exception e) {
log.error("WbListStreamFactory error when create stream e: ", e);
throw new KafkaStreamInitialException(e);
}
}
private ChangeCommand invertCommandToDelete(Event event) {
ChangeCommand command = new ChangeCommand();
command.setCommand(Command.DELETE);
command.setRow(event.getRow());
return command;
}
private boolean isCreateCommandWithEmptyListName(Event event) {
return event.getRow() != null
&& EMPTY_STRING.equals(event.getRow().list_name)
&& event.getEventType() == EventType.CREATED;
}
}

View File

@ -51,3 +51,7 @@ kafka:
topic: topic:
command: "wb-list-command" command: "wb-list-command"
event.sink: "wb-list-event-sink" event.sink: "wb-list-event-sink"
correction-stream:
enabled: true
application-id: wb-list-correction-empty-list-name
client-id: wb-list-correction-empty-list-name-client

View File

@ -3,13 +3,12 @@ package dev.vality.wb.list.manager;
import dev.vality.damsel.wb_list.*; import dev.vality.damsel.wb_list.*;
import dev.vality.wb.list.manager.utils.ChangeCommandWrapper; import dev.vality.wb.list.manager.utils.ChangeCommandWrapper;
import java.time.Instant;
import java.util.UUID; import java.util.UUID;
public abstract class TestObjectFactory { public abstract class TestObjectFactory {
public static dev.vality.damsel.wb_list.Row testRow() { public static Row testRow() {
dev.vality.damsel.wb_list.Row row = new dev.vality.damsel.wb_list.Row(); Row row = new Row();
row.setId(IdInfo.payment_id(new PaymentId() row.setId(IdInfo.payment_id(new PaymentId()
.setShopId(randomString()) .setShopId(randomString())
.setPartyId(randomString()) .setPartyId(randomString())
@ -22,6 +21,20 @@ public abstract class TestObjectFactory {
return row; return row;
} }
public static Row testRowWithEmptyListName() {
Row row = new Row();
row.setId(IdInfo.payment_id(new PaymentId()
.setShopId(randomString())
.setPartyId(randomString())
));
row.setListType(ListType.black);
row.setListName("");
row.setValue(randomString());
row.setPartyId(row.getId().getPaymentId().getPartyId());
row.setShopId(row.getId().getPaymentId().getShopId());
return row;
}
public static String randomString() { public static String randomString() {
return UUID.randomUUID().toString(); return UUID.randomUUID().toString();
} }
@ -34,14 +47,4 @@ public abstract class TestObjectFactory {
return changeCommand; return changeCommand;
} }
public static Row testRowWithCountInfo(String startTimeCount) {
Row row = testRow();
row.setRowInfo(RowInfo.count_info(
new CountInfo()
.setCount(5L)
.setStartCountTime(startTimeCount)
.setTimeToLive(Instant.now().plusSeconds(6000L).toString()))
);
return row;
}
} }

View File

@ -26,6 +26,7 @@ import org.springframework.test.context.ContextConfiguration;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -90,7 +91,6 @@ public class WbListManagerApplicationTest {
assertFalse(exist); assertFalse(exist);
List<Event> eventList = new ArrayList<>(); List<Event> eventList = new ArrayList<>();
testCommandKafkaConsumer.read(topicEventSink, data -> eventList.add(data.value())); testCommandKafkaConsumer.read(topicEventSink, data -> eventList.add(data.value()));
Unreliables.retryUntilTrue(TIMEOUT, TimeUnit.SECONDS, () -> eventList.size() == 2); Unreliables.retryUntilTrue(TIMEOUT, TimeUnit.SECONDS, () -> eventList.size() == 2);
@ -100,6 +100,20 @@ public class WbListManagerApplicationTest {
.anyMatch(row -> row.getPartyId().equals(testRow.getPartyId()))); .anyMatch(row -> row.getPartyId().equals(testRow.getPartyId())));
} }
@Test
void kafkaWbListCorrectionStreamsTest() throws Exception {
Row testRow = TestObjectFactory.testRowWithEmptyListName();
ChangeCommand changeCommand = produceCreateRow(testRow);
handler.isExist(changeCommand.getRow());
Awaitility.await()
.atMost(Duration.ofSeconds(60))
.pollDelay(2L, TimeUnit.SECONDS)
.until(() -> !handler.isExist(changeCommand.getRow()));
assertFalse(handler.isExist(changeCommand.getRow()));
}
@Test @Test
void kafkaRowTest() throws Exception { // TODO refactoring void kafkaRowTest() throws Exception { // TODO refactoring
Row row = createRowOld(); Row row = createRowOld();

View File

@ -5,16 +5,16 @@
<root level="info"> <root level="info">
<appender-ref ref="CONSOLE"/> <appender-ref ref="CONSOLE"/>
</root> </root>
<logger name="dev.vality.wb.list.manager"> <logger name="dev.vality.wb.list.manager.stream">
<level value="info"/> <level value="info"/>
</logger> </logger>
<logger name="org.apache.kafka"> <!-- <logger name="org.apache.kafka">-->
<level value="warn"/> <!-- <level value="warn"/>-->
</logger> <!-- </logger>-->
<logger name="org.springframework.boot.test.context"> <!-- <logger name="org.springframework.boot.test.context">-->
<level value="warn"/> <!-- <level value="warn"/>-->
</logger> <!-- </logger>-->
<logger name="org.springframework.test.context"> <!-- <logger name="org.springframework.test.context">-->
<level value="warn"/> <!-- <level value="warn"/>-->
</logger> <!-- </logger>-->
</configuration> </configuration>