Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dynamic frame skipping #29

Closed
wants to merge 3 commits into from

Conversation

eliteprox
Copy link
Collaborator

@eliteprox eliteprox commented Jan 30, 2025

This change adds a queue in ComfyStreamClient to dynamically skip inference on frames to acheive maximum FPS

  • Queue size: 50 frames
  • When queue is full, the last frame is returned, and one frame is removed from the stack to ensure the next frame is processed without delay.

Dynamically uses as many FPS as possible, however, due to last frame being returned the output FPS will mirror the source

Copy link
Owner

@yondonfu yondonfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree taht this architecture is preferable to the previous one - just left some comments.


class ComfyStreamClient:
def __init__(self, **kwargs):
config = Configuration(**kwargs)
self.comfy_client = EmbeddedComfyClient(config)
self.prompt = None
self._lock = asyncio.Lock()
self.input_queue = deque(maxlen=max_queue_size)
self.output_queue = asyncio.Queue()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that you need two queues here - I think you can just use a single asyncio.Queue to manage the buffer of input frames since you're not consuming anything from the output queue right now.

If the queue is full, the last frame is returned thereby also dropping the current input frame. Otherwise, the input frame is added to the queue and the processor task can handle it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yondonfu See my comment below about two queues #29 (comment). Should we proceed with merge?

if self.processor_task is None:
self.processor_task = asyncio.create_task(self._process_queue())

async def _process_queue(self):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This runs an infinite loop but we probably want some way for it to exit cleanly i.e. when the event loop closes. I think you can catch an asyncio.CancelledError and just re-raise the error. For testing, worth adding a print statement for debugging to see if the loop is actually exiting cleanly.

Copy link
Collaborator Author

@eliteprox eliteprox Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added handling for CancelledError which will break the loop, while allowing for other errors to continue

except Exception as e:
logger.error(f"Error processing queue item: {str(e)}")
if output_fut and not output_fut.done():
output_fut.set_exception(e)
self.output_queue.task_done()
except asyncio.CancelledError:
logger.info("Stopped frame processor loop")
raise

output_fut = asyncio.Future()
tensor_cache.outputs.append(output_fut)
self.input_queue.append(output_fut)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewing this makes me realize there is probably a problem with how locking works right now. I'm not that familiar with the details of that at the moment and whether the lock usage is truly needed, but AFAICT given the current structure there can only be a single queue_prompt() call at a time meaning that appending to the input queue will not happen until the previous queue_prompt() call completes. I don't think that is the desired behavior...you probably want to keep appending to the queue (until its full) even if the previous queue_prompt() is still awaiting the output of an input frame.

If you just want to enable frame skipping then I suggest addressing that in a separate PR.

Copy link
Collaborator Author

@eliteprox eliteprox Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yondonfu That's correct, I believe this is why the solution only works with two queues:

  1. Incoming frames are appended to the input queue, but we do not wait for it to return. This allows for a buffer of incoming frames that get dropped when the queue fills up:
    self.input_queue.append(output_fut)

Then, we must await the output queue for the result, since we need to return a frame from _process_queue

await self.output_queue.put((input, output_fut))
. If we use a single queue, I believe we would have to await the input and output

If we want to address this differently, I think it would affect how apps are implementing ComfyStreamClient, which could be a breaking change. Are you fine with addressing the duplicate queues in a separate PR after we implement release tagging?

@eliteprox eliteprox changed the title Frame skipping io (feat) dynamic frame skipping Feb 4, 2025
@eliteprox eliteprox marked this pull request as ready for review February 4, 2025 21:35
@eliteprox eliteprox changed the title (feat) dynamic frame skipping feat: dynamic frame skipping Feb 6, 2025
@eliteprox
Copy link
Collaborator Author

Closing in favor of #10

@eliteprox eliteprox closed this Feb 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants