Commit 3724af1a authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête
Browse files

agent: Add a ComponentSource to Component

This is a type of GSource which proxies all poll events from the sockets
in a Component. It’s necessary for the implementation of
GPollableInputStream and GPollableOutputStream.

This adds no new external API, but does add ComponentSource and
component_source_new() as new internal API.
parent 5d63de5a
......@@ -416,7 +416,18 @@ component_attach_socket (Component *component, NiceSocket *socket)
g_assert (component->ctx != NULL);
/* Find an existing SocketSource in the component which contains @socket, or
* create a new one. */
* create a new one.
* In order for socket_sources_age to work properly, socket_sources must only
* grow monotonically, or be entirely cleared. i.e. New SocketSources must be
* prepended to socket_sources, and all other existing SocketSource must be
* left untouched; *or* the whole of socket_sources must be cleared. If
* socket_sources is cleared, age is reset to 0 and *must not* be incremented
* again or the new sockets will not be picked up by ComponentSocket. This is
* guaranteed by the fact that socket_sources is only cleared on disconnection
* or discovery failure, which are both unrecoverable states.
* An empty socket_sources corresponds to age 0. */
l = g_slist_find_custom (component->socket_sources, socket,
if (l != NULL) {
......@@ -427,6 +438,7 @@ component_attach_socket (Component *component, NiceSocket *socket)
socket_source->component = component;
component->socket_sources =
g_slist_prepend (component->socket_sources, socket_source);
/* Create and attach a source */
......@@ -509,6 +521,7 @@ component_free_socket_sources (Component *component)
g_slist_free_full (component->socket_sources,
(GDestroyNotify) socket_source_free);
component->socket_sources = NULL;
component->socket_sources_age = 0;
GMainContext *
......@@ -775,3 +788,196 @@ component_deschedule_io_callback (Component *component)
g_source_remove (component->io_callback_id);
component->io_callback_id = 0;
* ComponentSource:
* This is a GSource which wraps a single Component and is dispatched whenever
* any of its NiceSockets are dispatched, i.e. it proxies all poll() events for
* every socket in the Component. It is designed for use by GPollableInputStream
* and GPollableOutputStream, so that a Component can be incorporated into a
* custom main context iteration.
* The callbacks dispatched by a ComponentSource have type GPollableSourceFunc.
* ComponentSource supports adding a GCancellable child source which will
* additionally dispatch if a provided GCancellable is cancelled.
* Internally, ComponentSource adds a new GSocketSource for each socket in the
* Component. Changes to the Component’s list of sockets are detected on each
* call to component_source_prepare(), which compares a stored age with the
* current age of the Component’s socket list — if the socket list has changed,
* the age will have increased (indicating added sockets) or will have been
* reset to 0 (indicating all sockets have been closed).
typedef struct {
GSource parent;
GObject *pollable_stream; /* owned */
GIOCondition condition;
Component *component; /* unowned */
guint component_socket_sources_age;
} ComponentSource;
static gboolean
component_source_prepare (GSource *source, gint *timeout_)
ComponentSource *component_source = (ComponentSource *) source;
gint age_diff;
/* Needed due to accessing the Component. */
agent_lock ();
age_diff =
component_source->component->socket_sources_age -
/* If the age has changed, either:
* • a new socket has been *prepended* to component->socket_sources (and
* age_diff > 0); or
* • component->socket_sources has been emptied (and age_diff < 0).
* We can’t remove any child sources without destroying them, so must
* monotonically add new ones, or remove everything.
* Removing everything only happens on shutdown or failure, in which case
* the ComponentSource itself can be destroyed, automatically destroying all
* the child sources. */
if (age_diff < 0) {
g_source_destroy (source);
} else if (age_diff > 0) {
/* Add the new child sources. The difference between the two ages gives
* the number of new child sources. */
guint i;
GSList *l;
for (i = 0, l = component_source->component->socket_sources;
i < (guint) age_diff && l != NULL;
i++, l = l->next) {
GSource *child_source;
SocketSource *socket_source;
socket_source = l->data;
child_source = g_socket_create_source (socket_source->socket->fileno,
component_source->condition, NULL);
g_source_set_dummy_callback (child_source);
g_source_add_child_source (source, child_source);
g_source_unref (child_source);
/* Update the age. */
component_source->component_socket_sources_age =
agent_unlock ();
/* We can’t be sure if the ComponentSource itself needs to be dispatched until
* poll() is called on all the child sources. */
return FALSE;
static gboolean
component_source_dispatch (GSource *source, GSourceFunc callback,
gpointer user_data)
ComponentSource *component_source = (ComponentSource *) source;
GPollableSourceFunc func = (GPollableSourceFunc) callback;
return func (component_source->pollable_stream, user_data);
static void
component_source_finalize (GSource *source)
ComponentSource *component_source = (ComponentSource *) source;
g_object_unref (component_source->pollable_stream);
component_source->pollable_stream = NULL;
static gboolean
component_source_closure_callback (GObject *pollable_stream, gpointer user_data)
GClosure *closure = user_data;
GValue param_value = G_VALUE_INIT;
GValue result_value = G_VALUE_INIT;
gboolean retval;
g_value_init (&result_value, G_TYPE_BOOLEAN);
g_value_init (&param_value, G_TYPE_OBJECT);
g_value_set_object (&param_value, pollable_stream);
g_closure_invoke (closure, &result_value, 1, &param_value, NULL);
retval = g_value_get_boolean (&result_value);
g_value_unset (&param_value);
g_value_unset (&result_value);
return retval;
static GSourceFuncs component_source_funcs = {
NULL, /* check */
(GSourceFunc) component_source_closure_callback,
* component_source_new:
* @component: a #Component
* @pollable_stream: a #GPollableInputStream or #GPollableOutputStream to pass
* to dispatched callbacks
* @condition: the I/O condition to poll for (e.g. %G_IO_IN or %G_IO_OUT)
* @cancellable: (allow-none): a #GCancellable, or %NULL
* Create a new #ComponentSource, a type of #GSource which proxies poll events
* from all sockets in the given @component.
* A callback function of type #GPollableSourceFunc must be connected to the
* returned #GSource using g_source_set_callback(). @pollable_stream is passed
* to all callbacks dispatched from the #GSource, and a reference is held on it
* by the #GSource.
* The #GSource will automatically update to poll sockets as they’re added to
* the @component (e.g. during peer discovery).
* Returns: (transfer full): a new #ComponentSource; unref with g_source_unref()
GSource *
component_source_new (Component *component, GObject *pollable_stream,
GIOCondition condition, GCancellable *cancellable)
ComponentSource *component_source;
g_assert (component != NULL);
g_assert (G_IS_POLLABLE_INPUT_STREAM (pollable_stream) ||
G_IS_POLLABLE_OUTPUT_STREAM (pollable_stream));
component_source =
(ComponentSource *)
g_source_new (&component_source_funcs, sizeof (ComponentSource));
g_source_set_name ((GSource *) component_source, "ComponentSource");
component_source->component = component;
component_source->component_socket_sources_age = 0;
component_source->pollable_stream = g_object_ref (pollable_stream);
component_source->condition = condition;
/* Add a cancellable source. */
if (cancellable != NULL) {
GSource *cancellable_source;
cancellable_source = g_cancellable_source_new (cancellable);
g_source_set_dummy_callback (cancellable_source);
g_source_add_child_source ((GSource *) component_source,
g_source_unref (cancellable_source);
return (GSource *) component_source;
......@@ -141,7 +141,8 @@ struct _Component
NiceComponentState state;
GSList *local_candidates; /**< list of Candidate objs */
GSList *remote_candidates; /**< list of Candidate objs */
GSList *socket_sources; /**< list of SocketSource objs */
GSList *socket_sources; /**< list of SocketSource objs; must only grow monotonically */
guint socket_sources_age; /**< incremented when socket_sources changes */
GSList *incoming_checks; /**< list of IncomingCheck objs */
GList *turn_servers; /**< List of TURN servers */
CandidatePair selected_pair; /**< independent from checklists,
......@@ -220,6 +221,10 @@ component_detach_all_sockets (Component *component);
component_free_socket_sources (Component *component);
GSource *
component_source_new (Component *component, GObject *pollable_stream,
GIOCondition condition, GCancellable *cancellable);
GMainContext *
component_dup_io_context (Component *component);
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment