Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add tracking of heartbeat results and executions with stale heartbeat #432

Merged
merged 7 commits into from
Mar 11, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Improve heartbeat events. PoC for heartbeat-monitoring
kagkarlsson committed Dec 6, 2023
commit 5faf59c037af5ec9fb03d6fd142e6df6e4366cc6
1 change: 0 additions & 1 deletion db-scheduler/pom.xml
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@
<java8-matchers.version>1.6</java8-matchers.version>
<equals-verifier.version>3.14.2</equals-verifier.version>
<bytebuddy.version>1.14.5</bytebuddy.version>
<micro-jdbc.version>0.6.0</micro-jdbc.version>
<zonky-pg-embedded.version>2.0.4</zonky-pg-embedded.version>
<postgresql.version>42.5.1</postgresql.version>
<slf4j.version>1.7.36</slf4j.version>
Original file line number Diff line number Diff line change
@@ -70,18 +70,19 @@ public ExecutePicked(
@Override
public void run() {
// FIXLATER: need to cleanup all the references back to scheduler fields
final UUID executionId =
executor.addCurrentlyProcessing(
new CurrentlyExecuting(pickedExecution, clock, heartbeatConfig));
CurrentlyExecuting currentlyExecuting =
new CurrentlyExecuting(pickedExecution, clock, heartbeatConfig);
final UUID executionId = executor.addCurrentlyProcessing(currentlyExecuting);

try {
statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED);
executePickedExecution(pickedExecution);
executePickedExecution(pickedExecution, currentlyExecuting);
} finally {
executor.removeCurrentlyProcessing(executionId);
}
}

private void executePickedExecution(Execution execution) {
private void executePickedExecution(Execution execution, CurrentlyExecuting currentlyExecuting) {
final Optional<Task> task = taskResolver.resolve(execution.taskInstance.getTaskName());
if (!task.isPresent()) {
LOG.error(
@@ -98,7 +99,8 @@ private void executePickedExecution(Execution execution) {
task.get()
.execute(
execution.taskInstance,
new ExecutionContext(schedulerState, execution, schedulerClient));
new ExecutionContext(
schedulerState, execution, schedulerClient, currentlyExecuting));
LOG.debug("Execution done: " + execution);

complete(completion, execution, executionStarted);
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler;

import java.time.Duration;
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler;

import java.time.Duration;
@@ -46,4 +59,19 @@ public void heartbeat(boolean successful, Instant lastHeartbeatAttempt) {
heartbeatFailuresSinceLastSuccess++;
}
}

public String describe() {
return "HeartbeatState{"
+ "successesSinceLastFailure="
+ heartbeatSuccessesSinceLastFailure
+ ", failuresSinceLastSuccess="
+ heartbeatFailuresSinceLastSuccess
+ ", lastSuccess="
+ heartbeatLastSuccess
+ ", lastFailure="
+ heartbeatLastFailure
+ ", missedHeartbeatsLimit="
+ heartbeatConfig.missedHeartbeatsLimit
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -401,18 +401,26 @@ protected void updateHeartbeatForExecution(Instant now, CurrentlyExecuting curre
LOG.trace("Updating heartbeat for execution: " + e);

try {
// update CurrentExecution if it failed, calc time remaining before problems

boolean successfulHeartbeat = schedulerTaskRepository.updateHeartbeatWithRetry(e, now, 3);
currentlyExecuting.heartbeat(successfulHeartbeat, now);

if (!successfulHeartbeat) {
statsRegistry.register(SchedulerStatsEvent.UNEXPECTED_ERROR);
statsRegistry.register(SchedulerStatsEvent.FAILED_HEARTBEAT);
}

HeartbeatState heartbeatState = currentlyExecuting.getHeartbeatState();
if (heartbeatState.getFailedHeartbeats() > 1) {
LOG.warn(
"Execution has more than 1 failed heartbeats. Should not happen. Risk of being"
+ " considered dead. See heartbeat-state. Heartbeat-state={}, Execution={}",
heartbeatState.describe(),
e);
statsRegistry.register(SchedulerStatsEvent.FAILED_MULTIPLE_HEARTBEATS);
}

} catch (Throwable ex) { // just-in-case
LOG.error("Unexpteced failure while while updating heartbeat for execution {}.", e, ex);
statsRegistry.register(SchedulerStatsEvent.UNEXPECTED_ERROR);
statsRegistry.register(SchedulerStatsEvent.FAILED_HEARTBEAT);
}
}

Original file line number Diff line number Diff line change
@@ -30,7 +30,10 @@
import com.github.kagkarlsson.scheduler.exceptions.ExecutionException;
import com.github.kagkarlsson.scheduler.exceptions.TaskInstanceException;
import com.github.kagkarlsson.scheduler.serializer.Serializer;
import com.github.kagkarlsson.scheduler.task.*;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -567,7 +570,7 @@ public boolean updateHeartbeatWithRetry(Execution execution, Instant newHeartbea
LOG.warn("Failed to update heartbeat. No more retries.", e);
return false;
} else {
LOG.info("Failed to update heartbeat. Will try again.", e);
LOG.info("Failed to update heartbeat. Remaining retries={}.", tries - 1, e);
return updateHeartbeatWithRetry(execution, newHeartbeat, tries - 1);
}
}
@@ -594,15 +597,18 @@ public boolean updateHeartbeat(Execution e, Instant newHeartbeat) {
if (updated == 0) {
// There is a race-condition: Executions are not removed from currently-executing until after
// the execution has been updated in the database, so this might happen.
LOG.trace(
"Did not update heartbeat. Execution must have been removed or rescheduled. "
LOG.warn(
"Did not update heartbeat. Execution must have been removed or rescheduled"
+ "(i.e. CompletionHandler ran and finished just before heartbeat-update). "
+ "This is a race-condition that may occur, but is very unlikely. "
+ "task-instance={}",
e.taskInstance);
return false;
} else {
if (updated > 1) {
throw new IllegalStateException(
"Updated multiple rows updating heartbeat for execution. Should never happen since name and id is primary key. Execution: "
"Updated multiple rows updating heartbeat for execution. Should never happen since "
+ "name and id is primary key. Execution: "
+ e);
}
LOG.debug("Updated heartbeat for execution: " + e);
Original file line number Diff line number Diff line change
@@ -19,12 +19,14 @@ public interface StatsRegistry {

enum SchedulerStatsEvent {
UNEXPECTED_ERROR,
FAILED_HEARTBEAT,
COMPLETIONHANDLER_ERROR,
FAILUREHANDLER_ERROR,
DEAD_EXECUTION,
RAN_UPDATE_HEARTBEATS,
RAN_DETECT_DEAD,
RAN_EXECUTE_DUE,
FAILED_MULTIPLE_HEARTBEATS,
UNRESOLVED_TASK
}

Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
*/
package com.github.kagkarlsson.scheduler.task;

import com.github.kagkarlsson.scheduler.CurrentlyExecuting;
import com.github.kagkarlsson.scheduler.SchedulerClient;
import com.github.kagkarlsson.scheduler.SchedulerState;

@@ -21,12 +22,17 @@ public class ExecutionContext {
private final SchedulerState schedulerState;
private final Execution execution;
private final SchedulerClient schedulerClient;
private CurrentlyExecuting currentlyExecuting;

public ExecutionContext(
SchedulerState schedulerState, Execution execution, SchedulerClient schedulerClient) {
SchedulerState schedulerState,
Execution execution,
SchedulerClient schedulerClient,
CurrentlyExecuting currentlyExecuting) {
this.schedulerState = schedulerState;
this.execution = execution;
this.schedulerClient = schedulerClient;
this.currentlyExecuting = currentlyExecuting;
}

public SchedulerState getSchedulerState() {
@@ -44,4 +50,8 @@ public SchedulerClient getSchedulerClient() {
public Execution getExecution() {
return execution;
}

public CurrentlyExecuting getCurrentlyExecuting() {
return currentlyExecuting;
}
}
Original file line number Diff line number Diff line change
@@ -205,11 +205,20 @@ public void execute(TaskInstance<T> taskInstance, ExecutionContext executionCont

public static class SimpleStatsRegistry extends StatsRegistry.DefaultStatsRegistry {
public final AtomicInteger unexpectedErrors = new AtomicInteger(0);
public final AtomicInteger failedHeartbeats = new AtomicInteger(0);
public final AtomicInteger failedMultipleHeartbeats = new AtomicInteger(0);
public final AtomicInteger deadExecutions = new AtomicInteger(0);

@Override
public void register(SchedulerStatsEvent e) {
if (e == SchedulerStatsEvent.UNEXPECTED_ERROR) {
unexpectedErrors.incrementAndGet();
} else if (e == SchedulerStatsEvent.FAILED_HEARTBEAT) {
failedHeartbeats.incrementAndGet();
} else if (e == SchedulerStatsEvent.FAILED_MULTIPLE_HEARTBEATS) {
failedMultipleHeartbeats.incrementAndGet();
} else if (e == SchedulerStatsEvent.DEAD_EXECUTION) {
deadExecutions.incrementAndGet();
}
super.register(e);
}
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.SchedulerBuilder;
@@ -79,7 +80,9 @@ static void testConcurrencyForPollingStrategy(
assertThat(completed.failed.size(), is(0));
assertThat(completed.ok.size(), is(ids.size()));
assertThat("Should contain no duplicates", new HashSet<>(completed.ok).size(), is(ids.size()));
assertThat(stats.unexpectedErrors.get(), is(0));
assertThat("No unexpected errors", stats.unexpectedErrors.get(), is(0));
assertThat("No dead executions", stats.deadExecutions.get(), is(0));
assertThat("No multiple-heartbeat-failures", stats.failedMultipleHeartbeats.get(), is(0));
assertThat(scheduler1.getCurrentlyExecuting(), hasSize(0));
assertThat(scheduler2.getCurrentlyExecuting(), hasSize(0));
}
@@ -114,7 +117,9 @@ static void testRecurring(StopSchedulerExtension stopScheduler, DataSource datas

scheduler1.stop();
scheduler2.stop();
assertThat(stats.unexpectedErrors.get(), is(0));
assertThat("No unexpected errors", stats.unexpectedErrors.get(), is(0));
assertThat("No dead executions", stats.deadExecutions.get(), is(0));
assertThat("No multiple-heartbeat-failures", stats.failedMultipleHeartbeats.get(), is(0));
assertThat(scheduler1.getCurrentlyExecuting(), hasSize(0));
assertThat(scheduler2.getCurrentlyExecuting(), hasSize(0));
}
@@ -130,7 +135,7 @@ private static Scheduler createScheduler(
.schedulerName(new Fixed(name))
.threads(NUMBER_OF_THREADS)
.pollingInterval(Duration.ofMillis(50)) // also runs fine with 5s
.heartbeatInterval(Duration.ofMillis(2_000))
.heartbeatInterval(Duration.ofMillis(500))
.statsRegistry(stats);
schedulerCustomization.accept(builder);
return builder.build();
@@ -143,7 +148,7 @@ private static Scheduler createSchedulerRecurring(
.schedulerName(new Fixed(name))
.threads(NUMBER_OF_THREADS)
.pollingInterval(Duration.ofMillis(50))
.heartbeatInterval(Duration.ofMillis(2_000))
.heartbeatInterval(Duration.ofMillis(500))
.statsRegistry(stats)
.build();
}
5 changes: 5 additions & 0 deletions examples/features/pom.xml
Original file line number Diff line number Diff line change
@@ -24,6 +24,11 @@
<artifactId>db-scheduler</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.kagkarlsson</groupId>
<artifactId>micro-jdbc</artifactId>
<version>${micro-jdbc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.examples;

import static com.github.kagkarlsson.jdbc.PreparedStatementSetter.NOOP;

import com.github.kagkarlsson.examples.helpers.Example;
import com.github.kagkarlsson.jdbc.JdbcRunner;
import com.github.kagkarlsson.scheduler.HeartbeatState;
import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask;
import com.github.kagkarlsson.scheduler.task.helper.Tasks;
import java.time.Duration;
import java.time.Instant;
import javax.sql.DataSource;

public class HeartbeatMonitoringMain extends Example {

public static void main(String[] args) {
new HeartbeatMonitoringMain().runWithDatasource();
}

@Override
public void run(DataSource dataSource) {

OneTimeTask<Void> waitForStaleHeartbeatTask =
Tasks.oneTime("wait-for-stale-heartbeat-task", Void.class)
.execute(
(inst, ctx) -> {
System.out.println("Running!");
while (ctx.getCurrentlyExecuting().getHeartbeatState().getFractionDead() < 0.7) {
sleep(100);
printHeartbeat(ctx.getCurrentlyExecuting().getHeartbeatState());
}
printHeartbeat(ctx.getCurrentlyExecuting().getHeartbeatState());
System.out.println("Done!");
});

final Scheduler scheduler =
Scheduler.create(dataSource, waitForStaleHeartbeatTask)
.threads(5)
.heartbeatInterval(Duration.ofSeconds(1))
.pollingInterval(Duration.ofSeconds(1))
.build();

scheduler.start();

scheduler.schedule(waitForStaleHeartbeatTask.instance("1045"), Instant.now());

sleep(2000);
JdbcRunner jdbcRunner = new JdbcRunner(dataSource);

// simulate something that will cause heartbeating to fail
System.out.println("Fake update on execution to cause heartbeat-update to fail.");
jdbcRunner.execute("update scheduled_tasks set version = version + 1", NOOP);
}

private void printHeartbeat(HeartbeatState heartbeatState) {
System.out.printf(
"Will keep running until heartbeat-failure detected. Current state: failed-heartbeats=%s, fraction-dead=%s, stale=%s\n",
heartbeatState.getFailedHeartbeats(),
heartbeatState.getFractionDead(),
heartbeatState.hasStaleHeartbeat());
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@
<dependency-plugin.failOnWarning>true</dependency-plugin.failOnWarning>

<!-- Dependency versions -->
<micro-jdbc.version>0.6.0</micro-jdbc.version>
<slf4j.version>1.7.36</slf4j.version>
<logback.version>1.2.12</logback.version>
<spring-boot.version>2.7.11</spring-boot.version>