HOOKER: minor fixes (#137)

* Independent processing of queue

* Fixed millis

* Fix after test in dev mirror kafka

* Disable schedulling in test

* Minor fixes

Co-authored-by: Inal Arsanukaev <inalarsanukaev@MacBook-Pro-Inal.local>
This commit is contained in:
Inal Arsanukaev 2020-12-02 11:57:46 +03:00 committed by GitHub
parent 80662fae75
commit 5d74ab20c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 6 deletions

View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>hooker</artifactId>
<version>2.0.58-SNAPSHOT</version>
<version>2.0.59-SNAPSHOT</version>
<packaging>jar</packaging>
<name>hooker</name>

View File

@ -39,7 +39,7 @@ public class FaultDetectorServiceImpl implements FaultDetectorService {
public double getRate(long hookId) {
try {
List<ServiceStatistics> statistics = faultDetector.getStatistics(List.of(buildServiceId(hookId)));
return statistics.get(0).getFailureRate();
return statistics.isEmpty() ? 0 : statistics.get(0).getFailureRate();
} catch (Exception e) {
log.error("Error in FaultDetectorService when getStatistics", e);
return 0;
@ -73,7 +73,7 @@ public class FaultDetectorServiceImpl implements FaultDetectorService {
}
private String getNow() {
return TypeUtil.temporalToString(LocalDateTime.now(), ZoneOffset.UTC);
return TypeUtil.temporalToString(LocalDateTime.now(ZoneOffset.UTC));
}
private String buildServiceId(long id) {

View File

@ -23,6 +23,8 @@ import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class MessageProcessor<M extends Message, Q extends Queue> implements Runnable {
private static final double UPDATE_PROBABILITY = 0.25;
private final HookDao hookDao;
private final TaskDao taskDao;
private final QueueDao<Q> queueDao;
@ -73,7 +75,12 @@ public class MessageProcessor<M extends Message, Q extends Queue> implements Run
if (queue.getRetryPolicyRecord().isFailed()) {
RetryPolicyRecord record = queue.getRetryPolicyRecord();
record.reset();
updatePolicy(queue, record);
updatePolicy(record);
updateAvailability(queue);
} else {
if (Math.random() < UPDATE_PROBABILITY) {
updateAvailability(queue);
}
}
}
@ -82,7 +89,8 @@ public class MessageProcessor<M extends Message, Q extends Queue> implements Run
RetryPolicy retryPolicy = retryPoliciesService.getRetryPolicyByType(queue.getHook().getRetryPolicyType());
RetryPolicyRecord retryPolicyRecord = queue.getRetryPolicyRecord();
retryPolicy.updateFailed(retryPolicyRecord);
updatePolicy(queue, retryPolicyRecord);
updatePolicy(retryPolicyRecord);
updateAvailability(queue);
if (retryPolicy.shouldDisable(retryPolicyRecord)) {
queueDao.disable(queue.getId());
taskDao.removeAll(queue.getId());
@ -90,9 +98,12 @@ public class MessageProcessor<M extends Message, Q extends Queue> implements Run
}
}
private void updatePolicy(Queue queue, RetryPolicyRecord record) {
private void updatePolicy(RetryPolicyRecord record) {
retryPoliciesService.update(record);
log.info("Queue retry policy has been updated {}", record);
}
private void updateAvailability(Queue queue) {
double rate = faultDetector.getRate(queue.getHook().getId());
hookDao.updateAvailability(queue.getHook().getId(), rate);
log.info("Hook {} availability has been updated to {}", queue.getHook().getId(), rate);