Skip to content

Commit

Permalink
New strategy for fuzz scheduling: Reactive or precises scheduling.
Browse files Browse the repository at this point in the history
It looks like the scheduling dips we have are a result of not reacting
fast enough to dips in fuzzing, due to the slowness of the batch API.
But we need to call this API to avoid growing the queue to infinite levels
So use two approaches:
1. If we are far from the limit/quotas of CPUs, don't bother using the API.
2. If we are close, use the API.

I think this should almost guarantee that we can get back to 95%
of target usage within 10 minutes of decline, while usually hitting
the target, instead of having enormous dips.
  • Loading branch information
jonathanmetzman committed Feb 26, 2025
1 parent b0e0891 commit 0c16f8c
Showing 1 changed file with 39 additions and 28 deletions.
67 changes: 39 additions & 28 deletions src/clusterfuzz/_internal/cron/schedule_fuzz.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -76,7 +77,7 @@ def count_unacked(creds, project_id, subscription_id):
return 0


def get_cpu_limit_for_regions(creds, project: str, region: str) -> int:
def get_cpu_usage(creds, project: str, region: str) -> int:
"""Returns the number of available CPUs in the current GCE region."""

quotas = _get_quotas(creds, project, region)
Expand Down Expand Up @@ -110,8 +111,8 @@ def get_cpu_limit_for_regions(creds, project: str, region: str) -> int:
# We need this because us-central1 and us-east4 have different numbers of
# cores alloted to us in their quota. Treat them the same to simplify things.
limit = quota['limit']
limit -= quota['usage']
return min(limit, 100_000)
limit = min(limit, 100_000)
return limit, quota['usage']


class BaseFuzzTaskScheduler:
Expand Down Expand Up @@ -250,33 +251,43 @@ def get_available_cpus(project: str, regions: List[str]) -> int:
# tasks (nor preemptible and non-preemptible CPUs). Fix this.
# Get total scheduled and queued.
creds = credentials.get_default()[0]
count_args = ((project, region) for region in regions)
with multiprocessing.Pool(2) as pool:
# These calls are extremely slow (about 1 minute total).
result = pool.starmap_async( # pylint: disable=no-member
batch.count_queued_or_scheduled_tasks, count_args)
waiting_tasks = count_unacked(creds, project, 'preprocess')
waiting_tasks += count_unacked(creds, project, 'utask_main')
region_counts = zip(*result.get()) # Group all queued and all scheduled.

# Add up all queued and scheduled.
region_counts = [sum(tup) for tup in region_counts]
logs.info(f'Region counts: {region_counts}')
if region_counts[0] > 50_000:
# Check queued tasks.
logs.info('Too many jobs queued, not scheduling more fuzzing.')
return 0
waiting_tasks += sum(region_counts) # Add up queued and scheduled.
soon_occupied_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB
logs.info(f'Soon occupied CPUs: {soon_occupied_cpus}')
cpu_limit = int(
sum(
get_cpu_limit_for_regions(creds, project, region)
for region in regions) * CPU_BUFFER_MULTIPLIER)

target = 0
usage = 0
for region in regions:
region_target, region_usage = get_cpu_usage(creds, project, region)
target += region_target
usage += region_usage
waiting_tasks = (
count_unacked(creds, project, 'preprocess') + count_unacked(
creds, project, 'utask_main'))
if usage + waiting_tasks * CPUS_PER_FUZZ_JOB > .95 * target:
# Only worry about queueing build up if we are above 95% utilization.
count_args = ((project, region) for region in regions)
with multiprocessing.Pool(2) as pool:
target *= CPU_BUFFER_MULTIPLIER
# These calls are extremely slow (about 30 minutes total).
result = pool.starmap_async( # pylint: disable=no-member
batch.count_queued_or_scheduled_tasks, count_args)

region_counts = zip(*result.get()) # Group all queued and all scheduled.
# Add up all queued and scheduled.
region_counts = [sum(tup) for tup in region_counts]
logs.info(f'QUEUED/SCHEDULED tasks per region: {region_counts}')
if region_counts[0] > 10_000:
# Check queued tasks.
logs.info('Too many jobs queued, not scheduling more fuzzing.')
return 0
waiting_tasks += sum(region_counts) # Add up queued and scheduled.
else:
logs.info('Skipping getting tasks.')

occupied_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB + usage
logs.info(f'Soon or currently occupied CPUs: {occupied_cpus}')

logs.info('Actually free CPUs (before subtracting soon '
f'occupied): {cpu_limit}')
available_cpus = max(cpu_limit - soon_occupied_cpus, 0)
f'occupied): {target}')
available_cpus = max(target - occupied_cpus, 0)

# Don't schedule more than 50K tasks at once. So we don't overload batch.
# This number is arbitrary, but we aren't at full capacity at lower numbers.
Expand Down

0 comments on commit 0c16f8c

Please sign in to comment.