Commit c5672702 authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête

agent: Move GSource handling into Component

Rather than handle GSource creation, attachment and removal in
NiceAgent, handle it inside Component. This brings it closer to the
networking code, and improves encapsulation of the state of each
Component.
parent 12ee430e
......@@ -166,15 +166,16 @@ guint64 agent_candidate_pair_priority (NiceAgent *agent, NiceCandidate *local, N
GSource *agent_timeout_add_with_context (NiceAgent *agent, guint interval, GSourceFunc function, gpointer data);
void agent_attach_stream_component_socket (NiceAgent *agent,
Stream *stream,
Component *component,
NiceSocket *socket);
StunUsageIceCompatibility agent_to_ice_compatibility (NiceAgent *agent);
StunUsageTurnCompatibility agent_to_turn_compatibility (NiceAgent *agent);
NiceTurnSocketCompatibility agent_to_turn_socket_compatibility (NiceAgent *agent);
void _priv_set_socket_tos (NiceAgent *agent, NiceSocket *sock, gint tos);
gboolean
component_io_cb (
GSocket *gsocket,
GIOCondition condition,
gpointer data);
#endif /*_NICE_AGENT_PRIV_H */
......@@ -1003,7 +1003,7 @@ static void priv_pseudo_tcp_error (NiceAgent *agent, Stream *stream,
if (component->tcp) {
agent_signal_component_state_change (agent, stream->id,
component->id, NICE_COMPONENT_STATE_FAILED);
component_detach_socket_sources (component);
component_detach_all_sockets (component);
}
priv_destroy_component_tcp (component);
}
......@@ -1420,8 +1420,7 @@ priv_add_new_candidate_discovery_turn (NiceAgent *agent,
new_socket = nice_udp_bsd_socket_new (&addr);
if (new_socket) {
_priv_set_socket_tos (agent, new_socket, stream->tos);
agent_attach_stream_component_socket (agent, stream,
component, new_socket);
component_attach_socket (component, new_socket);
socket = new_socket;
}
}
......@@ -1469,8 +1468,7 @@ priv_add_new_candidate_discovery_turn (NiceAgent *agent,
cdisco->nicesock = nice_tcp_turn_socket_new (socket,
agent_to_turn_socket_compatibility (agent));
agent_attach_stream_component_socket (agent, stream,
component, cdisco->nicesock);
component_attach_socket (component, cdisco->nicesock);
}
cdisco->turn = turn;
......@@ -2666,79 +2664,95 @@ io_ctx_free (IOCtx *ctx)
g_slice_free (IOCtx, ctx);
}
static gboolean
nice_agent_g_source_cb (
GSocket *gsocket,
GIOCondition condition,
gpointer data)
gboolean
component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
{
IOCtx *ctx = data;
NiceAgent *agent = ctx->agent;
Stream *stream = ctx->stream;
Component *component = ctx->component;
guint8 buf[MAX_BUFFER_SIZE];
SocketSource *socket_source = user_data;
Component *component;
NiceAgent *agent;
Stream *stream;
guint8 local_buf[MAX_BUFFER_SIZE];
gssize len;
guint8 *recv_buf;
gsize recv_buf_len;
gboolean retval = FALSE;
NiceAgentRecvFunc io_callback;
agent_lock ();
component = socket_source->component;
agent = component->agent;
stream = component->stream;
if (g_source_is_destroyed (g_main_current_source ())) {
agent_unlock ();
return FALSE;
/* Silently return FALSE. */
nice_debug ("%s: source %p destroyed", G_STRFUNC, g_main_current_source ());
goto done;
}
/* Actually read the data. This will return 0 if the data has already been
* handled. */
len = _nice_agent_recv_locked (agent, stream, component, ctx->socket,
buf, MAX_BUFFER_SIZE);
/* FIXME: Compartmentalise this in component.c */
g_mutex_lock (&component->io_mutex);
io_callback = component->io_callback;
g_mutex_unlock (&component->io_mutex);
if (len < 0) {
/* Error. Detach the source but don’t close the socket. We don’t close the
* socket because it would be way too complicated to take care of every path
* where it might still be used. */
nice_debug ("Agent %p: unable to recv from socket %p. Detaching",
ctx->agent, ctx->socket);
component_detach_socket_source (component, ctx->socket);
} else if (len > 0) {
component_emit_io_callback (component, buf, len);
/* Choose which receive buffer to use. If we’re reading for
* nice_agent_attach_recv(), use a local static buffer. If we’re reading for
* nice_agent_recv(), use the buffer provided by the client. */
g_assert (io_callback == NULL || component->recv_buf == NULL);
if (io_callback != NULL) {
recv_buf = local_buf;
recv_buf_len = sizeof (local_buf);
} else if (component->recv_buf != NULL) {
recv_buf = component->recv_buf + component->recv_buf_valid_len;
recv_buf_len = component->recv_buf_len - component->recv_buf_valid_len;
} else {
/* I/O is paused. Try again later. */
retval = TRUE;
goto done;
}
agent_unlock ();
nice_debug ("Receiving on source %p (socket %p, FD %d).",
socket_source->source, socket_source->socket,
g_socket_get_fd (socket_source->socket->fileno));
return TRUE;
}
/* Actually read the data. This will return 0 if the data has already been
* handled (e.g. for STUN control packets). */
len = agent_recv_locked (agent, stream, component, socket_source->socket,
recv_buf, recv_buf_len);
/*
* Attaches one socket handle to the main loop event context.
*
* Takes ownership of the socket.
*/
nice_debug ("\tReceived %" G_GSSIZE_FORMAT " bytes.", len);
void
agent_attach_stream_component_socket (NiceAgent *agent,
Stream *stream,
Component *component,
NiceSocket *socket)
{
GSource *source;
IOCtx *ctx;
if (len == 0) {
/* No data was available, probably due to being a reliable connection and
* hence the data is stored in the pseudotcp buffer. */
retval = TRUE;
goto done;
} else if (len < 0) {
/* Error. Detach the source but don’t close the socket. We don’t close the
* socket because it would be way too complicated to take care of every path
* where it might still be used. */
g_set_error (component->recv_buf_error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Unable to receive from socket %p. Detaching.", socket);
nice_debug ("%s: error receiving from socket %p", G_STRFUNC, socket);
goto done;
}
if (!component->ctx) {
component_add_detached_socket (component, socket);
return;
/* Actual data to notify the client about. */
if (io_callback != NULL) {
component_emit_io_callback (component, recv_buf, len);
} else {
/* Data has been stored in the component’s receive buffer to be picked up
* later by nice_agent_recv(). */
component->recv_buf_valid_len += len;
}
/* note: without G_IO_ERR the glib mainloop goes into
* busyloop if errors are encountered */
source = g_socket_create_source (socket->fileno, G_IO_IN | G_IO_ERR, NULL);
retval = TRUE;
ctx = io_ctx_new (agent, stream, component, socket, source);
g_source_set_callback (source, (GSourceFunc) nice_agent_g_source_cb,
ctx, (GDestroyNotify) io_ctx_free);
nice_debug ("Agent %p : Attach source %p (stream %u).", agent, source,
stream->id);
done:
agent_unlock ();
/* Add the pair to the component. */
component_add_socket_source (component, socket, source);
return retval;
}
NICEAPI_EXPORT gboolean
......@@ -2768,20 +2782,12 @@ nice_agent_attach_recv (
goto done;
}
/* Set the component’s I/O callback. */
component_set_io_callback (component, func, data, ctx);
/* Set the component’s I/O context. */
component_set_io_context (component, ctx);
component_set_io_callback (component, func, data);
ret = TRUE;
if (func) {
GSList *i;
/* Attach any detached sockets to the new main context. */
for (i = component->socket_sources; i != NULL; i = i->next) {
SocketSource *socket_source = i->data;
agent_attach_stream_component_socket (agent, stream, component,
socket_source->socket);
}
/* If we got detached, maybe our readable callback didn't finish reading
* all available data in the pseudotcp, so we need to make sure we free
* our recv window, so the readable callback can be triggered again on the
......
......@@ -62,9 +62,36 @@ component_deschedule_io_callback (Component *component);
/* Must *not* take the agent lock, since it’s called from within
* component_set_io_callback(), which holds the Component’s I/O lock. */
static void
socket_source_attach (SocketSource *socket_source, GMainContext *context)
{
GSource *source;
/* Create a source. */
source = g_socket_create_source (socket_source->socket->fileno,
G_IO_IN, NULL);
g_source_set_callback (source, (GSourceFunc) component_io_cb,
socket_source, NULL);
/* Add the source. */
nice_debug ("Attaching source %p (socket %p, FD %d) to context %p", source,
socket_source->socket, g_socket_get_fd (socket_source->socket->fileno),
context);
g_assert (socket_source->source == NULL);
socket_source->source = source;
g_source_attach (source, context);
}
static void
socket_source_detach (SocketSource *source)
{
nice_debug ("Detaching source %p (socket %p, FD %d) from context %p",
source->source, source->socket,
(source->socket->fileno != NULL) ?
g_socket_get_fd (source->socket->fileno) : 0,
(source->source != NULL) ? g_source_get_context (source->source) : 0);
if (source->source != NULL) {
g_source_destroy (source->source);
g_source_unref (source->source);
......@@ -136,7 +163,7 @@ component_free (Component *cmp)
g_slist_free (cmp->local_candidates);
g_slist_free (cmp->remote_candidates);
g_slist_free_full (cmp->socket_sources, (GDestroyNotify) socket_source_free);
component_free_socket_sources (cmp);
g_slist_free (cmp->incoming_checks);
for (item = cmp->turn_servers; item; item = g_list_next (item)) {
......@@ -375,20 +402,20 @@ _find_socket_source (gconstpointer a, gconstpointer b)
return (source_a->socket == socket_b) ? 0 : 1;
}
/* This takes ownership of socket and source.
* It attaches the source to the component’s context. */
/* This takes ownership of the socket.
* It creates and attaches a source to the component’s context. */
void
component_add_socket_source (Component *component, NiceSocket *socket,
GSource *source)
component_attach_socket (Component *component, NiceSocket *socket)
{
GSList *l;
SocketSource *socket_source;
g_assert (component != NULL);
g_assert (socket != NULL);
g_assert (source != NULL);
/* Find an existing SocketSource in the component which contains socket, or
g_assert (component->ctx != NULL);
/* Find an existing SocketSource in the component which contains @socket, or
* create a new one. */
l = g_slist_find_custom (component->socket_sources, socket,
_find_socket_source);
......@@ -397,31 +424,36 @@ component_add_socket_source (Component *component, NiceSocket *socket,
} else {
socket_source = g_slice_new0 (SocketSource);
socket_source->socket = socket;
socket_source->component = component;
component->socket_sources =
g_slist_prepend (component->socket_sources, socket_source);
}
/* Add the source. */
g_assert (socket_source->source == NULL);
g_assert (component->ctx != NULL);
socket_source->source = source;
g_source_attach (source, component->ctx);
/* Create and attach a source */
nice_debug ("Component %p (agent %p): Attach source (stream %u).",
component, component->agent, component->stream->id);
socket_source_attach (socket_source, component->ctx);
}
void
component_add_detached_socket (Component *component, NiceSocket *socket)
/* Reattaches socket handles of @component to the main context.
*
* Must *not* take the agent lock, since it’s called from within
* component_set_io_callback(), which holds the Component’s I/O lock. */
static void
component_reattach_all_sockets (Component *component)
{
SocketSource *socket_source;
GSList *i;
socket_source = g_slice_new0 (SocketSource);
socket_source->socket = socket;
socket_source->source = NULL;
component->socket_sources =
g_slist_prepend (component->socket_sources, socket_source);
for (i = component->socket_sources; i != NULL; i = i->next) {
SocketSource *socket_source = i->data;
nice_debug ("Reattach source %p.", socket_source->source);
socket_source_detach (socket_source);
socket_source_attach (socket_source, component->ctx);
}
}
/**
* component_detach_socket_source:
* component_detach_socket:
* @component: a #Component
* @socket: the socket to detach the source for
*
......@@ -431,11 +463,13 @@ component_add_detached_socket (Component *component, NiceSocket *socket)
* If the @socket doesn’t exist in this @component, do nothing.
*/
void
component_detach_socket_source (Component *component, NiceSocket *socket)
component_detach_socket (Component *component, NiceSocket *socket)
{
GSList *l;
SocketSource *socket_source;
nice_debug ("Detach socket %p.", socket);
/* Find the SocketSource for the socket. */
l = g_slist_find_custom (component->socket_sources, socket,
_find_socket_source);
......@@ -455,13 +489,14 @@ component_detach_socket_source (Component *component, NiceSocket *socket)
* component_set_io_callback(), which holds the Component’s I/O lock.
*/
void
component_detach_socket_sources (Component *component)
component_detach_all_sockets (Component *component)
{
GSList *i;
for (i = component->socket_sources; i != NULL; i = i->next) {
SocketSource *socket_source = i->data;
nice_debug ("Detach source %p.", socket_source->source);
nice_debug ("Detach source %p, socket %p.", socket_source->source,
socket_source->socket);
socket_source_detach (socket_source);
}
}
......@@ -469,38 +504,51 @@ component_detach_socket_sources (Component *component)
void
component_free_socket_sources (Component *component)
{
nice_debug ("Free socket sources for component %p.", component);
g_slist_free_full (component->socket_sources,
(GDestroyNotify) socket_source_free);
component->socket_sources = NULL;
}
/* If @context is %NULL, a fresh context is used, so component->ctx is always
* guaranteed to be non-%NULL. */
void
component_set_io_callback (Component *component, NiceAgentRecvFunc func,
gpointer user_data, GMainContext *context)
component_set_io_context (Component *component, GMainContext *context)
{
g_mutex_lock (&component->io_mutex);
/* Reference the context early so we don’t accidentally free it below. */
if (context != NULL && func != NULL)
g_main_context_ref (context);
if (component->io_callback != NULL)
component_detach_socket_sources (component);
if (component->ctx != context || component->ctx == NULL) {
if (context == NULL)
context = g_main_context_new ();
else
g_main_context_ref (context);
component_detach_all_sockets (component);
if (component->ctx != NULL)
g_main_context_unref (component->ctx);
component->ctx = context;
component_reattach_all_sockets (component);
}
component->io_callback = NULL;
component->io_user_data = NULL;
g_mutex_unlock (&component->io_mutex);
}
if (component->ctx != NULL)
g_main_context_unref (component->ctx);
component->ctx = NULL;
void
component_set_io_callback (Component *component,
NiceAgentRecvFunc func, gpointer user_data)
{
g_mutex_lock (&component->io_mutex);
if (func != NULL) {
component->io_callback = func;
component->io_user_data = user_data;
component->ctx = context; /* referenced above */
component_schedule_io_callback (component);
} else {
component->io_callback = NULL;
component->io_user_data = NULL;
component_deschedule_io_callback (component);
}
......
......@@ -99,10 +99,14 @@ struct _IncomingCheck
* GSources in a Component must be attached to the same main context:
* component->ctx.
*
* Socket must be non-NULL, but source may be NULL if it has been detached. */
* Socket must be non-NULL, but source may be NULL if it has been detached.
*
* The Component is stored so this may be used as the user data for a GSource
* callback. */
typedef struct {
NiceSocket *socket;
GSource *source;
Component *component;
} SocketSource;
......@@ -144,6 +148,9 @@ struct _Component
see ICE 11.1. "Sending Media" (ID-19) */
NiceCandidate *restart_candidate; /**< for storing active remote candidate during a restart */
/* I/O handling. The main context must always be non-NULL, and is used for all
* socket recv() operations. All io_callback emissions are invoked in this
* context too. */
GMutex io_mutex; /**< protects io_callback, io_user_data,
pending_io_messages and io_callback_id.
immutable: can be accessed without
......@@ -197,21 +204,19 @@ component_set_selected_remote_candidate (NiceAgent *agent, Component *component,
NiceCandidate *candidate);
void
component_add_socket_source (Component *component, NiceSocket *socket,
GSource *source);
void
component_add_detached_socket (Component *component, NiceSocket *socket);
component_attach_socket (Component *component, NiceSocket *socket);
void
component_detach_socket_source (Component *component, NiceSocket *socket);
component_detach_socket (Component *component, NiceSocket *socket);
void
component_detach_socket_sources (Component *component);
component_detach_all_sockets (Component *component);
void
component_free_socket_sources (Component *component);
void
component_set_io_callback (Component *component, NiceAgentRecvFunc func,
gpointer user_data, GMainContext *context);
component_set_io_context (Component *component, GMainContext *context);
void
component_set_io_callback (Component *component,
NiceAgentRecvFunc func, gpointer user_data);
void
component_emit_io_callback (Component *component,
const guint8 *buf, gsize buf_len);
......
......@@ -489,8 +489,7 @@ NiceCandidate *discovery_add_local_host_candidate (
goto errors;
_priv_set_socket_tos (agent, udp_socket, stream->tos);
agent_attach_stream_component_socket (agent, stream,
component, udp_socket);
component_attach_socket (component, udp_socket);
return candidate;
......@@ -621,7 +620,7 @@ discovery_add_relay_candidate (
if (!priv_add_local_candidate_pruned (agent, stream_id, component, candidate))
goto errors;
component_add_detached_socket (component, relay_socket);
component_attach_socket (component, relay_socket);
agent_signal_new_candidate (agent, candidate);
return candidate;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment