Commit c072968c authored by Jakub Adam's avatar Jakub Adam
Browse files

Implement WebRTC output

parent a9992298
......@@ -12,10 +12,13 @@ run_command('meson', 'wrap', 'promote', 'subprojects/gst-build/subprojects/gst-p
run_command('meson', 'wrap', 'promote', 'subprojects/gst-build/subprojects/gst-plugins-bad.wrap')
run_command('meson', 'wrap', 'promote', 'subprojects/gst-build/subprojects/gst-plugins-ugly.wrap')
run_command('meson', 'wrap', 'promote', 'subprojects/gst-build/subprojects/gst-libav.wrap')
run_command('meson', 'wrap', 'promote', 'subprojects/gst-build/subprojects/libnice.wrap')
subproject('gst-build')
soup_dep = dependency('libsoup-2.4')
json_glib_dep = dependency('json-glib-1.0')
gst_base_dep = subproject('gstreamer').get_variable('gst_base_dep')
gstwebrtc_dep = subproject('gst-plugins-bad').get_variable('gstwebrtc_dep')
subdir('src')
......
......@@ -3,6 +3,9 @@
#include <glib-unix.h>
#include <gst/gst.h>
#define GST_USE_UNSTABLE_API
#include <gst/webrtc/rtcsessiondescription.h>
#define DEFAULT_SRT_URI "srt://127.0.0.1:7001?mode=listener"
static gchar *srt_uri = NULL;
......@@ -14,6 +17,8 @@ static GOptionEntry options[] = {
{ NULL }
};
MssHttpServer *http_server;
static gboolean
sigint_handler (gpointer user_data)
{
......@@ -27,11 +32,173 @@ gst_bus_cb (GstBus * bus, GstMessage * message, gpointer data)
return TRUE;
}
static GstElement *
get_webrtcbin_for_client (GstBin *pipeline, MssClientId client_id)
{
gchar *name;
GstElement *webrtcbin;
name = g_strdup_printf ("webrtcbin_%p", client_id);
webrtcbin = gst_bin_get_by_name (pipeline, name);
g_free (name);
return webrtcbin;
}
static void
on_offer_created (GstPromise *promise, GstElement *webrtcbin)
{
GstWebRTCSessionDescription *offer = NULL;
gchar *sdp;
gst_structure_get (gst_promise_get_reply (promise),
"offer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
gst_promise_unref (promise);
g_signal_emit_by_name (webrtcbin, "set-local-description", offer, NULL);
sdp = gst_sdp_message_as_text (offer->sdp);
mss_http_server_send_sdp_offer (http_server,
g_object_get_data (G_OBJECT (webrtcbin), "client_id"),
sdp);
g_free (sdp);
gst_webrtc_session_description_free (offer);
}
static void
webrtc_on_ice_candidate_cb (GstElement *webrtcbin, guint mlineindex,
gchar *candidate)
{
g_debug ("Local candidate: %s", candidate);
mss_http_server_send_candidate (http_server,
g_object_get_data (G_OBJECT (webrtcbin), "client_id"),
mlineindex, candidate);
}
static void
webrtc_client_connected_cb (MssHttpServer *server, MssClientId client_id,
GstBin *pipeline)
{
gchar *name;
GstElement *tee;
GstElement *webrtcbin;
GstPad *srcpad;
GstPad *sinkpad;
name = g_strdup_printf ("webrtcbin_%p", client_id);
webrtcbin = gst_element_factory_make ("webrtcbin", name);
g_object_set_data (G_OBJECT (webrtcbin), "client_id", client_id);
gst_bin_add (pipeline, webrtcbin);
gst_element_sync_state_with_parent (webrtcbin);
g_signal_connect (webrtcbin, "on-ice-candidate",
G_CALLBACK (webrtc_on_ice_candidate_cb), NULL);
tee = gst_bin_get_by_name (pipeline, "webrtctee");
srcpad = gst_element_get_request_pad (tee, "src_%u");
sinkpad = gst_element_get_request_pad (webrtcbin, "sink_%u");
gst_pad_link (srcpad, sinkpad);
gst_object_unref (srcpad);
gst_object_unref (sinkpad);
gst_object_unref (tee);
g_signal_emit_by_name (webrtcbin, "create-offer", NULL,
gst_promise_new_with_change_func (
(GstPromiseChangeFunc) on_offer_created, webrtcbin,NULL));
GST_DEBUG_BIN_TO_DOT_FILE (pipeline, GST_DEBUG_GRAPH_SHOW_ALL, "rtcbin");
g_free (name);
}
static void
webrtc_sdp_answer_cb (MssHttpServer *server, MssClientId client_id,
const gchar *sdp, GstBin *pipeline)
{
GstSDPMessage *sdp_msg = NULL;
GstWebRTCSessionDescription *desc = NULL;
gchar *sdp_tmp;
char **split;
// TODO needed?
split = g_strsplit(sdp, "a=recvonly", -1);
sdp_tmp = g_strjoinv("a=sendrecv", split);
g_strfreev(split);
g_debug ("ANSWER: %s", sdp_tmp);
if (gst_sdp_message_new_from_text (sdp_tmp, &sdp_msg) != GST_SDP_OK) {
g_debug ("Error parsing SDP description");
goto out;
}
desc = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_ANSWER,
sdp_msg);
if (desc) {
GstElement *webrtcbin;
GstPromise *promise;
webrtcbin = get_webrtcbin_for_client (pipeline, client_id);
if (!webrtcbin) {
goto out;
}
promise = gst_promise_new();
g_signal_emit_by_name (webrtcbin, "set-remote-description", desc, promise);
gst_promise_wait (promise);
gst_promise_unref (promise);
gst_object_unref (webrtcbin);
} else {
gst_sdp_message_free (sdp_msg);
}
out:
g_free (sdp_tmp);
g_clear_pointer (&desc, gst_webrtc_session_description_free);
}
static void
webrtc_candidate_cb (MssHttpServer *server, MssClientId client_id,
guint mlineindex, const gchar *candidate, GstBin *pipeline)
{
if (strlen (candidate)) {
GstElement *webrtcbin;
webrtcbin = get_webrtcbin_for_client (pipeline, client_id);
if (webrtcbin) {
g_signal_emit_by_name (webrtcbin, "add-ice-candidate", mlineindex,
candidate);
gst_object_unref (webrtcbin);
}
}
g_debug ("Remote candidate: %s", candidate);
}
static void
webrtc_client_disconnected_cb (MssHttpServer *server, MssClientId client_id,
GstBin *pipeline)
{
GstElement *webrtcbin;
webrtcbin = get_webrtcbin_for_client (pipeline, client_id);
if (webrtcbin) {
gst_bin_remove (GST_BIN (GST_ELEMENT_PARENT (webrtcbin)), webrtcbin);
gst_element_set_state (webrtcbin, GST_STATE_NULL);
gst_object_unref (webrtcbin);
}
}
int main (int argc, char *argv[])
{
GOptionContext *option_context;
GMainLoop *loop;
MssHttpServer *http_server;
gchar *pipeline_str;
GstElement *pipeline;
GError *error = NULL;
......@@ -54,7 +221,9 @@ int main (int argc, char *argv[])
pipeline_str = g_strdup_printf ("srtsrc uri=%s ! tsparse ! tee name=t ! "
"queue ! decodebin ! videoconvert ! autovideosink "
"t. ! queue leaky=downstream ! tsdemux ! mpegtsmux ! hlssink location=%s/segment%%05d.ts playlist-location=%s/playlist.m3u8 "
"t. ! queue leaky=downstream max-size-buffers=400 ! rtpmp2tpay ! ristsink",
"t. ! queue leaky=downstream max-size-buffers=400 ! rtpmp2tpay ! ristsink "
"t. ! queue leaky=downstream ! tsdemux ! h264parse ! rtph264pay ! application/x-rtp,payload=96 ! tee name=webrtctee allow-not-linked=true"
,
srt_uri,
mss_http_server_get_hls_dir (http_server),
mss_http_server_get_hls_dir (http_server));
......@@ -69,6 +238,15 @@ int main (int argc, char *argv[])
gst_bus_add_watch (bus, gst_bus_cb, NULL);
gst_object_unref (bus);
g_signal_connect (http_server, "ws-client-connected",
G_CALLBACK (webrtc_client_connected_cb), pipeline);
g_signal_connect (http_server, "ws-client-disconnected",
G_CALLBACK (webrtc_client_disconnected_cb), pipeline);
g_signal_connect (http_server, "sdp-answer",
G_CALLBACK (webrtc_sdp_answer_cb), pipeline);
g_signal_connect (http_server, "candidate",
G_CALLBACK (webrtc_candidate_cb), pipeline);
loop = g_main_loop_new (NULL, FALSE);
g_unix_signal_add (SIGINT, sigint_handler, loop);
......
......@@ -7,5 +7,5 @@ sources = [
]
executable('multistream-server', sources,
dependencies: [soup_dep, gst_base_dep],
dependencies: [soup_dep, json_glib_dep, gst_base_dep, gstwebrtc_dep],
install: true)
......@@ -9,6 +9,8 @@
#include <glib/gstdio.h>
#include <json-glib/json-glib.h>
#include <libsoup/soup-message.h>
#include <libsoup/soup-server.h>
......@@ -17,11 +19,25 @@ struct _MssHttpServer
GObject parent;
SoupServer *soup_server;
gchar *hls_dir;
GSList *websocket_connections;
};
G_DEFINE_TYPE (MssHttpServer, mss_http_server, G_TYPE_OBJECT)
enum
{
SIGNAL_WS_CLIENT_CONNECTED,
SIGNAL_WS_CLIENT_DISCONNECTED,
SIGNAL_SDP_ANSWER,
SIGNAL_CANDIDATE,
N_SIGNALS
};
guint signals[N_SIGNALS];
MssHttpServer *
mss_http_server_new ()
{
......@@ -53,6 +69,11 @@ http_cb (SoupServer *server, SoupMessage *msg, const char *path,
soup_message_body_append_buffer (msg->response_body, buffer);
soup_buffer_free (buffer);
if (g_str_has_suffix(path, ".js")) {
soup_message_headers_append(msg->response_headers,
"Content-Type", "text/javascript");
}
soup_message_set_status (msg, SOUP_STATUS_OK);
} else {
soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
......@@ -93,6 +114,105 @@ hls_cb (SoupServer *server, SoupMessage *msg, const char *path,
}
}
static void
mss_http_server_handle_message (MssHttpServer *server,
SoupWebsocketConnection *connection, GBytes *message)
{
gsize length = 0;
const gchar *msg_data = g_bytes_get_data (message, &length);
JsonParser *parser = json_parser_new ();
GError *error = NULL;
if (json_parser_load_from_data (parser, msg_data, length, &error)) {
JsonObject *msg = json_node_get_object (json_parser_get_root (parser));
const gchar *msg_type;
if (!json_object_has_member (msg, "msg")) {
// Invalid message
goto out;
}
msg_type = json_object_get_string_member (msg, "msg");
if (g_str_equal(msg_type, "answer")) {
g_signal_emit (server, signals[SIGNAL_SDP_ANSWER], 0, connection,
json_object_get_string_member (msg, "sdp"));
} else if (g_str_equal(msg_type, "candidate")) {
JsonObject *candidate;
candidate = json_object_get_object_member (msg, "candidate");
g_signal_emit (server, signals[SIGNAL_CANDIDATE], 0, connection,
json_object_get_int_member (candidate, "sdpMLineIndex"),
json_object_get_string_member (candidate, "candidate"));
}
} else {
g_debug ("Error parsing message: %s", error->message);
g_clear_error (&error);
}
out:
g_object_unref (parser);
}
static void
message_cb (SoupWebsocketConnection *connection, gint type, GBytes *message,
gpointer user_data)
{
mss_http_server_handle_message (MSS_HTTP_SERVER (user_data),
connection, message);
}
static void
mss_http_server_remove_websocket_connection (MssHttpServer *server,
SoupWebsocketConnection *connection)
{
MssClientId client_id;
client_id = g_object_get_data (G_OBJECT (connection), "client_id");
server->websocket_connections = g_slist_remove (server->websocket_connections,
client_id);
g_signal_emit (server, signals [SIGNAL_WS_CLIENT_DISCONNECTED], 0, client_id);
}
static void
closed_cb (SoupWebsocketConnection *connection, gpointer user_data)
{
g_debug ("Connection closed");
mss_http_server_remove_websocket_connection (MSS_HTTP_SERVER (user_data),
connection);
}
static void
mss_http_server_add_websocket_connection (MssHttpServer *server,
SoupWebsocketConnection *connection)
{
g_signal_connect (connection, "message", (GCallback) message_cb, server);
g_signal_connect (connection, "closed", (GCallback) closed_cb, server);
g_object_ref (connection);
g_object_set_data (G_OBJECT (connection),
"client_id", connection);
server->websocket_connections =
g_slist_append (server->websocket_connections, connection);
g_signal_emit (server, signals [SIGNAL_WS_CLIENT_CONNECTED], 0, connection);
}
static void
websocket_cb (SoupServer *server, SoupWebsocketConnection *connection,
const char *path, SoupClientContext *client, gpointer user_data)
{
g_debug ("New connection from %s", soup_client_context_get_host (client));
mss_http_server_add_websocket_connection (MSS_HTTP_SERVER (user_data),
connection);
}
static void
mss_http_server_init (MssHttpServer *server)
{
......@@ -104,6 +224,9 @@ mss_http_server_init (MssHttpServer *server)
soup_server_add_handler (server->soup_server, NULL, http_cb, server, NULL);
soup_server_add_handler (server->soup_server, "/hls/", hls_cb, server, NULL);
soup_server_add_websocket_handler (server->soup_server, "/ws", NULL, NULL,
websocket_cb, server, NULL);
soup_server_listen_all (server->soup_server, 8080, 0, &error);
g_assert_no_error (error);
}
......@@ -114,6 +237,88 @@ mss_http_server_get_hls_dir (MssHttpServer *server)
return server->hls_dir;
}
static void
mss_http_server_send_to_websocket_client (MssHttpServer *server,
MssClientId client_id, JsonNode *msg)
{
SoupWebsocketConnection *connection = client_id;
SoupWebsocketState socket_state;
if (!g_slist_find (server->websocket_connections, connection)) {
g_warning ("Unknown websocket connection.");
return;
}
socket_state = soup_websocket_connection_get_state (connection);
if (socket_state == SOUP_WEBSOCKET_STATE_OPEN) {
gchar *msg_str = json_to_string (msg, TRUE);
soup_websocket_connection_send_text (connection, msg_str);
g_free (msg_str);
} else {
g_warning ("Trying to send message using websocket that isn't open.");
}
}
void
mss_http_server_send_sdp_offer (MssHttpServer *server, MssClientId client_id,
const gchar *sdp)
{
JsonBuilder *builder;
JsonNode *root;
g_debug ("Send offer: %s", sdp);
builder = json_builder_new ();
json_builder_begin_object (builder);
json_builder_set_member_name (builder, "msg");
json_builder_add_string_value(builder, "offer");
json_builder_set_member_name (builder, "sdp");
json_builder_add_string_value(builder, sdp);
json_builder_end_object (builder);
root = json_builder_get_root (builder);
mss_http_server_send_to_websocket_client (server, client_id, root);
json_node_unref (root);
g_object_unref (builder);
}
void
mss_http_server_send_candidate (MssHttpServer *server, MssClientId client_id,
guint mlineindex, const gchar *candidate)
{
JsonBuilder *builder;
JsonNode *root;
g_debug ("Send candidate: %u %s", mlineindex, candidate);
builder = json_builder_new ();
json_builder_begin_object (builder);
json_builder_set_member_name (builder, "msg");
json_builder_add_string_value(builder, "candidate");
json_builder_set_member_name (builder, "candidate");
json_builder_begin_object(builder);
json_builder_set_member_name (builder, "candidate");
json_builder_add_string_value(builder, candidate);
json_builder_set_member_name (builder, "sdpMLineIndex");
json_builder_add_int_value(builder, mlineindex);
json_builder_end_object(builder);
json_builder_end_object(builder);
root = json_builder_get_root (builder);
mss_http_server_send_to_websocket_client (server, client_id, root);
json_node_unref (root);
g_object_unref (builder);
}
static void
mss_http_server_dispose (GObject *object)
{
......@@ -146,4 +351,28 @@ mss_http_server_class_init (MssHttpServerClass *klass)
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
gobject_class->dispose = mss_http_server_dispose;
signals[SIGNAL_WS_CLIENT_CONNECTED] =
g_signal_new ("ws-client-connected", G_OBJECT_CLASS_TYPE (klass),
G_SIGNAL_RUN_LAST,
0, NULL, NULL, NULL,
G_TYPE_NONE, 1, G_TYPE_POINTER);
signals[SIGNAL_WS_CLIENT_DISCONNECTED] =
g_signal_new ("ws-client-disconnected", G_OBJECT_CLASS_TYPE (klass),
G_SIGNAL_RUN_LAST,
0, NULL, NULL, NULL,
G_TYPE_NONE, 1, G_TYPE_POINTER);
signals[SIGNAL_SDP_ANSWER] =
g_signal_new ("sdp-answer", G_OBJECT_CLASS_TYPE (klass),
G_SIGNAL_RUN_LAST,
0, NULL, NULL, NULL,
G_TYPE_NONE, 2, G_TYPE_POINTER, G_TYPE_STRING);
signals[SIGNAL_CANDIDATE] =
g_signal_new ("candidate", G_OBJECT_CLASS_TYPE (klass),
G_SIGNAL_RUN_LAST,
0, NULL, NULL, NULL,
G_TYPE_NONE, 3, G_TYPE_POINTER, G_TYPE_UINT, G_TYPE_STRING);
}
......@@ -13,10 +13,20 @@
G_DECLARE_FINAL_TYPE (MssHttpServer, mss_http_server, MSS, HTTP_SERVER, GObject)
typedef gpointer MssClientId;
MssHttpServer *
mss_http_server_new ();
const gchar *
mss_http_server_get_hls_dir (MssHttpServer *server);
void
mss_http_server_send_sdp_offer (MssHttpServer *server, MssClientId client_id,
const gchar *msg);
void
mss_http_server_send_candidate (MssHttpServer *server, MssClientId client_id,
guint mlineindex, const gchar *candidate);
#endif /* __MSS_HTTP_SERVER_H__ */
......@@ -6,18 +6,54 @@
<body>
<script src="https://cdn.jsdelivr.net/npm/hls.js"></script>
<center>
<video id="video" autoplay></video>
HLS: <video id="hls_video" autoplay></video>
WebRTC: <video id="webrtc_video" autoplay></video>
</center>
<script>
<script type="module">
import * as mss from './mss.js'
if(Hls.isSupported()) {
var video = document.getElementById('video');
var hls_video = document.getElementById('hls_video');
var hls = new Hls({
liveDurationInfinity: true
});
hls.loadSource('hls/playlist.m3u8');
hls.attachMedia(video);
hls.attachMedia(hls_video);
}
var client = new mss.Client()
var webrtc = new RTCPeerConnection()
client.onoffer = async (msg) => {
try {
console.log(msg.sdp)
await webrtc.setRemoteDescription({ type: 'offer', sdp: msg.sdp })
var answer = await webrtc.createAnswer()
console.log(answer)
webrtc.setLocalDescription(answer)
client.answer(answer.sdp)
} catch (err) {
console.log(err.message)
}
}
client.oncandidate = (c) => {
console.log(c)
webrtc.addIceCandidate(c)
}
webrtc.onicecandidate = (c) => {
if (c.candidate) {
client.candidate(c.candidate)
}
}
webrtc.ontrack = (event) => {
console.log(event)
var webrtc_video = document.getElementById("webrtc_video")
webrtc_video.srcObject = event.streams[0];
}
client.connect()
</script>
</body>
</html>
"use strict";
export class Client {
constructor() {
this.__requestId = 0
this.onopen = undefined
this.onoffer = undefined
this.oncandidate = undefined
this.__reset();
}
connect() {
this.__ws = new WebSocket(`ws://${window.location.host}/ws`)
this.__ws.onopen = () => {
if (this.onopen) {
this.onopen()
}
}
this.__ws.onmessage = message => {
var msg = JSON.parse(message.data)
switch (msg.msg) {
case 'offer':
this.onoffer(msg)
break
case 'candidate':
this.oncandidate(msg.candidate)
break
}
}
}
hello(callback) {