Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replay data channel recordings #2468

Merged
merged 11 commits into from
Dec 7, 2020
12 changes: 11 additions & 1 deletion html/recordplaytest.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ $(document).ready(function() {
recordplay.createAnswer(
{
jsep: jsep,
media: { audioSend: false, videoSend: false }, // We want recvonly audio/video
media: { audioSend: false, videoSend: false, data: true }, // We want recvonly audio/video
success: function(jsep) {
Janus.debug("Got SDP!", jsep);
var body = { request: "start" };
Expand Down Expand Up @@ -314,6 +314,16 @@ $(document).ready(function() {
$('#thevideo').removeClass('hide').show();
}
},
ondataopen: function(data) {
Janus.log("The DataChannel is available!");
$('#videobox').append(
'<input class="form-control" type="text" id="datarecv" disabled></input>'
);
},
ondata: function(data) {
Janus.debug("We got data from the DataChannel!", data);
$('#datarecv').val(data);
},
oncleanup: function() {
Janus.log(" ::: Got a cleanup notification :::");
// FIXME Reset status
Expand Down
177 changes: 164 additions & 13 deletions plugins/janus_recordplay.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ typedef struct janus_recordplay_recording {
janus_videocodec vcodec; /* Codec used for video, if available */
char *vfmtp; /* Video fmtp, if any */
int video_pt; /* Payload types to use for audio when playing recordings */
char *drc_file; /* Data file name */
gboolean textdata; /* Whether data format is text */
char *offer; /* The SDP offer that will be sent to watchers */
gboolean e2ee; /* Whether media in the recording is encrypted, e.g., using Insertable Streams */
GList *viewers; /* List of users watching this recording */
Expand All @@ -429,6 +431,8 @@ typedef struct janus_recordplay_session {
janus_mutex rec_mutex; /* Mutex to protect the recorders from race conditions */
janus_recordplay_frame_packet *aframes; /* Audio frames (for playout) */
janus_recordplay_frame_packet *vframes; /* Video frames (for playout) */
janus_recordplay_frame_packet *dframes; /* Data packets (for playout) */
gboolean textdata; /* Whether data format is text */
guint video_remb_startup;
gint64 video_remb_last;
guint32 video_bitrate;
Expand Down Expand Up @@ -587,11 +591,13 @@ static const char *janus_recordplay_parse_codec(const char *dir, const char *fil
return NULL;
}
const char *t = json_string_value(type);
gboolean video = FALSE;
gboolean video = FALSE, data = FALSE;
if(!strcasecmp(t, "v")) {
video = TRUE;
} else if(!strcasecmp(t, "a")) {
video = FALSE;
} else if(!strcasecmp(t, "d")) {
data = TRUE;
} else {
JANUS_LOG(LOG_WARN, "Unsupported recording type '%s' in info header...\n", t);
json_decref(info);
Expand All @@ -615,6 +621,13 @@ static const char *janus_recordplay_parse_codec(const char *dir, const char *fil
return NULL;
}
const char *c = json_string_value(codec);
if (data) {
/* Found! */
c = !strcasecmp(c, "text") ? "text" : "binary";
json_decref(info);
fclose(file);
return c;
}
const char *mcodec = janus_sdp_match_preferred_codec(video ? JANUS_SDP_VIDEO : JANUS_SDP_AUDIO, (char *)c);
if(mcodec != NULL) {
/* Found! */
Expand Down Expand Up @@ -643,7 +656,8 @@ static int janus_recordplay_generate_offer(janus_recordplay_recording *rec) {
return -1;
/* Prepare an SDP offer we'll send to playout viewers */
gboolean offer_audio = (rec->arc_file != NULL && rec->acodec != JANUS_AUDIOCODEC_NONE),
offer_video = (rec->vrc_file != NULL && rec->vcodec != JANUS_VIDEOCODEC_NONE);
offer_video = (rec->vrc_file != NULL && rec->vcodec != JANUS_VIDEOCODEC_NONE),
offer_data = rec->drc_file != NULL;
char s_name[100];
g_snprintf(s_name, sizeof(s_name), "Recording %"SCNu64, rec->id);
janus_sdp *offer = janus_sdp_generate_offer(
Expand All @@ -660,7 +674,7 @@ static int janus_recordplay_generate_offer(janus_recordplay_recording *rec) {
JANUS_SDP_OA_VIDEO_PT, rec->video_pt,
JANUS_SDP_OA_VIDEO_DIRECTION, JANUS_SDP_SENDONLY,
JANUS_SDP_OA_VIDEO_EXTENSION, JANUS_RTP_EXTMAP_MID, 1,
JANUS_SDP_OA_DATA, FALSE,
JANUS_SDP_OA_DATA, offer_data,
JANUS_SDP_OA_DONE);
g_free(rec->offer);
rec->offer = janus_sdp_write(offer);
Expand Down Expand Up @@ -1801,6 +1815,13 @@ static void *janus_recordplay_handler(void *data) {
warning = "Broken video file, playing audio only";
}
}
if(rec->drc_file) {
session->dframes = janus_recordplay_get_frames(recordings_path, rec->drc_file);
if(session->dframes == NULL) {
JANUS_LOG(LOG_WARN, "Error opening data recording, trying to go on anyway\n");
warning = "Broken data file, playing audio/video only";
}
}
if(session->aframes == NULL && session->vframes == NULL) {
error_code = JANUS_RECORDPLAY_ERROR_INVALID_RECORDING;
g_snprintf(error_cause, 512, "Error opening recording files");
Expand All @@ -1827,6 +1848,7 @@ static void *janus_recordplay_handler(void *data) {
json_object_set_new(info, "id", json_integer(id_value));
json_object_set_new(info, "audio", session->aframes ? json_true() : json_false());
json_object_set_new(info, "video", session->vframes ? json_true() : json_false());
json_object_set_new(info, "data", session->dframes ? json_true() : json_false());
gateway->notify_event(&janus_recordplay_plugin, session->handle, info);
}
} else if(!strcasecmp(request_text, "start")) {
Expand Down Expand Up @@ -1994,6 +2016,7 @@ void janus_recordplay_update_recordings_list(void) {
janus_config_item *date = janus_config_get(nfo, cat, janus_config_type_item, "date");
janus_config_item *audio = janus_config_get(nfo, cat, janus_config_type_item, "audio");
janus_config_item *video = janus_config_get(nfo, cat, janus_config_type_item, "video");
janus_config_item *data = janus_config_get(nfo, cat, janus_config_type_item, "data");
if(!name || !name->value || strlen(name->value) == 0 || !date || !date->value || strlen(date->value) == 0) {
JANUS_LOG(LOG_WARN, "Invalid info for recording %"SCNu64", skipping...\n", id);
g_list_free(cl);
Expand Down Expand Up @@ -2041,6 +2064,16 @@ void janus_recordplay_update_recordings_list(void) {
if(e2ee)
rec->e2ee = TRUE;
}
if(data && data->value) {
rec->drc_file = g_strdup(data->value);
char *ext = strstr(rec->drc_file, ".mjr");
if(ext != NULL)
*ext = '\0';
const char *textcodec = janus_recordplay_parse_codec(recordings_path,
rec->drc_file, NULL, sizeof(NULL), NULL);
if (textcodec)
rec->textdata = !strcasecmp("text", textcodec);
}
rec->audio_pt = AUDIO_PT;
if(rec->acodec != JANUS_AUDIOCODEC_NONE) {
/* Some audio codecs have a fixed payload type that we can't mess with */
Expand Down Expand Up @@ -2186,12 +2219,13 @@ janus_recordplay_frame_packet *janus_recordplay_get_frames(const char *dir, cons
return NULL;
}
const char *t = json_string_value(type);
int video = 0;
int video = 0, audio = 0;
gint64 c_time = 0, w_time = 0;
if(!strcasecmp(t, "v")) {
video = 1;
} else if(!strcasecmp(t, "a")) {
video = 0;
audio = 1;
} else if(!strcasecmp(t, "d")) {
} else {
JANUS_LOG(LOG_WARN, "Unsupported recording type '%s' in info header...\n", t);
json_decref(info);
Expand Down Expand Up @@ -2226,7 +2260,7 @@ janus_recordplay_frame_packet *janus_recordplay_get_frames(const char *dir, cons
}
w_time = json_integer_value(created);
/* Summary */
JANUS_LOG(LOG_VERB, "This is %s recording:\n", video ? "a video" : "an audio");
JANUS_LOG(LOG_VERB, "This is %s recording:\n", video ? "a video" : (audio ? "an audio" : "a data"));
JANUS_LOG(LOG_VERB, " -- Codec: %s\n", c);
JANUS_LOG(LOG_VERB, " -- Created: %"SCNi64"\n", c_time);
JANUS_LOG(LOG_VERB, " -- Written: %"SCNi64"\n", w_time);
Expand Down Expand Up @@ -2396,8 +2430,10 @@ janus_recordplay_frame_packet *janus_recordplay_get_frames(const char *dir, cons
return list;
}

static void *janus_recordplay_playout_thread(void *data) {
janus_recordplay_session *session = (janus_recordplay_session *)data;
#define ntohll(x) ((1==ntohl(1)) ? (x) : ((gint64)ntohl((x) & 0xFFFFFFFF) << 32) | ntohl((x) >> 32))

static void *janus_recordplay_playout_thread(void *sessiondata) {
janus_recordplay_session *session = (janus_recordplay_session *)sessiondata;
if(!session) {
JANUS_LOG(LOG_ERR, "Invalid session, can't start playout thread...\n");
g_thread_unref(g_thread_self());
Expand Down Expand Up @@ -2427,7 +2463,7 @@ static void *janus_recordplay_playout_thread(void *data) {
}
JANUS_LOG(LOG_INFO, "Joining playout thread\n");
/* Open the files */
FILE *afile = NULL, *vfile = NULL;
FILE *afile = NULL, *vfile = NULL, *dfile = NULL;
if(session->aframes) {
char source[1024];
if(strstr(rec->arc_file, ".mjr"))
Expand Down Expand Up @@ -2462,15 +2498,37 @@ static void *janus_recordplay_playout_thread(void *data) {
}
}

if(session->dframes) {
char source[1024];
if(strstr(rec->drc_file, ".mjr"))
g_snprintf(source, 1024, "%s/%s", recordings_path, rec->drc_file);
else
g_snprintf(source, 1024, "%s/%s.mjr", recordings_path, rec->drc_file);
dfile = fopen(source, "rb");
if(dfile == NULL) {
janus_refcount_decrease(&rec->ref);
janus_refcount_decrease(&session->ref);
JANUS_LOG(LOG_ERR, "Could not open data file %s, can't start playout thread...\n", source);
if(afile)
fclose(afile);
afile = NULL;
if(vfile)
fclose(vfile);
vfile = NULL;
g_thread_unref(g_thread_self());
return NULL;
}
}
/* Timer */
gboolean asent = FALSE, vsent = FALSE;
struct timeval now, abefore, vbefore;
gboolean asent = FALSE, vsent = FALSE, dsent = FALSE;
struct timeval now, abefore, vbefore, dbefore;
time_t d_s, d_us;
gettimeofday(&now, NULL);
gettimeofday(&abefore, NULL);
gettimeofday(&vbefore, NULL);
gettimeofday(&dbefore, NULL);

janus_recordplay_frame_packet *audio = session->aframes, *video = session->vframes;
janus_recordplay_frame_packet *audio = session->aframes, *video = session->vframes, *data = session->dframes;
char *buffer = g_malloc0(1500);
int bytes = 0;
int64_t ts_diff = 0, passed = 0;
Expand All @@ -2485,12 +2543,13 @@ static void *janus_recordplay_playout_thread(void *data) {

while(!g_atomic_int_get(&session->destroyed) && session->active
&& !g_atomic_int_get(&rec->destroyed) && (audio || video)) {
if(!asent && !vsent) {
if(!asent && !vsent && !dsent) {
/* We skipped the last round, so sleep a bit (5ms) */
g_usleep(5000);
}
asent = FALSE;
vsent = FALSE;
dsent = FALSE;
if(audio) {
if(audio == session->aframes) {
/* First packet, send now */
Expand Down Expand Up @@ -2618,6 +2677,98 @@ static void *janus_recordplay_playout_thread(void *data) {
}
}
}
if(data) {
if(data == session->dframes) {
/* First packet, send now */
/* Data recording stores recording timestamp in first 8 bytes - it follows the frame ts monotonically.
invariant: when = data->ts + when(initial packet) */
gint64 when = 0;
int len = data->len;
int offset = data->offset;
fseek(dfile, data->offset, SEEK_SET);
bytes = fread(&when, sizeof(gint64), 1, dfile);
when = ntohll(when); // NOTE: not currently used - playback is interested in actual data packets.
offset += sizeof(gint64);
len -= sizeof(gint64);

/* Read data packet */
fseek(dfile, offset, SEEK_SET);
bytes = fread(buffer, sizeof(char), len, dfile);
JANUS_LOG(LOG_INFO, "Sending data packet at rtp_timestamp = %lu, timestamp = %lu, delta = %lu\n", (data->ts), when, when - data->ts);
fseek(dfile, offset, SEEK_SET);
bytes = fread(buffer, sizeof(char), len, dfile);
if(bytes != data->len)
JANUS_LOG(LOG_WARN, "Didn't manage to read all the bytes we needed (%d < %d)...\n", bytes, data->len);
/* Update payload type */
janus_plugin_data datapacket = {
.label = NULL,
.protocol = NULL,
.binary = rec->textdata? FALSE : TRUE,
.buffer = (char *)buffer,
.length = bytes
};
gateway->relay_data(session->handle, &datapacket);

gettimeofday(&now, NULL);
dbefore.tv_sec = now.tv_sec;
dbefore.tv_usec = now.tv_usec;
dsent = TRUE;
data = data->next;
} else {
/* What's the timestamp skip from the previous packet? */
ts_diff = data->ts - data->prev->ts;
/* Check if it's time to send */
gettimeofday(&now, NULL);
d_s = now.tv_sec - dbefore.tv_sec;
d_us = now.tv_usec - dbefore.tv_usec;
if(d_us < 0) {
d_us += 1000000;
--d_s;
}
passed = d_s*1000000 + d_us;
if(passed < (ts_diff-5000)) {
dsent = FALSE;
} else {
/* Update the reference time */
dbefore.tv_usec += ts_diff%1000000;
if(dbefore.tv_usec > 1000000) {
dbefore.tv_sec++;
dbefore.tv_usec -= 1000000;
}
if(ts_diff/1000000 > 0) {
dbefore.tv_sec += ts_diff/1000000;
dbefore.tv_usec -= ts_diff/1000000;
}
/* Send now */
gint64 when = 0;
int len = data->len;
int offset = data->offset;
fseek(dfile, data->offset, SEEK_SET);
bytes = fread(&when, sizeof(gint64), 1, dfile);
when = ntohll(when);
offset += sizeof(gint64);
len -= sizeof(gint64);

/* Read data packet */
fseek(dfile, offset, SEEK_SET);
bytes = fread(buffer, sizeof(char), len, dfile);
JANUS_LOG(LOG_VERB, "Sending data packet at timestamp = %lu, recorded timestamp = %lu\n", (data->ts), when);
if(bytes != len)
JANUS_LOG(LOG_WARN, "Didn't manage to read all the bytes we needed (%d < %d)...\n", bytes, data->len);
/* Update payload type */
janus_plugin_data datapacket = {
.label = NULL,
.protocol = NULL,
.binary = rec->textdata ? FALSE : TRUE,
.buffer = (char *)buffer,
.length = bytes
};
gateway->relay_data(session->handle, &datapacket);
dsent = TRUE;
data = data->next;
}
}
}
}

g_free(buffer);
Expand Down