gstrtpbin.c 85.6 KB
Newer Older
1
/* GStreamer
2
 * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

/**
21
22
 * SECTION:element-gstrtpbin
 * @see_also: gstrtpjitterbuffer, gstrtpsession, gstrtpptdemux, gstrtpssrcdemux
23
 *
24
 * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
25
26
 * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
 * RTP sessions that will be synchronized together using RTCP SR packets.
Wim Taymans's avatar
Wim Taymans committed
27
 *
28
29
 * #GstRtpBin is configured with a number of request pads that define the
 * functionality that is activated, similar to the #GstRtpSession element.
Wim Taymans's avatar
Wim Taymans committed
30
 *
Stefan Kost's avatar
Stefan Kost committed
31
 * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%d pad. The session
Wim Taymans's avatar
Wim Taymans committed
32
 * number must be specified in the pad name.
Stefan Kost's avatar
Stefan Kost committed
33
 * Data received on the recv_rtp_sink_\%d pad will be processed in the #GstRtpSession
34
 * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
35
 * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36
 * the packets are released from the jitterbuffer, they will be forwarded to a
37
 * #GstRtpSsrcDemux element. The #GstRtpSsrcDemux element will demux the packets based
Stefan Kost's avatar
Stefan Kost committed
38
 * on the payload type and will create a unique pad recv_rtp_src_\%d_\%d_\%d on
39
 * gstrtpbin with the session number, SSRC and payload type respectively as the pad
Wim Taymans's avatar
Wim Taymans committed
40
 * name.
Wim Taymans's avatar
Wim Taymans committed
41
 *
Stefan Kost's avatar
Stefan Kost committed
42
 * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%d pad. The
Wim Taymans's avatar
Wim Taymans committed
43
 * session number must be specified in the pad name.
Wim Taymans's avatar
Wim Taymans committed
44
 *
Wim Taymans's avatar
Wim Taymans committed
45
 * If you want the session manager to generate and send RTCP packets, request
Stefan Kost's avatar
Stefan Kost committed
46
 * the send_rtcp_src_\%d pad with the session number in the pad name. Packet pushed
Wim Taymans's avatar
Wim Taymans committed
47
48
 * on this pad contain SR/RR RTCP reports that should be sent to all participants
 * in the session.
Wim Taymans's avatar
Wim Taymans committed
49
 *
Stefan Kost's avatar
Stefan Kost committed
50
51
 * To use #GstRtpBin as a sender, request a send_rtp_sink_\%d pad, which will
 * automatically create a send_rtp_src_\%d pad. If the session number is not provided,
52
 * the pad from the lowest available session will be returned. The session manager will modify the
Wim Taymans's avatar
Wim Taymans committed
53
 * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
Stefan Kost's avatar
Stefan Kost committed
54
 * send_rtp_src_\%d pad after updating its internal state.
Wim Taymans's avatar
Wim Taymans committed
55
 *
Wim Taymans's avatar
Wim Taymans committed
56
 * The session manager needs the clock-rate of the payload types it is handling
57
58
 * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
 * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
Wim Taymans's avatar
Wim Taymans committed
59
 * signal.
Wim Taymans's avatar
Wim Taymans committed
60
 *
61
 * <refsect2>
62
 * <title>Example pipelines</title>
63
 * |[
Wim Taymans's avatar
Wim Taymans committed
64
 * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
65
 *     gstrtpbin ! rtptheoradepay ! theoradec ! xvimagesink
66
67
 * ]| Receive RTP data from port 5000 and send to the session 0 in gstrtpbin.
 * |[
68
69
 * gst-launch gstrtpbin name=rtpbin \
 *         v4l2src ! ffmpegcolorspace ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
70
71
72
73
74
75
 *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
 *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
 *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
 *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
 *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
 *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
76
 *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
77
 * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
78
79
80
81
82
83
 * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
 * and the audio is sent to session 1. Video packets are sent on UDP port 5000
 * and audio packets on port 5002. The video RTCP packets for session 0 are sent
 * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
 * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
 * is received on port 5007. Since RTCP packets from the sender should be sent
Wim Taymans's avatar
Wim Taymans committed
84
 * as soon as possible and do not participate in preroll, sync=false and
85
 * async=false is configured on udpsink
86
87
 * |[
 * gst-launch -v gstrtpbin name=rtpbin                                          \
88
 *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
89
90
91
92
93
94
95
96
97
 *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
 *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
 *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
 *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
 *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
 *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
 *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
 *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
 *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
98
 * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
99
100
101
102
103
104
105
106
 * decode and display the video.
 * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
 * decode and play the audio.
 * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
 * session 1 on port 5003. These packets will be used for session management and
 * synchronisation.
 * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
 * on port 5007.
107
108
 * </refsect2>
 *
109
 * Last reviewed on 2007-08-30 (0.10.6)
110
111
112
113
114
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
115
#include <stdio.h>
116
117
#include <string.h>

118
119
120
#include <gst/rtp/gstrtpbuffer.h>
#include <gst/rtp/gstrtcpbuffer.h>

121
#include "gstrtpbin-marshal.h"
122
#include "gstrtpbin.h"
123
#include "rtpsession.h"
124
#include "gstrtpsession.h"
125
#include "gstrtpjitterbuffer.h"
126

127
128
129
GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
#define GST_CAT_DEFAULT gst_rtp_bin_debug

130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/* sink pads */
static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%d",
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("application/x-rtp")
    );

static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%d",
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("application/x-rtcp")
    );

static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%d",
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("application/x-rtp")
    );

/* src pads */
static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%d_%d_%d",
    GST_PAD_SRC,
    GST_PAD_SOMETIMES,
    GST_STATIC_CAPS ("application/x-rtp")
    );

160
161
static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%d",
162
163
164
165
166
167
168
169
170
171
172
173
174
    GST_PAD_SRC,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("application/x-rtcp")
    );

static GstStaticPadTemplate rtpbin_send_rtp_src_template =
GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d",
    GST_PAD_SRC,
    GST_PAD_SOMETIMES,
    GST_STATIC_CAPS ("application/x-rtp")
    );

#define GST_RTP_BIN_GET_PRIVATE(obj)  \
175
   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
176

177
178
179
#define GST_RTP_BIN_LOCK(bin)   g_mutex_lock ((bin)->priv->bin_lock)
#define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock ((bin)->priv->bin_lock)

180
181
182
183
/* lock to protect dynamic callbacks, like pad-added and new ssrc. */
#define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock ((bin)->priv->dyn_lock)
#define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock ((bin)->priv->dyn_lock)

184
185
186
187
188
/* lock for shutdown */
#define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
G_STMT_START {                                   \
  if (g_atomic_int_get (&bin->priv->shutdown))   \
    goto label;                                  \
189
  GST_RTP_BIN_DYN_LOCK (bin);                    \
190
  if (g_atomic_int_get (&bin->priv->shutdown)) { \
191
    GST_RTP_BIN_DYN_UNLOCK (bin);                \
192
193
194
195
196
197
    goto label;                                  \
  }                                              \
} G_STMT_END

/* unlock for shutdown */
#define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
198
  GST_RTP_BIN_DYN_UNLOCK (bin);                  \
199

200
struct _GstRtpBinPrivate
201
{
202
  GMutex *bin_lock;
203

204
205
206
  /* lock protecting dynamic adding/removing */
  GMutex *dyn_lock;

207
208
  /* if we are shutting down or not */
  gint shutdown;
209
210

  gboolean autoremove;
211
212
213
214
215
};

/* signals and args */
enum
{
216
  SIGNAL_REQUEST_PT_MAP,
217
  SIGNAL_PAYLOAD_TYPE_CHANGE,
218
  SIGNAL_CLEAR_PT_MAP,
219
  SIGNAL_RESET_SYNC,
220
  SIGNAL_GET_INTERNAL_SESSION,
221
222
223
224

  SIGNAL_ON_NEW_SSRC,
  SIGNAL_ON_SSRC_COLLISION,
  SIGNAL_ON_SSRC_VALIDATED,
225
  SIGNAL_ON_SSRC_ACTIVE,
226
  SIGNAL_ON_SSRC_SDES,
227
228
229
  SIGNAL_ON_BYE_SSRC,
  SIGNAL_ON_BYE_TIMEOUT,
  SIGNAL_ON_TIMEOUT,
230
  SIGNAL_ON_SENDER_TIMEOUT,
Wim Taymans's avatar
Wim Taymans committed
231
  SIGNAL_ON_NPT_STOP,
232
233
234
  LAST_SIGNAL
};

235
#define DEFAULT_LATENCY_MS	     200
Wim Taymans's avatar
Wim Taymans committed
236
#define DEFAULT_SDES                 NULL
237
#define DEFAULT_DO_LOST              FALSE
Marc Leeman's avatar
Marc Leeman committed
238
#define DEFAULT_IGNORE_PT            FALSE
239
#define DEFAULT_AUTOREMOVE           FALSE
240
#define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
241

242
243
enum
{
244
  PROP_0,
245
  PROP_LATENCY,
Wim Taymans's avatar
Wim Taymans committed
246
  PROP_SDES,
247
  PROP_DO_LOST,
Marc Leeman's avatar
Marc Leeman committed
248
  PROP_IGNORE_PT,
249
  PROP_AUTOREMOVE,
250
  PROP_BUFFER_MODE,
251
  PROP_LAST
252
253
};

254
/* helper objects */
255
256
257
typedef struct _GstRtpBinSession GstRtpBinSession;
typedef struct _GstRtpBinStream GstRtpBinStream;
typedef struct _GstRtpBinClient GstRtpBinClient;
258

259
260
static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };

261
static GstCaps *pt_map_requested (GstElement * element, guint pt,
262
    GstRtpBinSession * session);
263
264
static void payload_type_change (GstElement * element, guint pt,
    GstRtpBinSession * session);
265
static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
266
267
static void free_stream (GstRtpBinStream * stream);

268
269
270
271
/* Manages the RTP stream for one SSRC.
 *
 * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
 * If we see an SDES RTCP packet that links multiple SSRCs together based on a
272
 * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
273
274
 * together (see below).
 */
275
struct _GstRtpBinStream
276
277
278
{
  /* the SSRC of this stream */
  guint32 ssrc;
279

280
  /* parent bin */
281
  GstRtpBin *bin;
282

283
  /* the session this SSRC belongs to */
284
  GstRtpBinSession *session;
285

286
287
  /* the jitterbuffer of the SSRC */
  GstElement *buffer;
288
289
290
  gulong buffer_handlesync_sig;
  gulong buffer_ptreq_sig;
  gulong buffer_ntpstop_sig;
Wim Taymans's avatar
Wim Taymans committed
291
  gint percent;
292

293
294
295
  /* the PT demuxer of the SSRC */
  GstElement *demux;
  gulong demux_newpad_sig;
296
  gulong demux_padremoved_sig;
297
  gulong demux_ptreq_sig;
298
  gulong demux_ptchange_sig;
299

300
  /* if we have calculated a valid unix_delta for this stream */
301
302
303
  gboolean have_sync;
  /* mapping to local RTP and NTP time */
  gint64 unix_delta;
304
305
};

306
307
308
#define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock ((sess)->lock)
#define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock ((sess)->lock)

309
310
311
312
313
314
/* Manages the receiving end of the packets.
 *
 * There is one such structure for each RTP session (audio/video/...).
 * We get the RTP/RTCP packets and stuff them into the session manager. From
 * there they are pushed into an SSRC demuxer that splits the stream based on
 * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
315
 * the GstRtpBinStream above).
316
 */
317
struct _GstRtpBinSession
318
319
320
{
  /* session id */
  gint id;
321
  /* the parent bin */
322
  GstRtpBin *bin;
323
324
325
  /* the session element */
  GstElement *session;
  /* the SSRC demuxer */
326
327
  GstElement *demux;
  gulong demux_newpad_sig;
328
  gulong demux_padremoved_sig;
329

330
331
  GMutex *lock;

332
  /* list of GstRtpBinStream */
333
  GSList *streams;
334

335
336
337
  /* mapping of payload type to caps */
  GHashTable *ptmap;

338
339
  /* the pads of the session */
  GstPad *recv_rtp_sink;
340
  GstPad *recv_rtp_sink_ghost;
341
  GstPad *recv_rtp_src;
342
  GstPad *recv_rtcp_sink;
343
  GstPad *recv_rtcp_sink_ghost;
344
  GstPad *sync_src;
345
  GstPad *send_rtp_sink;
346
  GstPad *send_rtp_sink_ghost;
347
  GstPad *send_rtp_src;
348
  GstPad *send_rtp_src_ghost;
349
  GstPad *send_rtcp_src;
350
  GstPad *send_rtcp_src_ghost;
351
};
352

353
354
355
356
357
358
359
360
361
362
363
364
365
366
/* Manages the RTP streams that come from one client and should therefore be
 * synchronized.
 */
struct _GstRtpBinClient
{
  /* the common CNAME for the streams */
  gchar *cname;
  guint cname_len;

  /* the streams */
  guint nstreams;
  GSList *streams;
};

367
/* find a session with the given id. Must be called with RTP_BIN_LOCK */
368
369
static GstRtpBinSession *
find_session_by_id (GstRtpBin * rtpbin, gint id)
370
{
371
  GSList *walk;
372

373
  for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
374
    GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
375
376
377
378
379
380
381

    if (sess->id == id)
      return sess;
  }
  return NULL;
}

382
383
384
385
386
387
388
389
390
/* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
static GstRtpBinSession *
find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
{
  GSList *walk;

  for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
    GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;

391
392
393
394
    if ((sess->recv_rtp_sink_ghost == pad) ||
        (sess->recv_rtcp_sink_ghost == pad) ||
        (sess->send_rtp_sink_ghost == pad)
        || (sess->send_rtcp_src_ghost == pad))
395
396
397
398
399
      return sess;
  }
  return NULL;
}

400
static void
401
on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
402
403
404
405
406
407
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
      sess->id, ssrc);
}

static void
408
on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
409
410
411
412
413
414
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
      sess->id, ssrc);
}

static void
415
on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
416
417
418
419
420
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
      sess->id, ssrc);
}

421
422
423
424
425
426
427
static void
on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
      sess->id, ssrc);
}

428
429
430
431
432
433
434
static void
on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
      sess->id, ssrc);
}

435
static void
436
on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
437
438
439
440
441
442
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
      sess->id, ssrc);
}

static void
443
on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
444
445
446
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
      sess->id, ssrc);
447
448
449

  if (sess->bin->priv->autoremove)
    g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
450
451
452
}

static void
453
on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
454
455
456
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
      sess->id, ssrc);
457
458
459

  if (sess->bin->priv->autoremove)
    g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
460
461
}

462
463
464
465
466
467
468
static void
on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
      sess->id, ssrc);
}

Wim Taymans's avatar
Wim Taymans committed
469
470
471
472
473
474
475
static void
on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
{
  g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
      stream->session->id, stream->ssrc);
}

476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
/* must be called with the SESSION lock */
static GstRtpBinStream *
find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
{
  GSList *walk;

  for (walk = session->streams; walk; walk = g_slist_next (walk)) {
    GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;

    if (stream->ssrc == ssrc)
      return stream;
  }
  return NULL;
}

static void
492
ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
493
494
495
496
497
    GstRtpBinSession * session)
{
  GstRtpBinStream *stream = NULL;

  GST_RTP_SESSION_LOCK (session);
498
  if ((stream = find_stream_by_ssrc (session, ssrc)))
499
500
    session->streams = g_slist_remove (session->streams, stream);
  GST_RTP_SESSION_UNLOCK (session);
501
502
503

  if (stream)
    free_stream (stream);
504
505
}

506
/* create a session with the given id.  Must be called with RTP_BIN_LOCK */
507
508
static GstRtpBinSession *
create_session (GstRtpBin * rtpbin, gint id)
509
{
510
  GstRtpBinSession *sess;
511
  GstElement *session, *demux;
512
  GstState target;
513

514
  if (!(session = gst_element_factory_make ("gstrtpsession", NULL)))
515
516
    goto no_session;

517
  if (!(demux = gst_element_factory_make ("gstrtpssrcdemux", NULL)))
518
519
    goto no_demux;

520
  sess = g_new0 (GstRtpBinSession, 1);
521
  sess->lock = g_mutex_new ();
522
  sess->id = id;
523
  sess->bin = rtpbin;
524
  sess->session = session;
525
  sess->demux = demux;
526
527
  sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
      (GDestroyNotify) gst_caps_unref);
528
529
  rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);

530
531
  /* configure SDES items */
  GST_OBJECT_LOCK (rtpbin);
Wim Taymans's avatar
Wim Taymans committed
532
  g_object_set (session, "sdes", rtpbin->sdes, NULL);
533
  GST_OBJECT_UNLOCK (rtpbin);
534

535
536
537
538
  /* provide clock_rate to the session manager when needed */
  g_signal_connect (session, "request-pt-map",
      (GCallback) pt_map_requested, sess);

539
540
541
542
543
544
  g_signal_connect (sess->session, "on-new-ssrc",
      (GCallback) on_new_ssrc, sess);
  g_signal_connect (sess->session, "on-ssrc-collision",
      (GCallback) on_ssrc_collision, sess);
  g_signal_connect (sess->session, "on-ssrc-validated",
      (GCallback) on_ssrc_validated, sess);
545
546
  g_signal_connect (sess->session, "on-ssrc-active",
      (GCallback) on_ssrc_active, sess);
547
548
  g_signal_connect (sess->session, "on-ssrc-sdes",
      (GCallback) on_ssrc_sdes, sess);
549
550
551
552
553
  g_signal_connect (sess->session, "on-bye-ssrc",
      (GCallback) on_bye_ssrc, sess);
  g_signal_connect (sess->session, "on-bye-timeout",
      (GCallback) on_bye_timeout, sess);
  g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
554
555
  g_signal_connect (sess->session, "on-sender-timeout",
      (GCallback) on_sender_timeout, sess);
556

557
  gst_bin_add (GST_BIN_CAST (rtpbin), session);
558
  gst_bin_add (GST_BIN_CAST (rtpbin), demux);
559
560
561
562
563
564
565
566

  GST_OBJECT_LOCK (rtpbin);
  target = GST_STATE_TARGET (rtpbin);
  GST_OBJECT_UNLOCK (rtpbin);

  /* change state only to what's needed */
  gst_element_set_state (demux, target);
  gst_element_set_state (session, target);
567
568
569
570
571
572

  return sess;

  /* ERRORS */
no_session:
  {
573
    g_warning ("gstrtpbin: could not create gstrtpsession element");
574
575
    return NULL;
  }
576
577
no_demux:
  {
578
    gst_object_unref (session);
579
    g_warning ("gstrtpbin: could not create gstrtpssrcdemux element");
580
581
582
583
    return NULL;
  }
}

584
static void
585
free_session (GstRtpBinSession * sess, GstRtpBin * bin)
586
{
587
588
  GSList *client_walk;

589
590
  GST_DEBUG_OBJECT (bin, "freeing session %p", sess);

591
592
593
  gst_element_set_locked_state (sess->demux, TRUE);
  gst_element_set_locked_state (sess->session, TRUE);

594
  gst_element_set_state (sess->demux, GST_STATE_NULL);
595
  gst_element_set_state (sess->session, GST_STATE_NULL);
596

597
  if (sess->recv_rtp_sink != NULL) {
598
    gst_element_release_request_pad (sess->session, sess->recv_rtp_sink);
599
600
    gst_object_unref (sess->recv_rtp_sink);
  }
601
602
  if (sess->recv_rtp_src != NULL)
    gst_object_unref (sess->recv_rtp_src);
603
  if (sess->recv_rtcp_sink != NULL) {
604
    gst_element_release_request_pad (sess->session, sess->recv_rtcp_sink);
605
606
    gst_object_unref (sess->recv_rtcp_sink);
  }
607
608
  if (sess->sync_src != NULL)
    gst_object_unref (sess->sync_src);
609
  if (sess->send_rtp_sink != NULL) {
610
    gst_element_release_request_pad (sess->session, sess->send_rtp_sink);
611
612
    gst_object_unref (sess->send_rtp_sink);
  }
613
614
  if (sess->send_rtp_src != NULL)
    gst_object_unref (sess->send_rtp_src);
615
  if (sess->send_rtcp_src != NULL) {
616
    gst_element_release_request_pad (sess->session, sess->send_rtcp_src);
617
618
    gst_object_unref (sess->send_rtcp_src);
  }
619

620
621
622
  gst_bin_remove (GST_BIN_CAST (bin), sess->session);
  gst_bin_remove (GST_BIN_CAST (bin), sess->demux);

623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
  /* remove any references in bin->clients to the streams in sess->streams */
  client_walk = bin->clients;
  while (client_walk) {
    GSList *client_node = client_walk;
    GstRtpBinClient *client = (GstRtpBinClient *) client_node->data;
    GSList *stream_walk = client->streams;

    while (stream_walk) {
      GSList *stream_node = stream_walk;
      GstRtpBinStream *stream = (GstRtpBinStream *) stream_node->data;
      GSList *inner_walk;

      stream_walk = g_slist_next (stream_walk);

      for (inner_walk = sess->streams; inner_walk;
          inner_walk = g_slist_next (inner_walk)) {
        if ((GstRtpBinStream *) inner_walk->data == stream) {
          client->streams = g_slist_delete_link (client->streams, stream_node);
          --client->nstreams;
          break;
        }
      }
    }
    client_walk = g_slist_next (client_walk);

    g_assert ((client->streams && client->nstreams > 0) || (!client->streams
            && client->streams == 0));
    if (client->nstreams == 0) {
      free_client (client, bin);
      bin->clients = g_slist_delete_link (bin->clients, client_node);
    }
  }

656
657
658
659
660
661
662
663
664
  g_slist_foreach (sess->streams, (GFunc) free_stream, NULL);
  g_slist_free (sess->streams);

  g_mutex_free (sess->lock);
  g_hash_table_destroy (sess->ptmap);

  g_free (sess);
}

665
666
/* get the payload type caps for the specific payload @pt in @session */
static GstCaps *
667
get_pt_map (GstRtpBinSession * session, guint pt)
668
669
{
  GstCaps *caps = NULL;
670
  GstRtpBin *bin;
671
672
  GValue ret = { 0 };
  GValue args[3] = { {0}, {0}, {0} };
673

674
  GST_DEBUG ("searching pt %d in cache", pt);
675

676
677
  GST_RTP_SESSION_LOCK (session);

678
679
  /* first look in the cache */
  caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
680
681
  if (caps) {
    gst_caps_ref (caps);
682
    goto done;
683
  }
684
685
686
687
688
689

  bin = session->bin;

  GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);

  /* not in cache, send signal to request caps */
690
691
692
693
694
695
696
697
698
699
  g_value_init (&args[0], GST_TYPE_ELEMENT);
  g_value_set_object (&args[0], bin);
  g_value_init (&args[1], G_TYPE_UINT);
  g_value_set_uint (&args[1], session->id);
  g_value_init (&args[2], G_TYPE_UINT);
  g_value_set_uint (&args[2], pt);

  g_value_init (&ret, GST_TYPE_CAPS);
  g_value_set_boxed (&ret, NULL);

700
701
  GST_RTP_SESSION_UNLOCK (session);

702
703
  g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);

704
705
  GST_RTP_SESSION_LOCK (session);

706
707
708
  g_value_unset (&args[0]);
  g_value_unset (&args[1]);
  g_value_unset (&args[2]);
709
710
711
712
713
714
715
716
717

  /* look in the cache again because we let the lock go */
  caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
  if (caps) {
    gst_caps_ref (caps);
    g_value_unset (&ret);
    goto done;
  }

718
719
  caps = (GstCaps *) g_value_dup_boxed (&ret);
  g_value_unset (&ret);
720
721
722
  if (!caps)
    goto no_caps;

723
724
  GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);

725
726
727
  /* store in cache, take additional ref */
  g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
      gst_caps_ref (caps));
728
729

done:
730
731
  GST_RTP_SESSION_UNLOCK (session);

732
733
734
735
736
  return caps;

  /* ERRORS */
no_caps:
  {
737
    GST_RTP_SESSION_UNLOCK (session);
738
739
740
741
742
    GST_DEBUG ("no pt map could be obtained");
    return NULL;
  }
}

743
744
745
746
747
748
static gboolean
return_true (gpointer key, gpointer value, gpointer user_data)
{
  return TRUE;
}

749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
static void
gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
{
  GSList *clients, *streams;

  GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");

  GST_RTP_BIN_LOCK (rtpbin);
  for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
    GstRtpBinClient *client = (GstRtpBinClient *) clients->data;

    /* reset sync on all streams for this client */
    for (streams = client->streams; streams; streams = g_slist_next (streams)) {
      GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;

      /* make use require a new SR packet for this stream before we attempt new
       * lip-sync */
      stream->have_sync = FALSE;
      stream->unix_delta = 0;
    }
  }
  GST_RTP_BIN_UNLOCK (rtpbin);
}

773
static void
774
gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
775
{
776
  GSList *sessions, *streams;
777
778

  GST_RTP_BIN_LOCK (bin);
779
  GST_DEBUG_OBJECT (bin, "clearing pt map");
780
781
782
783
784
  for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
    GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;

    GST_DEBUG_OBJECT (bin, "clearing session %p", session);
    g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
785
786

    GST_RTP_SESSION_LOCK (session);
787
    g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
788
789
790
791
792
793

    for (streams = session->streams; streams; streams = g_slist_next (streams)) {
      GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;

      GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
      g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
Marc Leeman's avatar
Marc Leeman committed
794
795
      if (stream->demux)
        g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
796
    }
797
798
799
    GST_RTP_SESSION_UNLOCK (session);
  }
  GST_RTP_BIN_UNLOCK (bin);
800
801
802

  /* reset sync too */
  gst_rtp_bin_reset_sync (bin);
803
804
}

805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
static RTPSession *
gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
{
  RTPSession *internal_session = NULL;
  GstRtpBinSession *session;

  GST_RTP_BIN_LOCK (bin);
  GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d",
      session_id);
  session = find_session_by_id (bin, (gint) session_id);
  if (session) {
    g_object_get (session->session, "internal-session", &internal_session,
        NULL);
  }
  GST_RTP_BIN_UNLOCK (bin);

  return internal_session;
}

824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
static void
gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
    const gchar * name, const GValue * value)
{
  GSList *sessions, *streams;

  GST_RTP_BIN_LOCK (bin);
  for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
    GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;

    GST_RTP_SESSION_LOCK (session);
    for (streams = session->streams; streams; streams = g_slist_next (streams)) {
      GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;

      g_object_set_property (G_OBJECT (stream->buffer), name, value);
    }
    GST_RTP_SESSION_UNLOCK (session);
  }
  GST_RTP_BIN_UNLOCK (bin);
}

845
/* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
846
static GstRtpBinClient *
847
get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
{
  GstRtpBinClient *result = NULL;
  GSList *walk;

  for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
    GstRtpBinClient *client = (GstRtpBinClient *) walk->data;

    if (len != client->cname_len)
      continue;

    if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
      GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
          client->cname);
      result = client;
      break;
    }
  }

  /* nothing found, create one */
  if (result == NULL) {
    result = g_new0 (GstRtpBinClient, 1);
    result->cname = g_strndup ((gchar *) data, len);
    result->cname_len = len;
    bin->clients = g_slist_prepend (bin->clients, result);
    GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
        result->cname);
  }
  return result;
}

878
static void
879
free_client (GstRtpBinClient * client, GstRtpBin * bin)
880
{
881
  GST_DEBUG_OBJECT (bin, "freeing client %p", client);
882
  g_slist_free (client->streams);
883
884
885
886
  g_free (client->cname);
  g_free (client);
}

887
/* associate a stream to the given CNAME. This will make sure all streams for
888
889
 * that CNAME are synchronized together.
 * Must be called with GST_RTP_BIN_LOCK */
890
891
static void
gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
892
893
    guint8 * data, guint64 last_unix, guint64 last_extrtptime,
    guint64 clock_base, guint64 clock_base_time, guint clock_rate)
894
895
896
897
{
  GstRtpBinClient *client;
  gboolean created;
  GSList *walk;
898
899
  guint64 local_unix;
  guint64 local_rtp;
900
901

  /* first find or create the CNAME */
902
  client = get_client (bin, len, data, &created);
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923

  /* find stream in the client */
  for (walk = client->streams; walk; walk = g_slist_next (walk)) {
    GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;

    if (ostream == stream)
      break;
  }
  /* not found, add it to the list */
  if (walk == NULL) {
    GST_DEBUG_OBJECT (bin,
        "new association of SSRC %08x with client %p with CNAME %s",
        stream->ssrc, client, client->cname);
    client->streams = g_slist_prepend (client->streams, stream);
    client->nstreams++;
  } else {
    GST_DEBUG_OBJECT (bin,
        "found association of SSRC %08x with client %p with CNAME %s",
        stream->ssrc, client, client->cname);
  }

924
925
926
  /* take the extended rtptime we found in the SR packet and map it to the
   * local rtptime. The local rtp time is used to construct timestamps on the
   * buffers. */
927
  local_rtp = last_extrtptime - clock_base;
928
929
930

  GST_DEBUG_OBJECT (bin,
      "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
931
932
      ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", clock_base,
      last_extrtptime, local_rtp, clock_rate);
933

934
935
936
  /* calculate local NTP time in gstreamer timestamp, we essentially perform the
   * same conversion that a jitterbuffer would use to convert an rtp timestamp
   * into a corresponding gstreamer timestamp. */
937
938
939
  local_unix = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
  local_unix += clock_base_time;

940
941
942
  /* calculate delta between server and receiver. last_unix is created by
   * converting the ntptime in the last SR packet to a gstreamer timestamp. This
   * delta expresses the difference to our timeline and the server timeline. */
943
944
  stream->unix_delta = last_unix - local_unix;
  stream->have_sync = TRUE;
945
946
947

  GST_DEBUG_OBJECT (bin,
      "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT
948
      ", delta %" G_GINT64_FORMAT, local_unix, last_unix, stream->unix_delta);
949

950
  /* recalc inter stream playout offset, but only if there is more than one
951
952
953
954
   * stream. */
  if (client->nstreams > 1) {
    gint64 min;

955
956
    /* calculate the min of all deltas, ignoring streams that did not yet have a
     * valid unix_delta because we did not yet receive an SR packet for those
Wim Taymans's avatar
Wim Taymans committed
957
     * streams.
958
959
960
961
     * We calculate the mininum because we would like to only apply positive
     * offsets to streams, delaying their playback instead of trying to speed up
     * other streams (which might be imposible when we have to create negative
     * latencies).
962
     * The stream that has the smallest diff is selected as the reference stream,
963
     * all other streams will have a positive offset to this difference. */
964
965
966
967
    min = G_MAXINT64;
    for (walk = client->streams; walk; walk = g_slist_next (walk)) {
      GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;

968
969
970
971
      if (!ostream->have_sync)
        continue;

      if (ostream->unix_delta < min)
972
973
974
975
976
977
978
979
980
        min = ostream->unix_delta;
    }

    GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client,
        min);

    /* calculate offsets for each stream */
    for (walk = client->streams; walk; walk = g_slist_next (walk)) {
      GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
981
      gint64 ts_offset, prev_ts_offset;
982

983
984
985
986
987
988
989
990
      /* ignore streams for which we didn't receive an SR packet yet, we
       * can't synchronize them yet. We can however sync other streams just
       * fine. */
      if (!ostream->have_sync)
        continue;

      /* calculate offset to our reference stream, this should always give a
       * positive number. */
991
      ts_offset = ostream->unix_delta - min;
992

993
994
      g_object_get (ostream->buffer, "ts-offset", &prev_ts_offset, NULL);

995
      /* delta changed, see how much */
996
      if (prev_ts_offset != ts_offset) {
997
998
        gint64 diff;

999
1000
        if (prev_ts_offset > ts_offset)
          diff = prev_ts_offset - ts_offset;
1001
        else
1002
          diff = ts_offset - prev_ts_offset;
1003

1004
1005
        GST_DEBUG_OBJECT (bin,
            "ts-offset %" G_GUINT64_FORMAT ", prev %" G_GUINT64_FORMAT
1006
            ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
1007

1008
        /* only change diff when it changed more than 4 milliseconds. This
1009
1010
         * compensates for rounding errors in NTP to RTP timestamp
         * conversions */
1011
        if (diff > 4 * GST_MSECOND && diff < (3 * GST_SECOND)) {
1012
          g_object_set (ostream->buffer, "ts-offset", ts_offset, NULL);
1013
        }
1014
1015
      }
      GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
1016
          ostream->ssrc, ts_offset);
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
    }
  }
  return;
}

#define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
  for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
          (b) = gst_rtcp_packet_move_to_next ((packet)))

#define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
  for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
          (b) = gst_rtcp_packet_sdes_next_item ((packet)))

#define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
  for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
          (b) = gst_rtcp_packet_sdes_next_entry ((packet)))

1034
1035
1036
static void
gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
    GstRtpBinStream * stream)
1037
1038
1039
1040
1041
1042
1043
{
  GstRtpBin *bin;
  GstRTCPPacket packet;
  guint32 ssrc;
  guint64 ntptime;
  gboolean have_sr, have_sdes;
  gboolean more;
1044
  guint64 clock_base;
1045
  guint64 clock_base_time;
1046
  guint clock_rate;
1047
1048
  guint64 extrtptime;
  GstBuffer *buffer;
1049
1050
1051

  bin = stream->bin;

1052
  GST_DEBUG_OBJECT (bin, "sync handler called");
1053

1054
1055
  /* get the last relation between the rtp timestamps and the gstreamer
   * timestamps. We get this info directly from the jitterbuffer which
1056
1057
   * constructs gstreamer timestamps from rtp timestamps and so it know exactly
   * what the current situation is. */
1058
1059
1060
1061
1062
1063
1064
  clock_base = g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
  clock_base_time =
      g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
  clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
  extrtptime =
      g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
  buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));