Ignore command with empty value (#23)

This commit is contained in:
struga 2023-01-09 16:59:16 +07:00 committed by GitHub
parent cb597576be
commit 1162083061
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 0 deletions

View File

@ -9,6 +9,7 @@ import dev.vality.wb.list.manager.repository.ListRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Slf4j
@Component
@ -23,6 +24,10 @@ public class CommandServiceImpl implements CommandService {
log.info("CommandService apply command: {}", command);
Row row = commandToRowConverter.convert(command);
log.info("CommandService apply row: {}", row);
if (row == null || !StringUtils.hasText(row.getValue())) {
log.warn("Ignore empty command row value: {}", row);
return null;
}
Event event = applyCommandAndGetEvent(command, row);
event.setRow(command.getRow());
event.setUserInfo(command.getUserInfo());

View File

@ -41,6 +41,7 @@ public class WbListStreamFactory {
.peek((s, changeCommand) -> log.info("Command stream check command: {}", changeCommand))
.mapValues(command ->
retryTemplate.execute(args -> commandService.apply(command)))
.filter((s, event) -> event != null)
.to(resultTopic, Produced.with(Serdes.String(), eventSerde));
return new KafkaStreams(builder.build(), streamsConfiguration);
} catch (Exception e) {

View File

@ -2,6 +2,8 @@ package dev.vality.wb.list.manager;
import dev.vality.damsel.wb_list.ChangeCommand;
import dev.vality.damsel.wb_list.Command;
import dev.vality.damsel.wb_list.ListType;
import dev.vality.damsel.wb_list.Row;
import dev.vality.testcontainers.annotations.KafkaSpringBootTest;
import dev.vality.testcontainers.annotations.kafka.KafkaTestcontainer;
import dev.vality.testcontainers.annotations.kafka.config.KafkaProducer;
@ -48,4 +50,19 @@ public class WbListSafetyApplicationTest {
verify(listRepository, timeout(2000L).times(3)).create(any());
}
@Test
void kafkaRowTestEmptyValue() throws Exception {
ChangeCommand changeCommand = TestObjectFactory.testCommand();
changeCommand.setCommand(Command.CREATE);
changeCommand.setRow(new Row()
.setListType(ListType.black)
.setShopId("test")
.setValue("")
.setListName("test"));
testThriftKafkaProducer.send(topic, changeCommand);
clearInvocations(listRepository);
verify(listRepository, timeout(2000L).times(0)).create(any());
}
}