Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gateway publish to orchestrator. #3211

Merged
merged 8 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
go.opencensus.io v0.24.0
go.uber.org/goleak v1.3.0
golang.org/x/net v0.28.0
golang.org/x/sys v0.26.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.1
pgregory.net/rapid v1.1.0
Expand Down Expand Up @@ -237,7 +238,6 @@ require (
golang.org/x/mod v0.20.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.24.0 // indirect
Expand Down
275 changes: 275 additions & 0 deletions media/rtmp2segment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
package media

import (
"bufio"
"encoding/base32"
"fmt"
"io"
"log/slog"
"math/rand"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"github.com/livepeer/lpms/ffmpeg"
"golang.org/x/sys/unix"
)

var waitTimeout = 20 * time.Second

type MediaSegmenter struct {
Workdir string
}

func (ms *MediaSegmenter) RunSegmentation(in string, segmentHandler SegmentHandler) {

outFilePattern := filepath.Join(ms.Workdir, randomString()+"-%d.ts")
completionSignal := make(chan bool, 1)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
processSegments(segmentHandler, outFilePattern, completionSignal)
}()
ffmpeg.FfmpegSetLogLevel(ffmpeg.FFLogWarning)
ffmpeg.Transcode3(&ffmpeg.TranscodeOptionsIn{
Fname: in,
}, []ffmpeg.TranscodeOptions{{
Oname: outFilePattern,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
VideoEncoder: ffmpeg.ComponentOptions{Name: "copy"},
Muxer: ffmpeg.ComponentOptions{Name: "segment"},
}})
completionSignal <- true
slog.Info("sent completion signal, now waiting")
wg.Wait()

Check warning on line 48 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L27-L48

Added lines #L27 - L48 were not covered by tests
}

func createNamedPipe(pipeName string) {
err := syscall.Mkfifo(pipeName, 0666)
if err != nil && !os.IsExist(err) {
slog.Error("Failed to create named pipe", "pipeName", pipeName, "err", err)
}

Check warning on line 55 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L51-L55

Added lines #L51 - L55 were not covered by tests
}

func cleanUpPipe(pipeName string) {
err := os.Remove(pipeName)
if err != nil {
slog.Error("Failed to remove pipe", "pipeName", pipeName, "err", err)
}

Check warning on line 62 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L58-L62

Added lines #L58 - L62 were not covered by tests
}

func openNonBlockingWithRetry(name string, timeout time.Duration, completed <-chan bool) (*os.File, error) {
// Pipes block if there is no writer available

// Attempt to open the named pipe in non-blocking mode once
fd, err := syscall.Open(name, syscall.O_RDONLY|syscall.O_NONBLOCK, 0666)
if err != nil {
return nil, fmt.Errorf("error opening file in non-blocking mode: %w", err)
}

Check warning on line 72 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L65-L72

Added lines #L65 - L72 were not covered by tests

deadline := time.Now().Add(timeout)

// setFd sets the given file descriptor in the fdSet
setFd := func(fd int, fdSet *syscall.FdSet) {
fdSet.Bits[fd/64] |= 1 << (uint(fd) % 64)
}

Check warning on line 79 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L74-L79

Added lines #L74 - L79 were not covered by tests

// isFdSet checks if the given file descriptor is set in the fdSet
isFdSet := func(fd int, fdSet *syscall.FdSet) bool {
return fdSet.Bits[fd/64]&(1<<(uint(fd)%64)) != 0
}

Check warning on line 84 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L82-L84

Added lines #L82 - L84 were not covered by tests

for {
// Check if completed
select {
case <-completed:
syscall.Close(fd)
return nil, fmt.Errorf("Completed")
default:

Check warning on line 92 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L86-L92

Added lines #L86 - L92 were not covered by tests
// continue
}
// Calculate the remaining time until the deadline
timeLeft := time.Until(deadline)
if timeLeft <= 0 {
syscall.Close(fd)
return nil, fmt.Errorf("timeout waiting for file to be ready: %s", name)
}

Check warning on line 100 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L96-L100

Added lines #L96 - L100 were not covered by tests

// Convert timeLeft to a syscall.Timeval for the select call
tv := syscall.NsecToTimeval((100 * time.Millisecond).Nanoseconds())

// Set up the read file descriptor set for select
readFds := &syscall.FdSet{}
setFd(fd, readFds)

// Wait using select until the pipe is ready for reading
n, err := crossPlatformSelect(fd+1, readFds, nil, nil, &tv)
if err != nil {
if err == syscall.EINTR {
continue // Retry if interrupted by a signal

Check warning on line 113 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L103-L113

Added lines #L103 - L113 were not covered by tests
}
syscall.Close(fd)
return nil, fmt.Errorf("select error: %v", err)

Check warning on line 116 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L115-L116

Added lines #L115 - L116 were not covered by tests
}

// Check if the file descriptor is ready
if n > 0 && isFdSet(fd, readFds) {
// Modify the file descriptor to blocking mode using fcntl
flags, err := unix.FcntlInt(uintptr(fd), syscall.F_GETFL, 0)
if err != nil {
syscall.Close(fd)
return nil, fmt.Errorf("error getting file flags: %w", err)
}

Check warning on line 126 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L120-L126

Added lines #L120 - L126 were not covered by tests

// Clear the non-blocking flag
flags &^= syscall.O_NONBLOCK
if _, err := unix.FcntlInt(uintptr(fd), syscall.F_SETFL, flags); err != nil {
syscall.Close(fd)
return nil, fmt.Errorf("error setting file to blocking mode: %w", err)
}

Check warning on line 133 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L129-L133

Added lines #L129 - L133 were not covered by tests

// Convert the file descriptor to an *os.File to return
return os.NewFile(uintptr(fd), name), nil

Check warning on line 136 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L136

Added line #L136 was not covered by tests
}
}
}

func processSegments(segmentHandler SegmentHandler, outFilePattern string, completionSignal <-chan bool) {

// things protected by the mutex mu
mu := &sync.Mutex{}
isComplete := false
var currentSegment *os.File = nil
pipeCompletion := make(chan bool, 1)

// Start a goroutine to wait for the completion signal
go func() {
<-completionSignal
mu.Lock()
defer mu.Unlock()
if currentSegment != nil {
// Trigger EOF on the current segment by closing the file
slog.Info("Completion signal received. Closing current segment to trigger EOF.")
currentSegment.Close()
}
isComplete = true
pipeCompletion <- true
slog.Info("Got completion signal")

Check warning on line 161 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L141-L161

Added lines #L141 - L161 were not covered by tests
}()

pipeNum := 0
createNamedPipe(fmt.Sprintf(outFilePattern, pipeNum))

for {
pipeName := fmt.Sprintf(outFilePattern, pipeNum)
nextPipeName := fmt.Sprintf(outFilePattern, pipeNum+1)

// Create the next pipe ahead of time
createNamedPipe(nextPipeName)

// Open the current pipe for reading
// Blocks if no writer is available so do some tricks to it
file, err := openNonBlockingWithRetry(pipeName, waitTimeout, pipeCompletion)
if err != nil {
slog.Error("Error opening pipe", "pipeName", pipeName, "err", err)
cleanUpPipe(pipeName)
cleanUpPipe(nextPipeName)
break

Check warning on line 181 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L164-L181

Added lines #L164 - L181 were not covered by tests
}

mu.Lock()
currentSegment = file
mu.Unlock()

// Handle the reading process
readSegment(segmentHandler, file, pipeName)

// Increment to the next pipe
pipeNum++

// Clean up the current pipe after reading
cleanUpPipe(pipeName)

mu.Lock()
if isComplete {
cleanUpPipe(pipeName)
cleanUpPipe(nextPipeName)
mu.Unlock()
break

Check warning on line 202 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L184-L202

Added lines #L184 - L202 were not covered by tests
}
mu.Unlock()

Check warning on line 204 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L204

Added line #L204 was not covered by tests

}
}

func readSegment(segmentHandler SegmentHandler, file *os.File, pipeName string) {
defer file.Close()

reader := bufio.NewReader(file)
firstByteRead := false
totalBytesRead := int64(0)

buf := make([]byte, 32*1024)

// TODO should be explicitly buffered for better management
interfaceReader, interfaceWriter := io.Pipe()
defer interfaceWriter.Close()
segmentHandler(interfaceReader)

for {
n, err := reader.Read(buf)
if n > 0 {
if !firstByteRead {
slog.Debug("First byte read", "pipeName", pipeName)
firstByteRead = true

}
totalBytesRead += int64(n)
if _, err := interfaceWriter.Write(buf[:n]); err != nil {
if err != io.EOF {
slog.Error("Error writing", "pipeName", pipeName, "err", err)
}

Check warning on line 235 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L209-L235

Added lines #L209 - L235 were not covered by tests
}
}
if n == len(buf) && n < 1024*1024 {
newLen := int(float64(len(buf)) * 1.5)
slog.Info("Max buf hit, increasing", "oldSize", humanBytes(int64(len(buf))), "newSize", humanBytes(int64(newLen)))
buf = make([]byte, newLen)
}

Check warning on line 242 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L238-L242

Added lines #L238 - L242 were not covered by tests

if err != nil {
if err.Error() == "EOF" {
slog.Debug("Last byte read", "pipeName", pipeName, "totalRead", humanBytes(totalBytesRead))
} else {
slog.Error("Error reading", "pipeName", pipeName, "err", err)
}
break

Check warning on line 250 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L244-L250

Added lines #L244 - L250 were not covered by tests
}
}
}

func randomString() string {
// Create a random 4-byte string encoded as base32, trimming padding
b := make([]byte, 4)
for i := range b {
b[i] = byte(rand.Intn(256))
}
return strings.TrimRight(base32.StdEncoding.EncodeToString(b), "=")

Check warning on line 261 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L255-L261

Added lines #L255 - L261 were not covered by tests
}

func humanBytes(bytes int64) string {
var unit int64 = 1024
if bytes < unit {
return fmt.Sprintf("%d B", bytes)
}
div, exp := unit, 0
for n := bytes / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])

Check warning on line 274 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L264-L274

Added lines #L264 - L274 were not covered by tests
}
49 changes: 49 additions & 0 deletions media/segment_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package media

import (
"io"
"sync"
)

type SegmentHandler func(reader io.Reader)

func NoopReader(reader io.Reader) {
go func() {
io.Copy(io.Discard, reader)
}()

Check warning on line 13 in media/segment_reader.go

View check run for this annotation

Codecov / codecov/patch

media/segment_reader.go#L10-L13

Added lines #L10 - L13 were not covered by tests
}

type EOSReader struct{}

func (r EOSReader) Read(p []byte) (n int, err error) {
return 0, io.EOF

Check warning on line 19 in media/segment_reader.go

View check run for this annotation

Codecov / codecov/patch

media/segment_reader.go#L18-L19

Added lines #L18 - L19 were not covered by tests
}

type SwitchableSegmentReader struct {
mu sync.RWMutex
reader SegmentHandler
}

func NewSwitchableSegmentReader() *SwitchableSegmentReader {
return &SwitchableSegmentReader{
reader: NoopReader,
}

Check warning on line 30 in media/segment_reader.go

View check run for this annotation

Codecov / codecov/patch

media/segment_reader.go#L27-L30

Added lines #L27 - L30 were not covered by tests
}

func (sr *SwitchableSegmentReader) SwitchReader(newReader SegmentHandler) {
sr.mu.Lock()
defer sr.mu.Unlock()
sr.reader = newReader

Check warning on line 36 in media/segment_reader.go

View check run for this annotation

Codecov / codecov/patch

media/segment_reader.go#L33-L36

Added lines #L33 - L36 were not covered by tests
}

func (sr *SwitchableSegmentReader) Read(reader io.Reader) {
sr.mu.RLock()
defer sr.mu.RUnlock()
sr.reader(reader)

Check warning on line 42 in media/segment_reader.go

View check run for this annotation

Codecov / codecov/patch

media/segment_reader.go#L39-L42

Added lines #L39 - L42 were not covered by tests
}

func (sr *SwitchableSegmentReader) Close() {
sr.mu.RLock()
defer sr.mu.RUnlock()
sr.reader(&EOSReader{})

Check warning on line 48 in media/segment_reader.go

View check run for this annotation

Codecov / codecov/patch

media/segment_reader.go#L45-L48

Added lines #L45 - L48 were not covered by tests
}
42 changes: 42 additions & 0 deletions media/select_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//go:build darwin

package media

import "syscall"

func crossPlatformSelect(nfd int, r, w, e *syscall.FdSet, timeout *syscall.Timeval) (int, error) {
// On macOS, syscall.Select only returns an error
err := syscall.Select(nfd, r, w, e, timeout)
if err != nil {
return -1, err // Return -1 in case of an error
}
// We need to manually count the number of ready descriptors in FdSets
n := 0
if r != nil {
n += countReadyDescriptors(r, nfd)
}
if w != nil {
n += countReadyDescriptors(w, nfd)
}
if e != nil {
n += countReadyDescriptors(e, nfd)
}
return n, nil

}

// countReadyDescriptors manually counts the number of ready file descriptors in an FdSet
func countReadyDescriptors(set *syscall.FdSet, nfd int) int {
count := 0
for fd := 0; fd < nfd; fd++ {
if isSet(fd, set) {
count++
}
}
return count
}

// isSet checks if a file descriptor is set in an FdSet
func isSet(fd int, set *syscall.FdSet) bool {
return set.Bits[fd/64]&(1<<(uint(fd)%64)) != 0
}
9 changes: 9 additions & 0 deletions media/select_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//go:build linux

package media

import "syscall"

func crossPlatformSelect(nfd int, r, w, e *syscall.FdSet, timeout *syscall.Timeval) (int, error) {
return syscall.Select(nfd, r, w, e, timeout)

Check warning on line 8 in media/select_linux.go

View check run for this annotation

Codecov / codecov/patch

media/select_linux.go#L7-L8

Added lines #L7 - L8 were not covered by tests
}
Loading
Loading