main.c 11.7 KB
Newer Older
1 2 3 4 5
#include "mss-http-server.h"

#include <glib-unix.h>
#include <gst/gst.h>

Jakub Adam's avatar
Jakub Adam committed
6 7 8
#define GST_USE_UNSTABLE_API
#include <gst/webrtc/rtcsessiondescription.h>

9
#define DEFAULT_SRT_URI "srt://:7001"
10
#define DEFAULT_RIST_ADDRESSES "224.0.0.1:5004"
Jakub Adam's avatar
Jakub Adam committed
11

12
#ifdef __aarch64__
13
#define DEFAULT_VIDEOSINK " queue max-size-bytes=0 ! kmssink bus-id=a0070000.v_mix ts-offset=-300000000"
14 15 16 17
#else
#define DEFAULT_VIDEOSINK " videoconvert ! autovideosink "
#endif

Jakub Adam's avatar
Jakub Adam committed
18
static gchar *srt_uri = NULL;
19
static gchar *rist_addresses = NULL;
Jakub Adam's avatar
Jakub Adam committed
20 21 22

static GOptionEntry options[] = {
    { "srt-uri", 'u', 0, G_OPTION_ARG_STRING, &srt_uri,
Jakub Adam's avatar
Jakub Adam committed
23
      "SRT stream URI. Default: " DEFAULT_SRT_URI, "srt://address:port" },
24
    { "rist-addresses", 'r', 0, G_OPTION_ARG_STRING, &rist_addresses,
Jakub Adam's avatar
Jakub Adam committed
25
      "Comma-separated list of addresses to send RIST packets to. Default: " DEFAULT_RIST_ADDRESSES, "address:port,address:port" },
Jakub Adam's avatar
Jakub Adam committed
26 27 28
    { NULL }
};

Jakub Adam's avatar
Jakub Adam committed
29 30
MssHttpServer *http_server;

31 32 33 34 35 36 37
static gboolean
sigint_handler (gpointer user_data)
{
  g_main_loop_quit (user_data);
  return G_SOURCE_REMOVE;
}

Jakub Adam's avatar
Jakub Adam committed
38 39 40
static gboolean
gst_bus_cb (GstBus * bus, GstMessage * message, gpointer data)
{
41 42
  GstBin *pipeline = GST_BIN (data);

Olivier Crête's avatar
Olivier Crête committed
43 44 45 46 47 48
  switch (GST_MESSAGE_TYPE(message)) {
  case GST_MESSAGE_ERROR:
    {
      GError *gerr;
      gchar *debug_msg;
      gst_message_parse_error (message, &gerr, &debug_msg);
49
      GST_DEBUG_BIN_TO_DOT_FILE (pipeline, GST_DEBUG_GRAPH_SHOW_ALL, "mss-pipeline-ERROR");
Olivier Crête's avatar
Olivier Crête committed
50 51 52 53 54 55 56 57 58
      g_error ("Error: %s (%s)", gerr->message, debug_msg);
      g_error_free (gerr);
      g_free (debug_msg);
    }
    break;
  case GST_MESSAGE_WARNING:
    {
      GError *gerr;
      gchar *debug_msg;
59 60 61
      gst_message_parse_warning (message, &gerr, &debug_msg);
      GST_DEBUG_BIN_TO_DOT_FILE (pipeline, GST_DEBUG_GRAPH_SHOW_ALL, "mss-pipeline-WARNING");
      g_warning ("Warning: %s (%s)", gerr->message, debug_msg);
Olivier Crête's avatar
Olivier Crête committed
62 63 64 65 66 67 68 69 70 71 72 73
      g_error_free (gerr);
      g_free (debug_msg);
    }
    break;
  case GST_MESSAGE_EOS:
    {
      g_error ("Got EOS!!");
    }
    break;
  default:
    break;
  }
Jakub Adam's avatar
Jakub Adam committed
74 75 76
  return TRUE;
}

Jakub Adam's avatar
Jakub Adam committed
77 78 79 80 81 82 83 84 85 86 87 88 89
static GstElement *
get_webrtcbin_for_client (GstBin *pipeline, MssClientId client_id)
{
  gchar *name;
  GstElement *webrtcbin;

  name = g_strdup_printf ("webrtcbin_%p", client_id);
  webrtcbin = gst_bin_get_by_name (pipeline, name);
  g_free (name);

  return webrtcbin;
}

Olivier Crête's avatar
Olivier Crête committed
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
static void
connect_webrtc_to_tee (GstElement *webrtcbin)
{
  GstElement *pipeline;
  GstElement *tee;
  GstPad *srcpad;
  GstPad *sinkpad;
  GstPadLinkReturn ret;

  pipeline = GST_ELEMENT (gst_element_get_parent (webrtcbin));
  if (pipeline == NULL)
    return;
  tee = gst_bin_get_by_name (GST_BIN (pipeline), "webrtctee");
  srcpad = gst_element_get_request_pad (tee, "src_%u");
  sinkpad = gst_element_get_request_pad (webrtcbin, "sink_0");
  ret = gst_pad_link (srcpad, sinkpad);
  g_assert (ret == GST_PAD_LINK_OK);
  gst_object_unref (srcpad);
  gst_object_unref (sinkpad);
  gst_object_unref (tee);
  gst_object_unref (pipeline);
}

Jakub Adam's avatar
Jakub Adam committed
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
static void
on_offer_created (GstPromise *promise, GstElement *webrtcbin)
{
  GstWebRTCSessionDescription *offer = NULL;
  gchar *sdp;

  gst_structure_get (gst_promise_get_reply (promise),
      "offer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
  gst_promise_unref (promise);

  g_signal_emit_by_name (webrtcbin, "set-local-description", offer, NULL);

  sdp = gst_sdp_message_as_text (offer->sdp);
  mss_http_server_send_sdp_offer (http_server,
      g_object_get_data (G_OBJECT (webrtcbin), "client_id"),
      sdp);
  g_free (sdp);

  gst_webrtc_session_description_free (offer);
Olivier Crête's avatar
Olivier Crête committed
132 133

  connect_webrtc_to_tee (webrtcbin);
Jakub Adam's avatar
Jakub Adam committed
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
}

static void
webrtc_on_ice_candidate_cb (GstElement *webrtcbin, guint mlineindex,
  gchar *candidate)
{
  mss_http_server_send_candidate (http_server,
      g_object_get_data (G_OBJECT (webrtcbin), "client_id"),
      mlineindex, candidate);
}

static void
webrtc_client_connected_cb (MssHttpServer *server, MssClientId client_id,
    GstBin *pipeline)
{
  gchar *name;
  GstElement *webrtcbin;
Olivier Crête's avatar
Olivier Crête committed
151
  GstCaps *caps;
152
  GstStateChangeReturn ret;
Jakub Adam's avatar
Jakub Adam committed
153 154 155 156 157 158

  name = g_strdup_printf ("webrtcbin_%p", client_id);

  webrtcbin = gst_element_factory_make ("webrtcbin", name);
  g_object_set_data (G_OBJECT (webrtcbin), "client_id", client_id);
  gst_bin_add (pipeline, webrtcbin);
159 160
  ret = gst_element_set_state (webrtcbin, GST_STATE_PLAYING);
  g_assert (ret != GST_STATE_CHANGE_FAILURE);
Jakub Adam's avatar
Jakub Adam committed
161 162 163 164

  g_signal_connect (webrtcbin, "on-ice-candidate",
      G_CALLBACK (webrtc_on_ice_candidate_cb), NULL);

Olivier Crête's avatar
Olivier Crête committed
165 166 167 168 169 170
  caps = gst_caps_from_string ("application/x-rtp, payload=96,encoding-name=H264,clock-rate=90000,media=video,packetization-mode=(string)1,profile-level-id=(string)42e01f");
  g_signal_emit_by_name (webrtcbin, "add-transceiver",
      GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_SENDONLY,
      caps);

  gst_caps_unref (caps);
Jakub Adam's avatar
Jakub Adam committed
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187

  g_signal_emit_by_name (webrtcbin, "create-offer", NULL,
      gst_promise_new_with_change_func (
          (GstPromiseChangeFunc) on_offer_created, webrtcbin,NULL));

  GST_DEBUG_BIN_TO_DOT_FILE (pipeline, GST_DEBUG_GRAPH_SHOW_ALL, "rtcbin");

  g_free (name);
}

static void
webrtc_sdp_answer_cb (MssHttpServer *server, MssClientId client_id,
    const gchar *sdp, GstBin *pipeline)
{
  GstSDPMessage *sdp_msg = NULL;
  GstWebRTCSessionDescription *desc = NULL;

Olivier Crête's avatar
Olivier Crête committed
188
  if (gst_sdp_message_new_from_text (sdp, &sdp_msg) != GST_SDP_OK) {
Jakub Adam's avatar
Jakub Adam committed
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
    g_debug ("Error parsing SDP description");
    goto out;
  }

  desc = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_ANSWER,
      sdp_msg);
  if (desc) {
    GstElement *webrtcbin;
    GstPromise *promise;

    webrtcbin = get_webrtcbin_for_client (pipeline, client_id);
    if (!webrtcbin) {
      goto out;
    }
    promise = gst_promise_new();

    g_signal_emit_by_name (webrtcbin, "set-remote-description", desc, promise);

    gst_promise_wait (promise);
    gst_promise_unref (promise);

    gst_object_unref (webrtcbin);
  } else {
    gst_sdp_message_free (sdp_msg);
  }

out:
  g_clear_pointer (&desc, gst_webrtc_session_description_free);
}

static void
webrtc_candidate_cb (MssHttpServer *server, MssClientId client_id,
    guint mlineindex, const gchar *candidate, GstBin *pipeline)
{
  if (strlen (candidate)) {
    GstElement *webrtcbin;

    webrtcbin = get_webrtcbin_for_client (pipeline, client_id);
    if (webrtcbin) {
      g_signal_emit_by_name (webrtcbin, "add-ice-candidate", mlineindex,
          candidate);
      gst_object_unref (webrtcbin);
    }
  }

  g_debug ("Remote candidate: %s", candidate);
}

static void
webrtc_client_disconnected_cb (MssHttpServer *server, MssClientId client_id,
    GstBin *pipeline)
{
  GstElement *webrtcbin;

  webrtcbin = get_webrtcbin_for_client (pipeline, client_id);
  if (webrtcbin) {
    gst_bin_remove (GST_BIN (GST_ELEMENT_PARENT (webrtcbin)), webrtcbin);
    gst_element_set_state (webrtcbin, GST_STATE_NULL);
    gst_object_unref (webrtcbin);
  }
}

Olivier Crête's avatar
Olivier Crête committed
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
struct RestartData {
  GstElement *src;
  GstElement *pipeline;
};

static void
free_restart_data (gpointer user_data)
{
  struct RestartData *rd = user_data;

  gst_object_unref (rd->src);
  g_free (rd);
}

static gboolean
restart_source (gpointer user_data)
{
  struct RestartData *rd = user_data;
269
  GstElement *e;
Olivier Crête's avatar
Olivier Crête committed
270 271 272 273
  GstStateChangeReturn ret;

  gst_element_set_state (rd->src, GST_STATE_NULL);
  gst_element_set_locked_state (rd->src, TRUE);
274
  e = gst_bin_get_by_name (GST_BIN (rd->pipeline), "srtqueue");
Olivier Crête's avatar
Olivier Crête committed
275
  gst_bin_add (GST_BIN (rd->pipeline), rd->src);
276 277
  if (!gst_element_link (rd->src, e))
    g_assert_not_reached ();
Olivier Crête's avatar
Olivier Crête committed
278 279 280
  gst_element_set_locked_state (rd->src, FALSE);
  ret = gst_element_set_state (rd->src, GST_STATE_PLAYING);
  g_assert (ret != GST_STATE_CHANGE_FAILURE);
281
  gst_object_unref (e);
Olivier Crête's avatar
Olivier Crête committed
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318

  g_debug ("Restarted source after EOS");

  return G_SOURCE_REMOVE;
}

static GstPadProbeReturn
src_event_cb (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
  GstElement *pipeline = user_data;
  GstElement *src;
  struct RestartData *rd;

  if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) != GST_EVENT_EOS)
    return GST_PAD_PROBE_PASS;

  src = gst_pad_get_parent_element (pad);

  gst_bin_remove (GST_BIN (pipeline), src);

  rd = g_new (struct RestartData, 1);
  rd->src = src;
  rd->pipeline = pipeline;
  g_idle_add_full (G_PRIORITY_HIGH_IDLE, restart_source, rd, free_restart_data);

  return GST_PAD_PROBE_DROP;
}

static gboolean
print_stats (gpointer user_data)
{
  GstElement *src = user_data;
  GstStructure *s;
  char *str;

  g_object_get (src, "stats", &s, NULL);
  str = gst_structure_to_string (s);
Nicolas Dufresne's avatar
Nicolas Dufresne committed
319
  //g_debug ("%s", str);
Olivier Crête's avatar
Olivier Crête committed
320 321 322 323 324 325
  g_free (str);
  gst_structure_free (s);

  return G_SOURCE_CONTINUE;
}

326 327
int main (int argc, char *argv[])
{
Jakub Adam's avatar
Jakub Adam committed
328
  GOptionContext *option_context;
329 330 331 332
  GMainLoop *loop;
  gchar *pipeline_str;
  GstElement *pipeline;
  GError *error = NULL;
Jakub Adam's avatar
Jakub Adam committed
333
  GstBus *bus;
Olivier Crête's avatar
Olivier Crête committed
334 335
  GstElement *src;
  GstPad *srcpad;
336
  GstStateChangeReturn ret;
Jakub Adam's avatar
Jakub Adam committed
337 338 339 340 341 342 343 344 345 346 347 348

  option_context = g_option_context_new (NULL);
  g_option_context_add_main_entries (option_context, options, NULL);

  if (!g_option_context_parse (option_context, &argc, &argv, &error)) {
    g_print ("option parsing failed: %s\n", error->message);
    exit (1);
  }

  if (!srt_uri) {
    srt_uri = g_strdup(DEFAULT_SRT_URI);
  }
349 350 351
  if (!rist_addresses) {
    rist_addresses = g_strdup (DEFAULT_RIST_ADDRESSES);
  }
352 353 354

  http_server = mss_http_server_new ();

355 356 357
  pipeline_str = g_strdup_printf (
      "srtsrc uri=%s?mode=listener name=src latency=50 ! queue name=srtqueue ! tsparse smoothing-latency=50000 set-timestamps=1 ! tee name=t "
      "t. ! queue ! rtpmp2tpay ! ristsink bonding-addresses=%s async=0 "
Nicolas Dufresne's avatar
Nicolas Dufresne committed
358
      "t. ! queue leaky=downstream max-size-buffers=400 ! srtsink uri=srt://:7002?mode=listener async=0 "
359
      "t. ! queue ! tsdemux latency=50 ! tee name=h264_t "
360
        "h264_t. ! queue ! decodebin ! videoconvert ! " DEFAULT_VIDEOSINK " async=0 "
Nicolas Dufresne's avatar
Nicolas Dufresne committed
361 362
        "h264_t. ! queue ! h264parse ! video/x-h264, alignment=au ! "
          "hlssink2 location=%s/segment%%05d.ts playlist-location=%s/playlist.m3u8 send-keyframe-requests=0 target-duration=1 playlist-length=5 async=0 "
363
        "h264_t. ! queue ! h264parse ! rtph264pay config-interval=1 ! application/x-rtp,payload=96 ! tee name=webrtctee allow-not-linked=true ",
Jakub Adam's avatar
Jakub Adam committed
364
      srt_uri,
365
      rist_addresses,
366
      mss_http_server_get_hls_dir (http_server),
367
      mss_http_server_get_hls_dir (http_server));
368 369 370 371 372 373

  gst_init (&argc, &argv);
  pipeline = gst_parse_launch (pipeline_str, &error);
  g_assert_no_error (error);
  g_free (pipeline_str);

Jakub Adam's avatar
Jakub Adam committed
374
  bus = gst_element_get_bus (pipeline);
375
  gst_bus_add_watch (bus, gst_bus_cb, pipeline);
Jakub Adam's avatar
Jakub Adam committed
376 377
  gst_object_unref (bus);

Jakub Adam's avatar
Jakub Adam committed
378 379 380 381 382 383 384
  g_signal_connect (http_server, "ws-client-disconnected",
      G_CALLBACK (webrtc_client_disconnected_cb), pipeline);
  g_signal_connect (http_server, "sdp-answer",
      G_CALLBACK (webrtc_sdp_answer_cb), pipeline);
  g_signal_connect (http_server, "candidate",
      G_CALLBACK (webrtc_candidate_cb), pipeline);

385 386 387
  loop = g_main_loop_new (NULL, FALSE);
  g_unix_signal_add (SIGINT, sigint_handler, loop);

388
  g_print ("Input SRT URI is %s\n"
389
      "\nOutput streams:\n"
390
      "\tHLS & WebRTC web player: http://localhost:8080\n"
Jakub Adam's avatar
Jakub Adam committed
391
      "\tRIST: %s\n"
Olivier Crête's avatar
Olivier Crête committed
392
      "\tSRT: srt://127.0.0.1:7002\n",
393 394
      srt_uri,
      rist_addresses);
395

Olivier Crête's avatar
Olivier Crête committed
396 397 398 399 400 401 402 403
  src = gst_bin_get_by_name (GST_BIN (pipeline), "src");
  srcpad = gst_element_get_static_pad (src, "src");
  gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, src_event_cb,
      pipeline, NULL);
  g_timeout_add (1000, print_stats, src);
  gst_object_unref (srcpad);
  gst_object_unref (src);

404 405
  ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
  g_assert (ret != GST_STATE_CHANGE_FAILURE);
406 407 408 409

  g_signal_connect (http_server, "ws-client-connected",
      G_CALLBACK (webrtc_client_connected_cb), pipeline);

410 411 412
  g_main_loop_run (loop);
  g_main_loop_unref (loop);

413 414
  gst_element_set_state (pipeline, GST_STATE_NULL);

415 416
  gst_clear_object (&pipeline);
  g_clear_object (&http_server);
417 418
  g_clear_pointer (&srt_uri, g_free);
  g_clear_pointer (&rist_addresses, g_free);
419
}