Skip to content

Commit bb5ab2e

Browse files
Savonitarrkhachatryan
authored andcommitted
[FLINK-37401] Add JobID to MDC in JobMaster, ResourceManager, DefaultSlotStatusSyncer, FineGrainedSlotManager
1 parent 282b792 commit bb5ab2e

File tree

5 files changed

+440
-257
lines changed

5 files changed

+440
-257
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java

+60-42
Original file line numberDiff line numberDiff line change
@@ -470,23 +470,27 @@ protected void onStart() throws JobMasterException {
470470
/** Suspend the job and shutdown all other services including rpc. */
471471
@Override
472472
public CompletableFuture<Void> onStop() {
473-
log.info(
474-
"Stopping the JobMaster for job '{}' ({}).",
475-
executionPlan.getName(),
476-
executionPlan.getJobID());
477-
478-
// make sure there is a graceful exit
479-
return stopJobExecution(
480-
new FlinkException(
481-
String.format(
482-
"Stopping JobMaster for job '%s' (%s).",
483-
executionPlan.getName(), executionPlan.getJobID())))
484-
.exceptionally(
485-
exception -> {
486-
throw new CompletionException(
487-
new JobMasterException(
488-
"Could not properly stop the JobMaster.", exception));
489-
});
473+
try (MdcUtils.MdcCloseable ignored =
474+
MdcUtils.withContext(MdcUtils.asContextData(executionPlan.getJobID()))) {
475+
log.info(
476+
"Stopping the JobMaster for job '{}' ({}).",
477+
executionPlan.getName(),
478+
executionPlan.getJobID());
479+
480+
// make sure there is a graceful exit
481+
return stopJobExecution(
482+
new FlinkException(
483+
String.format(
484+
"Stopping JobMaster for job '%s' (%s).",
485+
executionPlan.getName(), executionPlan.getJobID())))
486+
.exceptionally(
487+
exception -> {
488+
throw new CompletionException(
489+
new JobMasterException(
490+
"Could not properly stop the JobMaster.",
491+
exception));
492+
});
493+
}
490494
}
491495

492496
// ----------------------------------------------------------------------------------------------
@@ -597,12 +601,14 @@ public void acknowledgeCheckpoint(
597601
final long checkpointId,
598602
final CheckpointMetrics checkpointMetrics,
599603
@Nullable final SerializedValue<TaskStateSnapshot> checkpointState) {
600-
schedulerNG.acknowledgeCheckpoint(
601-
jobID,
602-
executionAttemptID,
603-
checkpointId,
604-
checkpointMetrics,
605-
deserializeTaskStateSnapshot(checkpointState, getClass().getClassLoader()));
604+
try (MdcUtils.MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
605+
schedulerNG.acknowledgeCheckpoint(
606+
jobID,
607+
executionAttemptID,
608+
checkpointId,
609+
checkpointMetrics,
610+
deserializeTaskStateSnapshot(checkpointState, getClass().getClassLoader()));
611+
}
606612
}
607613

608614
@Override
@@ -1212,9 +1218,13 @@ private CompletableFuture<Void> stopJobExecution(final Exception cause) {
12121218
return FutureUtils.runAfterwardsAsync(
12131219
terminationFuture,
12141220
() -> {
1215-
shuffleMaster.unregisterJob(executionPlan.getJobID());
1216-
disconnectTaskManagerResourceManagerConnections(cause);
1217-
stopJobMasterServices();
1221+
try (MdcUtils.MdcCloseable ignored =
1222+
MdcUtils.withContext(
1223+
MdcUtils.asContextData(executionPlan.getJobID()))) {
1224+
shuffleMaster.unregisterJob(executionPlan.getJobID());
1225+
disconnectTaskManagerResourceManagerConnections(cause);
1226+
stopJobMasterServices();
1227+
}
12181228
},
12191229
getMainThreadExecutor());
12201230
}
@@ -1486,10 +1496,12 @@ private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
14861496
@Override
14871497
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
14881498
runAsync(
1489-
() ->
1490-
notifyOfNewResourceManagerLeader(
1491-
leaderAddress,
1492-
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
1499+
MdcUtils.wrapRunnable(
1500+
MdcUtils.asContextData(executionPlan.getJobID()),
1501+
() ->
1502+
notifyOfNewResourceManagerLeader(
1503+
leaderAddress,
1504+
ResourceManagerId.fromUuidOrNull(leaderSessionID))));
14931505
}
14941506

14951507
@Override
@@ -1670,13 +1682,16 @@ private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void
16701682

16711683
@Override
16721684
public void notifyHeartbeatTimeout(final ResourceID resourceId) {
1673-
final String message =
1674-
String.format(
1675-
"The heartbeat of ResourceManager with id %s timed out.",
1676-
resourceId.getStringWithMetadata());
1677-
log.info(message);
1685+
try (MdcUtils.MdcCloseable ignored =
1686+
MdcUtils.withContext(MdcUtils.asContextData(executionPlan.getJobID()))) {
1687+
final String message =
1688+
String.format(
1689+
"The heartbeat of ResourceManager with id %s timed out.",
1690+
resourceId.getStringWithMetadata());
1691+
log.info(message);
16781692

1679-
handleResourceManagerConnectionLoss(resourceId, new TimeoutException(message));
1693+
handleResourceManagerConnectionLoss(resourceId, new TimeoutException(message));
1694+
}
16801695
}
16811696

16821697
private void handleResourceManagerConnectionLoss(ResourceID resourceId, Exception cause) {
@@ -1691,13 +1706,16 @@ private void handleResourceManagerConnectionLoss(ResourceID resourceId, Exceptio
16911706

16921707
@Override
16931708
public void notifyTargetUnreachable(ResourceID resourceID) {
1694-
final String message =
1695-
String.format(
1696-
"ResourceManager with id %s is no longer reachable.",
1697-
resourceID.getStringWithMetadata());
1698-
log.info(message);
1709+
try (MdcUtils.MdcCloseable ignored =
1710+
MdcUtils.withContext(MdcUtils.asContextData(executionPlan.getJobID()))) {
1711+
final String message =
1712+
String.format(
1713+
"ResourceManager with id %s is no longer reachable.",
1714+
resourceID.getStringWithMetadata());
1715+
log.info(message);
16991716

1700-
handleResourceManagerConnectionLoss(resourceID, new JobMasterException(message));
1717+
handleResourceManagerConnectionLoss(resourceID, new JobMasterException(message));
1718+
}
17011719
}
17021720

17031721
@Override

0 commit comments

Comments
 (0)