Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scaffolding for realtime-to-realtime #3210

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const (
Capability_SegmentAnything2 Capability = 32
Capability_LLM Capability = 33
Capability_ImageToText Capability = 34
Capability_LiveVideoToVideo Capability = 35
)

var CapabilityNameLookup = map[Capability]string{
Expand Down Expand Up @@ -120,6 +121,7 @@ var CapabilityNameLookup = map[Capability]string{
Capability_SegmentAnything2: "Segment anything 2",
Capability_LLM: "Llm",
Capability_ImageToText: "Image to text",
Capability_LiveVideoToVideo: "Live video to video",
}

var CapabilityTestLookup = map[Capability]CapabilityTest{
Expand Down
30 changes: 30 additions & 0 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
lp.transRPC.Handle("/llm", oapiReqValidator(lp.LLM()))
lp.transRPC.Handle("/segment-anything-2", oapiReqValidator(lp.SegmentAnything2()))
lp.transRPC.Handle("/image-to-text", oapiReqValidator(lp.ImageToText()))
lp.transRPC.Handle("/live-video-to-video", oapiReqValidator(lp.StartLiveVideoToVideo()))

Check warning on line 59 in server/ai_http.go

View check run for this annotation

Codecov / codecov/patch

server/ai_http.go#L59

Added line #L59 was not covered by tests
// Additionally, there is the '/aiResults' endpoint registered in server/rpc.go

return nil
Expand Down Expand Up @@ -236,6 +237,35 @@
}

handleAIRequest(ctx, w, r, orch, req)

})
}

func (h *lphttp) StartLiveVideoToVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

// skipping handleAIRequest for now until we have payments

var (
mid = string(core.RandomManifestID())
pubUrl = "/ai/live-video/" + mid
subUrl = pubUrl + "/out"
)
jsonData, err := json.Marshal(struct {
PublishUrl string
SubscribeUrl string
}{
PublishUrl: pubUrl,
SubscribeUrl: subUrl,
})
if err != nil {
respondWithError(w, err.Error(), http.StatusInternalServerError)
return
}

Check warning on line 264 in server/ai_http.go

View check run for this annotation

Codecov / codecov/patch

server/ai_http.go#L244-L264

Added lines #L244 - L264 were not covered by tests

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(jsonData)

Check warning on line 268 in server/ai_http.go

View check run for this annotation

Codecov / codecov/patch

server/ai_http.go#L266-L268

Added lines #L266 - L268 were not covered by tests
})
}

Expand Down
24 changes: 24 additions & 0 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@
ls.HTTPMux.Handle("/segment-anything-2", oapiReqValidator(handle(ls, multipartDecoder[worker.GenSegmentAnything2MultipartRequestBody], processSegmentAnything2)))
ls.HTTPMux.Handle("/image-to-text", oapiReqValidator(handle(ls, multipartDecoder[worker.GenImageToTextMultipartRequestBody], processImageToText)))

// This is called by the media server when the stream is ready
ls.HTTPMux.Handle("/live/video-to-video/start", ls.StartLiveVideo())

return nil
}

Expand Down Expand Up @@ -361,3 +364,24 @@
_ = json.NewEncoder(w).Encode(resp)
})
}

func (ls *LivepeerServer) StartLiveVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
streamName := r.FormValue("stream")
if streamName == "" {
http.Error(w, "Missing stream name", http.StatusBadRequest)
return
}
requestID := string(core.RandomManifestID())
params := aiRequestParams{
node: ls.LivepeerNode,
os: drivers.NodeStorage.NewSession(requestID),
sessManager: ls.AISessionManager,
}
ctx := clog.AddVal(r.Context(), "request_id", requestID)
// TODO set model and initial parameters here if necessary (eg, prompt)
req := struct{}{}
resp, err := processAIRequest(ctx, params, req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should follow this AI flow here. I guess what we're trying to do is more similar to transcoding.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? This selects an orchestrator and allows us to receive and process a response from it (so we know where to send and receive media from)

This method works for now - if you want to change it later as part of the selection work that is fine but this gets us going with the existing RPC mechanisms

Copy link
Contributor

@mjh1 mjh1 Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see what @leszko is saying, up until now we have just been sending in synchronous AI jobs, the pipeline runs and returns the result. Whereas here the request is to start up the realtime to realtime stream basically? That said I don't think it's a huge issue when there's lots of common code that we're able to reuse by following the same flow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we can "re-use" it for this starting of AI runner. But then, the next question would be, when does the runner container stops? Or who stops it? We don't need to solve it in this PR, but something to consider.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep as @mjh1 says this basically makes a synchronous request using the normal AI inference path. This is pretty convenient, because it gives us an entry point to kick off the rest of the process - see #3211 for the beginnings of that.

I expect that we'd be building things like our job monitoring / re-selection, payments, etc somewhere near this 'main loop'

But then, the next question would be, when does the runner container stops? Or who stops it? We don't need to solve it in this PR, but something to consider.

This is would be a basic RPC request which can be done via whatever call-path makes sense. However, as far as job tear-down goes, I am not sure if we can rely on signaling for that (anything can disappear without notice), so we should ensure things still work well in the absence of explicit tear-down.

We'll need to solve a similar issue with calls to the model control API. The gateway will need some handle to the orchestrator session, the orchestrator to the (remote) worker, and possibly the worker to the runner (if runners have persistent job state). @victorges for visibility since this is his area

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is would be a basic RPC request which can be done via whatever call-path makes sense. However, as far as job tear-down goes, I am not sure if we can rely on signaling for that (anything can disappear without notice), so we should ensure things still work well in the absence of explicit tear-down.

Yeah, the best approach from the Distributed (and Decentralized) system perspective is what we do for Transcoding:

  1. The params are included in the stream (segments)
  2. The AI (Transcoding) pipelines is initialized when the first segment comes in
  3. The tear down is down on some timeout (so when there are no segments coming for x min, then it tears down)

Any signaling of warm up / tear down is an "optimization" on top of that. This approach is good for reliability, because a lot may happen, O may go down, we may swap the orchestrator, G may go down (before signaling something), etc.

CC: @victorges

Again, I think we can move this discussion outside this PR. The PR can be merged and we can discuss it separately.

clog.Infof(ctx, "Received live video AI request stream=%s resp=%v err=%v", streamName, resp, err)

Check warning on line 385 in server/ai_mediaserver.go

View check run for this annotation

Codecov / codecov/patch

server/ai_mediaserver.go#L370-L385

Added lines #L370 - L385 were not covered by tests
})
}
25 changes: 25 additions & 0 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
const defaultLLMModelID = "meta-llama/llama-3.1-8B-Instruct"
const defaultSegmentAnything2ModelID = "facebook/sam2-hiera-large"
const defaultImageToTextModelID = "Salesforce/blip-image-captioning-large"
const defaultLiveVideoToVideoModelID = "cumulo-autumn/stream-diffusion"

var errWrongFormat = fmt.Errorf("result not in correct format")

Expand Down Expand Up @@ -865,6 +866,19 @@
return &res, nil
}

func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *AISession, req struct{ ModelId *string }) (any, error) {
//client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient))
var err error
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "LiveVideoToVideo", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err

Check warning on line 876 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L869-L876

Added lines #L869 - L876 were not covered by tests
}
// TODO check urls and add sess.Transcoder to the host if necessary
return nil, nil

Check warning on line 879 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L879

Added line #L879 was not covered by tests
}

func CalculateLLMLatencyScore(took time.Duration, tokensUsed int) float64 {
if tokensUsed <= 0 {
return 0
Expand Down Expand Up @@ -1204,6 +1218,17 @@
submitFn = func(ctx context.Context, params aiRequestParams, sess *AISession) (interface{}, error) {
return submitImageToText(ctx, params, sess, v)
}
/*
case worker.StartLiveVideoToVideoFormdataRequestBody:
cap = core.Capability_LiveVideoToVideo
modelID = defaultLiveVideoToVideoModelID
if v.ModelId != nil {
modelID = *v.ModelId
}
submitFn = func(ctx context.Context, params aiRequestParams, sess *AISession) (interface{}, error) {
return submitLiveVideoToVideo(ctx, params, sess, v)
}
*/
default:
return nil, fmt.Errorf("unsupported request type %T", req)
}
Expand Down
Loading