Skip to content

Commit cd9e467

Browse files
authored
[core] Guard concurrent access to generator IDs with a mutex (#50845)
Cherry-pick #50740, fixes some serve test flakiness Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
1 parent df8546c commit cd9e467

File tree

2 files changed

+38
-20
lines changed

2 files changed

+38
-20
lines changed

src/ray/core_worker/core_worker.cc

+18-11
Original file line numberDiff line numberDiff line change
@@ -964,9 +964,9 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
964964
"CoreWorker.RecordMetrics");
965965

966966
periodical_runner_->RunFnPeriodically(
967-
[this] { TryDeleteObjectRefStreams(); },
967+
[this] { TryDelPendingObjectRefStreams(); },
968968
RayConfig::instance().local_gc_min_interval_s() * 1000,
969-
"CoreWorker.GCStreamingGeneratorMetadata");
969+
"CoreWorker.TryDelPendingObjectRefStreams");
970970

971971
#ifndef _WIN32
972972
// Doing this last during CoreWorker initialization, so initialization logic like
@@ -3471,21 +3471,28 @@ void CoreWorker::AsyncDelObjectRefStream(const ObjectID &generator_id) {
34713471
if (task_manager_->TryDelObjectRefStream(generator_id)) {
34723472
return;
34733473
}
3474-
deleted_generator_ids_.insert(generator_id);
3474+
3475+
{
3476+
// TryDelObjectRefStream is thread safe so no need to hold the lock above.
3477+
absl::MutexLock lock(&generator_ids_pending_deletion_mutex_);
3478+
generator_ids_pending_deletion_.insert(generator_id);
3479+
}
34753480
}
34763481

3477-
void CoreWorker::TryDeleteObjectRefStreams() {
3478-
std::vector<ObjectID> out_of_scope_generator_ids;
3479-
for (auto it = deleted_generator_ids_.begin(); it != deleted_generator_ids_.end();
3480-
it++) {
3481-
const auto &generator_id = *it;
3482+
void CoreWorker::TryDelPendingObjectRefStreams() {
3483+
absl::MutexLock lock(&generator_ids_pending_deletion_mutex_);
3484+
3485+
std::vector<ObjectID> deleted;
3486+
for (const auto &generator_id : generator_ids_pending_deletion_) {
3487+
RAY_LOG(DEBUG).WithField(generator_id)
3488+
<< "TryDelObjectRefStream from generator_ids_pending_deletion_";
34823489
if (task_manager_->TryDelObjectRefStream(generator_id)) {
3483-
out_of_scope_generator_ids.push_back(generator_id);
3490+
deleted.push_back(generator_id);
34843491
}
34853492
}
34863493

3487-
for (const auto &generator_id : out_of_scope_generator_ids) {
3488-
deleted_generator_ids_.erase(generator_id);
3494+
for (const auto &generator_id : deleted) {
3495+
generator_ids_pending_deletion_.erase(generator_id);
34893496
}
34903497
}
34913498

src/ray/core_worker/core_worker.h

+20-9
Original file line numberDiff line numberDiff line change
@@ -121,18 +121,17 @@ class TaskCounter {
121121
private:
122122
mutable absl::Mutex mu_;
123123
// Tracks all tasks submitted to this worker by state, is_retry.
124-
CounterMap<std::tuple<std::string, TaskStatusType, bool>> counter_
125-
ABSL_GUARDED_BY(&mu_);
124+
CounterMap<std::tuple<std::string, TaskStatusType, bool>> counter_ ABSL_GUARDED_BY(mu_);
126125

127126
// Additionally tracks the sub-states of RUNNING_IN_RAY_GET/WAIT. The counters here
128127
// overlap with those of counter_.
129-
CounterMap<std::pair<std::string, bool>> running_in_get_counter_ ABSL_GUARDED_BY(&mu_);
130-
CounterMap<std::pair<std::string, bool>> running_in_wait_counter_ ABSL_GUARDED_BY(&mu_);
128+
CounterMap<std::pair<std::string, bool>> running_in_get_counter_ ABSL_GUARDED_BY(mu_);
129+
CounterMap<std::pair<std::string, bool>> running_in_wait_counter_ ABSL_GUARDED_BY(mu_);
131130

132-
std::string job_id_ ABSL_GUARDED_BY(&mu_);
131+
std::string job_id_ ABSL_GUARDED_BY(mu_);
133132
// Used for actor state tracking.
134-
std::string actor_name_ ABSL_GUARDED_BY(&mu_);
135-
int64_t num_tasks_running_ ABSL_GUARDED_BY(&mu_) = 0;
133+
std::string actor_name_ ABSL_GUARDED_BY(mu_);
134+
int64_t num_tasks_running_ ABSL_GUARDED_BY(mu_) = 0;
136135
};
137136

138137
struct TaskToRetry {
@@ -294,7 +293,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
294293
/// generator task.
295294
void AsyncDelObjectRefStream(const ObjectID &generator_id);
296295

297-
void TryDeleteObjectRefStreams();
296+
// Attempt to delete ObjectRefStreams that were unable to be deleted when
297+
// AsyncDelObjectRefStream was called (stored in generator_ids_pending_deletion_).
298+
// This function is called periodically on the io_service_.
299+
void TryDelPendingObjectRefStreams();
298300

299301
const PlacementGroupID &GetCurrentPlacementGroupId() const {
300302
return worker_context_.GetCurrentPlacementGroupId();
@@ -1903,7 +1905,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
19031905
/// Worker's PID
19041906
uint32_t pid_;
19051907

1906-
absl::flat_hash_set<ObjectID> deleted_generator_ids_;
1908+
// Guards generator_ids_pending_deletion_.
1909+
absl::Mutex generator_ids_pending_deletion_mutex_;
1910+
1911+
// A set of generator IDs that have gone out of scope but couldn't be deleted from
1912+
// the task manager yet (e.g., due to lineage references). We will periodically
1913+
// attempt to delete them in the background until it succeeds.
1914+
// This field is accessed on the destruction path of an ObjectRefGenerator as well as
1915+
// by a background thread attempting later deletion, so it must be guarded by a lock.
1916+
absl::flat_hash_set<ObjectID> generator_ids_pending_deletion_
1917+
ABSL_GUARDED_BY(generator_ids_pending_deletion_mutex_);
19071918

19081919
/// TODO(hjiang):
19091920
/// 1. Cached job runtime env info, it's not implemented at first place since

0 commit comments

Comments
 (0)