Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to break parking lots, stop locks from stalling #3081

Merged
merged 23 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3bd67a5
initial implementation of lot breaking
jakkdl Sep 6, 2024
4dfa1ad
fix import cycle
jakkdl Sep 6, 2024
543a087
fix re-export for verifytypes visibility
jakkdl Sep 6, 2024
c36cdad
update docstrings
jakkdl Sep 6, 2024
127c5fc
fixes after review by TeamSpen210
jakkdl Sep 6, 2024
1f75d44
add tests
jakkdl Sep 6, 2024
6835e87
add lock handover test
jakkdl Sep 6, 2024
3b86e80
clean up breaker dict
jakkdl Sep 6, 2024
94ff9a2
clean up GLOBAL_PARKING_LOT_BREAKER when task releases or exits
jakkdl Sep 6, 2024
eb7a451
add newsfragments, add StalledLockError, reraise BrokenResourceError …
jakkdl Sep 10, 2024
e7d7205
add test for default argument of break_lot
jakkdl Sep 10, 2024
c89fb2a
Merge branch 'main' into break_the_lot
jakkdl Sep 10, 2024
277c7da
various fixes after review
jakkdl Sep 18, 2024
21cf0d6
Merge remote-tracking branch 'origin/main' into break_the_lot
jakkdl Sep 18, 2024
45f78f4
break lots before other checks, minor phrasing improvement in docstring
jakkdl Sep 19, 2024
ec48863
docstring updates after A5rocks review
jakkdl Sep 27, 2024
c742a52
Merge branch 'main' into break_the_lot
jakkdl Sep 27, 2024
7a1ce5b
raise brokenresourceerror if registering an already exited task. fix …
jakkdl Oct 2, 2024
cc97cca
remove warning on task exit
jakkdl Oct 7, 2024
b81e297
Merge remote-tracking branch 'origin/main' into break_the_lot
jakkdl Oct 7, 2024
1d7ece3
make broken_by attribute a list, clean up tests
jakkdl Oct 8, 2024
b826210
Merge branch 'main' into break_the_lot
jakkdl Oct 8, 2024
92f9799
fix test. polish comments and tests
jakkdl Oct 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/source/reference-lowlevel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,10 @@ Wait queue abstraction
.. autoclass:: ParkingLotStatistics
:members:

.. autofunction:: add_parking_lot_breaker

.. autofunction:: remove_parking_lot_breaker

Low-level checkpoint functions
------------------------------

Expand Down
1 change: 1 addition & 0 deletions newsfragments/3035.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:class:`trio.Lock` and :class:`trio.StrictFIFOLock` will now raise :exc:`trio.BrokenResourceError` when :meth:`trio.Lock.acquire` would previously stall due to the owner of the lock exiting without releasing the lock.
1 change: 1 addition & 0 deletions newsfragments/3081.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added :func:`trio.lowlevel.add_parking_lot_breaker` and :func:`trio.lowlevel.remove_parking_lot_breaker` to allow creating custom lock/semaphore implementations that will break their underlying parking lot if a task exits unexpectedly. :meth:`trio.lowlevel.ParkingLot.break_lot` is also added, to allow breaking a parking lot intentionally.
7 changes: 6 additions & 1 deletion src/trio/_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
from ._ki import currently_ki_protected, disable_ki_protection, enable_ki_protection
from ._local import RunVar, RunVarToken
from ._mock_clock import MockClock
from ._parking_lot import ParkingLot, ParkingLotStatistics
from ._parking_lot import (
ParkingLot,
ParkingLotStatistics,
add_parking_lot_breaker,
remove_parking_lot_breaker,
)

# Imports that always exist
from ._run import (
Expand Down
71 changes: 71 additions & 0 deletions src/trio/_core/_parking_lot.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@
# See: https://github.com/python-trio/trio/issues/53
from __future__ import annotations

import inspect
import math
from collections import OrderedDict
from typing import TYPE_CHECKING

import attrs
import outcome

from .. import _core
from .._util import final
Expand All @@ -86,6 +88,37 @@
from ._run import Task


GLOBAL_PARKING_LOT_BREAKER: dict[Task, list[ParkingLot]] = {}


def add_parking_lot_breaker(task: Task, lot: ParkingLot) -> None:
"""Register a task as a breaker for a lot. See :meth:`ParkingLot.break_lot`.

raises:
trio.BrokenResourceError: if the task has already exited.
"""
if inspect.getcoroutinestate(task.coro) == inspect.CORO_CLOSED:
raise _core._exceptions.BrokenResourceError(
"Attempted to add already exited task as lot breaker.",
)
if task not in GLOBAL_PARKING_LOT_BREAKER:
GLOBAL_PARKING_LOT_BREAKER[task] = [lot]
else:
GLOBAL_PARKING_LOT_BREAKER[task].append(lot)


def remove_parking_lot_breaker(task: Task, lot: ParkingLot) -> None:
"""Deregister a task as a breaker for a lot. See :meth:`ParkingLot.break_lot`"""
try:
GLOBAL_PARKING_LOT_BREAKER[task].remove(lot)
except (KeyError, ValueError):
raise RuntimeError(
"Attempted to remove task as breaker for a lot it is not registered for",
) from None
if not GLOBAL_PARKING_LOT_BREAKER[task]:
del GLOBAL_PARKING_LOT_BREAKER[task]


@attrs.frozen
class ParkingLotStatistics:
"""An object containing debugging information for a ParkingLot.
Expand Down Expand Up @@ -118,6 +151,7 @@ class ParkingLot:
# {task: None}, we just want a deque where we can quickly delete random
# items
_parked: OrderedDict[Task, None] = attrs.field(factory=OrderedDict, init=False)
broken_by: list[Task] = attrs.field(factory=list, init=False)

def __len__(self) -> int:
"""Returns the number of parked tasks."""
Expand All @@ -136,7 +170,15 @@ async def park(self) -> None:
"""Park the current task until woken by a call to :meth:`unpark` or
:meth:`unpark_all`.

Raises:
BrokenResourceError: if attempting to park in a broken lot, or the lot
breaks before we get to unpark.

"""
if self.broken_by:
raise _core.BrokenResourceError(
f"Attempted to park in parking lot broken by {self.broken_by}",
)
task = _core.current_task()
self._parked[task] = None
task.custom_sleep_data = self
Expand Down Expand Up @@ -234,6 +276,35 @@ def repark_all(self, new_lot: ParkingLot) -> None:
"""
return self.repark(new_lot, count=len(self))

def break_lot(self, task: Task | None = None) -> None:
"""Break this lot, with ``task`` noted as the task that broke it.

This causes all parked tasks to raise an error, and any
future tasks attempting to park to error. Unpark & repark become no-ops as the
parking lot is empty.

The error raised contains a reference to the task sent as a parameter. The task
is also saved in the parking lot in the ``broken_by`` attribute.
"""
if task is None:
task = _core.current_task()

# if lot is already broken, just mark this as another breaker and return
if self.broken_by:
self.broken_by.append(task)
return

self.broken_by.append(task)

for parked_task in self._parked:
_core.reschedule(
parked_task,
outcome.Error(
_core.BrokenResourceError(f"Parking lot broken by {task}"),
),
)
self._parked.clear()

def statistics(self) -> ParkingLotStatistics:
"""Return an object containing debugging information.

Expand Down
7 changes: 7 additions & 0 deletions src/trio/_core/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from ._exceptions import Cancelled, RunFinishedError, TrioInternalError
from ._instrumentation import Instruments
from ._ki import LOCALS_KEY_KI_PROTECTION_ENABLED, KIManager, enable_ki_protection
from ._parking_lot import GLOBAL_PARKING_LOT_BREAKER
from ._thread_cache import start_thread_soon
from ._traps import (
Abort,
Expand Down Expand Up @@ -1896,6 +1897,12 @@ async def python_wrapper(orig_coro: Awaitable[RetT]) -> RetT:
return task

def task_exited(self, task: Task, outcome: Outcome[Any]) -> None:
# break parking lots associated with the exiting task
if task in GLOBAL_PARKING_LOT_BREAKER:
for lot in GLOBAL_PARKING_LOT_BREAKER[task]:
lot.break_lot(task)
del GLOBAL_PARKING_LOT_BREAKER[task]

if (
task._cancel_status is not None
and task._cancel_status.abandoned_by_misnesting
Expand Down
167 changes: 167 additions & 0 deletions src/trio/_core/_tests/test_parking_lot.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
from __future__ import annotations

import re
from typing import TypeVar

import pytest

import trio
from trio.lowlevel import (
add_parking_lot_breaker,
current_task,
remove_parking_lot_breaker,
)
from trio.testing import Matcher, RaisesGroup

from ... import _core
from ...testing import wait_all_tasks_blocked
from .._parking_lot import ParkingLot
Expand Down Expand Up @@ -215,3 +224,161 @@ async def test_parking_lot_repark_with_count() -> None:
"wake 2",
]
lot1.unpark_all()


async def dummy_task(
task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED,
) -> None:
task_status.started(_core.current_task())
await trio.sleep_forever()


async def test_parking_lot_breaker_basic() -> None:
"""Test basic functionality for breaking lots."""
lot = ParkingLot()
task = current_task()

# defaults to current task
lot.break_lot()
assert lot.broken_by == [task]

# breaking the lot again with the same task appends another copy in `broken_by`
lot.break_lot()
assert lot.broken_by == [task, task]

# trying to park in broken lot errors
broken_by_str = re.escape(str([task, task]))
with pytest.raises(
_core.BrokenResourceError,
match=f"^Attempted to park in parking lot broken by {broken_by_str}$",
):
await lot.park()


async def test_parking_lot_break_parking_tasks() -> None:
"""Checks that tasks currently waiting to park raise an error when the breaker exits."""

async def bad_parker(lot: ParkingLot, scope: _core.CancelScope) -> None:
add_parking_lot_breaker(current_task(), lot)
with scope:
await trio.sleep_forever()

lot = ParkingLot()
cs = _core.CancelScope()

# check that parked task errors
with RaisesGroup(
Matcher(_core.BrokenResourceError, match="^Parking lot broken by"),
):
async with _core.open_nursery() as nursery:
nursery.start_soon(bad_parker, lot, cs)
await wait_all_tasks_blocked()

nursery.start_soon(lot.park)
await wait_all_tasks_blocked()

cs.cancel()


async def test_parking_lot_breaker_registration() -> None:
lot = ParkingLot()
task = current_task()

with pytest.raises(
RuntimeError,
match="Attempted to remove task as breaker for a lot it is not registered for",
):
remove_parking_lot_breaker(task, lot)

# check that a task can be registered as breaker for the same lot multiple times
add_parking_lot_breaker(task, lot)
add_parking_lot_breaker(task, lot)
remove_parking_lot_breaker(task, lot)
remove_parking_lot_breaker(task, lot)

with pytest.raises(
RuntimeError,
match="Attempted to remove task as breaker for a lot it is not registered for",
):
remove_parking_lot_breaker(task, lot)

# registering a task as breaker on an already broken lot is fine
lot.break_lot()
child_task = None
async with trio.open_nursery() as nursery:
child_task = await nursery.start(dummy_task)
add_parking_lot_breaker(child_task, lot)
nursery.cancel_scope.cancel()
assert lot.broken_by == [task, child_task]

# manually breaking a lot with an already exited task is fine
lot = ParkingLot()
lot.break_lot(child_task)
assert lot.broken_by == [child_task]


async def test_parking_lot_breaker_rebreak() -> None:
lot = ParkingLot()
task = current_task()
lot.break_lot()

# breaking an already broken lot with a different task is allowed
# The nursery is only to create a task we can pass to lot.break_lot
async with trio.open_nursery() as nursery:
child_task = await nursery.start(dummy_task)
lot.break_lot(child_task)
nursery.cancel_scope.cancel()

assert lot.broken_by == [task, child_task]


async def test_parking_lot_multiple_breakers_exit() -> None:
# register multiple tasks as lot breakers, then have them all exit
lot = ParkingLot()
async with trio.open_nursery() as nursery:
child_task1 = await nursery.start(dummy_task)
child_task2 = await nursery.start(dummy_task)
child_task3 = await nursery.start(dummy_task)
add_parking_lot_breaker(child_task1, lot)
add_parking_lot_breaker(child_task2, lot)
add_parking_lot_breaker(child_task3, lot)
nursery.cancel_scope.cancel()

# I think the order is guaranteed currently, but doesn't hurt to be safe.
assert set(lot.broken_by) == {child_task1, child_task2, child_task3}


async def test_parking_lot_breaker_register_exited_task() -> None:
lot = ParkingLot()
child_task = None
async with trio.open_nursery() as nursery:
child_task = await nursery.start(dummy_task)
nursery.cancel_scope.cancel()
# trying to register an exited task as lot breaker errors
with pytest.raises(
trio.BrokenResourceError,
match="^Attempted to add already exited task as lot breaker.$",
):
add_parking_lot_breaker(child_task, lot)


async def test_parking_lot_break_itself() -> None:
"""Break a parking lot, where the breakee is parked.
Doing this is weird, but should probably be supported.
"""

async def return_me_and_park(
lot: ParkingLot,
*,
task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED,
) -> None:
task_status.started(_core.current_task())
await lot.park()

lot = ParkingLot()
with RaisesGroup(
Matcher(_core.BrokenResourceError, match="^Parking lot broken by"),
):
async with _core.open_nursery() as nursery:
child_task = await nursery.start(return_me_and_park, lot)
lot.break_lot(child_task)
Loading
Loading