-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gateway publish to orchestrator. #3211
Conversation
j0sh
commented
Oct 18, 2024
- Requires Scaffolding for realtime-to-realtime #3210
- Set up a trickle HTTP endpoints on the orchestrator (requires golang 1.22 for the new routes; will send a separate patch to bump the go.mod and do any CI adjustments)
- Pulls a RTMP stream from MediaMTX on the gateway when a new stream comes in
- Converts the RTMP into mpegts segments
- Publishes segments to the orchestrator via trickle HTTP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work, I think this is a very important PR, because it describes the protocol between G<>O. I added 2 comments, because I think I still don't understand how we plan it all to work.
Other than that, @j0sh is it possible to actually run the code from this PR? If yes, could you describe how to run it locally? I think if I plan with it, I can have some more comments and understand if better.
server/ai_mediaserver.go
Outdated
// Kick off the RTMP pull and segmentation as soon as possible | ||
ssr := media.NewSwitchableSegmentReader() | ||
go func() { | ||
media.RunSegmentation("rtmp://localhost/"+streamName, ssr.Read) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, gateway initiates the RTMP pull, but the flow of media still goes from mediamtx -> gateway
I guess there could be a better distinction between "who initiates the pull" vis-a-vis the actual flow of media, but RTMP isn't really a request-response protocol in the same way the other HTTP flows are in this diagram. Open to suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can add one more arrow with "initiate RTMP" 🙃 For me it is/was pretty confusing.
u = sess.Transcoder() + u | ||
return url.Parse(u) | ||
} | ||
pub, err := appendHostname(resp.JSON200.PublishUrl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need PublishUrl in Orchestrator? Isn't it Gateway Publishing to Orchestrator?
Yeah this was a bit of a spur of the moment addition when I was looking at the overall flow, where we are doing this request -> response call anyway. This gives the exact endpoint where a publish should happen. Likewise for the subscribe URL - it tells the gateway where to pull the results.
We could skip this entirely and hard-code the URLs via well-known paths, distinguish jobs via ids in HTTP headers, etc but this is an easy way for us to add a bit of topological flexibility without breaking the protocol later (eg, routing the stream to a different machine).
In fact from some of the conversations on Discord right now, there is probably another way to make this even more robust: to get a list of subscribe URLs (think multiple renditions of low latency video transcoding)
a92dacb
to
31077ff
Compare
110cbc6
to
737d637
Compare
Force-pushed to fix merge conflicts from the ai-video rebase
Absolutely! The big thing is making sure you have MediaMTX running. This is pretty simple - it is a single executable + config file. Download a pre-built release and use this config file [1]. Stick the executable in the same directory as the config and run Then for the gateway:
For the orchestrator + worker:
Publish to
That's fair; as mentioned earlier I think that is partially a result of recent pressure to deliver things without enough time to fully design an end-to-end flow within go-livepeer. BTW, I am afraid that this PR is probably not quite enough for you to base your work on just yet, if the plan is still to carry payments within media stream. The trickle server on the orchestrator only behaves as a simple pipe between publisher and subscriber, and does not have any mechanisms (yet) to execute additional code based on incoming segments, eg for us to process PM tickets or record metrics for selection, and we need to adjust the publisher API to also include custom headers. I'll have those in within the next day or so. NB: One elephant in the room here is "how do we integrate mediamtx into our infrastructure" and I will spin up a separate thread to discuss that. [1] The only difference between this config file and the MediaMTX sample config is in the addition of a runOnReady curl hook, and enabling STUN support for WebRTC (because my cloud dev box doesn't work without it; prod might vary). Diff below: diff --git b/mediamtx.yml a/mediamtx.yml
index c3aed76..cf7c60c 100644
--- b/mediamtx.yml
+++ a/mediamtx.yml
@@ -376,8 +376,8 @@ webrtcAdditionalHosts: []
# ICE servers. Needed only when local listeners can't be reached by clients.
# STUN servers allows to obtain and share the public IP of the server.
# TURN/TURNS servers forces all traffic through them.
-webrtcICEServers2: []
- # - url: stun:stun.l.google.com:19302
+webrtcICEServers2:
+ - url: stun:stun.l.google.com:19302
# if user is "AUTH_SECRET", then authentication is secret based.
# the secret must be inserted into the password field.
# username: ''
@@ -643,7 +643,7 @@ pathDefaults:
# a regular expression.
# * MTX_SOURCE_TYPE: source type
# * MTX_SOURCE_ID: source ID
- runOnReady:
+ runOnReady: curl http://localhost:5936/live-video-start -F stream=$MTX_PATH
# Restart the command if it exits.
runOnReadyRestart: no
# Command to run when the stream is not available anymore. |
|
9e461a7
to
e8f2568
Compare
737d637
to
53c7495
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## ai-video #3211 +/- ##
===================================================
- Coverage 35.93147% 34.98009% -0.95138%
===================================================
Files 126 135 +9
Lines 34961 35909 +948
===================================================
- Hits 12562 12561 -1
- Misses 21693 22642 +949
Partials 706 706
... and 1 file with indirect coverage changes Continue to review full report in Codecov by Sentry.
|
a494055
to
4db8c94
Compare
Marking as ready for review so we can get this in and start building other things on top of it. It it also causing merge conflicts with new pipelines so best to get those out of the way |
server/ai_http.go
Outdated
@@ -49,6 +50,8 @@ func startAIServer(lp lphttp) error { | |||
|
|||
openapi3filter.RegisterBodyDecoder("image/png", openapi3filter.FileBodyDecoder) | |||
|
|||
trickle.ConfigureServerWithMux(lp.transRPC) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we maybe want to have a feature flag for this?
For the context, we'll merge ai-video
into master
most probably tomorrow. Then, this change may go to out transcoding broadcaster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would only be triggered on orchestrators with AI enabled, and in order to do anything, they would also need a live pipeline loaded - otherwise, selection should fail on the gateway side.
We can still add a (temporary?) flag if there is something else that would protect against.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maybe you're right. We can go with this part without feature flag.
trickle/trickle_server.go
Outdated
var FirstByteTimeout = errors.New("pending read timeout") | ||
|
||
func ConfigureServerWithMux(mux *http.ServeMux) { | ||
/* TODO we probably want to configure the below |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not to configure it right in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just hadn't gotten around to it yet. Updated the PR with some of this.
trickle/trickle_server.go
Outdated
mux.HandleFunc("DELETE "+BaseServerPath+"{streamName}", streamManager.handleDelete) | ||
} | ||
|
||
func (sm *StreamManager) getStream(streamName string) (*Stream, bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Don't you want to reorder functions to follow the stepdown rule. I guess it's simpler to read the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Old habits from C, where you have to define (or at least declare) functions before using them. I am somewhat used to looking up for a given definition, rather than down but I suppose it does not really matter in golang. If the convention is step down style, we can do that but there will always be some subjectivity about what is "important" enough to come first (declaration-first at least gives something of a rule to follow)
trickle/trickle_server.go
Outdated
stream, exists := sm.streams[streamName] | ||
if !exists { | ||
stream = &Stream{ | ||
segments: make([]*Segment, 5), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
segments: make([]*Segment, 5), | |
segments: make([]*Segment, maxSegmentsPerStream), |
Shouldn't you use maxSegmentsPerStream
instead of 5
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, fixed in cccdde8
trickle/trickle_server.go
Outdated
return stream | ||
} | ||
|
||
func (sm *StreamManager) clearAllStreams() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were is it (or will it be) used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shutdown originally but I think it is not used at the moment
trickle/trickle_server.go
Outdated
return | ||
} | ||
|
||
// TODO properly clear sessions once we have a good solution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the proper solution for clearing the stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearing the stream is not the problem, but session reuse was (eg, trying to start a new session with the same name). We can work around the issue for now by loosening up some of the constraints around sequence numbering. In practice I think this will be less of an issue as long as the O hands out a fresh ID for each session which it does here.
trickle/trickle_server.go
Outdated
s.segments = make([]*Segment, maxSegmentsPerStream) | ||
} | ||
|
||
func (sm *StreamManager) handleDelete(w http.ResponseWriter, r *http.Request) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC Gateway will send a request to delete the stream. All good, but what happens if Gateway never sends that request? Shouldn't we have some automatic delete/cleanup timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we will need to sweep the server periodically, but that can come later
// After reaching the last read byte, start buffering up the bytes of Segment Y | ||
// as they come in. Process requests normally. | ||
|
||
// Handle post requests for a given index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to understand the logic of this function. Could you maybe describe it in the docs above?
My understanding is that it's an HTTP POST request which is kept forever open and the body is streamed all the time. That is why you have this timeoutReader
. Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought I answered this earleir - but I added some explanation in the trickle readme that should hopefully clear things up. (Also removed the long comment right above this.)
To sum up, POST requests are not meant to be kept open forever. For video, they are a GOP / segment in length.
Clients will pre-connect the next POST in the sequence to minimize set-up time, and we use the timeoutReader
to send down periodic 100 Continue keepalives on that preconnect while waiting for content to come in. As soon as content comes in (eg, the first byte), we stop sending keepalives and proceed as normal. Added some comments to clarify that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@j0sh I added some comments, but in general, it's hard for me to understand the details of what happens in the Tickle/Transport protocol. I think I understand how it works with G and O, but I don't understand the details of the protocol itself.
Some things that could help:
- Add a README or comments in
trickle_server.go
with how it works from the overall perspective (I understanding is that Publisher sends a forever lasting HTTP POST req and subscriber sends a forever lasting GET req, but I'm not sure about it 🙃 ) - Add better comments to the parts that are not obvious, some examples:
- Why do we have
firstByteRead
, why there is a difference with failing to read 1st byte and 3rd byte? - Why do we have
timeoutReader
? - What is
idx
in the stream? If it's an index of the segment in the stream and it's set as POST param, then I guess my understanding that we have one long-lasting HTTP POST connection is wrong - ...
- Why do we have
- Add unit tests
Another option is, if we don't have time to make it right ☝️ is to just merge it. I just don't feel comfortable if go live and you're the only person understanding it. But we could merge it and improve on that later if it helps with the Realtime Video work distribution and unblocks some other work. Then, I'm ok with merging it behind some feature flag.
@leszko Thanks for the review! Added a README with some details, updated the code to address some of the TODOs and PR feedback, plus bug fixes |
42d056a
to
67576cc
Compare
seq int | ||
} | ||
|
||
func NewLocalSubscriber(sm *Server, channelName string) *TrickleLocalSubscriber { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need it if it's not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used in #3232
@@ -0,0 +1,57 @@ | |||
# Trickle Protocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the README. Super helpful!
Could you also add info about what's changefeed
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trickle/trickle_server.go
Outdated
if idx == -1 { | ||
idx = s.latestWrite | ||
// TODO figure out how to better handle restarts while maintaining ordering | ||
/* } else if idx > s.latestWrite { */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in 5760116
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@j0sh Thanks for adding README.
I think it's ok to merge it. We'll need to work on some better documentation and unit tests, but we can do it in a separate PR. I can say I understand 50% of how trickle works internally, but I guess it's good enough for now 🙃
After second thought, I think we don't need any feature flag.
a070df4
to
45dd661
Compare
45dd661
to
5760116
Compare