|
3 | 3 | import os
|
4 | 4 | import json
|
5 | 5 | import logging
|
| 6 | +import wave |
| 7 | +import numpy as np |
6 | 8 |
|
7 | 9 | from twilio.rest import Client
|
8 | 10 | from aiohttp import web
|
@@ -31,6 +33,95 @@ def __init__(self, track: MediaStreamTrack, pipeline):
|
31 | 33 | async def recv(self):
|
32 | 34 | frame = await self.track.recv()
|
33 | 35 | return await self.pipeline(frame)
|
| 36 | + |
| 37 | +class AudioStreamTrack(MediaStreamTrack): |
| 38 | + """ |
| 39 | + This custom audio track wraps an incoming audio MediaStreamTrack. |
| 40 | + It continuously records frames in 10-second chunks and saves each chunk |
| 41 | + as a separate WAV file with an incrementing index. |
| 42 | + """ |
| 43 | + |
| 44 | + kind = "audio" |
| 45 | + |
| 46 | + def __init__(self, track: MediaStreamTrack): |
| 47 | + super().__init__() |
| 48 | + self.track = track |
| 49 | + self.start_time = None |
| 50 | + self.frames = [] |
| 51 | + self._recording_duration = 10.0 # in seconds |
| 52 | + self._chunk_index = 0 |
| 53 | + self._saving = False |
| 54 | + self._lock = asyncio.Lock() |
| 55 | + |
| 56 | + async def recv(self): |
| 57 | + frame = await self.track.recv() |
| 58 | + return await self.pipeline(frame) |
| 59 | + |
| 60 | + # async def recv(self): |
| 61 | + # frame = await self.source.recv() |
| 62 | + |
| 63 | + # # On the first frame, record the start time. |
| 64 | + # if self.start_time is None: |
| 65 | + # self.start_time = frame.time |
| 66 | + # logger.info(f"Audio recording started at time: {self.start_time:.3f}") |
| 67 | + |
| 68 | + # elapsed = frame.time - self.start_time |
| 69 | + # self.frames.append(frame) |
| 70 | + |
| 71 | + # logger.info(f"Received audio frame at time: {frame.time:.3f}, total frames: {len(self.frames)}") |
| 72 | + |
| 73 | + # # Check if we've hit 10 seconds and we're not currently saving. |
| 74 | + # if elapsed >= self._recording_duration and not self._saving: |
| 75 | + # logger.info(f"10 second chunk reached (elapsed: {elapsed:.3f}s). Preparing to save chunk {self._chunk_index}.") |
| 76 | + # self._saving = True |
| 77 | + # # Handle saving in a background task so we don't block the recv loop. |
| 78 | + # asyncio.create_task(self.save_audio()) |
| 79 | + |
| 80 | + # return frame |
| 81 | + |
| 82 | + async def save_audio(self): |
| 83 | + logger.info(f"Starting to save audio chunk {self._chunk_index}...") |
| 84 | + async with self._lock: |
| 85 | + # Extract properties from the first frame |
| 86 | + if not self.frames: |
| 87 | + logger.warning("No frames to save, skipping.") |
| 88 | + self._saving = False |
| 89 | + return |
| 90 | + |
| 91 | + sample_rate = self.frames[0].sample_rate |
| 92 | + layout = self.frames[0].layout |
| 93 | + channels = len(layout.channels) |
| 94 | + |
| 95 | + logger.info(f"Audio chunk {self._chunk_index}: sample_rate={sample_rate}, channels={channels}, frames_count={len(self.frames)}") |
| 96 | + |
| 97 | + # Convert all frames to ndarray and concatenate |
| 98 | + data_arrays = [f.to_ndarray() for f in self.frames] |
| 99 | + data = np.concatenate(data_arrays, axis=1) # shape: (channels, total_samples) |
| 100 | + |
| 101 | + # Interleave channels (if multiple) since WAV expects interleaved samples. |
| 102 | + interleaved = data.T.flatten() |
| 103 | + |
| 104 | + # If needed, convert float frames to int16 |
| 105 | + # interleaved = (interleaved * 32767).astype(np.int16) |
| 106 | + |
| 107 | + filename = f"output_{self._chunk_index}.wav" |
| 108 | + logger.info(f"Writing audio chunk {self._chunk_index} to file: {filename}") |
| 109 | + with wave.open(filename, 'wb') as wf: |
| 110 | + wf.setnchannels(channels) |
| 111 | + wf.setsampwidth(2) # 16-bit PCM |
| 112 | + wf.setframerate(sample_rate) |
| 113 | + wf.writeframes(interleaved.tobytes()) |
| 114 | + |
| 115 | + logger.info(f"Audio chunk {self._chunk_index} saved successfully as {filename}") |
| 116 | + |
| 117 | + # Increment the chunk index for the next segment |
| 118 | + self._chunk_index += 1 |
| 119 | + |
| 120 | + # Reset for next recording chunk |
| 121 | + self.frames.clear() |
| 122 | + self.start_time = None |
| 123 | + self._saving = False |
| 124 | + logger.info(f"Ready to record next 10-second chunk. Current chunk index: {self._chunk_index}") |
34 | 125 |
|
35 | 126 |
|
36 | 127 | def force_codec(pc, sender, forced_codec):
|
@@ -113,6 +204,10 @@ def on_track(track):
|
113 | 204 |
|
114 | 205 | codec = "video/H264"
|
115 | 206 | force_codec(pc, sender, codec)
|
| 207 | + elif track.kind == "audio": |
| 208 | + audioTrack = AudioStreamTrack(track) |
| 209 | + tracks["audio"] = audioTrack |
| 210 | + pc.addTrack(audioTrack) |
116 | 211 |
|
117 | 212 | @track.on("ended")
|
118 | 213 | async def on_ended():
|
|
0 commit comments