5
5
6
6
from typing import Any , Dict , Union , List
7
7
from comfystream .client import ComfyStreamClient
8
+ from utils import temporary_log_level
8
9
9
10
WARMUP_RUNS = 5
10
11
11
12
12
13
class Pipeline :
13
- def __init__ (self , ** kwargs ):
14
+ def __init__ (self , comfyui_inference_log_level : int = None , ** kwargs ):
15
+ """Initialize the pipeline with the given configuration.
16
+ Args:
17
+ comfyui_inference_log_level: The logging level for ComfyUI inference.
18
+ Defaults to None, using the global ComfyUI log level.
19
+ **kwargs: Additional arguments to pass to the ComfyStreamClient
20
+ """
14
21
self .client = ComfyStreamClient (** kwargs , max_workers = 5 ) # TODO: hardcoded max workers, should it be configurable?
15
22
16
23
self .video_incoming_frames = asyncio .Queue ()
17
24
self .audio_incoming_frames = asyncio .Queue ()
18
25
19
26
self .processed_audio_buffer = np .array ([], dtype = np .int16 )
20
27
28
+ self ._comfyui_inference_log_level = comfyui_inference_log_level
29
+
21
30
async def warm_video (self ):
22
31
dummy_frame = av .VideoFrame ()
23
32
dummy_frame .side_data .input = torch .randn (1 , 512 , 512 , 3 )
@@ -76,7 +85,8 @@ def audio_postprocess(self, output: Union[torch.Tensor, np.ndarray]) -> av.Audio
76
85
77
86
async def get_processed_video_frame (self ):
78
87
# TODO: make it generic to support purely generative video cases
79
- out_tensor = await self .client .get_video_output ()
88
+ async with temporary_log_level ("comfy" , self ._comfyui_inference_log_level ):
89
+ out_tensor = await self .client .get_video_output ()
80
90
frame = await self .video_incoming_frames .get ()
81
91
while frame .side_data .skipped :
82
92
frame = await self .video_incoming_frames .get ()
@@ -91,7 +101,8 @@ async def get_processed_audio_frame(self):
91
101
# TODO: make it generic to support purely generative audio cases and also add frame skipping
92
102
frame = await self .audio_incoming_frames .get ()
93
103
if frame .samples > len (self .processed_audio_buffer ):
94
- out_tensor = await self .client .get_audio_output ()
104
+ async with temporary_log_level ("comfy" , self ._comfyui_inference_log_level ):
105
+ out_tensor = await self .client .get_audio_output ()
95
106
self .processed_audio_buffer = np .concatenate ([self .processed_audio_buffer , out_tensor ])
96
107
out_data = self .processed_audio_buffer [:frame .samples ]
97
108
self .processed_audio_buffer = self .processed_audio_buffer [frame .samples :]
@@ -109,4 +120,4 @@ async def get_nodes_info(self) -> Dict[str, Any]:
109
120
return nodes_info
110
121
111
122
async def cleanup (self ):
112
- await self .client .cleanup ()
123
+ await self .client .cleanup ()
0 commit comments