Skip to content

Commit 44df170

Browse files
committed
fix: frame skipping
1 parent c3009c6 commit 44df170

File tree

4 files changed

+22
-13
lines changed

4 files changed

+22
-13
lines changed

nodes/tensor_utils/load_tensor.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from comfystream import tensor_cache
23

34

@@ -15,5 +16,6 @@ def IS_CHANGED():
1516
return float("nan")
1617

1718
def execute(self):
18-
input = tensor_cache.image_inputs.get(block=True)
19-
return (input,)
19+
frame = tensor_cache.image_inputs.get(block=True)
20+
frame.side_data.skipped = False
21+
return (frame.side_data.input,)

server/pipeline.py

+12-8
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ def __init__(self, **kwargs):
1919
self.processed_audio_buffer = np.array([], dtype=np.int16)
2020

2121
async def warm_video(self):
22-
dummy_video_inp = torch.randn(1, 512, 512, 3)
22+
dummy_frame = av.VideoFrame()
23+
dummy_frame.side_data.input = torch.randn(1, 512, 512, 3)
2324

2425
for _ in range(WARMUP_RUNS):
25-
self.client.put_video_input(dummy_video_inp)
26+
self.client.put_video_input(dummy_frame)
2627
await self.client.get_video_output()
2728

2829
async def warm_audio(self):
@@ -45,9 +46,10 @@ async def update_prompts(self, prompts: Union[Dict[Any, Any], List[Dict[Any, Any
4546
await self.client.update_prompts([prompts])
4647

4748
async def put_video_frame(self, frame: av.VideoFrame):
48-
inp_tensor = self.video_preprocess(frame)
49-
self.client.put_video_input(inp_tensor)
50-
await self.video_incoming_frames.put((frame.pts, frame.time_base))
49+
frame.side_data.input = self.video_preprocess(frame)
50+
frame.side_data.skipped = True
51+
self.client.put_video_input(frame)
52+
await self.video_incoming_frames.put(frame)
5153

5254
async def put_audio_frame(self, frame: av.AudioFrame):
5355
inp_np = self.audio_preprocess(frame)
@@ -71,12 +73,14 @@ def audio_postprocess(self, output: Union[torch.Tensor, np.ndarray]) -> av.Audio
7173

7274
async def get_processed_video_frame(self):
7375
# TODO: make it generic to support purely generative video cases
74-
pts, time_base = await self.video_incoming_frames.get()
7576
out_tensor = await self.client.get_video_output()
77+
frame = await self.video_incoming_frames.get()
78+
while frame.side_data.skipped:
79+
frame = await self.video_incoming_frames.get()
7680

7781
processed_frame = self.video_postprocess(out_tensor)
78-
processed_frame.pts = pts
79-
processed_frame.time_base = time_base
82+
processed_frame.pts = frame.pts
83+
processed_frame.time_base = frame.time_base
8084

8185
return processed_frame
8286

src/comfystream/client.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@ async def run_prompt(self, prompt_index: int):
4343
logger.error(f"Error type: {type(e)}")
4444
raise
4545

46-
def put_video_input(self, inp_tensor):
47-
tensor_cache.image_inputs.put(inp_tensor)
46+
def put_video_input(self, frame):
47+
if tensor_cache.image_inputs.full():
48+
tensor_cache.image_inputs.get(block=True)
49+
tensor_cache.image_inputs.put(frame)
4850

4951
def put_audio_input(self, inp_tensor):
5052
tensor_cache.audio_inputs.put(inp_tensor)

src/comfystream/tensor_cache.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66

77
from typing import Union
88

9-
image_inputs: Queue[Union[torch.Tensor, np.ndarray]] = Queue()
9+
# TODO: improve eviction policy fifo might not be the best, skip alternate frames instead
10+
image_inputs: Queue[Union[torch.Tensor, np.ndarray]] = Queue(maxsize=1)
1011
image_outputs: AsyncQueue[Union[torch.Tensor, np.ndarray]] = AsyncQueue()
1112

1213
audio_inputs: Queue[Union[torch.Tensor, np.ndarray]] = Queue()

0 commit comments

Comments
 (0)