gstmultiudpsink.c 55.3 KB
Newer Older
1
/* GStreamer
2
 * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3
 * Copyright (C) <2009> Jarkko Palviainen <jarkko.palviainen@sesca.com>
4 5
 * Copyright (C) <2012> Collabora Ltd.
 *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 7 8 9 10 11 12 13 14 15 16 17 18
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
Tim-Philipp Müller's avatar
Tim-Philipp Müller committed
19 20
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
21 22
 */

23
/**
24
 * SECTION:element-multiudpsink
25 26 27 28 29 30 31
 * @see_also: udpsink, multifdsink
 *
 * multiudpsink is a network sink that sends UDP packets to multiple
 * clients.
 * It can be combined with rtp payload encoders to implement RTP streaming.
 */

32 33 34 35 36
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstmultiudpsink.h"

37
#include <string.h>
Raimo Järvi's avatar
Raimo Järvi committed
38
#ifdef HAVE_SYS_SOCKET_H
39
#include <sys/socket.h>
Raimo Järvi's avatar
Raimo Järvi committed
40
#endif
41

42 43 44 45
#ifndef G_OS_WIN32
#include <netinet/in.h>
#endif

46 47
#include "gst/glib-compat-private.h"

48
GST_DEBUG_CATEGORY_STATIC (multiudpsink_debug);
49 50
#define GST_CAT_DEFAULT (multiudpsink_debug)

51 52
#define UDP_MAX_SIZE 65507

53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

/* MultiUDPSink signals and args */
enum
{
  /* methods */
  SIGNAL_ADD,
  SIGNAL_REMOVE,
  SIGNAL_CLEAR,
  SIGNAL_GET_STATS,

  /* signals */
  SIGNAL_CLIENT_ADDED,
  SIGNAL_CLIENT_REMOVED,

  /* FILL ME */
  LAST_SIGNAL
};

75 76 77
#define DEFAULT_SOCKET             NULL
#define DEFAULT_CLOSE_SOCKET       TRUE
#define DEFAULT_USED_SOCKET        NULL
78
#define DEFAULT_CLIENTS            NULL
79 80 81
/* FIXME, this should be disabled by default, we don't need to join a multicast
 * group for sending, if this socket is also used for receiving, it should
 * be configured in the element that does the receive. */
82
#define DEFAULT_AUTO_MULTICAST     TRUE
83
#define DEFAULT_MULTICAST_IFACE    NULL
84
#define DEFAULT_TTL                64
85
#define DEFAULT_TTL_MC             1
86
#define DEFAULT_LOOP               TRUE
87
#define DEFAULT_FORCE_IPV4         FALSE
88
#define DEFAULT_QOS_DSCP           -1
89
#define DEFAULT_SEND_DUPLICATES    TRUE
90
#define DEFAULT_BUFFER_SIZE        0
91 92
#define DEFAULT_BIND_ADDRESS       NULL
#define DEFAULT_BIND_PORT          0
93

94 95 96
enum
{
  PROP_0,
97 98
  PROP_BYTES_TO_SERVE,
  PROP_BYTES_SERVED,
99
  PROP_SOCKET,
100
  PROP_SOCKET_V6,
101 102
  PROP_CLOSE_SOCKET,
  PROP_USED_SOCKET,
103
  PROP_USED_SOCKET_V6,
104
  PROP_CLIENTS,
105
  PROP_AUTO_MULTICAST,
106
  PROP_MULTICAST_IFACE,
107
  PROP_TTL,
108
  PROP_TTL_MC,
109
  PROP_LOOP,
110
  PROP_FORCE_IPV4,
111
  PROP_QOS_DSCP,
112
  PROP_SEND_DUPLICATES,
113
  PROP_BUFFER_SIZE,
114
  PROP_BIND_ADDRESS,
115
  PROP_BIND_PORT
116 117 118 119 120 121
};

static void gst_multiudpsink_finalize (GObject * object);

static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink,
    GstBuffer * buffer);
122 123
static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink,
    GstBufferList * buffer_list);
124 125 126 127 128

static gboolean gst_multiudpsink_start (GstBaseSink * bsink);
static gboolean gst_multiudpsink_stop (GstBaseSink * bsink);
static gboolean gst_multiudpsink_unlock (GstBaseSink * bsink);
static gboolean gst_multiudpsink_unlock_stop (GstBaseSink * bsink);
129 130 131 132 133 134

static void gst_multiudpsink_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec);
static void gst_multiudpsink_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec);

135 136 137 138 139
static void gst_multiudpsink_add_internal (GstMultiUDPSink * sink,
    const gchar * host, gint port, gboolean lock);
static void gst_multiudpsink_clear_internal (GstMultiUDPSink * sink,
    gboolean lock);

140 141
static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 };

Wim Taymans's avatar
Wim Taymans committed
142 143
#define gst_multiudpsink_parent_class parent_class
G_DEFINE_TYPE (GstMultiUDPSink, gst_multiudpsink, GST_TYPE_BASE_SINK);
144 145

static void
146
gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
147 148 149 150 151 152 153 154 155 156 157 158 159
{
  GObjectClass *gobject_class;
  GstElementClass *gstelement_class;
  GstBaseSinkClass *gstbasesink_class;

  gobject_class = (GObjectClass *) klass;
  gstelement_class = (GstElementClass *) klass;
  gstbasesink_class = (GstBaseSinkClass *) klass;

  gobject_class->set_property = gst_multiudpsink_set_property;
  gobject_class->get_property = gst_multiudpsink_get_property;
  gobject_class->finalize = gst_multiudpsink_finalize;

160 161 162 163 164
  /**
   * GstMultiUDPSink::add:
   * @gstmultiudpsink: the sink on which the signal is emitted
   * @host: the hostname/IP address of the client to add
   * @port: the port of the client to add
165 166
   *
   * Add a client with destination @host and @port to the list of
167 168 169 170 171 172 173
   * clients. When the same host/port pair is added multiple times, the
   * send-duplicates property defines if the packets are sent multiple times to
   * the same host/port pair or not.
   *
   * When a host/port pair is added multiple times, an equal amount of remove
   * calls must be performed to actually remove the host/port pair from the list
   * of destinations.
174
   */
175
  gst_multiudpsink_signals[SIGNAL_ADD] =
176
      g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
177
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
178
      G_STRUCT_OFFSET (GstMultiUDPSinkClass, add),
179
      NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
180
      G_TYPE_STRING, G_TYPE_INT);
181 182 183 184 185
  /**
   * GstMultiUDPSink::remove:
   * @gstmultiudpsink: the sink on which the signal is emitted
   * @host: the hostname/IP address of the client to remove
   * @port: the port of the client to remove
186 187 188
   *
   * Remove the client with destination @host and @port from the list of
   * clients.
189
   */
190
  gst_multiudpsink_signals[SIGNAL_REMOVE] =
191
      g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
192
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
193
      G_STRUCT_OFFSET (GstMultiUDPSinkClass, remove),
194
      NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
195
      G_TYPE_STRING, G_TYPE_INT);
196 197 198 199 200 201
  /**
   * GstMultiUDPSink::clear:
   * @gstmultiudpsink: the sink on which the signal is emitted
   *
   * Clear the list of clients.
   */
202
  gst_multiudpsink_signals[SIGNAL_CLEAR] =
203
      g_signal_new ("clear", G_TYPE_FROM_CLASS (klass),
204
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
205
      G_STRUCT_OFFSET (GstMultiUDPSinkClass, clear),
206
      NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0);
207 208 209 210 211 212
  /**
   * GstMultiUDPSink::get-stats:
   * @gstmultiudpsink: the sink on which the signal is emitted
   * @host: the hostname/IP address of the client to get stats on
   * @port: the port of the client to get stats on
   *
213 214
   * Get the statistics of the client with destination @host and @port.
   *
215
   * Returns: a GstStructure: bytes_sent, packets_sent,
216 217
   *           connect_time (in epoch seconds), disconnect_time (in epoch seconds)
   */
218
  gst_multiudpsink_signals[SIGNAL_GET_STATS] =
219
      g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
220
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
221
      G_STRUCT_OFFSET (GstMultiUDPSinkClass, get_stats),
222
      NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_STRUCTURE, 2,
223
      G_TYPE_STRING, G_TYPE_INT);
224 225 226 227 228
  /**
   * GstMultiUDPSink::client-added:
   * @gstmultiudpsink: the sink emitting the signal
   * @host: the hostname/IP address of the added client
   * @port: the port of the added client
229 230 231
   *
   * Signal emited when a new client is added to the list of
   * clients.
232
   */
233 234
  gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED] =
      g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
235
      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass, client_added),
236
      NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
237
      G_TYPE_STRING, G_TYPE_INT);
238 239 240 241 242
  /**
   * GstMultiUDPSink::client-removed:
   * @gstmultiudpsink: the sink emitting the signal
   * @host: the hostname/IP address of the removed client
   * @port: the port of the removed client
243 244 245
   *
   * Signal emited when a client is removed from the list of
   * clients.
246
   */
247 248
  gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED] =
      g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
249
      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass,
250
          client_removed), NULL, NULL, g_cclosure_marshal_generic,
251 252
      G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);

253 254 255
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_TO_SERVE,
      g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
          "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
256
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
257 258
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED,
      g_param_spec_uint64 ("bytes-served", "Bytes served",
259
          "Total number of bytes sent to all clients", 0, G_MAXUINT64, 0,
260
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
261 262 263 264
  g_object_class_install_property (gobject_class, PROP_SOCKET,
      g_param_spec_object ("socket", "Socket Handle",
          "Socket to use for UDP sending. (NULL == allocate)",
          G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265 266 267 268
  g_object_class_install_property (gobject_class, PROP_SOCKET_V6,
      g_param_spec_object ("socket-v6", "Socket Handle IPv6",
          "Socket to use for UDPv6 sending. (NULL == allocate)",
          G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
269 270 271 272 273 274 275 276
  g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
      g_param_spec_boolean ("close-socket", "Close socket",
          "Close socket if passed as property on state change",
          DEFAULT_CLOSE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_USED_SOCKET,
      g_param_spec_object ("used-socket", "Used Socket Handle",
          "Socket currently in use for UDP sending. (NULL == no socket)",
          G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
277 278 279 280
  g_object_class_install_property (gobject_class, PROP_USED_SOCKET_V6,
      g_param_spec_object ("used-socket-v6", "Used Socket Handle IPv6",
          "Socket currently in use for UDPv6 sending. (NULL == no socket)",
          G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
281 282 283
  g_object_class_install_property (gobject_class, PROP_CLIENTS,
      g_param_spec_string ("clients", "Clients",
          "A comma separated list of host:port pairs with destinations",
284
          DEFAULT_CLIENTS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
285 286 287 288
  g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
      g_param_spec_boolean ("auto-multicast",
          "Automatically join/leave multicast groups",
          "Automatically join/leave the multicast groups, FALSE means user"
289 290
          " has to do it himself", DEFAULT_AUTO_MULTICAST,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
291 292 293 294
  g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
      g_param_spec_string ("multicast-iface", "Multicast Interface",
          "The network interface on which to join the multicast group",
          DEFAULT_MULTICAST_IFACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
295
  g_object_class_install_property (gobject_class, PROP_TTL,
296 297
      g_param_spec_int ("ttl", "Unicast TTL",
          "Used for setting the unicast TTL parameter",
298
          0, 255, DEFAULT_TTL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
299 300 301
  g_object_class_install_property (gobject_class, PROP_TTL_MC,
      g_param_spec_int ("ttl-mc", "Multicast TTL",
          "Used for setting the multicast TTL parameter",
302
          0, 255, DEFAULT_TTL_MC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
303 304 305
  g_object_class_install_property (gobject_class, PROP_LOOP,
      g_param_spec_boolean ("loop", "Multicast Loopback",
          "Used for setting the multicast loop parameter. TRUE = enable,"
306 307
          " FALSE = disable", DEFAULT_LOOP,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308
  /**
309
   * GstMultiUDPSink::force-ipv4:
310 311 312 313 314
   *
   * Force the use of an IPv4 socket.
   *
   * Since: 1.0.2
   */
315
#ifndef GST_REMOVE_DEPRECATED
316 317
  g_object_class_install_property (gobject_class, PROP_FORCE_IPV4,
      g_param_spec_boolean ("force-ipv4", "Force IPv4",
318
          "Forcing the use of an IPv4 socket (DEPRECATED, has no effect anymore)",
319 320 321
          DEFAULT_FORCE_IPV4,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
#endif
322
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_QOS_DSCP,
323 324
      g_param_spec_int ("qos-dscp", "QoS diff srv code point",
          "Quality of Service, differentiated services code point (-1 default)",
325 326
          -1, 63, DEFAULT_QOS_DSCP,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
327
  /**
328
   * GstMultiUDPSink::send-duplicates:
329 330 331 332 333 334 335 336
   *
   * When a host/port pair is added mutliple times, send the packet to the host
   * multiple times as well.
   */
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEND_DUPLICATES,
      g_param_spec_boolean ("send-duplicates", "Send Duplicates",
          "When a distination/port pair is added multiple times, send packets "
          "multiple times as well", DEFAULT_SEND_DUPLICATES,
337
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
338

339 340 341 342 343
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
      g_param_spec_int ("buffer-size", "Buffer Size",
          "Size of the kernel send buffer in bytes, 0=default", 0, G_MAXINT,
          DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

344 345 346 347 348 349 350 351 352
  g_object_class_install_property (gobject_class, PROP_BIND_ADDRESS,
      g_param_spec_string ("bind-address", "Bind Address",
          "Address to bind the socket to", DEFAULT_BIND_ADDRESS,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_BIND_PORT,
      g_param_spec_int ("bind-port", "Bind Port",
          "Port to bind the socket to", 0, G_MAXUINT16,
          DEFAULT_BIND_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

Wim Taymans's avatar
Wim Taymans committed
353 354 355
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&sink_template));

356
  gst_element_class_set_static_metadata (gstelement_class, "UDP packet sender",
Wim Taymans's avatar
Wim Taymans committed
357
      "Sink/Network",
358 359
      "Send data over the network via UDP to one or multiple recipients "
      "which can be added or removed at runtime using action signals",
Wim Taymans's avatar
Wim Taymans committed
360 361
      "Wim Taymans <wim.taymans@gmail.com>");

362
  gstbasesink_class->render = gst_multiudpsink_render;
363
  gstbasesink_class->render_list = gst_multiudpsink_render_list;
364 365 366 367
  gstbasesink_class->start = gst_multiudpsink_start;
  gstbasesink_class->stop = gst_multiudpsink_stop;
  gstbasesink_class->unlock = gst_multiudpsink_unlock;
  gstbasesink_class->unlock_stop = gst_multiudpsink_unlock_stop;
368 369 370 371
  klass->add = gst_multiudpsink_add;
  klass->remove = gst_multiudpsink_remove;
  klass->clear = gst_multiudpsink_clear;
  klass->get_stats = gst_multiudpsink_get_stats;
372 373 374 375

  GST_DEBUG_CATEGORY_INIT (multiudpsink_debug, "multiudpsink", 0, "UDP sink");
}

376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
static void
gst_multiudpsink_create_cancellable (GstMultiUDPSink * sink)
{
  GPollFD pollfd;

  sink->cancellable = g_cancellable_new ();
  sink->made_cancel_fd = g_cancellable_make_pollfd (sink->cancellable, &pollfd);
}

static void
gst_multiudpsink_free_cancellable (GstMultiUDPSink * sink)
{
  if (sink->made_cancel_fd) {
    g_cancellable_release_fd (sink->cancellable);
    sink->made_cancel_fd = FALSE;
  }
  g_object_unref (sink->cancellable);
  sink->cancellable = NULL;
}

396 397 398
static void
gst_multiudpsink_init (GstMultiUDPSink * sink)
{
399 400
  guint max_mem;

Wim Taymans's avatar
Wim Taymans committed
401
  g_mutex_init (&sink->client_lock);
402 403 404 405 406 407
  sink->clients = NULL;
  sink->num_v4_unique = 0;
  sink->num_v4_all = 0;
  sink->num_v6_unique = 0;
  sink->num_v6_all = 0;

408
  sink->socket = DEFAULT_SOCKET;
409
  sink->socket_v6 = DEFAULT_SOCKET;
410
  sink->used_socket = DEFAULT_USED_SOCKET;
411
  sink->used_socket_v6 = DEFAULT_USED_SOCKET;
412 413
  sink->close_socket = DEFAULT_CLOSE_SOCKET;
  sink->external_socket = (sink->socket != NULL);
414
  sink->auto_multicast = DEFAULT_AUTO_MULTICAST;
415
  sink->ttl = DEFAULT_TTL;
416
  sink->ttl_mc = DEFAULT_TTL_MC;
417
  sink->loop = DEFAULT_LOOP;
418
  sink->force_ipv4 = DEFAULT_FORCE_IPV4;
419
  sink->qos_dscp = DEFAULT_QOS_DSCP;
420
  sink->send_duplicates = DEFAULT_SEND_DUPLICATES;
421
  sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
422

423
  gst_multiudpsink_create_cancellable (sink);
424

425 426
  /* pre-allocate OutputVector, MapInfo and OutputMessage arrays
   * for use in the render and render_list functions */
427 428
  max_mem = gst_buffer_get_max_memory ();

429 430 431 432 433 434 435 436
  sink->n_vecs = max_mem;
  sink->vecs = g_new (GOutputVector, sink->n_vecs);

  sink->n_maps = max_mem;
  sink->maps = g_new (GstMapInfo, sink->n_maps);

  sink->n_messages = 1;
  sink->messages = g_new (GstOutputMessage, sink->n_messages);
437 438 439

  /* we assume that the number of memories per buffer can fit into a guint8 */
  g_warn_if_fail (max_mem <= G_MAXUINT8);
440 441 442
}

static GstUDPClient *
443
gst_udp_client_new (GstMultiUDPSink * sink, const gchar * host, gint port)
444 445
{
  GstUDPClient *client;
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
  GInetAddress *addr;
  GResolver *resolver;
  GError *err = NULL;

  addr = g_inet_address_new_from_string (host);
  if (!addr) {
    GList *results;

    resolver = g_resolver_get_default ();
    results =
        g_resolver_lookup_by_name (resolver, host, sink->cancellable, &err);
    if (!results)
      goto name_resolve;
    addr = G_INET_ADDRESS (g_object_ref (results->data));

    g_resolver_free_addresses (results);
    g_object_unref (resolver);
  }
#ifndef GST_DISABLE_GST_DEBUG
  {
    gchar *ip = g_inet_address_to_string (addr);

    GST_DEBUG_OBJECT (sink, "IP address for host %s is %s", host, ip);
    g_free (ip);
  }
#endif
472 473

  client = g_slice_new0 (GstUDPClient);
474 475
  client->ref_count = 1;
  client->add_count = 0;
476 477
  client->host = g_strdup (host);
  client->port = port;
478 479
  client->addr = g_inet_socket_address_new (addr, port);
  g_object_unref (addr);
480 481

  return client;
482 483 484 485 486 487 488

name_resolve:
  {
    g_object_unref (resolver);

    return NULL;
  }
489 490
}

491
/* call with client lock held */
492
static void
493
gst_udp_client_unref (GstUDPClient * client)
494
{
495 496 497 498 499 500 501 502 503 504 505 506 507
  if (--client->ref_count == 0) {
    g_object_unref (client->addr);
    g_free (client->host);
    g_slice_free (GstUDPClient, client);
  }
}

/* call with client lock held */
static inline GstUDPClient *
gst_udp_client_ref (GstUDPClient * client)
{
  ++client->ref_count;
  return client;
508 509 510 511 512 513 514 515 516
}

static gint
client_compare (GstUDPClient * a, GstUDPClient * b)
{
  if ((a->port == b->port) && (strcmp (a->host, b->host) == 0))
    return 0;

  return 1;
517 518 519 520 521 522 523 524 525
}

static void
gst_multiudpsink_finalize (GObject * object)
{
  GstMultiUDPSink *sink;

  sink = GST_MULTIUDPSINK (object);

526
  g_list_foreach (sink->clients, (GFunc) gst_udp_client_unref, NULL);
527 528
  g_list_free (sink->clients);

529 530 531
  if (sink->socket)
    g_object_unref (sink->socket);
  sink->socket = NULL;
532

533 534 535 536
  if (sink->socket_v6)
    g_object_unref (sink->socket_v6);
  sink->socket_v6 = NULL;

537 538 539
  if (sink->used_socket)
    g_object_unref (sink->used_socket);
  sink->used_socket = NULL;
540

541 542 543 544
  if (sink->used_socket_v6)
    g_object_unref (sink->used_socket_v6);
  sink->used_socket_v6 = NULL;

545
  gst_multiudpsink_free_cancellable (sink);
546

547 548 549
  g_free (sink->multi_iface);
  sink->multi_iface = NULL;

550 551 552 553 554 555
  g_free (sink->vecs);
  sink->vecs = NULL;
  g_free (sink->maps);
  sink->maps = NULL;
  g_free (sink->messages);
  sink->messages = NULL;
556 557 558

  g_free (sink->bind_address);
  sink->bind_address = NULL;
559

Wim Taymans's avatar
Wim Taymans committed
560
  g_mutex_clear (&sink->client_lock);
561

562
  G_OBJECT_CLASS (parent_class)->finalize (object);
563 564
}

565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791
/* replacement until we can depend unconditionally on the real one in GLib */
#ifndef HAVE_G_SOCKET_SEND_MESSAGES
#define g_socket_send_messages gst_socket_send_messages

static gint
gst_socket_send_messages (GSocket * socket, GstOutputMessage * messages,
    guint num_messages, gint flags, GCancellable * cancellable, GError ** error)
{
  gssize result;
  gint i;

  for (i = 0; i < num_messages; ++i) {
    GstOutputMessage *msg = &messages[i];
    GError *msg_error = NULL;

    result = g_socket_send_message (socket, msg->address,
        msg->vectors, msg->num_vectors,
        msg->control_messages, msg->num_control_messages,
        flags, cancellable, &msg_error);

    if (result < 0) {
      /* if we couldn't send all messages, just return how many we did
       * manage to send, provided we managed to send at least one */
      if (msg_error->code == G_IO_ERROR_WOULD_BLOCK && i > 0) {
        g_error_free (msg_error);
        return i;
      } else {
        g_propagate_error (error, msg_error);
        return -1;
      }
    }

    msg->bytes_sent = result;
  }

  return i;
}
#endif /* HAVE_G_SOCKET_SEND_MESSAGES */

static gsize
fill_vectors (GOutputVector * vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
{
  GstMemory *mem;
  gsize size = 0;
  guint i;

  g_assert (gst_buffer_n_memory (buf) == n);

  for (i = 0; i < n; ++i) {
    mem = gst_buffer_peek_memory (buf, i);
    if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
      vecs[i].buffer = maps[i].data;
      vecs[i].size = maps[i].size;
    } else {
      GST_WARNING ("Failed to map memory %p for reading", mem);
      vecs[i].buffer = "";
      vecs[i].size = 0;
    }
    size += vecs[i].size;
  }

  return size;
}

static gsize
gst_udp_calc_message_size (GstOutputMessage * msg)
{
  gsize size = 0;
  guint i;

  for (i = 0; i < msg->num_vectors; ++i)
    size += msg->vectors[i].size;

  return size;
}

static gint
gst_udp_messsages_find_first_not_sent (GstOutputMessage * messages,
    guint num_messages)
{
  guint i;

  for (i = 0; i < num_messages; ++i) {
    GstOutputMessage *msg = &messages[i];

    if (msg->bytes_sent == 0 && gst_udp_calc_message_size (msg) > 0)
      return i;
  }

  return -1;
}

static inline gchar *
gst_udp_address_get_string (GSocketAddress * addr, gchar * s, gsize size)
{
  GInetSocketAddress *isa = G_INET_SOCKET_ADDRESS (addr);
  GInetAddress *ia;
  gchar *addr_str;

  ia = g_inet_socket_address_get_address (isa);
  addr_str = g_inet_address_to_string (ia);
  g_snprintf (s, size, "%s:%u", addr_str, g_inet_socket_address_get_port (isa));
  g_free (addr_str);

  return s;
}

/* Wrapper around g_socket_send_messages() plus error handling (ignoring).
 * Returns FALSE if we got cancelled, otherwise TRUE. */
static gboolean
gst_multiudpsink_send_messages (GstMultiUDPSink * sink, GSocket * socket,
    GstOutputMessage * messages, guint num_messages)
{
  gboolean sent_max_size_warning = FALSE;

  while (num_messages > 0) {
    gchar astr[64] G_GNUC_UNUSED;
    GError *err = NULL;
    guint msg_size, skip, i;
    gint ret, err_idx;

    ret = g_socket_send_messages (socket, messages, num_messages, 0,
        sink->cancellable, &err);

    if (G_UNLIKELY (ret < 0)) {
      GstOutputMessage *msg;

      if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
        g_clear_error (&err);
        return FALSE;
      }

      err_idx = gst_udp_messsages_find_first_not_sent (messages, num_messages);
      if (err_idx < 0)
        break;

      msg = &messages[err_idx];
      msg_size = gst_udp_calc_message_size (msg);

      GST_LOG_OBJECT (sink, "error sending %u bytes to client %s: %s", msg_size,
          gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
          err->message);

      skip = 1;
      if (msg_size > UDP_MAX_SIZE) {
        if (!sent_max_size_warning) {
          GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
              ("Attempting to send a UDP packets larger than maximum size "
                  "(%u > %d)", msg_size, UDP_MAX_SIZE),
              ("Reason: %s", err ? err->message : "unknown reason"));
          sent_max_size_warning = FALSE;
        }
      } else {
        GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
            ("Error sending UDP packets"), ("client %s, reason: %s",
                gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
                (err != NULL) ? err->message : "unknown reason"));

        for (i = err_idx + 1; i < num_messages; ++i, ++skip) {
          if (messages[i].address != msg->address)
            break;
        }
        GST_DEBUG_OBJECT (sink, "skipping %d message(s) to same client", skip);
      }

      /* ignore any errors and try sending the rest */
      g_clear_error (&err);
      ret = skip;
    }

    g_assert (ret <= num_messages);

    messages += ret;
    num_messages -= ret;
  }

  return TRUE;
}

static GstFlowReturn
gst_multiudpsink_render_buffers (GstMultiUDPSink * sink, GstBuffer ** buffers,
    guint num_buffers, guint8 * mem_nums, guint total_mem_num)
{
  GstOutputMessage *msgs;
  gboolean send_duplicates;
  GstUDPClient **clients;
  GOutputVector *vecs;
  GstMapInfo *map_infos;
  GstFlowReturn flow_ret;
  guint num_addr_v4, num_addr_v6;
  guint num_addr, num_msgs;
  GError *err = NULL;
  guint i, j, mem;
  gsize size = 0;
  GList *l;

  send_duplicates = sink->send_duplicates;

  g_mutex_lock (&sink->client_lock);

  if (send_duplicates) {
    num_addr_v4 = sink->num_v4_all;
    num_addr_v6 = sink->num_v6_all;
  } else {
    num_addr_v4 = sink->num_v4_unique;
    num_addr_v6 = sink->num_v6_unique;
  }
  num_addr = num_addr_v4 + num_addr_v6;

  if (num_addr == 0)
    goto no_clients;

  clients = g_newa (GstUDPClient *, num_addr);
  for (l = sink->clients, i = 0; l != NULL; l = l->next) {
    GstUDPClient *client = l->data;

    clients[i++] = gst_udp_client_ref (client);
    for (j = 1; send_duplicates && j < client->add_count; ++j)
      clients[i++] = gst_udp_client_ref (client);
  }
  g_assert_cmpuint (i, ==, num_addr);

  g_mutex_unlock (&sink->client_lock);

  GST_LOG_OBJECT (sink, "%u buffers, %u memories -> to be sent to %u clients",
      num_buffers, total_mem_num, num_addr);

792 793 794 795 796 797 798 799 800 801 802 803 804 805
  /* ensure our pre-allocated scratch space arrays are large enough */
  if (sink->n_vecs < total_mem_num) {
    sink->n_vecs = GST_ROUND_UP_16 (total_mem_num);
    g_free (sink->vecs);
    sink->vecs = g_new (GOutputVector, sink->n_vecs);
  }
  vecs = sink->vecs;

  if (sink->n_maps < total_mem_num) {
    sink->n_maps = GST_ROUND_UP_16 (total_mem_num);
    g_free (sink->maps);
    sink->maps = g_new (GstMapInfo, sink->n_maps);
  }
  map_infos = sink->maps;
806 807

  num_msgs = num_addr * num_buffers;
808 809 810 811 812 813
  if (sink->n_messages < num_msgs) {
    sink->n_messages = GST_ROUND_UP_16 (num_msgs);
    g_free (sink->messages);
    sink->messages = g_new (GstOutputMessage, sink->n_messages);
  }
  msgs = sink->messages;
814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849

  /* populate first num_buffers messages with output vectors for the buffers */
  for (i = 0, mem = 0; i < num_buffers; ++i) {
    size += fill_vectors (&vecs[mem], &map_infos[mem], mem_nums[i], buffers[i]);
    msgs[i].vectors = &vecs[mem];
    msgs[i].num_vectors = mem_nums[i];
    msgs[i].num_control_messages = 0;
    msgs[i].control_messages = NULL;
    msgs[i].address = clients[0]->addr;
    mem += mem_nums[i];
  }

  /* FIXME: how about some locking? (there wasn't any before either, but..) */
  sink->bytes_to_serve += size;

  /* now copy the pre-filled num_buffer messages over to the next num_buffer
   * messages for the next client, where we also change the target adddress */
  for (i = 1; i < num_addr; ++i) {
    for (j = 0; j < num_buffers; ++j) {
      msgs[i * num_buffers + j] = msgs[j];
      msgs[i * num_buffers + j].address = clients[i]->addr;
    }
  }

  /* now send it! */
  {
    gboolean ret;

    /* no IPv4 socket? Send it all from the IPv6 socket then.. */
    if (sink->used_socket == NULL) {
      ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
          msgs, num_msgs);
    } else {
      guint num_msgs_v4 = num_buffers * num_addr_v4;
      guint num_msgs_v6 = num_buffers * num_addr_v6;

850
      /* our client list is sorted with IPv4 clients first and IPv6 ones last */
851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891
      ret = gst_multiudpsink_send_messages (sink, sink->used_socket,
          msgs, num_msgs_v4);

      if (!ret)
        goto cancelled;

      ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
          msgs + num_msgs_v4, num_msgs_v6);
    }

    if (!ret)
      goto cancelled;
  }

  flow_ret = GST_FLOW_OK;

  /* now update stats */
  g_mutex_lock (&sink->client_lock);

  for (i = 0; i < num_addr; ++i) {
    GstUDPClient *client = clients[i];

    for (j = 0; j < num_buffers; ++j) {
      gsize bytes_sent;

      bytes_sent = msgs[i * num_buffers + j].bytes_sent;

      client->bytes_sent += bytes_sent;
      client->packets_sent++;
      sink->bytes_served += bytes_sent;
    }
    gst_udp_client_unref (client);
  }

  g_mutex_unlock (&sink->client_lock);

out:

  for (i = 0; i < mem; ++i)
    gst_memory_unmap (map_infos[i].memory, &map_infos[i]);

Tim-Philipp Müller's avatar
Tim-Philipp Müller committed
892
  return flow_ret;
893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949

no_clients:
  {
    g_mutex_unlock (&sink->client_lock);
    GST_LOG_OBJECT (sink, "no clients");
    return GST_FLOW_OK;
  }
cancelled:
  {
    GST_INFO_OBJECT (sink, "cancelled");
    g_clear_error (&err);
    flow_ret = GST_FLOW_FLUSHING;

    g_mutex_lock (&sink->client_lock);
    for (i = 0; i < num_addr; ++i)
      gst_udp_client_unref (clients[i]);
    g_mutex_unlock (&sink->client_lock);
    goto out;
  }
}

static GstFlowReturn
gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
{
  GstMultiUDPSink *sink;
  GstBuffer **buffers;
  GstFlowReturn flow;
  guint8 *mem_nums;
  guint total_mems;
  guint i, num_buffers;

  sink = GST_MULTIUDPSINK_CAST (bsink);

  num_buffers = gst_buffer_list_length (buffer_list);
  if (num_buffers == 0)
    goto no_data;

  buffers = g_newa (GstBuffer *, num_buffers);
  mem_nums = g_newa (guint8, num_buffers);
  for (i = 0, total_mems = 0; i < num_buffers; ++i) {
    buffers[i] = gst_buffer_list_get (buffer_list, i);
    mem_nums[i] = gst_buffer_n_memory (buffers[i]);
    total_mems += mem_nums[i];
  }

  flow = gst_multiudpsink_render_buffers (sink, buffers, num_buffers,
      mem_nums, total_mems);

  return flow;

no_data:
  {
    GST_LOG_OBJECT (sink, "empty buffer");
    return GST_FLOW_OK;
  }
}

950
static GstFlowReturn
951
gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
952 953
{
  GstMultiUDPSink *sink;
954 955
  GstFlowReturn flow;
  guint8 n_mem;
956

957
  sink = GST_MULTIUDPSINK_CAST (bsink);
958

959
  n_mem = gst_buffer_n_memory (buffer);
960

961 962 963 964
  if (n_mem > 0)
    flow = gst_multiudpsink_render_buffers (sink, &buffer, 1, &n_mem, n_mem);
  else
    flow = GST_FLOW_OK;
Ognyan Tonchev's avatar
Ognyan Tonchev committed
965

966
  return flow;
967 968
}

969 970 971 972 973 974 975 976 977
static void
gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink,
    const gchar * string)
{
  gchar **clients;
  gint i;

  clients = g_strsplit (string, ",", 0);

Wim Taymans's avatar
Wim Taymans committed
978
  g_mutex_lock (&sink->client_lock);
979 980 981 982
  /* clear all existing clients */
  gst_multiudpsink_clear_internal (sink, FALSE);
  for (i = 0; clients[i]; i++) {
    gchar *host, *p;
983
    gint64 port = 0;
984 985 986 987 988

    host = clients[i];
    p = strstr (clients[i], ":");
    if (p != NULL) {
      *p = '\0';
989
      port = g_ascii_strtoll (p + 1, NULL, 10);
990 991 992 993
    }
    if (port != 0)
      gst_multiudpsink_add_internal (sink, host, port, FALSE);
  }
Wim Taymans's avatar
Wim Taymans committed
994
  g_mutex_unlock (&sink->client_lock);
995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006

  g_strfreev (clients);
}

static gchar *
gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink)
{
  GString *str;
  GList *clients;

  str = g_string_new ("");

Wim Taymans's avatar
Wim Taymans committed
1007
  g_mutex_lock (&sink->client_lock);
1008 1009 1010
  clients = sink->clients;
  while (clients) {
    GstUDPClient *client;
1011
    gint count;
1012 1013 1014 1015 1016

    client = (GstUDPClient *) clients->data;

    clients = g_list_next (clients);

1017
    count = client->add_count;
1018 1019 1020 1021
    while (count--) {
      g_string_append_printf (str, "%s:%d%s", client->host, client->port,
          (clients || count > 1 ? "," : ""));
    }
1022
  }
Wim Taymans's avatar
Wim Taymans committed
1023
  g_mutex_unlock (&sink->client_lock);
1024 1025 1026 1027

  return g_string_free (str, FALSE);
}

1028
static void
1029
gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink, GSocket * socket)
1030
{
1031 1032 1033 1034
  /* don't touch on -1 */
  if (sink->qos_dscp < 0)
    return;

1035
  if (socket == NULL)
1036 1037
    return;

1038 1039 1040 1041 1042
#ifdef IP_TOS
  {
    gint tos;
    gint fd;

1043
    fd = g_socket_get_fd (socket);
1044

1045
    GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp);
1046

1047 1048 1049 1050 1051 1052
    /* Extract and shift 6 bits of DSFIELD */
    tos = (sink->qos_dscp & 0x3f) << 2;

    if (setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) {
      GST_ERROR_OBJECT (sink, "could not set TOS: %s", g_strerror (errno));
    }
1053
#ifdef IPV6_TCLASS
1054 1055 1056
    if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0) {
      GST_ERROR_OBJECT (sink, "could not set TCLASS: %s", g_strerror (errno));
    }
1057
#endif
1058
  }
1059
#endif
1060 1061
}

1062 1063 1064 1065 1066 1067 1068 1069 1070
static void
gst_multiudpsink_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstMultiUDPSink *udpsink;

  udpsink = GST_MULTIUDPSINK (object);

  switch (prop_id) {
1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085
    case PROP_SOCKET:
      if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket &&
          udpsink->close_socket) {
        GError *err = NULL;

        if (!g_socket_close (udpsink->socket, &err)) {
          GST_ERROR ("failed to close socket %p: %s", udpsink->socket,
              err->message);
          g_clear_error (&err);
        }
      }
      if (udpsink->socket)
        g_object_unref (udpsink->socket);
      udpsink->socket = g_value_dup_object (value);
      GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket);
1086
      break;
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103
    case PROP_SOCKET_V6:
      if (udpsink->socket_v6 != NULL
          && udpsink->socket_v6 != udpsink->used_socket_v6
          && udpsink->close_socket) {
        GError *err = NULL;

        if (!g_socket_close (udpsink->socket_v6, &err)) {
          GST_ERROR ("failed to close socket %p: %s", udpsink->socket_v6,
              err->message);
          g_clear_error (&err);
        }
      }
      if (udpsink->socket_v6)
        g_object_unref (udpsink->socket_v6);
      udpsink->socket_v6 = g_value_dup_object (value);
      GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket_v6);
      break;
1104 1105
    case PROP_CLOSE_SOCKET:
      udpsink->close_socket = g_value_get_boolean (value);
1106
      break;
1107 1108 1109
    case PROP_CLIENTS:
      gst_multiudpsink_set_clients_string (udpsink, g_value_get_string (value));
      break;
1110 1111 1112
    case PROP_AUTO_MULTICAST:
      udpsink->auto_multicast = g_value_get_boolean (value);
      break;
1113 1114 1115 1116 1117 1118 1119 1120
    case PROP_MULTICAST_IFACE:
      g_free (udpsink->multi_iface);

      if (g_value_get_string (value) == NULL)
        udpsink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
      else
        udpsink->multi_iface = g_value_dup_string (value);
      break;
1121 1122 1123
    case PROP_TTL:
      udpsink->ttl = g_value_get_int (value);
      break;
1124 1125 1126
    case PROP_TTL_MC:
      udpsink->ttl_mc = g_value_get_int (value);
      break;
1127 1128 1129
    case PROP_LOOP:
      udpsink->loop = g_value_get_boolean (value);
      break;
1130 1131 1132
    case PROP_FORCE_IPV4:
      udpsink->force_ipv4 = g_value_get_boolean (value);
      break;
1133
    case PROP_QOS_DSCP:
1134
      udpsink->qos_dscp = g_value_get_int (value);
1135 1136
      gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket);
      gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket_v6);
1137
      break;
1138 1139 1140
    case PROP_SEND_DUPLICATES:
      udpsink->send_duplicates = g_value_get_boolean (value);
      break;
1141 1142 1143
    case PROP_BUFFER_SIZE:
      udpsink->buffer_size = g_value_get_int (value);
      break;
1144 1145 1146 1147 1148 1149 1150
    case PROP_BIND_ADDRESS:
      g_free (udpsink->bind_address);
      udpsink->bind_address = g_value_dup_string (value);
      break;
    case PROP_BIND_PORT:
      udpsink->bind_port = g_value_get_int (value);
      break;
1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value,
    GParamSpec * pspec)
{
  GstMultiUDPSink *udpsink;

  udpsink = GST_MULTIUDPSINK (object);

  switch (prop_id) {
1166 1167 1168 1169 1170 1171
    case PROP_BYTES_TO_SERVE:
      g_value_set_uint64 (value, udpsink->bytes_to_serve);
      break;
    case PROP_BYTES_SERVED:
      g_value_set_uint64 (value, udpsink->bytes_served);
      break;
1172 1173
    case PROP_SOCKET:
      g_value_set_object (value, udpsink->socket);
1174
      break;
1175 1176 1177
    case PROP_SOCKET_V6:
      g_value_set_object (value, udpsink->socket_v6);
      break;
1178 1179
    case PROP_CLOSE_SOCKET:
      g_value_set_boolean (value, udpsink->close_socket);
1180
      break;
1181 1182
    case PROP_USED_SOCKET:
      g_value_set_object (value, udpsink->used_socket);
1183
      break;
1184 1185 1186
    case PROP_USED_SOCKET_V6:
      g_value_set_object (value, udpsink->used_socket_v6);
      break;
1187 1188 1189 1190
    case PROP_CLIENTS:
      g_value_take_string (value,
          gst_multiudpsink_get_clients_string (udpsink));
      break;
1191 1192 1193
    case PROP_AUTO_MULTICAST:
      g_value_set_boolean (value, udpsink->auto_multicast);
      break;
1194 1195 1196
    case PROP_MULTICAST_IFACE:
      g_value_set_string (value, udpsink->multi_iface);
      break;
1197 1198 1199
    case PROP_TTL:
      g_value_set_int (value, udpsink->ttl);
      break;
1200 1201 1202
    case PROP_TTL_MC:
      g_value_set_int (value, udpsink->ttl_mc);
      break;
1203 1204 1205
    case PROP_LOOP:
      g_value_set_boolean (value, udpsink->loop);
      break;
1206 1207 1208
    case PROP_FORCE_IPV4:
      g_value_set_boolean (value, udpsink->force_ipv4);
      break;
1209 1210 1211
    case PROP_QOS_DSCP:
      g_value_set_int (value, udpsink->qos_dscp);
      break;
1212 1213 1214
    case PROP_SEND_DUPLICATES:
      g_value_set_boolean (value, udpsink->send_duplicates);
      break;
1215 1216 1217
    case PROP_BUFFER_SIZE:
      g_value_set_int (value, udpsink->buffer_size);
      break;
1218 1219 1220 1221 1222 1223
    case PROP_BIND_ADDRESS:
      g_value_set_string (value, udpsink->bind_address);
      break;
    case PROP_BIND_PORT:
      g_value_set_int (value, udpsink->bind_port);
      break;
1224 1225 1226 1227 1228 1229
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

1230 1231 1232 1233
static gboolean
gst_multiudpsink_configure_client (GstMultiUDPSink * sink,
    GstUDPClient * client)
{
1234 1235
  GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
  GInetAddress *addr = g_inet_socket_address_get_address (saddr);
1236 1237
  GSocketFamily family = g_socket_address_get_family (G_SOCKET_ADDRESS (saddr));
  GSocket *socket;
1238 1239
  GError *err = NULL;

1240 1241
  GST_DEBUG_OBJECT (sink, "configuring client %p", client);

1242 1243 1244