gstrtpbin.c 98.6 KB
Newer Older
1
/* GStreamer
2
 * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

/**
21 22
 * SECTION:element-gstrtpbin
 * @see_also: gstrtpjitterbuffer, gstrtpsession, gstrtpptdemux, gstrtpssrcdemux
23
 *
24
 * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
25 26
 * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
 * RTP sessions that will be synchronized together using RTCP SR packets.
Wim Taymans's avatar
Wim Taymans committed
27
 *
28 29
 * #GstRtpBin is configured with a number of request pads that define the
 * functionality that is activated, similar to the #GstRtpSession element.
Wim Taymans's avatar
Wim Taymans committed
30
 *
31
 * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
Wim Taymans's avatar
Wim Taymans committed
32
 * number must be specified in the pad name.
33
 * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
34
 * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
35
 * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36
 * the packets are released from the jitterbuffer, they will be forwarded to a
Wim Taymans's avatar
Wim Taymans committed
37
 * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
38
 * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
39
 * gstrtpbin with the session number, SSRC and payload type respectively as the pad
Wim Taymans's avatar
Wim Taymans committed
40
 * name.
Wim Taymans's avatar
Wim Taymans committed
41
 *
42
 * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
Wim Taymans's avatar
Wim Taymans committed
43
 * session number must be specified in the pad name.
Wim Taymans's avatar
Wim Taymans committed
44
 *
Wim Taymans's avatar
Wim Taymans committed
45
 * If you want the session manager to generate and send RTCP packets, request
46
 * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
Wim Taymans's avatar
Wim Taymans committed
47 48
 * on this pad contain SR/RR RTCP reports that should be sent to all participants
 * in the session.
Wim Taymans's avatar
Wim Taymans committed
49
 *
50 51
 * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
 * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
52
 * the pad from the lowest available session will be returned. The session manager will modify the
Wim Taymans's avatar
Wim Taymans committed
53
 * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
54
 * send_rtp_src_\%u pad after updating its internal state.
Wim Taymans's avatar
Wim Taymans committed
55
 *
Wim Taymans's avatar
Wim Taymans committed
56
 * The session manager needs the clock-rate of the payload types it is handling
57 58
 * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
 * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
Wim Taymans's avatar
Wim Taymans committed
59
 * signal.
Wim Taymans's avatar
Wim Taymans committed
60
 *
61 62 63 64 65
 * Access to the internal statistics of gstrtpbin is provided with the
 * get-internal-session property. This action signal gives access to the
 * RTPSession object which further provides action signals to retrieve the
 * internal source and other sources.
 *
66
 * <refsect2>
67
 * <title>Example pipelines</title>
68
 * |[
Wim Taymans's avatar
Wim Taymans committed
69
 * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
70
 *     gstrtpbin ! rtptheoradepay ! theoradec ! xvimagesink
71 72
 * ]| Receive RTP data from port 5000 and send to the session 0 in gstrtpbin.
 * |[
73 74
 * gst-launch gstrtpbin name=rtpbin \
 *         v4l2src ! ffmpegcolorspace ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
75 76 77 78 79 80
 *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
 *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
 *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
 *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
 *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
 *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
81
 *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
82
 * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
83 84 85 86 87 88
 * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
 * and the audio is sent to session 1. Video packets are sent on UDP port 5000
 * and audio packets on port 5002. The video RTCP packets for session 0 are sent
 * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
 * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
 * is received on port 5007. Since RTCP packets from the sender should be sent
Wim Taymans's avatar
Wim Taymans committed
89
 * as soon as possible and do not participate in preroll, sync=false and
90
 * async=false is configured on udpsink
91 92
 * |[
 * gst-launch -v gstrtpbin name=rtpbin                                          \
93
 *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
94 95 96 97 98 99 100 101 102
 *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
 *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
 *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
 *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
 *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
 *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
 *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
 *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
 *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
103
 * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
104 105 106 107 108 109 110 111
 * decode and display the video.
 * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
 * decode and play the audio.
 * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
 * session 1 on port 5003. These packets will be used for session management and
 * synchronisation.
 * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
 * on port 5007.
112 113
 * </refsect2>
 *
114
 * Last reviewed on 2007-08-30 (0.10.6)
115 116 117 118 119
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
120
#include <stdio.h>
121 122
#include <string.h>

123 124 125
#include <gst/rtp/gstrtpbuffer.h>
#include <gst/rtp/gstrtcpbuffer.h>

126
#include "gstrtpbin-marshal.h"
127
#include "gstrtpbin.h"
128
#include "rtpsession.h"
129
#include "gstrtpsession.h"
130
#include "gstrtpjitterbuffer.h"
131

132 133 134
GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
#define GST_CAT_DEFAULT gst_rtp_bin_debug

135 136
/* sink pads */
static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
137
GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
138 139 140 141 142 143
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("application/x-rtp")
    );

static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
144
GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
145 146 147 148 149 150
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("application/x-rtcp")
    );

static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
151
GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
152 153 154 155 156 157 158
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("application/x-rtp")
    );

/* src pads */
static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
159
GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
160 161 162 163 164
    GST_PAD_SRC,
    GST_PAD_SOMETIMES,
    GST_STATIC_CAPS ("application/x-rtp")
    );

165
static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
166
GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
167 168 169 170 171 172
    GST_PAD_SRC,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("application/x-rtcp")
    );

static GstStaticPadTemplate rtpbin_send_rtp_src_template =
173
GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
174 175 176 177 178 179
    GST_PAD_SRC,
    GST_PAD_SOMETIMES,
    GST_STATIC_CAPS ("application/x-rtp")
    );

#define GST_RTP_BIN_GET_PRIVATE(obj)  \
180
   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
181

182 183 184
#define GST_RTP_BIN_LOCK(bin)   g_mutex_lock ((bin)->priv->bin_lock)
#define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock ((bin)->priv->bin_lock)

185 186 187 188
/* lock to protect dynamic callbacks, like pad-added and new ssrc. */
#define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock ((bin)->priv->dyn_lock)
#define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock ((bin)->priv->dyn_lock)

189 190 191 192 193
/* lock for shutdown */
#define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
G_STMT_START {                                   \
  if (g_atomic_int_get (&bin->priv->shutdown))   \
    goto label;                                  \
194
  GST_RTP_BIN_DYN_LOCK (bin);                    \
195
  if (g_atomic_int_get (&bin->priv->shutdown)) { \
196
    GST_RTP_BIN_DYN_UNLOCK (bin);                \
197 198 199 200 201 202
    goto label;                                  \
  }                                              \
} G_STMT_END

/* unlock for shutdown */
#define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
203
  GST_RTP_BIN_DYN_UNLOCK (bin);                  \
204

205
struct _GstRtpBinPrivate
206
{
207
  GMutex *bin_lock;
208

209 210 211
  /* lock protecting dynamic adding/removing */
  GMutex *dyn_lock;

212 213
  /* if we are shutting down or not */
  gint shutdown;
214 215

  gboolean autoremove;
216 217 218

  /* UNIX (ntp) time of last SR sync used */
  guint64 last_unix;
219 220 221 222 223
};

/* signals and args */
enum
{
224
  SIGNAL_REQUEST_PT_MAP,
225
  SIGNAL_PAYLOAD_TYPE_CHANGE,
226
  SIGNAL_CLEAR_PT_MAP,
227
  SIGNAL_RESET_SYNC,
228
  SIGNAL_GET_INTERNAL_SESSION,
229 230 231 232

  SIGNAL_ON_NEW_SSRC,
  SIGNAL_ON_SSRC_COLLISION,
  SIGNAL_ON_SSRC_VALIDATED,
233
  SIGNAL_ON_SSRC_ACTIVE,
234
  SIGNAL_ON_SSRC_SDES,
235 236 237
  SIGNAL_ON_BYE_SSRC,
  SIGNAL_ON_BYE_TIMEOUT,
  SIGNAL_ON_TIMEOUT,
238
  SIGNAL_ON_SENDER_TIMEOUT,
Wim Taymans's avatar
Wim Taymans committed
239
  SIGNAL_ON_NPT_STOP,
240 241 242
  LAST_SIGNAL
};

243
#define DEFAULT_LATENCY_MS           200
Wim Taymans's avatar
Wim Taymans committed
244
#define DEFAULT_SDES                 NULL
245
#define DEFAULT_DO_LOST              FALSE
Marc Leeman's avatar
Marc Leeman committed
246
#define DEFAULT_IGNORE_PT            FALSE
Wim Taymans's avatar
Wim Taymans committed
247
#define DEFAULT_NTP_SYNC             FALSE
248
#define DEFAULT_AUTOREMOVE           FALSE
249
#define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
250
#define DEFAULT_USE_PIPELINE_CLOCK   FALSE
251
#define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
252
#define DEFAULT_RTCP_SYNC_INTERVAL   0
253

254 255
enum
{
256
  PROP_0,
257
  PROP_LATENCY,
Wim Taymans's avatar
Wim Taymans committed
258
  PROP_SDES,
259
  PROP_DO_LOST,
Marc Leeman's avatar
Marc Leeman committed
260
  PROP_IGNORE_PT,
Wim Taymans's avatar
Wim Taymans committed
261
  PROP_NTP_SYNC,
262
  PROP_RTCP_SYNC,
263
  PROP_RTCP_SYNC_INTERVAL,
264
  PROP_AUTOREMOVE,
265
  PROP_BUFFER_MODE,
266
  PROP_USE_PIPELINE_CLOCK,
267
  PROP_LAST
268 269
};

270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
enum
{
  GST_RTP_BIN_RTCP_SYNC_ALWAYS,
  GST_RTP_BIN_RTCP_SYNC_INITIAL,
  GST_RTP_BIN_RTCP_SYNC_RTP
};

#define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
static GType
gst_rtp_bin_rtcp_sync_get_type (void)
{
  static GType rtcp_sync_type = 0;
  static const GEnumValue rtcp_sync_types[] = {
    {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
    {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
    {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
    {0, NULL, NULL},
  };

  if (!rtcp_sync_type) {
    rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
  }
  return rtcp_sync_type;
}

295
/* helper objects */
296 297 298
typedef struct _GstRtpBinSession GstRtpBinSession;
typedef struct _GstRtpBinStream GstRtpBinStream;
typedef struct _GstRtpBinClient GstRtpBinClient;
299

300 301
static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };

302
static GstCaps *pt_map_requested (GstElement * element, guint pt,
303
    GstRtpBinSession * session);
304 305
static void payload_type_change (GstElement * element, guint pt,
    GstRtpBinSession * session);
306
static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
307 308
static void free_stream (GstRtpBinStream * stream);

309 310 311 312
/* Manages the RTP stream for one SSRC.
 *
 * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
 * If we see an SDES RTCP packet that links multiple SSRCs together based on a
313
 * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
314 315
 * together (see below).
 */
316
struct _GstRtpBinStream
317 318 319
{
  /* the SSRC of this stream */
  guint32 ssrc;
320

321
  /* parent bin */
322
  GstRtpBin *bin;
323

324
  /* the session this SSRC belongs to */
325
  GstRtpBinSession *session;
326

327 328
  /* the jitterbuffer of the SSRC */
  GstElement *buffer;
329 330 331
  gulong buffer_handlesync_sig;
  gulong buffer_ptreq_sig;
  gulong buffer_ntpstop_sig;
Wim Taymans's avatar
Wim Taymans committed
332
  gint percent;
333

334 335 336
  /* the PT demuxer of the SSRC */
  GstElement *demux;
  gulong demux_newpad_sig;
337
  gulong demux_padremoved_sig;
338
  gulong demux_ptreq_sig;
339
  gulong demux_ptchange_sig;
340

Wim Taymans's avatar
Wim Taymans committed
341
  /* if we have calculated a valid rt_delta for this stream */
342 343
  gboolean have_sync;
  /* mapping to local RTP and NTP time */
Wim Taymans's avatar
Wim Taymans committed
344
  gint64 rt_delta;
345 346 347
  gint64 rtp_delta;
  /* base rtptime in gst time */
  gint64 clock_base;
348 349
};

350 351 352
#define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock ((sess)->lock)
#define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock ((sess)->lock)

353 354 355 356 357 358
/* Manages the receiving end of the packets.
 *
 * There is one such structure for each RTP session (audio/video/...).
 * We get the RTP/RTCP packets and stuff them into the session manager. From
 * there they are pushed into an SSRC demuxer that splits the stream based on
 * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
359
 * the GstRtpBinStream above).
360
 */
361
struct _GstRtpBinSession
362 363 364
{
  /* session id */
  gint id;
365
  /* the parent bin */
366
  GstRtpBin *bin;
367 368 369
  /* the session element */
  GstElement *session;
  /* the SSRC demuxer */
370 371
  GstElement *demux;
  gulong demux_newpad_sig;
372
  gulong demux_padremoved_sig;
373

374 375
  GMutex *lock;

376
  /* list of GstRtpBinStream */
377
  GSList *streams;
378

379 380 381
  /* mapping of payload type to caps */
  GHashTable *ptmap;

382 383
  /* the pads of the session */
  GstPad *recv_rtp_sink;
384
  GstPad *recv_rtp_sink_ghost;
385
  GstPad *recv_rtp_src;
386
  GstPad *recv_rtcp_sink;
387
  GstPad *recv_rtcp_sink_ghost;
388
  GstPad *sync_src;
389
  GstPad *send_rtp_sink;
390
  GstPad *send_rtp_sink_ghost;
391
  GstPad *send_rtp_src;
392
  GstPad *send_rtp_src_ghost;
393
  GstPad *send_rtcp_src;
394
  GstPad *send_rtcp_src_ghost;
395
};
396

397 398 399 400 401 402 403 404 405 406 407 408 409 410
/* Manages the RTP streams that come from one client and should therefore be
 * synchronized.
 */
struct _GstRtpBinClient
{
  /* the common CNAME for the streams */
  gchar *cname;
  guint cname_len;

  /* the streams */
  guint nstreams;
  GSList *streams;
};

411
/* find a session with the given id. Must be called with RTP_BIN_LOCK */
412 413
static GstRtpBinSession *
find_session_by_id (GstRtpBin * rtpbin, gint id)
414
{
415
  GSList *walk;
416

417
  for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
418
    GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
419 420 421 422 423 424 425

    if (sess->id == id)
      return sess;
  }
  return NULL;
}

426 427 428 429 430 431 432 433 434
/* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
static GstRtpBinSession *
find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
{
  GSList *walk;

  for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
    GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;

435 436 437 438
    if ((sess->recv_rtp_sink_ghost == pad) ||
        (sess->recv_rtcp_sink_ghost == pad) ||
        (sess->send_rtp_sink_ghost == pad)
        || (sess->send_rtcp_src_ghost == pad))
439 440 441 442 443
      return sess;
  }
  return NULL;
}

444
static void
445
on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
446 447 448 449 450 451
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
      sess->id, ssrc);
}

static void
452
on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
453 454 455 456 457 458
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
      sess->id, ssrc);
}

static void
459
on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
460 461 462 463 464
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
      sess->id, ssrc);
}

465 466 467 468 469 470 471
static void
on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
      sess->id, ssrc);
}

472 473 474 475 476 477 478
static void
on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
      sess->id, ssrc);
}

479
static void
480
on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
481 482 483 484 485 486
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
      sess->id, ssrc);
}

static void
487
on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
488 489 490
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
      sess->id, ssrc);
491 492 493

  if (sess->bin->priv->autoremove)
    g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
494 495 496
}

static void
497
on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
498 499 500
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
      sess->id, ssrc);
501 502 503

  if (sess->bin->priv->autoremove)
    g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
504 505
}

506 507 508 509 510 511 512
static void
on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
      sess->id, ssrc);
}

Wim Taymans's avatar
Wim Taymans committed
513 514 515 516 517 518 519
static void
on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
{
  g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
      stream->session->id, stream->ssrc);
}

520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
/* must be called with the SESSION lock */
static GstRtpBinStream *
find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
{
  GSList *walk;

  for (walk = session->streams; walk; walk = g_slist_next (walk)) {
    GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;

    if (stream->ssrc == ssrc)
      return stream;
  }
  return NULL;
}

static void
536
ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
537 538 539 540 541
    GstRtpBinSession * session)
{
  GstRtpBinStream *stream = NULL;

  GST_RTP_SESSION_LOCK (session);
542
  if ((stream = find_stream_by_ssrc (session, ssrc)))
543 544
    session->streams = g_slist_remove (session->streams, stream);
  GST_RTP_SESSION_UNLOCK (session);
545 546 547

  if (stream)
    free_stream (stream);
548 549
}

550
/* create a session with the given id.  Must be called with RTP_BIN_LOCK */
551 552
static GstRtpBinSession *
create_session (GstRtpBin * rtpbin, gint id)
553
{
554
  GstRtpBinSession *sess;
555
  GstElement *session, *demux;
556
  GstState target;
557

558
  if (!(session = gst_element_factory_make ("rtpsession", NULL)))
559 560
    goto no_session;

561
  if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
562 563
    goto no_demux;

564
  sess = g_new0 (GstRtpBinSession, 1);
565
  sess->lock = g_mutex_new ();
566
  sess->id = id;
567
  sess->bin = rtpbin;
568
  sess->session = session;
569
  sess->demux = demux;
570 571
  sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
      (GDestroyNotify) gst_caps_unref);
572 573
  rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);

574 575
  /* configure SDES items */
  GST_OBJECT_LOCK (rtpbin);
576 577
  g_object_set (session, "sdes", rtpbin->sdes, "use-pipeline-clock",
      rtpbin->use_pipeline_clock, NULL);
578
  GST_OBJECT_UNLOCK (rtpbin);
579

580 581 582 583
  /* provide clock_rate to the session manager when needed */
  g_signal_connect (session, "request-pt-map",
      (GCallback) pt_map_requested, sess);

584 585 586 587 588 589
  g_signal_connect (sess->session, "on-new-ssrc",
      (GCallback) on_new_ssrc, sess);
  g_signal_connect (sess->session, "on-ssrc-collision",
      (GCallback) on_ssrc_collision, sess);
  g_signal_connect (sess->session, "on-ssrc-validated",
      (GCallback) on_ssrc_validated, sess);
590 591
  g_signal_connect (sess->session, "on-ssrc-active",
      (GCallback) on_ssrc_active, sess);
592 593
  g_signal_connect (sess->session, "on-ssrc-sdes",
      (GCallback) on_ssrc_sdes, sess);
594 595 596 597 598
  g_signal_connect (sess->session, "on-bye-ssrc",
      (GCallback) on_bye_ssrc, sess);
  g_signal_connect (sess->session, "on-bye-timeout",
      (GCallback) on_bye_timeout, sess);
  g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
599 600
  g_signal_connect (sess->session, "on-sender-timeout",
      (GCallback) on_sender_timeout, sess);
601

602
  gst_bin_add (GST_BIN_CAST (rtpbin), session);
603
  gst_bin_add (GST_BIN_CAST (rtpbin), demux);
604 605 606 607 608 609 610 611

  GST_OBJECT_LOCK (rtpbin);
  target = GST_STATE_TARGET (rtpbin);
  GST_OBJECT_UNLOCK (rtpbin);

  /* change state only to what's needed */
  gst_element_set_state (demux, target);
  gst_element_set_state (session, target);
612 613 614 615 616 617

  return sess;

  /* ERRORS */
no_session:
  {
618
    g_warning ("rtpbin: could not create gstrtpsession element");
619 620
    return NULL;
  }
621 622
no_demux:
  {
623
    gst_object_unref (session);
624
    g_warning ("rtpbin: could not create gstrtpssrcdemux element");
625 626 627 628
    return NULL;
  }
}

629
static void
630
free_session (GstRtpBinSession * sess, GstRtpBin * bin)
631
{
632 633
  GSList *client_walk;

634 635
  GST_DEBUG_OBJECT (bin, "freeing session %p", sess);

636 637 638
  gst_element_set_locked_state (sess->demux, TRUE);
  gst_element_set_locked_state (sess->session, TRUE);

639
  gst_element_set_state (sess->demux, GST_STATE_NULL);
640
  gst_element_set_state (sess->session, GST_STATE_NULL);
641

642
  if (sess->recv_rtp_sink != NULL) {
643
    gst_element_release_request_pad (sess->session, sess->recv_rtp_sink);
644 645
    gst_object_unref (sess->recv_rtp_sink);
  }
646 647
  if (sess->recv_rtp_src != NULL)
    gst_object_unref (sess->recv_rtp_src);
648
  if (sess->recv_rtcp_sink != NULL) {
649
    gst_element_release_request_pad (sess->session, sess->recv_rtcp_sink);
650 651
    gst_object_unref (sess->recv_rtcp_sink);
  }
652 653
  if (sess->sync_src != NULL)
    gst_object_unref (sess->sync_src);
654
  if (sess->send_rtp_sink != NULL) {
655
    gst_element_release_request_pad (sess->session, sess->send_rtp_sink);
656 657
    gst_object_unref (sess->send_rtp_sink);
  }
658 659
  if (sess->send_rtp_src != NULL)
    gst_object_unref (sess->send_rtp_src);
660
  if (sess->send_rtcp_src != NULL) {
661
    gst_element_release_request_pad (sess->session, sess->send_rtcp_src);
662 663
    gst_object_unref (sess->send_rtcp_src);
  }
664

665 666 667
  gst_bin_remove (GST_BIN_CAST (bin), sess->session);
  gst_bin_remove (GST_BIN_CAST (bin), sess->demux);

668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700
  /* remove any references in bin->clients to the streams in sess->streams */
  client_walk = bin->clients;
  while (client_walk) {
    GSList *client_node = client_walk;
    GstRtpBinClient *client = (GstRtpBinClient *) client_node->data;
    GSList *stream_walk = client->streams;

    while (stream_walk) {
      GSList *stream_node = stream_walk;
      GstRtpBinStream *stream = (GstRtpBinStream *) stream_node->data;
      GSList *inner_walk;

      stream_walk = g_slist_next (stream_walk);

      for (inner_walk = sess->streams; inner_walk;
          inner_walk = g_slist_next (inner_walk)) {
        if ((GstRtpBinStream *) inner_walk->data == stream) {
          client->streams = g_slist_delete_link (client->streams, stream_node);
          --client->nstreams;
          break;
        }
      }
    }
    client_walk = g_slist_next (client_walk);

    g_assert ((client->streams && client->nstreams > 0) || (!client->streams
            && client->streams == 0));
    if (client->nstreams == 0) {
      free_client (client, bin);
      bin->clients = g_slist_delete_link (bin->clients, client_node);
    }
  }

701 702 703 704 705 706 707 708 709
  g_slist_foreach (sess->streams, (GFunc) free_stream, NULL);
  g_slist_free (sess->streams);

  g_mutex_free (sess->lock);
  g_hash_table_destroy (sess->ptmap);

  g_free (sess);
}

710 711
/* get the payload type caps for the specific payload @pt in @session */
static GstCaps *
712
get_pt_map (GstRtpBinSession * session, guint pt)
713 714
{
  GstCaps *caps = NULL;
715
  GstRtpBin *bin;
716 717
  GValue ret = { 0 };
  GValue args[3] = { {0}, {0}, {0} };
718

719
  GST_DEBUG ("searching pt %d in cache", pt);
720

721 722
  GST_RTP_SESSION_LOCK (session);

723 724
  /* first look in the cache */
  caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
725 726
  if (caps) {
    gst_caps_ref (caps);
727
    goto done;
728
  }
729 730 731 732 733 734

  bin = session->bin;

  GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);

  /* not in cache, send signal to request caps */
735 736 737 738 739 740 741 742 743 744
  g_value_init (&args[0], GST_TYPE_ELEMENT);
  g_value_set_object (&args[0], bin);
  g_value_init (&args[1], G_TYPE_UINT);
  g_value_set_uint (&args[1], session->id);
  g_value_init (&args[2], G_TYPE_UINT);
  g_value_set_uint (&args[2], pt);

  g_value_init (&ret, GST_TYPE_CAPS);
  g_value_set_boxed (&ret, NULL);

745 746
  GST_RTP_SESSION_UNLOCK (session);

747 748
  g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);

749 750
  GST_RTP_SESSION_LOCK (session);

751 752 753
  g_value_unset (&args[0]);
  g_value_unset (&args[1]);
  g_value_unset (&args[2]);
754 755 756 757 758 759 760 761 762

  /* look in the cache again because we let the lock go */
  caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
  if (caps) {
    gst_caps_ref (caps);
    g_value_unset (&ret);
    goto done;
  }

763 764
  caps = (GstCaps *) g_value_dup_boxed (&ret);
  g_value_unset (&ret);
765 766 767
  if (!caps)
    goto no_caps;

768 769
  GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);

770 771 772
  /* store in cache, take additional ref */
  g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
      gst_caps_ref (caps));
773 774

done:
775 776
  GST_RTP_SESSION_UNLOCK (session);

777 778 779 780 781
  return caps;

  /* ERRORS */
no_caps:
  {
782
    GST_RTP_SESSION_UNLOCK (session);
783 784 785 786 787
    GST_DEBUG ("no pt map could be obtained");
    return NULL;
  }
}

788 789 790 791 792 793
static gboolean
return_true (gpointer key, gpointer value, gpointer user_data)
{
  return TRUE;
}

794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811
static void
gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
{
  GSList *clients, *streams;

  GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");

  GST_RTP_BIN_LOCK (rtpbin);
  for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
    GstRtpBinClient *client = (GstRtpBinClient *) clients->data;

    /* reset sync on all streams for this client */
    for (streams = client->streams; streams; streams = g_slist_next (streams)) {
      GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;

      /* make use require a new SR packet for this stream before we attempt new
       * lip-sync */
      stream->have_sync = FALSE;
Wim Taymans's avatar
Wim Taymans committed
812
      stream->rt_delta = 0;
813 814
      stream->rtp_delta = 0;
      stream->clock_base = -100 * GST_SECOND;
815 816 817 818 819
    }
  }
  GST_RTP_BIN_UNLOCK (rtpbin);
}

820
static void
821
gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
822
{
823
  GSList *sessions, *streams;
824 825

  GST_RTP_BIN_LOCK (bin);
826
  GST_DEBUG_OBJECT (bin, "clearing pt map");
827 828 829 830 831
  for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
    GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;

    GST_DEBUG_OBJECT (bin, "clearing session %p", session);
    g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
832 833

    GST_RTP_SESSION_LOCK (session);
834
    g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
835 836 837 838 839 840

    for (streams = session->streams; streams; streams = g_slist_next (streams)) {
      GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;

      GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
      g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
Marc Leeman's avatar
Marc Leeman committed
841 842
      if (stream->demux)
        g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
843
    }
844 845 846
    GST_RTP_SESSION_UNLOCK (session);
  }
  GST_RTP_BIN_UNLOCK (bin);
847 848 849

  /* reset sync too */
  gst_rtp_bin_reset_sync (bin);
850 851
}

852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870
static RTPSession *
gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
{
  RTPSession *internal_session = NULL;
  GstRtpBinSession *session;

  GST_RTP_BIN_LOCK (bin);
  GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d",
      session_id);
  session = find_session_by_id (bin, (gint) session_id);
  if (session) {
    g_object_get (session->session, "internal-session", &internal_session,
        NULL);
  }
  GST_RTP_BIN_UNLOCK (bin);

  return internal_session;
}

871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891
static void
gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
    const gchar * name, const GValue * value)
{
  GSList *sessions, *streams;

  GST_RTP_BIN_LOCK (bin);
  for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
    GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;

    GST_RTP_SESSION_LOCK (session);
    for (streams = session->streams; streams; streams = g_slist_next (streams)) {
      GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;

      g_object_set_property (G_OBJECT (stream->buffer), name, value);
    }
    GST_RTP_SESSION_UNLOCK (session);
  }
  GST_RTP_BIN_UNLOCK (bin);
}

892
/* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
893
static GstRtpBinClient *
894
get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924
{
  GstRtpBinClient *result = NULL;
  GSList *walk;

  for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
    GstRtpBinClient *client = (GstRtpBinClient *) walk->data;

    if (len != client->cname_len)
      continue;

    if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
      GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
          client->cname);
      result = client;
      break;
    }
  }

  /* nothing found, create one */
  if (result == NULL) {
    result = g_new0 (GstRtpBinClient, 1);
    result->cname = g_strndup ((gchar *) data, len);
    result->cname_len = len;
    bin->clients = g_slist_prepend (bin->clients, result);
    GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
        result->cname);
  }
  return result;
}

925
static void
926
free_client (GstRtpBinClient * client, GstRtpBin * bin)
927
{
928
  GST_DEBUG_OBJECT (bin, "freeing client %p", client);
929
  g_slist_free (client->streams);
930 931 932 933
  g_free (client->cname);
  g_free (client);
}

Wim Taymans's avatar
Wim Taymans committed
934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012
static void
get_current_times (GstRtpBin * bin, GstClockTime * running_time,
    guint64 * ntpnstime)
{
  guint64 ntpns;
  GstClock *clock;
  GstClockTime base_time, rt, clock_time;

  GST_OBJECT_LOCK (bin);
  if ((clock = GST_ELEMENT_CLOCK (bin))) {
    base_time = GST_ELEMENT_CAST (bin)->base_time;
    gst_object_ref (clock);
    GST_OBJECT_UNLOCK (bin);

    clock_time = gst_clock_get_time (clock);

    if (bin->use_pipeline_clock) {
      ntpns = clock_time;
    } else {
      GTimeVal current;

      /* get current NTP time */
      g_get_current_time (&current);
      ntpns = GST_TIMEVAL_TO_TIME (current);
    }

    /* add constant to convert from 1970 based time to 1900 based time */
    ntpns += (2208988800LL * GST_SECOND);

    /* get current clock time and convert to running time */
    rt = clock_time - base_time;

    gst_object_unref (clock);
  } else {
    GST_OBJECT_UNLOCK (bin);
    rt = -1;
    ntpns = -1;
  }
  if (running_time)
    *running_time = rt;
  if (ntpnstime)
    *ntpnstime = ntpns;
}

static void
stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
    gint64 ts_offset)
{
  gint64 prev_ts_offset;

  g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);

  /* delta changed, see how much */
  if (prev_ts_offset != ts_offset) {
    gint64 diff;

    diff = prev_ts_offset - ts_offset;

    GST_DEBUG_OBJECT (bin,
        "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
        ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);

    /* only change diff when it changed more than 4 milliseconds. This
     * compensates for rounding errors in NTP to RTP timestamp
     * conversions */
    if (ABS (diff) > 4 * GST_MSECOND) {
      if (ABS (diff) < (3 * GST_SECOND)) {
        g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
      } else {
        GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
      }
    } else {
      GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
    }
  }
  GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
      stream->ssrc, ts_offset);
}

1013
/* associate a stream to the given CNAME. This will make sure all streams for
1014 1015
 * that CNAME are synchronized together.
 * Must be called with GST_RTP_BIN_LOCK */
1016 1017
static void
gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
Wim Taymans's avatar
Wim Taymans committed
1018
    guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1019 1020
    guint64 base_rtptime, guint64 base_time, guint clock_rate,
    gint64 rtp_clock_base)
1021 1022 1023 1024
{
  GstRtpBinClient *client;
  gboolean created;
  GSList *walk;
Wim Taymans's avatar
Wim Taymans committed
1025
  guint64 local_rt;
1026
  guint64 local_rtp;
Wim Taymans's avatar
Wim Taymans committed
1027 1028 1029 1030
  GstClockTime running_time;
  guint64 ntpnstime;
  gint64 ntpdiff, rtdiff;
  guint64 last_unix;
1031 1032

  /* first find or create the CNAME */
1033
  client = get_client (bin, len, data, &created);
1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054