gstmultihandlesink.c 74.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/* GStreamer
 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
 * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
 * Copyright (C) <2011> Collabora Ltd.
 *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
Tim-Philipp Müller's avatar
Tim-Philipp Müller committed
20 21
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
22 23 24
 */

/**
25
 * SECTION:element-multihandlesink
26 27 28
 * @see_also: tcpserversink
 *
 * This plugin writes incoming data to a set of file descriptors. The
29
 * file descriptors can be added to multihandlesink by emitting the #GstMultiHandleSink::add signal. 
30 31
 * For each descriptor added, the #GstMultiHandleSink::client-added signal will be called.
 *
32
 * A client can also be added with the #GstMultiHandleSink::add-full signal
33 34 35
 * that allows for more control over what and how much data a client 
 * initially receives.
 *
36
 * Clients can be removed from multihandlesink by emitting the #GstMultiHandleSink::remove signal. For
37
 * each descriptor removed, the #GstMultiHandleSink::client-removed signal will be called. The
38
 * #GstMultiHandleSink::client-removed signal can also be fired when multihandlesink decides that a
39 40
 * client is not active anymore or, depending on the value of the
 * #GstMultiHandleSink:recover-policy property, if the client is reading too slowly.
41 42
 * In all cases, multihandlesink will never close a file descriptor itself.
 * The user of multihandlesink is responsible for closing all file descriptors.
43
 * This can for example be done in response to the #GstMultiHandleSink::client-fd-removed signal.
44
 * Note that multihandlesink still has a reference to the file descriptor when the
45 46 47 48 49 50 51 52 53 54
 * #GstMultiHandleSink::client-removed signal is emitted, so that "get-stats" can be performed on
 * the descriptor; it is therefore not safe to close the file descriptor in
 * the #GstMultiHandleSink::client-removed signal handler, and you should use the 
 * #GstMultiHandleSink::client-fd-removed signal to safely close the fd.
 *
 * Multisocketsink internally keeps a queue of the incoming buffers and uses a
 * separate thread to send the buffers to the clients. This ensures that no
 * client write can block the pipeline and that clients can read with different
 * speeds.
 *
55
 * When adding a client to multihandlesink, the #GstMultiHandleSink:sync-method property will define
56 57 58
 * which buffer in the queued buffers will be sent first to the client. Clients 
 * can be sent the most recent buffer (which might not be decodable by the 
 * client if it is not a keyframe), the next keyframe received in 
59
 * multihandlesink (which can take some time depending on the keyframe rate), or the
60 61 62 63
 * last received keyframe (which will cause a simple burst-on-connect). 
 * Multisocketsink will always keep at least one keyframe in its internal buffers
 * when the sync-mode is set to latest-keyframe.
 *
64
 * There are additional values for the #GstMultiHandleSink:sync-method
65 66 67 68 69 70 71 72 73 74 75 76 77 78
 * property to allow finer control over burst-on-connect behaviour. By selecting
 * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
 * additionally requires that the burst begin with a keyframe, and 
 * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
 * prefer a minimum burst size even if it requires not starting with a keyframe.
 *
 * Multisocketsink can be instructed to keep at least a minimum amount of data
 * expressed in time or byte units in its internal queues with the 
 * #GstMultiHandleSink:time-min and #GstMultiHandleSink:bytes-min properties respectively.
 * These properties are useful if the application adds clients with the 
 * #GstMultiHandleSink::add-full signal to make sure that a burst connect can
 * actually be honored. 
 *
 * When streaming data, clients are allowed to read at a different rate than
79 80
 * the rate at which multihandlesink receives data. If the client is reading too
 * fast, no data will be send to the client until multihandlesink receives more
81
 * data. If the client, however, reads too slowly, data for that client will be 
82 83
 * queued up in multihandlesink. Two properties control the amount of data 
 * (buffers) that is queued in multihandlesink: #GstMultiHandleSink:buffers-max and 
84
 * #GstMultiHandleSink:buffers-soft-max. A client that falls behind by
85
 * #GstMultiHandleSink:buffers-max is removed from multihandlesink forcibly.
86 87 88 89 90 91 92 93 94
 *
 * A client with a lag of at least #GstMultiHandleSink:buffers-soft-max enters the recovery
 * procedure which is controlled with the #GstMultiHandleSink:recover-policy property.
 * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
 * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
 * positions the client to the soft limit in the buffer queue and
 * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
 * buffer queue.
 *
95
 * multihandlesink will by default synchronize on the clock before serving the 
96 97 98 99 100 101 102 103 104 105 106
 * buffers to the clients. This behaviour can be disabled by setting the sync 
 * property to FALSE. Multisocketsink will by default not do QoS and will never
 * drop late buffers.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gst/gst-i18n-plugin.h>

107
#include "gstmultihandlesink.h"
108

109 110 111 112
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif

113 114 115 116
#ifndef G_OS_WIN32
#include <netinet/in.h>
#endif

117 118
#include <string.h>

119 120 121 122 123 124 125
#define NOT_IMPLEMENTED 0

static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

126 127
GST_DEBUG_CATEGORY_STATIC (multihandlesink_debug);
#define GST_CAT_DEFAULT (multihandlesink_debug)
128 129 130 131

/* MultiHandleSink signals and args */
enum
{
132 133
  GST_MULTI_SINK_LAST_SIGNAL,

134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
  /* methods */
  SIGNAL_ADD,
  SIGNAL_ADD_BURST,
  SIGNAL_CLEAR,

  /* signals */
  SIGNAL_CLIENT_ADDED,
  SIGNAL_CLIENT_REMOVED,
  SIGNAL_CLIENT_SOCKET_REMOVED,

  LAST_SIGNAL
};


/* this is really arbitrarily chosen */
#define DEFAULT_BUFFERS_MAX             -1
#define DEFAULT_BUFFERS_SOFT_MAX        -1
#define DEFAULT_TIME_MIN                -1
#define DEFAULT_BYTES_MIN               -1
#define DEFAULT_BUFFERS_MIN             -1
154
#define DEFAULT_UNIT_FORMAT               GST_FORMAT_BUFFERS
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
#define DEFAULT_UNITS_MAX               -1
#define DEFAULT_UNITS_SOFT_MAX          -1
#define DEFAULT_RECOVER_POLICY          GST_RECOVER_POLICY_NONE
#define DEFAULT_TIMEOUT                 0
#define DEFAULT_SYNC_METHOD             GST_SYNC_METHOD_LATEST

#define DEFAULT_BURST_FORMAT            GST_FORMAT_UNDEFINED
#define DEFAULT_BURST_VALUE             0

#define DEFAULT_QOS_DSCP                -1

#define DEFAULT_RESEND_STREAMHEADER      TRUE

enum
{
  PROP_0,
  PROP_BUFFERS_QUEUED,
  PROP_BYTES_QUEUED,
  PROP_TIME_QUEUED,

175
  PROP_UNIT_FORMAT,
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
  PROP_UNITS_MAX,
  PROP_UNITS_SOFT_MAX,

  PROP_BUFFERS_MAX,
  PROP_BUFFERS_SOFT_MAX,

  PROP_TIME_MIN,
  PROP_BYTES_MIN,
  PROP_BUFFERS_MIN,

  PROP_RECOVER_POLICY,
  PROP_TIMEOUT,
  PROP_SYNC_METHOD,
  PROP_BYTES_TO_SERVE,
  PROP_BYTES_SERVED,

  PROP_BURST_FORMAT,
  PROP_BURST_VALUE,

  PROP_QOS_DSCP,

  PROP_RESEND_STREAMHEADER,

199
  PROP_NUM_HANDLES
200 201
};

202
GType
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
gst_multi_handle_sink_recover_policy_get_type (void)
{
  static GType recover_policy_type = 0;
  static const GEnumValue recover_policy[] = {
    {GST_RECOVER_POLICY_NONE,
        "Do not try to recover", "none"},
    {GST_RECOVER_POLICY_RESYNC_LATEST,
        "Resync client to latest buffer", "latest"},
    {GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT,
        "Resync client to soft limit", "soft-limit"},
    {GST_RECOVER_POLICY_RESYNC_KEYFRAME,
        "Resync client to most recent keyframe", "keyframe"},
    {0, NULL, NULL},
  };

  if (!recover_policy_type) {
    recover_policy_type =
        g_enum_register_static ("GstMultiHandleSinkRecoverPolicy",
        recover_policy);
  }
  return recover_policy_type;
}

226
GType
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
gst_multi_handle_sink_sync_method_get_type (void)
{
  static GType sync_method_type = 0;
  static const GEnumValue sync_method[] = {
    {GST_SYNC_METHOD_LATEST,
        "Serve starting from the latest buffer", "latest"},
    {GST_SYNC_METHOD_NEXT_KEYFRAME,
        "Serve starting from the next keyframe", "next-keyframe"},
    {GST_SYNC_METHOD_LATEST_KEYFRAME,
          "Serve everything since the latest keyframe (burst)",
        "latest-keyframe"},
    {GST_SYNC_METHOD_BURST, "Serve burst-value data to client", "burst"},
    {GST_SYNC_METHOD_BURST_KEYFRAME,
          "Serve burst-value data starting on a keyframe",
        "burst-keyframe"},
    {GST_SYNC_METHOD_BURST_WITH_KEYFRAME,
          "Serve burst-value data preferably starting on a keyframe",
        "burst-with-keyframe"},
    {0, NULL, NULL},
  };

  if (!sync_method_type) {
    sync_method_type =
        g_enum_register_static ("GstMultiHandleSinkSyncMethod", sync_method);
  }
  return sync_method_type;
}

255
GType
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
gst_multi_handle_sink_client_status_get_type (void)
{
  static GType client_status_type = 0;
  static const GEnumValue client_status[] = {
    {GST_CLIENT_STATUS_OK, "ok", "ok"},
    {GST_CLIENT_STATUS_CLOSED, "Closed", "closed"},
    {GST_CLIENT_STATUS_REMOVED, "Removed", "removed"},
    {GST_CLIENT_STATUS_SLOW, "Too slow", "slow"},
    {GST_CLIENT_STATUS_ERROR, "Error", "error"},
    {GST_CLIENT_STATUS_DUPLICATE, "Duplicate", "duplicate"},
    {GST_CLIENT_STATUS_FLUSHING, "Flushing", "flushing"},
    {0, NULL, NULL},
  };

  if (!client_status_type) {
    client_status_type =
        g_enum_register_static ("GstMultiHandleSinkClientStatus",
        client_status);
  }
  return client_status_type;
}

static void gst_multi_handle_sink_finalize (GObject * object);
279
static void gst_multi_handle_sink_clear (GstMultiHandleSink * mhsink);
280 281 282

static GstFlowReturn gst_multi_handle_sink_render (GstBaseSink * bsink,
    GstBuffer * buf);
283 284 285 286
static void gst_multi_handle_sink_queue_buffer (GstMultiHandleSink * mhsink,
    GstBuffer * buffer);
static gboolean gst_multi_handle_sink_client_queue_buffer (GstMultiHandleSink *
    mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer);
287 288 289 290 291 292 293 294 295 296 297 298 299
static GstStateChangeReturn gst_multi_handle_sink_change_state (GstElement *
    element, GstStateChange transition);

static void gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec);
static void gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec);

#define gst_multi_handle_sink_parent_class parent_class
G_DEFINE_TYPE (GstMultiHandleSink, gst_multi_handle_sink, GST_TYPE_BASE_SINK);

static guint gst_multi_handle_sink_signals[LAST_SIGNAL] = { 0 };

300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
static gint
find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction);
#define find_next_syncframe(s,i) 	find_syncframe(s,i,1)
#define find_prev_syncframe(s,i) 	find_syncframe(s,i,-1)
static gboolean is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer);
static gboolean gst_multi_handle_sink_stop (GstBaseSink * bsink);
static gboolean gst_multi_handle_sink_start (GstBaseSink * bsink);
static gint get_buffers_max (GstMultiHandleSink * sink, gint64 max);
static gint
gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink,
    GstMultiHandleClient * client);
static void gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink);
static gboolean
find_limits (GstMultiHandleSink * sink,
    gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
    gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max);


318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
static void
gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
{
  GObjectClass *gobject_class;
  GstElementClass *gstelement_class;
  GstBaseSinkClass *gstbasesink_class;

  gobject_class = (GObjectClass *) klass;
  gstelement_class = (GstElementClass *) klass;
  gstbasesink_class = (GstBaseSinkClass *) klass;

  gobject_class->set_property = gst_multi_handle_sink_set_property;
  gobject_class->get_property = gst_multi_handle_sink_get_property;
  gobject_class->finalize = gst_multi_handle_sink_finalize;

  g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
      g_param_spec_int ("buffers-max", "Buffers max",
          "max number of buffers to queue for a client (-1 = no limit)", -1,
          G_MAXINT, DEFAULT_BUFFERS_MAX,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
      g_param_spec_int ("buffers-soft-max", "Buffers soft max",
          "Recover client when going over this limit (-1 = no limit)", -1,
          G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (gobject_class, PROP_BYTES_MIN,
      g_param_spec_int ("bytes-min", "Bytes min",
          "min number of bytes to queue (-1 = as little as possible)", -1,
          G_MAXINT, DEFAULT_BYTES_MIN,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_TIME_MIN,
      g_param_spec_int64 ("time-min", "Time min",
          "min number of time to queue (-1 = as little as possible)", -1,
          G_MAXINT64, DEFAULT_TIME_MIN,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_BUFFERS_MIN,
      g_param_spec_int ("buffers-min", "Buffers min",
          "min number of buffers to queue (-1 = as few as possible)", -1,
          G_MAXINT, DEFAULT_BUFFERS_MIN,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

360 361
  g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
      g_param_spec_enum ("unit-format", "Units format",
362
          "The unit to measure the max/soft-max/queued properties",
363
          GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
      g_param_spec_int64 ("units-max", "Units max",
          "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
          DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
      g_param_spec_int64 ("units-soft-max", "Units soft max",
          "Recover client when going over this limit (-1 = no limit)", -1,
          G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED,
      g_param_spec_uint ("buffers-queued", "Buffers queued",
          "Number of buffers currently queued", 0, G_MAXUINT, 0,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
#if NOT_IMPLEMENTED
  g_object_class_install_property (gobject_class, PROP_BYTES_QUEUED,
      g_param_spec_uint ("bytes-queued", "Bytes queued",
          "Number of bytes currently queued", 0, G_MAXUINT, 0,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_TIME_QUEUED,
      g_param_spec_uint64 ("time-queued", "Time queued",
          "Number of time currently queued", 0, G_MAXUINT64, 0,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
#endif

  g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY,
      g_param_spec_enum ("recover-policy", "Recover Policy",
          "How to recover when client reaches the soft max",
          GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_TIMEOUT,
      g_param_spec_uint64 ("timeout", "Timeout",
          "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
          0, G_MAXUINT64, DEFAULT_TIMEOUT,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_SYNC_METHOD,
      g_param_spec_enum ("sync-method", "Sync Method",
          "How to sync new clients to the stream", GST_TYPE_SYNC_METHOD,
          DEFAULT_SYNC_METHOD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_BYTES_TO_SERVE,
      g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
          "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_BYTES_SERVED,
      g_param_spec_uint64 ("bytes-served", "Bytes served",
          "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
      g_param_spec_enum ("burst-format", "Burst format",
          "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
          GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
      g_param_spec_uint64 ("burst-value", "Burst value",
420
          "The amount of burst expressed in burst-format", 0, G_MAXUINT64,
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
          DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
      g_param_spec_int ("qos-dscp", "QoS diff srv code point",
          "Quality of Service, differentiated services code point (-1 default)",
          -1, 63, DEFAULT_QOS_DSCP,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * GstMultiHandleSink::resend-streamheader
   *
   * Resend the streamheaders to existing clients when they change.
   */
  g_object_class_install_property (gobject_class, PROP_RESEND_STREAMHEADER,
      g_param_spec_boolean ("resend-streamheader", "Resend streamheader",
          "Resend the streamheader if it changes in the caps",
          DEFAULT_RESEND_STREAMHEADER,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

440 441 442
  g_object_class_install_property (gobject_class, PROP_NUM_HANDLES,
      g_param_spec_uint ("num-handles", "Number of handles",
          "The current number of client handles",
443 444 445 446
          0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

  /**
   * GstMultiHandleSink::clear:
447
   * @gstmultihandlesink: the multihandlesink element to emit this signal on
448
   *
449
   * Remove all sockets from multihandlesink.  Since multihandlesink did not
450 451 452 453 454 455 456
   * open sockets itself, it does not explicitly close the sockets. The application
   * should do so by connecting to the client-socket-removed callback.
   */
  gst_multi_handle_sink_signals[SIGNAL_CLEAR] =
      g_signal_new ("clear", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
      G_STRUCT_OFFSET (GstMultiHandleSinkClass, clear), NULL, NULL,
457
      g_cclosure_marshal_generic, G_TYPE_NONE, 0);
458

459
  gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
460

461
  gst_element_class_set_static_metadata (gstelement_class,
462 463 464 465 466 467 468 469 470 471
      "Multi socket sink", "Sink/Network",
      "Send data to multiple sockets",
      "Thomas Vander Stichele <thomas at apestaart dot org>, "
      "Wim Taymans <wim@fluendo.com>, "
      "Sebastian Dröge <sebastian.droege@collabora.co.uk>");

  gstelement_class->change_state =
      GST_DEBUG_FUNCPTR (gst_multi_handle_sink_change_state);

  gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_render);
472 473
  klass->client_queue_buffer =
      GST_DEBUG_FUNCPTR (gst_multi_handle_sink_client_queue_buffer);
474

475
#if 0
476 477 478 479
  klass->add = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_add);
  klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_add_full);
  klass->remove = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_remove);
  klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_remove_flush);
480 481
#endif

482 483
  klass->clear = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_clear);

484
  GST_DEBUG_CATEGORY_INIT (multihandlesink_debug, "multihandlesink", 0,
485 486 487 488 489 490 491 492 493 494 495 496
      "Multi socket sink");
}

static void
gst_multi_handle_sink_init (GstMultiHandleSink * this)
{
  GST_OBJECT_FLAG_UNSET (this, GST_MULTI_HANDLE_SINK_OPEN);

  CLIENTS_LOCK_INIT (this);
  this->clients = NULL;

  this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
497
  this->unit_format = DEFAULT_UNIT_FORMAT;
498 499 500 501 502 503 504 505 506
  this->units_max = DEFAULT_UNITS_MAX;
  this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
  this->time_min = DEFAULT_TIME_MIN;
  this->bytes_min = DEFAULT_BYTES_MIN;
  this->buffers_min = DEFAULT_BUFFERS_MIN;
  this->recover_policy = DEFAULT_RECOVER_POLICY;

  this->timeout = DEFAULT_TIMEOUT;
  this->def_sync_method = DEFAULT_SYNC_METHOD;
507

508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
  this->def_burst_format = DEFAULT_BURST_FORMAT;
  this->def_burst_value = DEFAULT_BURST_VALUE;

  this->qos_dscp = DEFAULT_QOS_DSCP;

  this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER;
}

static void
gst_multi_handle_sink_finalize (GObject * object)
{
  GstMultiHandleSink *this;

  this = GST_MULTI_HANDLE_SINK (object);

  CLIENTS_LOCK_CLEAR (this);
  g_array_free (this->bufqueue, TRUE);
525
  g_hash_table_destroy (this->handle_hash);
526 527 528 529

  G_OBJECT_CLASS (parent_class)->finalize (object);
}

530 531 532
gint
gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink,
    GstMultiHandleClient * client)
533
{
534
#if !defined(IP_TOS) || !defined(HAVE_SYS_SOCKET_H)
535 536 537 538 539 540 541 542 543 544 545 546 547
  return 0;
#else
  gint tos;
  gint ret;
  int fd;
  union gst_sockaddr
  {
    struct sockaddr sa;
    struct sockaddr_in6 sa_in6;
    struct sockaddr_storage sa_stor;
  } sa;
  socklen_t slen = sizeof (sa);
  gint af;
548
  GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (sink);
549 550 551 552 553

  /* don't touch */
  if (sink->qos_dscp < 0)
    return 0;

554
  fd = mhsinkclass->client_get_fd (client);
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596

  if ((ret = getsockname (fd, &sa.sa, &slen)) < 0) {
    GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
    return ret;
  }

  af = sa.sa.sa_family;

  /* if this is an IPv4-mapped address then do IPv4 QoS */
  if (af == AF_INET6) {

    GST_DEBUG_OBJECT (sink, "check IP6 socket");
    if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
      GST_DEBUG_OBJECT (sink, "mapped to IPV4");
      af = AF_INET;
    }
  }

  /* extract and shift 6 bits of the DSCP */
  tos = (sink->qos_dscp & 0x3f) << 2;

  switch (af) {
    case AF_INET:
      ret = setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
      break;
    case AF_INET6:
#ifdef IPV6_TCLASS
      ret = setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos));
      break;
#endif
    default:
      ret = 0;
      GST_ERROR_OBJECT (sink, "unsupported AF");
      break;
  }
  if (ret)
    GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));

  return ret;
#endif
}

597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
void
gst_multi_handle_sink_client_init (GstMultiHandleClient * client,
    GstSyncMethod sync_method)
{
  GTimeVal now;

  client->status = GST_CLIENT_STATUS_OK;
  client->bufpos = -1;
  client->flushcount = -1;
  client->bufoffset = 0;
  client->sending = NULL;
  client->bytes_sent = 0;
  client->dropped_buffers = 0;
  client->avg_queue_size = 0;
  client->first_buffer_ts = GST_CLOCK_TIME_NONE;
  client->last_buffer_ts = GST_CLOCK_TIME_NONE;
  client->new_connection = TRUE;
  client->sync_method = sync_method;
  client->currently_removing = FALSE;

  /* update start time */
  g_get_current_time (&now);
  client->connect_time = GST_TIMEVAL_TO_TIME (now);
  client->disconnect_time = 0;
  /* set last activity time to connect time */
  client->last_activity_time = client->connect_time;
}
624

625
static void
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640
gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink)
{
  GList *clients;

  CLIENTS_LOCK (mhsink);
  for (clients = mhsink->clients; clients; clients = clients->next) {
    GstMultiHandleClient *client;

    client = clients->data;

    gst_multi_handle_sink_setup_dscp_client (mhsink, client);
  }
  CLIENTS_UNLOCK (mhsink);
}

641
void
642 643 644
gst_multi_handle_sink_add_full (GstMultiHandleSink * sink,
    GstMultiSinkHandle handle, GstSyncMethod sync_method, GstFormat min_format,
    guint64 min_value, GstFormat max_format, guint64 max_value)
645
{
646
  GstMultiHandleClient *mhclient;
647
  GList *clink;
648 649 650 651
  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
  gchar debug[30];
  GstMultiHandleSinkClass *mhsinkclass =
      GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
652

653 654 655 656 657 658
  if (!sink->running) {
    g_warning ("Element %s must be set to READY, PAUSED or PLAYING state "
        "before clients can be added", GST_OBJECT_NAME (sink));
    return;
  }

659 660
  mhsinkclass->handle_debug (handle, debug);
  GST_DEBUG_OBJECT (sink, "%s adding client, sync_method %d, "
661
      "min_format %d, min_value %" G_GUINT64_FORMAT
662
      ", max_format %d, max_value %" G_GUINT64_FORMAT, debug,
663 664 665 666 667 668 669 670 671 672
      sync_method, min_format, min_value, max_format, max_value);

  /* do limits check if we can */
  if (min_format == max_format) {
    if (max_value != -1 && min_value != -1 && max_value < min_value)
      goto wrong_limits;
  }

  CLIENTS_LOCK (sink);

673 674 675
  /* check the hash to find a duplicate handle */
  clink = g_hash_table_lookup (mhsink->handle_hash,
      mhsinkclass->handle_hash_key (handle));
676 677 678
  if (clink != NULL)
    goto duplicate;

679 680 681 682
  /* We do not take ownership of @handle in this function, but we can't take a
   * reference directly as we don't know the concrete type of the handle.
   * GstMultiHandleSink relies on the derived class to take a reference for us
   * in new_client: */
683
  mhclient = mhsinkclass->new_client (mhsink, handle, sync_method);
684

685 686 687 688 689 690 691 692 693 694 695
  /* we can add the handle now */
  clink = mhsink->clients = g_list_prepend (mhsink->clients, mhclient);
  g_hash_table_insert (mhsink->handle_hash,
      mhsinkclass->handle_hash_key (mhclient->handle), clink);
  mhsink->clients_cookie++;


  mhclient->burst_min_format = min_format;
  mhclient->burst_min_value = min_value;
  mhclient->burst_max_format = max_format;
  mhclient->burst_max_value = max_value;
696

697 698
  if (mhsinkclass->hash_changed)
    mhsinkclass->hash_changed (mhsink);
699 700 701

  CLIENTS_UNLOCK (sink);

702
  mhsinkclass->emit_client_added (mhsink, handle);
703 704 705 706 707 708 709

  return;

  /* errors */
wrong_limits:
  {
    GST_WARNING_OBJECT (sink,
710 711 712
        "%s wrong values min =%" G_GUINT64_FORMAT ", max=%"
        G_GUINT64_FORMAT ", unit %d specified when adding client",
        debug, min_value, max_value, min_format);
713 714 715 716 717
    return;
  }
duplicate:
  {
    CLIENTS_UNLOCK (sink);
718
    GST_WARNING_OBJECT (sink, "%s duplicate client found, refusing", debug);
719 720
    mhsinkclass->emit_client_removed (mhsink, handle,
        GST_CLIENT_STATUS_DUPLICATE);
721 722 723 724 725 726
    return;
  }
}

/* "add" signal implementation */
void
727
gst_multi_handle_sink_add (GstMultiHandleSink * sink, GstMultiSinkHandle handle)
728
{
729
  gst_multi_handle_sink_add_full (sink, handle, sink->def_sync_method,
730 731 732 733 734 735
      sink->def_burst_format, sink->def_burst_value, sink->def_burst_format,
      -1);
}

/* "remove" signal implementation */
void
736 737
gst_multi_handle_sink_remove (GstMultiHandleSink * sink,
    GstMultiSinkHandle handle)
738 739
{
  GList *clink;
740 741 742 743
  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
  GstMultiHandleSinkClass *mhsinkclass =
      GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
  gchar debug[30];
744

745 746 747
  mhsinkclass->handle_debug (handle, debug);

  GST_DEBUG_OBJECT (sink, "%s removing client", debug);
748 749

  CLIENTS_LOCK (sink);
750 751
  clink = g_hash_table_lookup (mhsink->handle_hash,
      mhsinkclass->handle_hash_key (handle));
752
  if (clink != NULL) {
753
    GstMultiHandleClient *mhclient = (GstMultiHandleClient *) clink->data;
754

755
    if (mhclient->status != GST_CLIENT_STATUS_OK) {
756
      GST_INFO_OBJECT (sink,
757 758
          "%s Client already disconnecting with status %d",
          debug, mhclient->status);
759 760 761
      goto done;
    }

762 763 764 765 766
    mhclient->status = GST_CLIENT_STATUS_REMOVED;
    gst_multi_handle_sink_remove_client_link (GST_MULTI_HANDLE_SINK (sink),
        clink);
    if (mhsinkclass->hash_changed)
      mhsinkclass->hash_changed (mhsink);
767
  } else {
768
    GST_WARNING_OBJECT (sink, "%s no client with this handle found!", debug);
769 770 771 772 773 774 775 776
  }

done:
  CLIENTS_UNLOCK (sink);
}

/* "remove-flush" signal implementation */
void
777 778
gst_multi_handle_sink_remove_flush (GstMultiHandleSink * sink,
    GstMultiSinkHandle handle)
779 780
{
  GList *clink;
781 782 783 784 785 786
  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
  GstMultiHandleSinkClass *mhsinkclass =
      GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
  gchar debug[30];

  mhsinkclass->handle_debug (handle, debug);
787

788
  GST_DEBUG_OBJECT (sink, "%s flushing client", debug);
789 790

  CLIENTS_LOCK (sink);
791 792
  clink = g_hash_table_lookup (mhsink->handle_hash,
      mhsinkclass->handle_hash_key (handle));
793
  if (clink != NULL) {
794
    GstMultiHandleClient *mhclient = (GstMultiHandleClient *) clink->data;
795

796
    if (mhclient->status != GST_CLIENT_STATUS_OK) {
797
      GST_INFO_OBJECT (sink,
798 799
          "%s Client already disconnecting with status %d",
          mhclient->debug, mhclient->status);
800 801 802 803 804 805
      goto done;
    }

    /* take the position of the client as the number of buffers left to flush.
     * If the client was at position -1, we flush 0 buffers, 0 == flush 1
     * buffer, etc... */
806
    mhclient->flushcount = mhclient->bufpos + 1;
807 808
    /* mark client as flushing. We can not remove the client right away because
     * it might have some buffers to flush in the ->sending queue. */
809
    mhclient->status = GST_CLIENT_STATUS_FLUSHING;
810
  } else {
811
    GST_WARNING_OBJECT (sink, "%s no client with this handle found!", debug);
812 813 814 815 816 817 818
  }
done:
  CLIENTS_UNLOCK (sink);
}

/* can be called both through the signal (i.e. from any thread) or when 
 * stopping, after the writing thread has shut down */
819 820
static void
gst_multi_handle_sink_clear (GstMultiHandleSink * mhsink)
821
{
822
  GList *clients, *next;
823
  guint32 cookie;
824 825
  GstMultiHandleSinkClass *mhsinkclass =
      GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
826

827
  GST_DEBUG_OBJECT (mhsink, "clearing all clients");
828

829
  CLIENTS_LOCK (mhsink);
830
restart:
831 832 833
  cookie = mhsink->clients_cookie;
  for (clients = mhsink->clients; clients; clients = next) {
    GstMultiHandleClient *mhclient;
834

835 836
    if (cookie != mhsink->clients_cookie) {
      GST_DEBUG_OBJECT (mhsink, "cookie changed while removing all clients");
837 838 839
      goto restart;
    }

840 841 842 843 844 845
    mhclient = (GstMultiHandleClient *) clients->data;
    next = g_list_next (clients);

    mhclient->status = GST_CLIENT_STATUS_REMOVED;
    /* the next call changes the list, which is why we iterate
     * with a temporary next pointer */
846
    gst_multi_handle_sink_remove_client_link (mhsink, clients);
847
  }
848 849
  if (mhsinkclass->hash_changed)
    mhsinkclass->hash_changed (mhsink);
850

851
  CLIENTS_UNLOCK (mhsink);
852 853
}

854

855 856 857
/* "get-stats" signal implementation
 */
GstStructure *
858 859
gst_multi_handle_sink_get_stats (GstMultiHandleSink * sink,
    GstMultiSinkHandle handle)
860
{
861
  GstMultiHandleClient *client;
862 863
  GstStructure *result = NULL;
  GList *clink;
864 865 866 867 868 869
  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
  GstMultiHandleSinkClass *mhsinkclass =
      GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
  gchar debug[30];

  mhsinkclass->handle_debug (handle, debug);
870 871

  CLIENTS_LOCK (sink);
872 873
  clink = g_hash_table_lookup (mhsink->handle_hash,
      mhsinkclass->handle_hash_key (handle));
874 875 876 877 878
  if (clink == NULL)
    goto noclient;

  client = clink->data;
  if (client != NULL) {
879
    GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
880 881
    guint64 interval;

882
    result = gst_structure_new_empty ("multihandlesink-stats");
883

884
    if (mhclient->disconnect_time == 0) {
885 886 887 888
      GTimeVal nowtv;

      g_get_current_time (&nowtv);

889
      interval = GST_TIMEVAL_TO_TIME (nowtv) - mhclient->connect_time;
890
    } else {
891
      interval = mhclient->disconnect_time - mhclient->connect_time;
892 893 894
    }

    gst_structure_set (result,
895 896 897 898 899 900 901 902
        "bytes-sent", G_TYPE_UINT64, mhclient->bytes_sent,
        "connect-time", G_TYPE_UINT64, mhclient->connect_time,
        "disconnect-time", G_TYPE_UINT64, mhclient->disconnect_time,
        "connect-duration", G_TYPE_UINT64, interval,
        "last-activitity-time", G_TYPE_UINT64, mhclient->last_activity_time,
        "buffers-dropped", G_TYPE_UINT64, mhclient->dropped_buffers,
        "first-buffer-ts", G_TYPE_UINT64, mhclient->first_buffer_ts,
        "last-buffer-ts", G_TYPE_UINT64, mhclient->last_buffer_ts, NULL);
903 904 905 906 907 908 909
  }

noclient:
  CLIENTS_UNLOCK (sink);

  /* python doesn't like a NULL pointer yet */
  if (result == NULL) {
910
    GST_WARNING_OBJECT (sink, "%s no client with this found!", debug);
911
    result = gst_structure_new_empty ("multihandlesink-stats");
912 913 914 915 916
  }

  return result;
}

917
/* should be called with the clientslock held.
918 919 920 921
 * Note that we don't close the fd as we didn't open it in the first
 * place. An application should connect to the client-fd-removed signal and
 * close the fd itself.
 */
922
void
923 924 925 926
gst_multi_handle_sink_remove_client_link (GstMultiHandleSink * sink,
    GList * link)
{
  GTimeVal now;
927 928
  GstMultiHandleClient *mhclient = (GstMultiHandleClient *) link->data;
  GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (sink);
929

930 931 932
  if (mhclient->currently_removing) {
    GST_WARNING_OBJECT (sink, "%s client is already being removed",
        mhclient->debug);
933 934
    return;
  } else {
935
    mhclient->currently_removing = TRUE;
936 937 938
  }

  /* FIXME: if we keep track of ip we can log it here and signal */
939
  switch (mhclient->status) {
940
    case GST_CLIENT_STATUS_OK:
941 942
      GST_WARNING_OBJECT (sink, "%s removing client %p for no reason",
          mhclient->debug, mhclient);
943 944
      break;
    case GST_CLIENT_STATUS_CLOSED:
945 946
      GST_DEBUG_OBJECT (sink, "%s removing client %p because of close",
          mhclient->debug, mhclient);
947 948 949
      break;
    case GST_CLIENT_STATUS_REMOVED:
      GST_DEBUG_OBJECT (sink,
950 951
          "%s removing client %p because the app removed it", mhclient->debug,
          mhclient);
952 953 954
      break;
    case GST_CLIENT_STATUS_SLOW:
      GST_INFO_OBJECT (sink,
955 956
          "%s removing client %p because it was too slow", mhclient->debug,
          mhclient);
957 958 959
      break;
    case GST_CLIENT_STATUS_ERROR:
      GST_WARNING_OBJECT (sink,
960
          "%s removing client %p because of error", mhclient->debug, mhclient);
961 962 963 964
      break;
    case GST_CLIENT_STATUS_FLUSHING:
    default:
      GST_WARNING_OBJECT (sink,
965 966
          "%s removing client %p with invalid reason %d", mhclient->debug,
          mhclient, mhclient->status);
967 968 969
      break;
  }

970
  mhsinkclass->hash_removing (sink, mhclient);
971 972

  g_get_current_time (&now);
973
  mhclient->disconnect_time = GST_TIMEVAL_TO_TIME (now);
974 975

  /* free client buffers */
976 977 978
  g_slist_foreach (mhclient->sending, (GFunc) gst_mini_object_unref, NULL);
  g_slist_free (mhclient->sending);
  mhclient->sending = NULL;
979

980 981 982
  if (mhclient->caps)
    gst_caps_unref (mhclient->caps);
  mhclient->caps = NULL;
983 984 985 986 987

  /* unlock the mutex before signaling because the signal handler
   * might query some properties */
  CLIENTS_UNLOCK (sink);

988
  mhsinkclass->emit_client_removed (sink, mhclient->handle, mhclient->status);
989 990 991 992

  /* lock again before we remove the client completely */
  CLIENTS_LOCK (sink);

993
  /* handle cannot be reused in the above signal callback so we can safely
994
   * remove it from the hashtable here */
995 996
  if (!g_hash_table_remove (sink->handle_hash,
          mhsinkclass->handle_hash_key (mhclient->handle))) {
997
    GST_WARNING_OBJECT (sink,
998
        "%s error removing client %p from hash", mhclient->debug, mhclient);
999 1000 1001 1002 1003 1004
  }
  /* after releasing the lock above, the link could be invalid, more
   * precisely, the next and prev pointers could point to invalid list
   * links. One optimisation could be to add a cookie to the linked list
   * and take a shortcut when it did not change between unlocking and locking
   * our mutex. For now we just walk the list again. */
1005
  sink->clients = g_list_remove (sink->clients, mhclient);
1006 1007
  sink->clients_cookie++;

1008 1009
  if (mhsinkclass->removed)
    mhsinkclass->removed (sink, mhclient->handle);
1010 1011 1012

  CLIENTS_UNLOCK (sink);

1013 1014 1015
  /* sub-class must implement this to emit the client-$handle-removed signal */
  g_assert (mhsinkclass->client_free != NULL);

1016
  /* and the handle is really gone now */
1017
  mhsinkclass->client_free (sink, mhclient);
1018

1019
  g_free (mhclient);
1020

1021
  CLIENTS_LOCK (sink);
1022 1023 1024
}

static gboolean
1025 1026
gst_multi_handle_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
    GstMultiHandleClient * mhclient, GstBuffer * buffer)
1027
{
1028
  GstMultiHandleSink *sink = GST_MULTI_HANDLE_SINK (mhsink);
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
  GstCaps *caps;

  /* TRUE: send them if the new caps have them */
  gboolean send_streamheader = FALSE;
  GstStructure *s;

  /* before we queue the buffer, we check if we need to queue streamheader
   * buffers (because it's a new client, or because they changed) */
  caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (sink));

1039
  if (!mhclient->caps) {
1040
    GST_DEBUG_OBJECT (sink,
1041 1042
        "%s no previous caps for this client, send streamheader",
        mhclient->debug);
1043
    send_streamheader = TRUE;
1044
    mhclient->caps = gst_caps_ref (caps);
1045 1046
  } else {
    /* there were previous caps recorded, so compare */
1047
    if (!gst_caps_is_equal (caps, mhclient->caps)) {
1048 1049 1050 1051 1052 1053 1054
      const GValue *sh1, *sh2;

      /* caps are not equal, but could still have the same streamheader */
      s = gst_caps_get_structure (caps, 0);
      if (!gst_structure_has_field (s, "streamheader")) {
        /* no new streamheader, so nothing new to send */
        GST_DEBUG_OBJECT (sink,
1055 1056
            "%s new caps do not have streamheader, not sending",
            mhclient->debug);
1057 1058
      } else {
        /* there is a new streamheader */
1059
        s = gst_caps_get_structure (mhclient->caps, 0);
1060 1061 1062
        if (!gst_structure_has_field (s, "streamheader")) {
          /* no previous streamheader, so send the new one */
          GST_DEBUG_OBJECT (sink,
1063 1064
              "%s previous caps did not have streamheader, sending",
              mhclient->debug);
1065 1066 1067
          send_streamheader = TRUE;
        } else {
          /* both old and new caps have streamheader set */
1068
          if (!mhsink->resend_streamheader) {
1069
            GST_DEBUG_OBJECT (sink,
1070 1071
                "%s asked to not resend the streamheader, not sending",
                mhclient->debug);
1072 1073 1074 1075 1076 1077 1078
            send_streamheader = FALSE;
          } else {
            sh1 = gst_structure_get_value (s, "streamheader");
            s = gst_caps_get_structure (caps, 0);
            sh2 = gst_structure_get_value (s, "streamheader");
            if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
              GST_DEBUG_OBJECT (sink,
1079 1080
                  "%s new streamheader different from old, sending",
                  mhclient->debug);
1081 1082 1083 1084 1085 1086 1087
              send_streamheader = TRUE;
            }
          }
        }
      }
    }
    /* Replace the old caps */
1088 1089
    gst_caps_unref (mhclient->caps);
    mhclient->caps = gst_caps_ref (caps);
1090 1091 1092 1093 1094 1095 1096 1097
  }

  if (G_UNLIKELY (send_streamheader)) {
    const GValue *sh;
    GArray *buffers;
    int i;

    GST_LOG_OBJECT (sink,
1098 1099
        "%s sending streamheader from caps %" GST_PTR_FORMAT,
        mhclient->debug, caps);
1100 1101 1102
    s = gst_caps_get_structure (caps, 0);
    if (!gst_structure_has_field (s, "streamheader")) {
      GST_DEBUG_OBJECT (sink,
1103
          "%s no new streamheader, so nothing to send", mhclient->debug);
1104 1105
    } else {
      GST_LOG_OBJECT (sink,
1106 1107
          "%s sending streamheader from caps %" GST_PTR_FORMAT,
          mhclient->debug, caps);
1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119
      sh = gst_structure_get_value (s, "streamheader");
      g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
      buffers = g_value_peek_pointer (sh);
      GST_DEBUG_OBJECT (sink, "%d streamheader buffers", buffers->len);
      for (i = 0; i < buffers->len; ++i) {
        GValue *bufval;
        GstBuffer *buffer;

        bufval = &g_array_index (buffers, GValue, i);
        g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
        buffer = g_value_peek_pointer (bufval);
        GST_DEBUG_OBJECT (sink,
1120 1121
            "%s queueing streamheader buffer of length %" G_GSIZE_FORMAT,
            mhclient->debug, gst_buffer_get_size (buffer));
1122 1123
        gst_buffer_ref (buffer);

1124
        mhclient->sending = g_slist_append (mhclient->sending, buffer);
1125 1126 1127 1128 1129 1130 1131
      }
    }
  }

  gst_caps_unref (caps);
  caps = NULL;

1132 1133
  GST_LOG_OBJECT (sink, "%s queueing buffer of length %" G_GSIZE_FORMAT,
      mhclient->debug, gst_buffer_get_size (buffer));
1134 1135

  gst_buffer_ref (buffer);
1136
  mhclient->sending = g_slist_append (mhclient->sending, buffer);
1137 1138 1139

  return TRUE;
}
1140

1141
static gboolean
1142 1143
is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer)
{
1144
  if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
1145
    return FALSE;
1146
  return TRUE;
1147
}
1148 1149 1150 1151 1152 1153

/* find the keyframe in the list of buffers starting the
 * search from @idx. @direction as -1 will search backwards, 
 * 1 will search forwards.
 * Returns: the index or -1 if there is no keyframe after idx.
 */
1154
gint
1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183
find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction)
{
  gint i, len, result;

  /* take length of queued buffers */
  len = sink->bufqueue->len;

  /* assume we don't find a keyframe */
  result = -1;

  /* then loop over all buffers to find the first keyframe */
  for (i = idx; i >= 0 && i < len; i += direction) {
    GstBuffer *buf;

    buf = g_array_index (sink->bufqueue, GstBuffer *, i);
    if (is_sync_frame (sink, buf)) {
      GST_LOG_OBJECT (sink, "found keyframe at %d from %d, direction %d",
          i, idx, direction);
      result = i;
      break;
    }
  }
  return result;
}

/* Get the number of buffers from the buffer queue needed to satisfy
 * the maximum max in the configured units.
 * If units are not BUFFERS, and there are insufficient buffers in the
 * queue to satify the limit, return len(queue) + 1 */
1184
gint
1185 1186
get_buffers_max (GstMultiHandleSink * sink, gint64 max)
{
1187
  switch (sink->unit_format) {
1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248
    case GST_FORMAT_BUFFERS:
      return max;
    case GST_FORMAT_TIME:
    {
      GstBuffer *buf;
      int i;
      int len;
      gint64 diff;
      GstClockTime first = GST_CLOCK_TIME_NONE;

      len = sink->bufqueue->len;

      for (i = 0; i < len; i++) {
        buf = g_array_index (sink->bufqueue, GstBuffer *, i);
        if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
          if (first == -1)
            first = GST_BUFFER_TIMESTAMP (buf);

          diff = first - GST_BUFFER_TIMESTAMP (buf);

          if (diff > max)
            return i + 1;
        }
      }
      return len + 1;
    }
    case GST_FORMAT_BYTES:
    {
      GstBuffer *buf;
      int i;
      int len;
      gint acc = 0;

      len = sink->bufqueue->len;

      for (i = 0; i < len; i++) {
        buf = g_array_index (sink->bufqueue, GstBuffer *, i);
        acc += gst_buffer_get_size (buf);

        if (acc > max)
          return i + 1;
      }
      return len + 1;
    }
    default:
      return max;
  }
}

/* find the positions in t