mirror of
https://github.com/valitydev/party-shop.git
synced 2024-11-06 09:45:23 +00:00
Change revidion to HEAD
This commit is contained in:
parent
a7797df37b
commit
8ef5590daa
@ -99,7 +99,7 @@ public class KafkaConfig {
|
||||
private <T> void initFactory(ConsumerFactory<String, T> consumerFactory, int threadsNumber, ConcurrentKafkaListenerContainerFactory<String, T> factory) {
|
||||
factory.setConsumerFactory(consumerFactory);
|
||||
factory.setBatchListener(true);
|
||||
factory.getContainerProperties().setAckOnError(false);
|
||||
factory.getContainerProperties().setAckOnError(false);
|
||||
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
|
||||
factory.setBatchErrorHandler(kafkaErrorHandler());
|
||||
factory.setConcurrency(threadsNumber);
|
||||
|
@ -1,12 +0,0 @@
|
||||
package com.rbkmoney.partyshop.domain;
|
||||
|
||||
import com.rbkmoney.damsel.payment_processing.ClaimStatus;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class ClaimStatusWrapper {
|
||||
|
||||
private long revision;
|
||||
private ClaimStatus claimStatus;
|
||||
|
||||
}
|
@ -2,9 +2,11 @@ package com.rbkmoney.partyshop.handler;
|
||||
|
||||
import com.rbkmoney.damsel.domain.Category;
|
||||
import com.rbkmoney.damsel.domain.Shop;
|
||||
import com.rbkmoney.damsel.payment_processing.*;
|
||||
import com.rbkmoney.damsel.payment_processing.ClaimEffect;
|
||||
import com.rbkmoney.damsel.payment_processing.ClaimStatus;
|
||||
import com.rbkmoney.damsel.payment_processing.PartyChange;
|
||||
import com.rbkmoney.damsel.payment_processing.ShopEffect;
|
||||
import com.rbkmoney.machinegun.eventsink.MachineEvent;
|
||||
import com.rbkmoney.partyshop.domain.ClaimStatusWrapper;
|
||||
import com.rbkmoney.partyshop.entity.PartyShopPK;
|
||||
import com.rbkmoney.partyshop.entity.PartyShopReference;
|
||||
import com.rbkmoney.partyshop.exception.UnknownClaimStatusException;
|
||||
@ -27,19 +29,19 @@ public class PartyChangeHandler {
|
||||
|
||||
public void handle(MachineEvent machineEvent, PartyChange partyChange) {
|
||||
log.info("handle machineEvent: {} partyChange: {}", machineEvent, partyChange);
|
||||
ClaimStatusWrapper claimStatusWrapper = getClaimStatus(partyChange);
|
||||
claimStatusWrapper.getClaimStatus().getAccepted().getEffects()
|
||||
ClaimStatus claimStatus = getClaimStatus(partyChange);
|
||||
claimStatus.getAccepted().getEffects()
|
||||
.stream()
|
||||
.filter(ClaimEffect::isSetShopEffect)
|
||||
.forEach(claimEffect -> checkAndSaveShopReferences(machineEvent, claimEffect, claimStatusWrapper.getRevision()));
|
||||
.forEach(claimEffect -> checkAndSaveShopReferences(machineEvent, claimEffect));
|
||||
}
|
||||
|
||||
private void checkAndSaveShopReferences(MachineEvent machineEvent, ClaimEffect claimEffect, long revision) {
|
||||
private void checkAndSaveShopReferences(MachineEvent machineEvent, ClaimEffect claimEffect) {
|
||||
PartyShopReference partyShopReference;
|
||||
ShopEffect shopEffect = claimEffect.getShopEffect().getEffect();
|
||||
if (shopEffect.isSetCreated()) {
|
||||
Shop created = shopEffect.getCreated();
|
||||
Category category = domainRepositoryAdapter.getCategory(created.getCategory(), revision);
|
||||
Category category = domainRepositoryAdapter.getCategory(created.getCategory());
|
||||
partyShopReference = partyShopReferenceRepository.save(PartyShopReference.builder()
|
||||
.partyShopPK(PartyShopPK.builder()
|
||||
.shopId(claimEffect.getShopEffect().getShopId())
|
||||
@ -59,7 +61,7 @@ public class PartyChangeHandler {
|
||||
throw new UnknownReferenceException(String.format("Can't find reference for shopId: %s!",
|
||||
claimEffect.getShopEffect().getShopId()));
|
||||
}
|
||||
Category category = domainRepositoryAdapter.getCategory(shopEffect.getCategoryChanged(), revision);
|
||||
Category category = domainRepositoryAdapter.getCategory(shopEffect.getCategoryChanged());
|
||||
partyShopReference = shopReference.get();
|
||||
partyShopReference.setCategoryType(category.getType().name());
|
||||
partyShopReferenceRepository.save(partyShopReference);
|
||||
@ -67,19 +69,13 @@ public class PartyChangeHandler {
|
||||
}
|
||||
}
|
||||
|
||||
protected ClaimStatusWrapper getClaimStatus(PartyChange change) {
|
||||
ClaimStatusWrapper claimStatusWrapper = new ClaimStatusWrapper();
|
||||
protected ClaimStatus getClaimStatus(PartyChange change) {
|
||||
if (change.isSetClaimCreated()) {
|
||||
Claim claimCreated = change.getClaimCreated();
|
||||
claimStatusWrapper.setClaimStatus(claimCreated.getStatus());
|
||||
claimStatusWrapper.setRevision(claimCreated.getRevision());
|
||||
return change.getClaimCreated().getStatus();
|
||||
} else if (change.isSetClaimStatusChanged()) {
|
||||
ClaimStatusChanged claimStatusChanged = change.getClaimStatusChanged();
|
||||
claimStatusWrapper.setClaimStatus(claimStatusChanged.getStatus());
|
||||
claimStatusWrapper.setRevision(claimStatusChanged.getRevision());
|
||||
return change.getClaimStatusChanged().getStatus();
|
||||
} else {
|
||||
throw new UnknownClaimStatusException(String.format("Unknown claim status for %s!", change.toString()));
|
||||
}
|
||||
return claimStatusWrapper;
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package com.rbkmoney.partyshop.service;
|
||||
|
||||
import com.rbkmoney.damsel.domain.Category;
|
||||
import com.rbkmoney.damsel.domain.CategoryRef;
|
||||
import com.rbkmoney.damsel.domain_config.Head;
|
||||
import com.rbkmoney.damsel.domain_config.Reference;
|
||||
import com.rbkmoney.damsel.domain_config.RepositoryClientSrv;
|
||||
import com.rbkmoney.damsel.domain_config.VersionedObject;
|
||||
@ -19,12 +20,12 @@ public class DomainRepositoryAdapterImpl {
|
||||
|
||||
@SneakyThrows
|
||||
@Cacheable("categories")
|
||||
public Category getCategory(CategoryRef categoryRef, long revision) {
|
||||
VersionedObject versionedObject = repositoryClient.checkoutObject(Reference.version(revision), com.rbkmoney.damsel.domain.Reference.category(categoryRef));
|
||||
public Category getCategory(CategoryRef categoryRef) {
|
||||
VersionedObject versionedObject = repositoryClient.checkoutObject(Reference.head(new Head()), com.rbkmoney.damsel.domain.Reference.category(categoryRef));
|
||||
if (!versionedObject.isSetObject() || !versionedObject.getObject().isSetCategory() || !versionedObject.getObject().getCategory().isSetData()) {
|
||||
throw new UnknownCategoryRevisionException(String.format("Unknown category: %s with revision: %x", categoryRef.id, revision));
|
||||
throw new UnknownCategoryRevisionException(String.format("Unknown category: %s", categoryRef.id));
|
||||
}
|
||||
return versionedObject.getObject().getCategory().getData();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ public class IntegrationTest extends PostgresAbstractTest {
|
||||
|
||||
@Test
|
||||
public void test() throws InterruptedException {
|
||||
Mockito.when(domainRepositoryAdapter.getCategory(any(), anyLong())).thenReturn(new Category()
|
||||
Mockito.when(domainRepositoryAdapter.getCategory(any())).thenReturn(new Category()
|
||||
.setName("test")
|
||||
.setType(CategoryType.live)
|
||||
);
|
||||
@ -58,4 +58,4 @@ public class IntegrationTest extends PostgresAbstractTest {
|
||||
log.info("all: {}", all);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ public class PartyShopApplicationTest extends AbstractKafkaIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void contextLoads() throws ExecutionException, InterruptedException, TException {
|
||||
Mockito.when(domainRepositoryAdapter.getCategory(any(), anyLong())).thenReturn(new Category()
|
||||
Mockito.when(domainRepositoryAdapter.getCategory(any())).thenReturn(new Category()
|
||||
.setName("test")
|
||||
.setType(CategoryType.live)
|
||||
);
|
||||
|
Loading…
Reference in New Issue
Block a user