gstmultifdsink.c 97.5 KB
Newer Older
1
2
3
/* GStreamer
 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4
 * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

22
/**
23
 * SECTION:element-multifdsink
24
 * @see_also: tcpserversink
Wim Taymans's avatar
Wim Taymans committed
25
 *
26
 * This plugin writes incoming data to a set of file descriptors. The
27
28
29
30
 * file descriptors can be added to multifdsink by emitting the #GstMultiFdSink::add signal. 
 * For each descriptor added, the #GstMultiFdSink::client-added signal will be called.
 *
 * As of version 0.10.8, a client can also be added with the #GstMultiFdSink::add-full signal
31
32
 * that allows for more control over what and how much data a client 
 * initially receives.
33
34
35
36
 *
 * Clients can be removed from multifdsink by emitting the #GstMultiFdSink::remove signal. For
 * each descriptor removed, the #GstMultiFdSink::client-removed signal will be called. The
 * #GstMultiFdSink::client-removed signal can also be fired when multifdsink decides that a
37
 * client is not active anymore or, depending on the value of the
38
 * #GstMultiFdSink:recover-policy property, if the client is reading too slowly.
39
 * In all cases, multifdsink will never close a file descriptor itself.
40
 * The user of multifdsink is responsible for closing all file descriptors.
41
 * This can for example be done in response to the #GstMultiFdSink::client-fd-removed signal.
42
 * Note that multifdsink still has a reference to the file descriptor when the
43
 * #GstMultiFdSink::client-removed signal is emitted, so that "get-stats" can be performed on
44
 * the descriptor; it is therefore not safe to close the file descriptor in
45
46
47
 * the #GstMultiFdSink::client-removed signal handler, and you should use the 
 * #GstMultiFdSink::client-fd-removed signal to safely close the fd.
 *
48
 * Multifdsink internally keeps a queue of the incoming buffers and uses a
49
50
51
 * separate thread to send the buffers to the clients. This ensures that no
 * client write can block the pipeline and that clients can read with different
 * speeds.
52
53
 *
 * When adding a client to multifdsink, the #GstMultiFdSink:sync-method property will define
54
 * which buffer in the queued buffers will be sent first to the client. Clients 
55
56
57
58
 * can be sent the most recent buffer (which might not be decodable by the 
 * client if it is not a keyframe), the next keyframe received in 
 * multifdsink (which can take some time depending on the keyframe rate), or the
 * last received keyframe (which will cause a simple burst-on-connect). 
59
60
 * Multifdsink will always keep at least one keyframe in its internal buffers
 * when the sync-mode is set to latest-keyframe.
61
62
 *
 * As of version 0.10.8, there are additional values for the #GstMultiFdSink:sync-method 
63
64
65
66
67
 * property to allow finer control over burst-on-connect behaviour. By selecting
 * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
 * additionally requires that the burst begin with a keyframe, and 
 * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
 * prefer a minimum burst size even if it requires not starting with a keyframe.
68
 *
69
70
 * Multifdsink can be instructed to keep at least a minimum amount of data
 * expressed in time or byte units in its internal queues with the the 
71
72
73
74
75
 * #GstMultiFdSink:time-min and #GstMultiFdSink:bytes-min properties respectively.
 * These properties are useful if the application adds clients with the 
 * #GstMultiFdSink::add-full signal to make sure that a burst connect can
 * actually be honored. 
 *
76
77
78
 * When streaming data, clients are allowed to read at a different rate than
 * the rate at which multifdsink receives data. If the client is reading too
 * fast, no data will be send to the client until multifdsink receives more
79
80
 * data. If the client, however, reads too slowly, data for that client will be 
 * queued up in multifdsink. Two properties control the amount of data 
81
82
83
84
85
86
87
 * (buffers) that is queued in multifdsink: #GstMultiFdSink:buffers-max and 
 * #GstMultiFdSink:buffers-soft-max. A client that falls behind by
 * #GstMultiFdSink:buffers-max is removed from multifdsink forcibly.
 *
 * A client with a lag of at least #GstMultiFdSink:buffers-soft-max enters the recovery
 * procedure which is controlled with the #GstMultiFdSink:recover-policy property.
 * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
88
89
 * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
 * positions the client to the soft limit in the buffer queue and
90
 * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
91
 * buffer queue.
92
 *
93
94
95
 * multifdsink will by default synchronize on the clock before serving the 
 * buffers to the clients. This behaviour can be disabled by setting the sync 
 * property to FALSE. Multifdsink will by default not do QoS and will never
96
 * drop late buffers.
Wim Taymans's avatar
Wim Taymans committed
97
 *
98
 * Last reviewed on 2006-09-12 (0.10.10)
99
100
 */

101
102
103
104
105
106
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst-i18n-plugin.h>

#include <sys/ioctl.h>
107
108

#ifdef HAVE_UNISTD_H
109
#include <unistd.h>
110
111
#endif

112
113
114
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
115
#include <sys/stat.h>
116
#include <netinet/in.h>
117
118
119
120
121
122
123
124

#ifdef HAVE_FIONREAD_IN_SYS_FILIO
#include <sys/filio.h>
#endif

#include "gstmultifdsink.h"
#include "gsttcp-marshal.h"

125
126
#define NOT_IMPLEMENTED 0

Wim Taymans's avatar
Wim Taymans committed
127
128
129
130
131
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

132
GST_DEBUG_CATEGORY_STATIC (multifdsink_debug);
133
134
135
136
137
#define GST_CAT_DEFAULT (multifdsink_debug)

/* MultiFdSink signals and args */
enum
{
138
139
  /* methods */
  SIGNAL_ADD,
140
  SIGNAL_ADD_BURST,
141
  SIGNAL_REMOVE,
142
  SIGNAL_REMOVE_FLUSH,
143
  SIGNAL_CLEAR,
144
  SIGNAL_GET_STATS,
145

146
  /* signals */
147
148
  SIGNAL_CLIENT_ADDED,
  SIGNAL_CLIENT_REMOVED,
149
  SIGNAL_CLIENT_FD_REMOVED,
150

151
152
153
  LAST_SIGNAL
};

154

155
/* this is really arbitrarily chosen */
156
#define DEFAULT_PROTOCOL                GST_TCP_PROTOCOL_NONE
157
#define DEFAULT_MODE                    1
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
158
159
#define DEFAULT_BUFFERS_MAX             -1
#define DEFAULT_BUFFERS_SOFT_MAX        -1
160
161
162
#define DEFAULT_TIME_MIN                -1
#define DEFAULT_BYTES_MIN               -1
#define DEFAULT_BUFFERS_MIN             -1
163
#define DEFAULT_UNIT_TYPE               GST_TCP_UNIT_TYPE_BUFFERS
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
164
165
#define DEFAULT_UNITS_MAX               -1
#define DEFAULT_UNITS_SOFT_MAX          -1
166
167
168
169
#define DEFAULT_RECOVER_POLICY          GST_RECOVER_POLICY_NONE
#define DEFAULT_TIMEOUT                 0
#define DEFAULT_SYNC_METHOD             GST_SYNC_METHOD_LATEST

170
#define DEFAULT_BURST_UNIT              GST_TCP_UNIT_TYPE_UNDEFINED
171
#define DEFAULT_BURST_VALUE             0
172

173
#define DEFAULT_QOS_DSCP                -1
174
#define DEFAULT_HANDLE_READ             TRUE
175

176
177
#define DEFAULT_RESEND_STREAMHEADER      TRUE

178
179
enum
{
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
  PROP_0,
  PROP_PROTOCOL,
  PROP_MODE,
  PROP_BUFFERS_QUEUED,
  PROP_BYTES_QUEUED,
  PROP_TIME_QUEUED,

  PROP_UNIT_TYPE,
  PROP_UNITS_MAX,
  PROP_UNITS_SOFT_MAX,

  PROP_BUFFERS_MAX,
  PROP_BUFFERS_SOFT_MAX,

  PROP_TIME_MIN,
  PROP_BYTES_MIN,
  PROP_BUFFERS_MIN,

  PROP_RECOVER_POLICY,
  PROP_TIMEOUT,
  PROP_SYNC_METHOD,
  PROP_BYTES_TO_SERVE,
  PROP_BYTES_SERVED,

  PROP_BURST_UNIT,
  PROP_BURST_VALUE,
206
207
208

  PROP_QOS_DSCP,

209
210
  PROP_HANDLE_READ,

211
212
  PROP_RESEND_STREAMHEADER,

213
214
  PROP_NUM_FDS,

215
  PROP_LAST
216
217
};

218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
/* For backward compat, we can't really select the poll mode anymore with
 * GstPoll. */
#define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type())
static GType
gst_fdset_mode_get_type (void)
{
  static GType fdset_mode_type = 0;
  static const GEnumValue fdset_mode[] = {
    {0, "Select", "select"},
    {1, "Poll", "poll"},
    {2, "EPoll", "epoll"},
    {0, NULL, NULL},
  };

  if (!fdset_mode_type) {
    fdset_mode_type = g_enum_register_static ("GstFDSetMode", fdset_mode);
  }
  return fdset_mode_type;
}

238
239
240
241
242
#define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type())
static GType
gst_recover_policy_get_type (void)
{
  static GType recover_policy_type = 0;
243
  static const GEnumValue recover_policy[] = {
244
245
246
247
248
249
250
251
    {GST_RECOVER_POLICY_NONE,
        "Do not try to recover", "none"},
    {GST_RECOVER_POLICY_RESYNC_LATEST,
        "Resync client to latest buffer", "latest"},
    {GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT,
        "Resync client to soft limit", "soft-limit"},
    {GST_RECOVER_POLICY_RESYNC_KEYFRAME,
        "Resync client to most recent keyframe", "keyframe"},
252
253
254
255
256
    {0, NULL, NULL},
  };

  if (!recover_policy_type) {
    recover_policy_type =
257
        g_enum_register_static ("GstRecoverPolicy", recover_policy);
258
259
260
261
  }
  return recover_policy_type;
}

262
263
264
265
266
#define GST_TYPE_SYNC_METHOD (gst_sync_method_get_type())
static GType
gst_sync_method_get_type (void)
{
  static GType sync_method_type = 0;
267
  static const GEnumValue sync_method[] = {
268
269
270
271
272
    {GST_SYNC_METHOD_LATEST,
        "Serve starting from the latest buffer", "latest"},
    {GST_SYNC_METHOD_NEXT_KEYFRAME,
        "Serve starting from the next keyframe", "next-keyframe"},
    {GST_SYNC_METHOD_LATEST_KEYFRAME,
273
274
          "Serve everything since the latest keyframe (burst)",
        "latest-keyframe"},
275
276
277
278
279
280
281
    {GST_SYNC_METHOD_BURST, "Serve burst-value data to client", "burst"},
    {GST_SYNC_METHOD_BURST_KEYFRAME,
          "Serve burst-value data starting on a keyframe",
        "burst-keyframe"},
    {GST_SYNC_METHOD_BURST_WITH_KEYFRAME,
          "Serve burst-value data preferably starting on a keyframe",
        "burst-with-keyframe"},
282
283
284
285
    {0, NULL, NULL},
  };

  if (!sync_method_type) {
286
    sync_method_type = g_enum_register_static ("GstSyncMethod", sync_method);
287
288
289
290
  }
  return sync_method_type;
}

291
292
293
294
295
#define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
static GType
gst_unit_type_get_type (void)
{
  static GType unit_type_type = 0;
296
  static const GEnumValue unit_type[] = {
297
298
299
300
    {GST_TCP_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"},
    {GST_TCP_UNIT_TYPE_BUFFERS, "Buffers", "buffers"},
    {GST_TCP_UNIT_TYPE_BYTES, "Bytes", "bytes"},
    {GST_TCP_UNIT_TYPE_TIME, "Time", "time"},
301
302
303
304
305
306
307
308
309
310
311
312
313
314
    {0, NULL, NULL},
  };

  if (!unit_type_type) {
    unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
  }
  return unit_type_type;
}

#define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type())
static GType
gst_client_status_get_type (void)
{
  static GType client_status_type = 0;
315
  static const GEnumValue client_status[] = {
316
    {GST_CLIENT_STATUS_OK, "ok", "ok"},
317
318
319
320
321
    {GST_CLIENT_STATUS_CLOSED, "Closed", "closed"},
    {GST_CLIENT_STATUS_REMOVED, "Removed", "removed"},
    {GST_CLIENT_STATUS_SLOW, "Too slow", "slow"},
    {GST_CLIENT_STATUS_ERROR, "Error", "error"},
    {GST_CLIENT_STATUS_DUPLICATE, "Duplicate", "duplicate"},
322
    {GST_CLIENT_STATUS_FLUSHING, "Flushing", "flushing"},
323
324
325
326
327
    {0, NULL, NULL},
  };

  if (!client_status_type) {
    client_status_type =
328
        g_enum_register_static ("GstClientStatus", client_status);
329
330
331
332
  }
  return client_status_type;
}

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
333
static void gst_multi_fd_sink_finalize (GObject * object);
334

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
335
static void gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink,
336
    GList * link);
337

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
338
static GstFlowReturn gst_multi_fd_sink_render (GstBaseSink * bsink,
Wim Taymans's avatar
Wim Taymans committed
339
    GstBuffer * buf);
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
340
static GstStateChangeReturn gst_multi_fd_sink_change_state (GstElement *
341
    element, GstStateChange transition);
342

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
343
static void gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
344
    const GValue * value, GParamSpec * pspec);
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
345
static void gst_multi_fd_sink_get_property (GObject * object, guint prop_id,
346
347
    GValue * value, GParamSpec * pspec);

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
348
GST_BOILERPLATE (GstMultiFdSink, gst_multi_fd_sink, GstBaseSink,
349
350
    GST_TYPE_BASE_SINK);

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
351
static guint gst_multi_fd_sink_signals[LAST_SIGNAL] = { 0 };
352
353

static void
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
354
gst_multi_fd_sink_base_init (gpointer g_class)
355
356
357
{
  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);

Wim Taymans's avatar
Wim Taymans committed
358
359
360
  gst_element_class_add_pad_template (element_class,
      gst_static_pad_template_get (&sinktemplate));

361
362
363
364
365
  gst_element_class_set_details_simple (element_class,
      "Multi filedescriptor sink", "Sink/Network",
      "Send data to multiple filedescriptors",
      "Thomas Vander Stichele <thomas at apestaart dot org>, "
      "Wim Taymans <wim@fluendo.com>");
366
367
368
}

static void
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
369
gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
370
371
372
{
  GObjectClass *gobject_class;
  GstElementClass *gstelement_class;
Wim Taymans's avatar
Wim Taymans committed
373
  GstBaseSinkClass *gstbasesink_class;
374
375
376

  gobject_class = (GObjectClass *) klass;
  gstelement_class = (GstElementClass *) klass;
Wim Taymans's avatar
Wim Taymans committed
377
378
  gstbasesink_class = (GstBaseSinkClass *) klass;

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
379
380
381
  gobject_class->set_property = gst_multi_fd_sink_set_property;
  gobject_class->get_property = gst_multi_fd_sink_get_property;
  gobject_class->finalize = gst_multi_fd_sink_finalize;
382

383
  g_object_class_install_property (gobject_class, PROP_PROTOCOL,
384
385
      g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in"
          ". GDP protocol here is deprecated. Please use gdppay element.",
386
387
          GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
388
389
390
391
392
393
394
395
396

  /**
   * GstMultiFdSink::mode
   *
   * The mode for selecting activity on the fds. 
   *
   * This property is deprecated since 0.10.18, if will now automatically
   * select and use the most optimal method.
   */
397
  g_object_class_install_property (gobject_class, PROP_MODE,
398
      g_param_spec_enum ("mode", "Mode",
399
          "The mode for selecting activity on the fds (deprecated)",
400
401
          GST_TYPE_FDSET_MODE, DEFAULT_MODE,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
402

403
  g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
404
      g_param_spec_int ("buffers-max", "Buffers max",
405
          "max number of buffers to queue for a client (-1 = no limit)", -1,
406
407
408
409
          G_MAXINT, DEFAULT_BUFFERS_MAX,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
      g_param_spec_int ("buffers-soft-max", "Buffers soft max",
410
          "Recover client when going over this limit (-1 = no limit)", -1,
411
412
          G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413

414
  g_object_class_install_property (gobject_class, PROP_BYTES_MIN,
415
416
      g_param_spec_int ("bytes-min", "Bytes min",
          "min number of bytes to queue (-1 = as little as possible)", -1,
417
418
          G_MAXINT, DEFAULT_BYTES_MIN,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
419
  g_object_class_install_property (gobject_class, PROP_TIME_MIN,
420
      g_param_spec_int64 ("time-min", "Time min",
421
          "min number of time to queue (-1 = as little as possible)", -1,
422
423
          G_MAXINT64, DEFAULT_TIME_MIN,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
424
  g_object_class_install_property (gobject_class, PROP_BUFFERS_MIN,
425
426
      g_param_spec_int ("buffers-min", "Buffers min",
          "min number of buffers to queue (-1 = as few as possible)", -1,
427
428
          G_MAXINT, DEFAULT_BUFFERS_MIN,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
429

430
  g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
431
432
      g_param_spec_enum ("unit-type", "Units type",
          "The unit to measure the max/soft-max/queued properties",
433
434
          GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
435
  g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
436
437
      g_param_spec_int64 ("units-max", "Units max",
          "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
438
          DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439
  g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
440
      g_param_spec_int64 ("units-soft-max", "Units soft max",
441
          "Recover client when going over this limit (-1 = no limit)", -1,
442
443
          G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
444

445
  g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED,
446
447
      g_param_spec_uint ("buffers-queued", "Buffers queued",
          "Number of buffers currently queued", 0, G_MAXUINT, 0,
448
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
449
#if NOT_IMPLEMENTED
450
  g_object_class_install_property (gobject_class, PROP_BYTES_QUEUED,
451
452
      g_param_spec_uint ("bytes-queued", "Bytes queued",
          "Number of bytes currently queued", 0, G_MAXUINT, 0,
453
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
454
  g_object_class_install_property (gobject_class, PROP_TIME_QUEUED,
455
456
      g_param_spec_uint64 ("time-queued", "Time queued",
          "Number of time currently queued", 0, G_MAXUINT64, 0,
457
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
458
#endif
459

460
  g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY,
461
462
      g_param_spec_enum ("recover-policy", "Recover Policy",
          "How to recover when client reaches the soft max",
463
464
          GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
465
  g_object_class_install_property (gobject_class, PROP_TIMEOUT,
466
467
      g_param_spec_uint64 ("timeout", "Timeout",
          "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
468
469
          0, G_MAXUINT64, DEFAULT_TIMEOUT,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
470
  g_object_class_install_property (gobject_class, PROP_SYNC_METHOD,
471
      g_param_spec_enum ("sync-method", "Sync Method",
472
473
          "How to sync new clients to the stream", GST_TYPE_SYNC_METHOD,
          DEFAULT_SYNC_METHOD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474
  g_object_class_install_property (gobject_class, PROP_BYTES_TO_SERVE,
475
476
      g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
          "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
477
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
478
  g_object_class_install_property (gobject_class, PROP_BYTES_SERVED,
479
480
      g_param_spec_uint64 ("bytes-served", "Bytes served",
          "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
481
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
482

483
  g_object_class_install_property (gobject_class, PROP_BURST_UNIT,
484
485
      g_param_spec_enum ("burst-unit", "Burst unit",
          "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
486
487
          GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
488
  g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
489
      g_param_spec_uint64 ("burst-value", "Burst value",
490
491
          "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
          DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
492

493
  g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
494
      g_param_spec_int ("qos-dscp", "QoS diff srv code point",
495
          "Quality of Service, differentiated services code point (-1 default)",
496
497
          -1, 63, DEFAULT_QOS_DSCP,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
498
499
500
501
502
503
504
505
506
507
  /**
   * GstMultiFdSink::handle-read
   *
   * Handle read requests from clients and discard the data.
   *
   * Since: 0.10.23
   */
  g_object_class_install_property (gobject_class, PROP_HANDLE_READ,
      g_param_spec_boolean ("handle-read", "Handle Read",
          "Handle client reads and discard the data",
508
509
510
511
512
513
514
515
516
517
518
519
520
          DEFAULT_HANDLE_READ, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  /**
   * GstMultiFdSink::resend-streamheader
   *
   * Resend the streamheaders to existing clients when they change.
   *
   * Since: 0.10.23
   */
  g_object_class_install_property (gobject_class, PROP_RESEND_STREAMHEADER,
      g_param_spec_boolean ("resend-streamheader", "Resend streamheader",
          "Resend the streamheader if it changes in the caps",
          DEFAULT_RESEND_STREAMHEADER,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
521

522
523
524
525
526
  g_object_class_install_property (gobject_class, PROP_NUM_FDS,
      g_param_spec_uint ("num-fds", "Number of fds",
          "The current number of client file descriptors.",
          0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

527
528
529
  /**
   * GstMultiFdSink::add:
   * @gstmultifdsink: the multifdsink element to emit this signal on
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
530
   * @fd:             the file descriptor to add to multifdsink
531
532
533
   *
   * Hand the given open file descriptor to multifdsink to write to.
   */
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
534
  gst_multi_fd_sink_signals[SIGNAL_ADD] =
535
536
537
538
      g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
          add), NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1,
      G_TYPE_INT);
539
540
541
542
  /**
   * GstMultiFdSink::add-full:
   * @gstmultifdsink: the multifdsink element to emit this signal on
   * @fd:             the file descriptor to add to multifdsink
543
   * @sync:           the sync method to use
544
545
546
547
548
549
   * @unit_type_min:  the unit-type of @value_min
   * @value_min:      the minimum amount of data to burst expressed in
   *                  @unit_type_min units.
   * @unit_type_max:  the unit-type of @value_max
   * @value_max:      the maximum amount of data to burst expressed in
   *                  @unit_type_max units.
550
551
552
553
554
   *
   * Hand the given open file descriptor to multifdsink to write to and
   * specify the burst parameters for the new connection.
   */
  gst_multi_fd_sink_signals[SIGNAL_ADD_BURST] =
555
556
557
      g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
          add_full), NULL, NULL,
558
559
      gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64, G_TYPE_NONE, 6,
      G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64,
560
      GST_TYPE_UNIT_TYPE, G_TYPE_UINT64);
561
562
563
  /**
   * GstMultiFdSink::remove:
   * @gstmultifdsink: the multifdsink element to emit this signal on
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
564
   * @fd:             the file descriptor to remove from multifdsink
565
566
567
   *
   * Remove the given open file descriptor from multifdsink.
   */
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
568
  gst_multi_fd_sink_signals[SIGNAL_REMOVE] =
569
570
571
572
      g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
          remove), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1,
      G_TYPE_INT);
573
  /**
574
   * GstMultiFdSink::remove-flush:
575
576
577
578
579
580
581
582
   * @gstmultifdsink: the multifdsink element to emit this signal on
   * @fd:             the file descriptor to remove from multifdsink
   *
   * Remove the given open file descriptor from multifdsink after flushing all
   * the pending data to the fd.
   */
  gst_multi_fd_sink_signals[SIGNAL_REMOVE_FLUSH] =
      g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
583
584
585
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
          remove_flush), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1,
      G_TYPE_INT);
586
587
588
589
  /**
   * GstMultiFdSink::clear:
   * @gstmultifdsink: the multifdsink element to emit this signal on
   *
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
590
591
   * Remove all file descriptors from multifdsink.  Since multifdsink did not
   * open fd's itself, it does not explicitly close the fd.  The application
592
   * should do so by connecting to the client-fd-removed callback.
593
   */
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
594
  gst_multi_fd_sink_signals[SIGNAL_CLEAR] =
595
596
597
      g_signal_new ("clear", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
          clear), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
Wim Taymans's avatar
Wim Taymans committed
598
599
600
601
602
603
604
605
606

  /**
   * GstMultiFdSink::get-stats:
   * @gstmultifdsink: the multifdsink element to emit this signal on
   * @fd:             the file descriptor to get stats of from multifdsink
   *
   * Get statistics about @fd. This function returns a GValueArray to ease
   * automatic wrapping for bindings.
   *
607
608
609
610
   * Returns: a GValueArray with the statistics. The array contains guint64
   *     values that represent respectively: total number of bytes sent, time
   *     when the client was added, time when the client was
   *     disconnected/removed, time the client is/was active, last activity
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
611
   *     time (in epoch seconds), number of buffers dropped.
612
   *     All times are expressed in nanoseconds (GstClockTime).
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
613
   *     The array can be 0-length if the client was not found.
Wim Taymans's avatar
Wim Taymans committed
614
   */
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
615
  gst_multi_fd_sink_signals[SIGNAL_GET_STATS] =
616
617
618
619
      g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
          get_stats), NULL, NULL, gst_tcp_marshal_BOXED__INT,
      G_TYPE_VALUE_ARRAY, 1, G_TYPE_INT);
620

621
622
623
  /**
   * GstMultiFdSink::client-added:
   * @gstmultifdsink: the multifdsink element that emitted this signal
Wim Taymans's avatar
Wim Taymans committed
624
   * @fd:             the file descriptor that was added to multifdsink
625
   *
Wim Taymans's avatar
Wim Taymans committed
626
   * The given file descriptor was added to multifdsink. This signal will
627
   * be emitted from the streaming thread so application should be prepared
Wim Taymans's avatar
Wim Taymans committed
628
   * for that.
629
   */
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
630
  gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED] =
631
632
      g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, client_added),
633
      NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
634
635
636
  /**
   * GstMultiFdSink::client-removed:
   * @gstmultifdsink: the multifdsink element that emitted this signal
637
   * @fd:             the file descriptor that is to be removed from multifdsink
Wim Taymans's avatar
Wim Taymans committed
638
   * @status:         the reason why the client was removed
639
   *
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
640
   * The given file descriptor is about to be removed from multifdsink. This
641
   * signal will be emitted from the streaming thread so applications should
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
642
   * be prepared for that.
643
644
645
646
   *
   * @gstmultifdsink still holds a handle to @fd so it is possible to call
   * the get-stats signal from this callback. For the same reason it is
   * not safe to close() and reuse @fd in this callback.
647
   */
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
648
  gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED] =
649
650
      g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
651
652
          client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED,
      G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
653
654
655
656
657
658
  /**
   * GstMultiFdSink::client-fd-removed:
   * @gstmultifdsink: the multifdsink element that emitted this signal
   * @fd:             the file descriptor that was removed from multifdsink
   *
   * The given file descriptor was removed from multifdsink. This signal will
659
   * be emitted from the streaming thread so applications should be prepared
660
661
   * for that.
   *
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
662
   * In this callback, @gstmultifdsink has removed all the information
663
664
665
666
667
668
669
670
671
672
   * associated with @fd and it is therefore not possible to call get-stats
   * with @fd. It is however safe to close() and reuse @fd in the callback.
   *
   * Since: 0.10.7
   */
  gst_multi_fd_sink_signals[SIGNAL_CLIENT_FD_REMOVED] =
      g_signal_new ("client-fd-removed", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
          client_fd_removed), NULL, NULL, gst_tcp_marshal_VOID__INT,
      G_TYPE_NONE, 1, G_TYPE_INT);
673

674
  gstelement_class->change_state =
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
675
      GST_DEBUG_FUNCPTR (gst_multi_fd_sink_change_state);
676

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
677
  gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render);
Wim Taymans's avatar
Wim Taymans committed
678

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
679
  klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add);
680
  klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full);
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
681
  klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove);
682
  klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_flush);
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
683
684
  klass->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear);
  klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats);
685

686
687
688
689
  GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink");
}

static void
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
690
gst_multi_fd_sink_init (GstMultiFdSink * this, GstMultiFdSinkClass * klass)
691
{
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
692
  GST_OBJECT_FLAG_UNSET (this, GST_MULTI_FD_SINK_OPEN);
693

694
  this->protocol = DEFAULT_PROTOCOL;
695
  this->mode = DEFAULT_MODE;
696

Wim Taymans's avatar
Wim Taymans committed
697
  CLIENTS_LOCK_INIT (this);
698
  this->clients = NULL;
699
  this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
700
701

  this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
702
703
704
  this->unit_type = DEFAULT_UNIT_TYPE;
  this->units_max = DEFAULT_UNITS_MAX;
  this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
705
706
707
  this->time_min = DEFAULT_TIME_MIN;
  this->bytes_min = DEFAULT_BYTES_MIN;
  this->buffers_min = DEFAULT_BUFFERS_MIN;
708
  this->recover_policy = DEFAULT_RECOVER_POLICY;
709

710
  this->timeout = DEFAULT_TIMEOUT;
711
712
713
  this->def_sync_method = DEFAULT_SYNC_METHOD;
  this->def_burst_unit = DEFAULT_BURST_UNIT;
  this->def_burst_value = DEFAULT_BURST_VALUE;
714

715
  this->qos_dscp = DEFAULT_QOS_DSCP;
716
  this->handle_read = DEFAULT_HANDLE_READ;
717

718
719
  this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER;

720
  this->header_flags = 0;
721
722
}

723
static void
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
724
gst_multi_fd_sink_finalize (GObject * object)
725
726
727
{
  GstMultiFdSink *this;

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
728
  this = GST_MULTI_FD_SINK (object);
729
730
731
732
733
734
735
736

  CLIENTS_LOCK_FREE (this);
  g_hash_table_destroy (this->fd_hash);
  g_array_free (this->bufqueue, TRUE);

  G_OBJECT_CLASS (parent_class)->finalize (object);
}

737
738
739
740
741
static gint
setup_dscp_client (GstMultiFdSink * sink, GstTCPClient * client)
{
  gint tos;
  gint ret;
742
743
  union gst_sockaddr
  {
744
745
746
    struct sockaddr sa;
    struct sockaddr_in6 sa_in6;
    struct sockaddr_storage sa_stor;
747
748
  } sa;
  socklen_t slen = sizeof (sa);
749
750
751
752
753
754
  gint af;

  /* don't touch */
  if (sink->qos_dscp < 0)
    return 0;

755
  if ((ret = getsockname (client->fd.fd, &sa.sa, &slen)) < 0) {
756
757
758
759
    GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
    return ret;
  }

760
  af = sa.sa.sa_family;
761

762
  /* if this is an IPv4-mapped address then do IPv4 QoS */
763
764
765
  if (af == AF_INET6) {

    GST_DEBUG_OBJECT (sink, "check IP6 socket");
766
    if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
767
      GST_DEBUG_OBJECT (sink, "mapped to IPV4");
768
769
      af = AF_INET;
    }
770
771
  }

772
  /* extract and shift 6 bits of the DSCP */
773
  tos = (sink->qos_dscp & 0x3f) << 2;
774

775
776
  switch (af) {
    case AF_INET:
777
      ret = setsockopt (client->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
778
779
780
781
      break;
    case AF_INET6:
#ifdef IPV6_TCLASS
      ret =
782
783
          setsockopt (client->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos,
          sizeof (tos));
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
      break;
#endif
    default:
      ret = 0;
      GST_ERROR_OBJECT (sink, "unsupported AF");
      break;
  }
  if (ret)
    GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));

  return ret;
}


static void
setup_dscp (GstMultiFdSink * sink)
{
  GList *clients, *next;

  CLIENTS_LOCK (sink);
  for (clients = sink->clients; clients; clients = next) {
    GstTCPClient *client;

    client = (GstTCPClient *) clients->data;
    next = g_list_next (clients);

    setup_dscp_client (sink, client);
  }
  CLIENTS_UNLOCK (sink);
}

815
/* "add-full" signal implementation */
816
void
817
gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
818
819
    GstSyncMethod sync_method, GstTCPUnitType min_unit, guint64 min_value,
    GstTCPUnitType max_unit, guint64 max_value)
820
821
{
  GstTCPClient *client;
822
  GList *clink;
823
  GTimeVal now;
824
825
  gint flags, res;
  struct stat statbuf;
826

827
828
829
830
  GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, "
      "min_unit %d, min_value %" G_GUINT64_FORMAT
      ", max_unit %d, max_value %" G_GUINT64_FORMAT, fd, sync_method,
      min_unit, min_value, max_unit, max_value);
831

832
833
834
835
836
837
  /* do limits check if we can */
  if (min_unit == max_unit) {
    if (max_value != -1 && min_value != -1 && max_value < min_value)
      goto wrong_limits;
  }

838
839
  /* create client datastructure */
  client = g_new0 (GstTCPClient, 1);
840
  client->fd.fd = fd;
841
  client->status = GST_CLIENT_STATUS_OK;
842
  client->bufpos = -1;
843
  client->flushcount = -1;
844
845
  client->bufoffset = 0;
  client->sending = NULL;
846
847
848
  client->bytes_sent = 0;
  client->dropped_buffers = 0;
  client->avg_queue_size = 0;
849
850
  client->first_buffer_ts = GST_CLOCK_TIME_NONE;
  client->last_buffer_ts = GST_CLOCK_TIME_NONE;
851
  client->new_connection = TRUE;
852
853
854
855
856
  client->burst_min_unit = min_unit;
  client->burst_min_value = min_value;
  client->burst_max_unit = max_unit;
  client->burst_max_value = max_value;
  client->sync_method = sync_method;
857
  client->currently_removing = FALSE;
858
859
860
861

  /* update start time */
  g_get_current_time (&now);
  client->connect_time = GST_TIMEVAL_TO_TIME (now);
862
  client->disconnect_time = 0;
863
864
  /* set last activity time to connect time */
  client->last_activity_time = client->connect_time;
865

Wim Taymans's avatar
Wim Taymans committed
866
  CLIENTS_LOCK (sink);
867

868
869
  /* check the hash to find a duplicate fd */
  clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd);
870
871
  if (clink != NULL)
    goto duplicate;
872
873
874
875

  /* we can add the fd now */
  clink = sink->clients = g_list_prepend (sink->clients, client);
  g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
876
  sink->clients_cookie++;
877

878
  /* set the socket to non blocking */
879
  res = fcntl (fd, F_SETFL, O_NONBLOCK);
880
  /* we always read from a client */
881
  gst_poll_add_fd (sink->fdset, &client->fd);
882
883

  /* we don't try to read from write only fds */
884
885
886
887
888
  if (sink->handle_read) {
    flags = fcntl (fd, F_GETFL, 0);
    if ((flags & O_ACCMODE) != O_WRONLY) {
      gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE);
    }
889
890
891
892
893
  }
  /* figure out the mode, can't use send() for non sockets */
  res = fstat (fd, &statbuf);
  if (S_ISSOCK (statbuf.st_mode)) {
    client->is_socket = TRUE;
894
    setup_dscp_client (sink, client);
895
  }
896

897
  gst_poll_restart (sink->fdset);
898

Wim Taymans's avatar
Wim Taymans committed
899
  CLIENTS_UNLOCK (sink);
900
901

  g_signal_emit (G_OBJECT (sink),
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
902
      gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED], 0, fd);
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933

  return;

  /* errors */
wrong_limits:
  {
    GST_WARNING_OBJECT (sink,
        "[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%"
        G_GUINT64_FORMAT ", unit %d specified when adding client", fd,
        min_value, max_value, min_unit);
    return;
  }
duplicate:
  {
    client->status = GST_CLIENT_STATUS_DUPLICATE;
    CLIENTS_UNLOCK (sink);
    GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
    g_signal_emit (G_OBJECT (sink),
        gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd,
        client->status);
    g_free (client);
    return;
  }
}

/* "add" signal implemntation */
void
gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
{
  gst_multi_fd_sink_add_full (sink, fd, sink->def_sync_method,
      sink->def_burst_unit, sink->def_burst_value, sink->def_burst_unit, -1);
934
935
}

936
/* "remove" signal implementation */
937
void
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
938
gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
939
{
940
  GList *clink;
941

942
  GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd);
943

Wim Taymans's avatar
Wim Taymans committed
944
  CLIENTS_LOCK (sink);
945
946
947
  clink = g_hash_table_lookup (sink->fd_hash, &fd);
  if (clink != NULL) {
    GstTCPClient *client = (GstTCPClient *) clink->data;
948

949
950
951
952
953
954
955
    if (client->status != GST_CLIENT_STATUS_OK) {
      GST_INFO_OBJECT (sink,
          "[fd %5d] Client already disconnecting with status %d",
          fd, client->status);
      goto done;
    }

956
    client->status = GST_CLIENT_STATUS_REMOVED;
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
957
    gst_multi_fd_sink_remove_client_link (sink, clink);
958
    gst_poll_restart (sink->fdset);
959
  } else {
960
    GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
961
  }
962
963

done:
Wim Taymans's avatar
Wim Taymans committed
964
  CLIENTS_UNLOCK (sink);
965
966
}

967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* "remove-flush" signal implementation */
void
gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd)
{
  GList *clink;

  GST_DEBUG_OBJECT (sink, "[fd %5d] flushing client", fd);

  CLIENTS_LOCK (sink);
  clink = g_hash_table_lookup (sink->fd_hash, &fd);
  if (clink != NULL) {
    GstTCPClient *client = (GstTCPClient *) clink->data;

    if (client->status != GST_CLIENT_STATUS_OK) {
      GST_INFO_OBJECT (sink,
          "[fd %5d] Client already disconnecting with status %d",
          fd, client->status);
      goto done;
    }

    /* take the position of the client as the number of buffers left to flush.
     * If the client was at position -1, we flush 0 buffers, 0 == flush 1
     * buffer, etc... */
    client->flushcount = client->bufpos + 1;
    /* mark client as flushing. We can not remove the client right away because
     * it might have some buffers to flush in the ->sending queue. */
    client->status = GST_CLIENT_STATUS_FLUSHING;
  } else {
    GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
  }
done:
  CLIENTS_UNLOCK (sink);
}

1001
1002
/* can be called both through the signal (i.e. from any thread) or when 
 * stopping, after the writing thread has shut down */
1003
void
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
1004
gst_multi_fd_sink_clear (GstMultiFdSink * sink)
1005
{
1006
  GList *clients, *next;
1007
  guint32 cookie;
1008

1009
1010
  GST_DEBUG_OBJECT (sink, "clearing all clients");

Wim Taymans's avatar
Wim Taymans committed
1011
  CLIENTS_LOCK (sink);
1012
1013
restart:
  cookie = sink->clients_cookie;
1014
  for (clients = sink->clients; clients; clients = next) {
1015
1016
    GstTCPClient *client;

1017
1018
1019
1020
1021
    if (cookie != sink->clients_cookie) {
      GST_DEBUG_OBJECT (sink, "cookie changed while removing all clients");
      goto restart;
    }

1022
1023
1024
1025
    client = (GstTCPClient *) clients->data;
    next = g_list_next (clients);

    client->status = GST_CLIENT_STATUS_REMOVED;
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
1026
    gst_multi_fd_sink_remove_client_link (sink, clients);