22
22
23
23
import logging
24
24
import os
25
- import threading
26
25
import urllib
27
26
from abc import ABC , abstractmethod
28
27
from types import TracebackType
56
55
run_in_background ,
57
56
)
58
57
from synapse .util import Clock
58
+ from synapse .util .async_helpers import DeferredEvent
59
59
from synapse .util .stringutils import is_ascii
60
60
61
61
if TYPE_CHECKING :
@@ -620,10 +620,13 @@ class ThreadedFileSender:
620
620
A producer that sends the contents of a file to a consumer, reading from the
621
621
file on a thread.
622
622
623
- This works by spawning a loop in a threadpool that repeatedly reads from the
624
- file and sends it to the consumer. The main thread communicates with the
625
- loop via two `threading.Event`, which controls when to start/pause reading
626
- and when to terminate.
623
+ This works by having a loop in a threadpool repeatedly reading from the
624
+ file, until the consumer pauses the producer. There is then a loop in the
625
+ main thread that waits until the consumer resumes the producer and then
626
+ starts reading in the threadpool again.
627
+
628
+ This is done to ensure that we're never waiting in the threadpool, as
629
+ otherwise its easy to starve it of threads.
627
630
"""
628
631
629
632
# How much data to read in one go.
@@ -643,12 +646,11 @@ def __init__(self, hs: "HomeServer") -> None:
643
646
644
647
# Signals if the thread should keep reading/sending data. Set means
645
648
# continue, clear means pause.
646
- self .wakeup_event = threading . Event ( )
649
+ self .wakeup_event = DeferredEvent ( self . reactor )
647
650
648
651
# Signals if the thread should terminate, e.g. because the consumer has
649
- # gone away. Both this and `wakeup_event` should be set to terminate the
650
- # loop (otherwise the thread will block on `wakeup_event`).
651
- self .stop_event = threading .Event ()
652
+ # gone away.
653
+ self .stop_writing = False
652
654
653
655
def beginFileTransfer (
654
656
self , file : BinaryIO , consumer : interfaces .IConsumer
@@ -663,12 +665,7 @@ def beginFileTransfer(
663
665
664
666
# We set the wakeup signal as we should start producing immediately.
665
667
self .wakeup_event .set ()
666
- run_in_background (
667
- defer_to_threadpool ,
668
- self .reactor ,
669
- self .thread_pool ,
670
- self ._on_thread_read_loop ,
671
- )
668
+ run_in_background (self .start_read_loop )
672
669
673
670
return make_deferred_yieldable (self .deferred )
674
671
@@ -686,50 +683,60 @@ def stopProducing(self) -> None:
686
683
# Unregister the consumer so we don't try and interact with it again.
687
684
self .consumer = None
688
685
689
- # Terminate the thread loop.
686
+ # Terminate the loop.
687
+ self .stop_writing = True
690
688
self .wakeup_event .set ()
691
- self .stop_event .set ()
692
689
693
690
if not self .deferred .called :
694
691
self .deferred .errback (Exception ("Consumer asked us to stop producing" ))
695
692
696
- def _on_thread_read_loop (self ) -> None :
697
- """This is the loop that happens on a thread."""
698
-
693
+ async def start_read_loop (self ) -> None :
694
+ """This is the loop that drives reading/writing"""
699
695
try :
700
- while not self .stop_event .is_set ():
701
- # We wait for the producer to signal that the consumer wants
702
- # more data (or we should abort)
696
+ while not self .stop_writing :
697
+ # Start the loop in the threadpool to read data.
698
+ more_data = await defer_to_threadpool (
699
+ self .reactor , self .thread_pool , self ._on_thread_read_loop
700
+ )
701
+ if not more_data :
702
+ # Reached EOF, we can just return.
703
+ return
704
+
703
705
if not self .wakeup_event .is_set ():
704
- ret = self .wakeup_event .wait (self .TIMEOUT_SECONDS )
706
+ ret = await self .wakeup_event .wait (self .TIMEOUT_SECONDS )
705
707
if not ret :
706
708
raise Exception ("Timed out waiting to resume" )
709
+ except Exception :
710
+ self ._error (Failure ())
711
+ finally :
712
+ self ._finish ()
707
713
708
- # Check if we were woken up so that we abort the download
709
- if self .stop_event .is_set ():
710
- return
714
+ def _on_thread_read_loop (self ) -> bool :
715
+ """This is the loop that happens on a thread.
711
716
712
- # The file should always have been set before we get here.
713
- assert self .file is not None
717
+ Returns:
718
+ Whether there is more data to send.
719
+ """
714
720
715
- chunk = self .file . read ( self . CHUNK_SIZE )
716
- if not chunk :
717
- return
721
+ while not self . stop_writing and self .wakeup_event . is_set ():
722
+ # The file should always have been set before we get here.
723
+ assert self . file is not None
718
724
719
- self .reactor .callFromThread (self ._write , chunk )
725
+ chunk = self .file .read (self .CHUNK_SIZE )
726
+ if not chunk :
727
+ return False
720
728
721
- except Exception :
722
- self .reactor .callFromThread (self ._error , Failure ())
723
- finally :
724
- self .reactor .callFromThread (self ._finish )
729
+ self .reactor .callFromThread (self ._write , chunk )
730
+
731
+ return True
725
732
726
733
def _write (self , chunk : bytes ) -> None :
727
734
"""Called from the thread to write a chunk of data"""
728
735
if self .consumer :
729
736
self .consumer .write (chunk )
730
737
731
738
def _error (self , failure : Failure ) -> None :
732
- """Called from the thread when there was a fatal error"""
739
+ """Called when there was a fatal error"""
733
740
if self .consumer :
734
741
self .consumer .unregisterProducer ()
735
742
self .consumer = None
@@ -738,7 +745,7 @@ def _error(self, failure: Failure) -> None:
738
745
self .deferred .errback (failure )
739
746
740
747
def _finish (self ) -> None :
741
- """Called from the thread when it finishes (either on success or
748
+ """Called when we have finished writing (either on success or
742
749
failure)."""
743
750
if self .file :
744
751
self .file .close ()
0 commit comments