From e8f2568b5f4f61d5de48a8f41f9c8dfe6cd034cb Mon Sep 17 00:00:00 2001 From: Josh Allmann Date: Thu, 17 Oct 2024 09:17:30 +0000 Subject: [PATCH 1/2] realtime: Add MediaMTX handler to G, and O caps and signaling --- core/capabilities.go | 2 ++ go.mod | 2 ++ go.sum | 2 ++ server/ai_http.go | 29 +++++++++++++++++++++++++++++ server/ai_mediaserver.go | 25 +++++++++++++++++++++++++ server/ai_process.go | 32 ++++++++++++++++++++++++++++++++ 6 files changed, 92 insertions(+) diff --git a/core/capabilities.go b/core/capabilities.go index 2d8b00eede..44b954ccb7 100644 --- a/core/capabilities.go +++ b/core/capabilities.go @@ -81,6 +81,7 @@ const ( Capability_SegmentAnything2 Capability = 32 Capability_LLM Capability = 33 Capability_ImageToText Capability = 34 + Capability_LiveVideoToVideo Capability = 35 ) var CapabilityNameLookup = map[Capability]string{ @@ -120,6 +121,7 @@ var CapabilityNameLookup = map[Capability]string{ Capability_SegmentAnything2: "Segment anything 2", Capability_LLM: "Llm", Capability_ImageToText: "Image to text", + Capability_LiveVideoToVideo: "Live video to video", } var CapabilityTestLookup = map[Capability]CapabilityTest{ diff --git a/go.mod b/go.mod index 847513b404..c1ec7eec03 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/livepeer/go-livepeer go 1.23.2 +replace github.com/livepeer/ai-worker => github.com/j0sh/livepeer-ai-worker v0.0.0-20241017214425-b4f177ea52a7 + require ( contrib.go.opencensus.io/exporter/prometheus v0.4.2 github.com/Masterminds/semver/v3 v3.2.1 diff --git a/go.sum b/go.sum index a1973ed8a2..647e35ca24 100644 --- a/go.sum +++ b/go.sum @@ -509,6 +509,8 @@ github.com/iris-contrib/blackfriday v2.0.0+incompatible/go.mod h1:UzZ2bDEoaSGPbk github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0= github.com/iris-contrib/i18n v0.0.0-20171121225848-987a633949d0/go.mod h1:pMCz62A0xJL6I+umB2YTlFRwWXaDFA0jy+5HzGiJjqI= github.com/iris-contrib/schema v0.0.1/go.mod h1:urYA3uvUNG1TIIjOSCzHr9/LmbQo8LrOcOqfqxa4hXw= +github.com/j0sh/livepeer-ai-worker v0.0.0-20241017214425-b4f177ea52a7 h1:m9JLRnvNr9SOyy/DZthWVK8F8AoGtBxDlK4zORuV/bA= +github.com/j0sh/livepeer-ai-worker v0.0.0-20241017214425-b4f177ea52a7/go.mod h1:91lMzkzVuwR9kZ0EzXwf+7yVhLaNVmYAfmBtn7t3cQA= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jaypipes/ghw v0.10.0 h1:UHu9UX08Py315iPojADFPOkmjTsNzHj4g4adsNKKteY= diff --git a/server/ai_http.go b/server/ai_http.go index a1ca272f4c..cd32f3fbfc 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -56,6 +56,7 @@ func startAIServer(lp lphttp) error { lp.transRPC.Handle("/llm", oapiReqValidator(lp.LLM())) lp.transRPC.Handle("/segment-anything-2", oapiReqValidator(lp.SegmentAnything2())) lp.transRPC.Handle("/image-to-text", oapiReqValidator(lp.ImageToText())) + lp.transRPC.Handle("/live-video-to-video", oapiReqValidator(lp.StartLiveVideoToVideo())) // Additionally, there is the '/aiResults' endpoint registered in server/rpc.go return nil @@ -236,6 +237,34 @@ func (h *lphttp) ImageToText() http.Handler { } handleAIRequest(ctx, w, r, orch, req) + + }) +} + +func (h *lphttp) StartLiveVideoToVideo() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + // skipping handleAIRequest for now until we have payments + + var ( + mid = string(core.RandomManifestID()) + pubUrl = "/ai/live-video/" + mid + subUrl = pubUrl + "/out" + ) + jsonData, err := json.Marshal(&worker.StartLiveVideoToVideoResponse{ + JSON200: &worker.ResponseStartVideoToVideo{ + PublishUrl: &pubUrl, + SubscribeUrl: &subUrl, + }, + }) + if err != nil { + respondWithError(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(jsonData) }) } diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index a5466fcd46..4429716e7a 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -74,6 +74,9 @@ func startAIMediaServer(ls *LivepeerServer) error { ls.HTTPMux.Handle("/segment-anything-2", oapiReqValidator(handle(ls, multipartDecoder[worker.GenSegmentAnything2MultipartRequestBody], processSegmentAnything2))) ls.HTTPMux.Handle("/image-to-text", oapiReqValidator(handle(ls, multipartDecoder[worker.GenImageToTextMultipartRequestBody], processImageToText))) + // This is called by the media server when the stream is ready + ls.HTTPMux.Handle("/live/video-to-video/start", ls.StartLiveVideo()) + return nil } @@ -361,3 +364,25 @@ func (ls *LivepeerServer) ImageToVideoResult() http.Handler { _ = json.NewEncoder(w).Encode(resp) }) } + +func (ls *LivepeerServer) StartLiveVideo() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + streamName := r.FormValue("stream") + if streamName == "" { + http.Error(w, "Missing stream name", http.StatusBadRequest) + return + } + requestID := string(core.RandomManifestID()) + params := aiRequestParams{ + node: ls.LivepeerNode, + os: drivers.NodeStorage.NewSession(requestID), + sessManager: ls.AISessionManager, + } + ctx := clog.AddVal(r.Context(), "request_id", requestID) + req := worker.StartLiveVideoToVideoFormdataRequestBody{ + // TODO set model and initial parameters here if necessary (eg, prompt) + } + resp, err := processAIRequest(ctx, params, req) + clog.Infof(ctx, "Received live video AI request stream=%s resp=%v err=%v", streamName, resp, err) + }) +} diff --git a/server/ai_process.go b/server/ai_process.go index 113f53d290..02f29ca538 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -34,6 +34,7 @@ const defaultAudioToTextModelID = "openai/whisper-large-v3" const defaultLLMModelID = "meta-llama/llama-3.1-8B-Instruct" const defaultSegmentAnything2ModelID = "facebook/sam2-hiera-large" const defaultImageToTextModelID = "Salesforce/blip-image-captioning-large" +const defaultLiveVideoToVideoModelID = "cumulo-autumn/stream-diffusion" var errWrongFormat = fmt.Errorf("result not in correct format") @@ -865,6 +866,28 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess return &res, nil } +func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *AISession, req worker.StartLiveVideoToVideoFormdataRequestBody) (*worker.StartLiveVideoToVideoResponse, error) { + client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) + if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "LiveVideoToVideo", *req.ModelId, sess.OrchestratorInfo) + } + return nil, err + } + resp, err := client.StartLiveVideoToVideoWithFormdataBodyWithResponse(ctx, req) + if err != nil { + return nil, err + } + if resp.JSON200 != nil { + } + if resp.JSON400 != nil { + } + if resp.JSON500 != nil { + } + // TODO check urls and add sess.Transcoder to the host if necessary + return resp, nil +} + func CalculateLLMLatencyScore(took time.Duration, tokensUsed int) float64 { if tokensUsed <= 0 { return 0 @@ -1204,6 +1227,15 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface submitFn = func(ctx context.Context, params aiRequestParams, sess *AISession) (interface{}, error) { return submitImageToText(ctx, params, sess, v) } + case worker.StartLiveVideoToVideoFormdataRequestBody: + cap = core.Capability_LiveVideoToVideo + modelID = defaultLiveVideoToVideoModelID + if v.ModelId != nil { + modelID = *v.ModelId + } + submitFn = func(ctx context.Context, params aiRequestParams, sess *AISession) (interface{}, error) { + return submitLiveVideoToVideo(ctx, params, sess, v) + } default: return nil, fmt.Errorf("unsupported request type %T", req) } From 4733f05af26460dd8a587c8c1261e8c3f174c2f0 Mon Sep 17 00:00:00 2001 From: Josh Allmann Date: Fri, 25 Oct 2024 22:07:18 +0000 Subject: [PATCH 2/2] Remove dependency on ai-worker changes --- go.mod | 2 -- go.sum | 2 -- server/ai_http.go | 11 ++++++----- server/ai_mediaserver.go | 5 ++--- server/ai_process.go | 37 +++++++++++++++---------------------- 5 files changed, 23 insertions(+), 34 deletions(-) diff --git a/go.mod b/go.mod index c1ec7eec03..847513b404 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module github.com/livepeer/go-livepeer go 1.23.2 -replace github.com/livepeer/ai-worker => github.com/j0sh/livepeer-ai-worker v0.0.0-20241017214425-b4f177ea52a7 - require ( contrib.go.opencensus.io/exporter/prometheus v0.4.2 github.com/Masterminds/semver/v3 v3.2.1 diff --git a/go.sum b/go.sum index 647e35ca24..a1973ed8a2 100644 --- a/go.sum +++ b/go.sum @@ -509,8 +509,6 @@ github.com/iris-contrib/blackfriday v2.0.0+incompatible/go.mod h1:UzZ2bDEoaSGPbk github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0= github.com/iris-contrib/i18n v0.0.0-20171121225848-987a633949d0/go.mod h1:pMCz62A0xJL6I+umB2YTlFRwWXaDFA0jy+5HzGiJjqI= github.com/iris-contrib/schema v0.0.1/go.mod h1:urYA3uvUNG1TIIjOSCzHr9/LmbQo8LrOcOqfqxa4hXw= -github.com/j0sh/livepeer-ai-worker v0.0.0-20241017214425-b4f177ea52a7 h1:m9JLRnvNr9SOyy/DZthWVK8F8AoGtBxDlK4zORuV/bA= -github.com/j0sh/livepeer-ai-worker v0.0.0-20241017214425-b4f177ea52a7/go.mod h1:91lMzkzVuwR9kZ0EzXwf+7yVhLaNVmYAfmBtn7t3cQA= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jaypipes/ghw v0.10.0 h1:UHu9UX08Py315iPojADFPOkmjTsNzHj4g4adsNKKteY= diff --git a/server/ai_http.go b/server/ai_http.go index cd32f3fbfc..ce7d8b76b1 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -251,11 +251,12 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { pubUrl = "/ai/live-video/" + mid subUrl = pubUrl + "/out" ) - jsonData, err := json.Marshal(&worker.StartLiveVideoToVideoResponse{ - JSON200: &worker.ResponseStartVideoToVideo{ - PublishUrl: &pubUrl, - SubscribeUrl: &subUrl, - }, + jsonData, err := json.Marshal(struct { + PublishUrl string + SubscribeUrl string + }{ + PublishUrl: pubUrl, + SubscribeUrl: subUrl, }) if err != nil { respondWithError(w, err.Error(), http.StatusInternalServerError) diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index 4429716e7a..b80f9af317 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -379,9 +379,8 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler { sessManager: ls.AISessionManager, } ctx := clog.AddVal(r.Context(), "request_id", requestID) - req := worker.StartLiveVideoToVideoFormdataRequestBody{ - // TODO set model and initial parameters here if necessary (eg, prompt) - } + // TODO set model and initial parameters here if necessary (eg, prompt) + req := struct{}{} resp, err := processAIRequest(ctx, params, req) clog.Infof(ctx, "Received live video AI request stream=%s resp=%v err=%v", streamName, resp, err) }) diff --git a/server/ai_process.go b/server/ai_process.go index 02f29ca538..6fd92fdf0a 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -866,26 +866,17 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess return &res, nil } -func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *AISession, req worker.StartLiveVideoToVideoFormdataRequestBody) (*worker.StartLiveVideoToVideoResponse, error) { - client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) +func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *AISession, req struct{ ModelId *string }) (any, error) { + //client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) + var err error if err != nil { if monitor.Enabled { monitor.AIRequestError(err.Error(), "LiveVideoToVideo", *req.ModelId, sess.OrchestratorInfo) } return nil, err } - resp, err := client.StartLiveVideoToVideoWithFormdataBodyWithResponse(ctx, req) - if err != nil { - return nil, err - } - if resp.JSON200 != nil { - } - if resp.JSON400 != nil { - } - if resp.JSON500 != nil { - } // TODO check urls and add sess.Transcoder to the host if necessary - return resp, nil + return nil, nil } func CalculateLLMLatencyScore(took time.Duration, tokensUsed int) float64 { @@ -1227,15 +1218,17 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface submitFn = func(ctx context.Context, params aiRequestParams, sess *AISession) (interface{}, error) { return submitImageToText(ctx, params, sess, v) } - case worker.StartLiveVideoToVideoFormdataRequestBody: - cap = core.Capability_LiveVideoToVideo - modelID = defaultLiveVideoToVideoModelID - if v.ModelId != nil { - modelID = *v.ModelId - } - submitFn = func(ctx context.Context, params aiRequestParams, sess *AISession) (interface{}, error) { - return submitLiveVideoToVideo(ctx, params, sess, v) - } + /* + case worker.StartLiveVideoToVideoFormdataRequestBody: + cap = core.Capability_LiveVideoToVideo + modelID = defaultLiveVideoToVideoModelID + if v.ModelId != nil { + modelID = *v.ModelId + } + submitFn = func(ctx context.Context, params aiRequestParams, sess *AISession) (interface{}, error) { + return submitLiveVideoToVideo(ctx, params, sess, v) + } + */ default: return nil, fmt.Errorf("unsupported request type %T", req) }