Commit 790974e0 authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête

agent: Move socket/source handling from NiceAgent into Component

This compartmentalises it a little more, reducing the spread of
state-changing code from three files down to one.

The key change is the switch from using two GSLists of NiceSockets and
GSources in Component, to using a single GSList of a struct {
NiceSocket, GSource }. This is possible because there is at most one
GSource per NiceSocket. This change reduces memory overhead (from the
GSList structures) slightly, and makes the relationship between sockets
and sources much clearer.
parent 7cca1250
...@@ -125,10 +125,9 @@ static GRecMutex agent_mutex; /* Mutex used for thread-safe lib */ ...@@ -125,10 +125,9 @@ static GRecMutex agent_mutex; /* Mutex used for thread-safe lib */
static GStaticRecMutex agent_mutex = G_STATIC_REC_MUTEX_INIT; static GStaticRecMutex agent_mutex = G_STATIC_REC_MUTEX_INIT;
#endif #endif
static gboolean priv_attach_stream_component (NiceAgent *agent, static void priv_attach_stream_component (NiceAgent *agent,
Stream *stream, Stream *stream,
Component *component); Component *component);
static void priv_detach_stream_component (Stream *stream, Component *component);
static void priv_free_upnp (NiceAgent *agent); static void priv_free_upnp (NiceAgent *agent);
...@@ -1012,7 +1011,7 @@ static void priv_pseudo_tcp_error (NiceAgent *agent, Stream *stream, ...@@ -1012,7 +1011,7 @@ static void priv_pseudo_tcp_error (NiceAgent *agent, Stream *stream,
if (component->tcp) { if (component->tcp) {
agent_signal_component_state_change (agent, stream->id, agent_signal_component_state_change (agent, stream->id,
component->id, NICE_COMPONENT_STATE_FAILED); component->id, NICE_COMPONENT_STATE_FAILED);
priv_detach_stream_component (stream, component); component_detach_socket_sources (component);
} }
priv_destroy_component_tcp (component); priv_destroy_component_tcp (component);
} }
...@@ -1442,7 +1441,6 @@ priv_add_new_candidate_discovery_turn (NiceAgent *agent, ...@@ -1442,7 +1441,6 @@ priv_add_new_candidate_discovery_turn (NiceAgent *agent,
_priv_set_socket_tos (agent, new_socket, stream->tos); _priv_set_socket_tos (agent, new_socket, stream->tos);
agent_attach_stream_component_socket (agent, stream, agent_attach_stream_component_socket (agent, stream,
component, new_socket); component, new_socket);
component->sockets= g_slist_append (component->sockets, new_socket);
socket = new_socket; socket = new_socket;
} }
} }
...@@ -1492,7 +1490,6 @@ priv_add_new_candidate_discovery_turn (NiceAgent *agent, ...@@ -1492,7 +1490,6 @@ priv_add_new_candidate_discovery_turn (NiceAgent *agent,
agent_attach_stream_component_socket (agent, stream, agent_attach_stream_component_socket (agent, stream,
component, cdisco->nicesock); component, cdisco->nicesock);
component->sockets = g_slist_append (component->sockets, cdisco->nicesock);
} }
cdisco->turn = turn; cdisco->turn = turn;
...@@ -1971,20 +1968,14 @@ nice_agent_gather_candidates ( ...@@ -1971,20 +1968,14 @@ nice_agent_gather_candidates (
for (n = 0; n < stream->n_components; n++) { for (n = 0; n < stream->n_components; n++) {
Component *component = stream_find_component_by_id (stream, n + 1); Component *component = stream_find_component_by_id (stream, n + 1);
priv_detach_stream_component (stream, component); component_free_socket_sources (component);
for (i = component->local_candidates; i; i = i->next) { for (i = component->local_candidates; i; i = i->next) {
NiceCandidate *candidate = i->data; NiceCandidate *candidate = i->data;
nice_candidate_free (candidate); nice_candidate_free (candidate);
} }
for (i = component->sockets; i; i = i->next) {
NiceSocket *udpsocket = i->data;
nice_socket_free (udpsocket);
}
g_slist_free (component->local_candidates); g_slist_free (component->local_candidates);
component->local_candidates = NULL; component->local_candidates = NULL;
g_slist_free (component->sockets);
component->sockets = NULL;
} }
discovery_prune_stream (agent, stream_id); discovery_prune_stream (agent, stream_id);
} }
...@@ -2708,16 +2699,12 @@ nice_agent_g_source_cb ( ...@@ -2708,16 +2699,12 @@ nice_agent_g_source_cb (
MAX_BUFFER_SIZE, buf); MAX_BUFFER_SIZE, buf);
if (len < 0) { if (len < 0) {
GSource *source = ctx->source; /* Error. Detach the source but don’t close the socket. We don’t close the
* socket because it would be way too complicated to take care of every path
component->gsources = g_slist_remove (component->gsources, source); * where it might still be used. */
g_source_destroy (source);
g_source_unref (source);
/* We don't close the socket because it would be way too complicated to
* take care of every path where the socket might still be used.. */
nice_debug ("Agent %p: unable to recv from socket %p. Detaching", nice_debug ("Agent %p: unable to recv from socket %p. Detaching",
ctx->agent, ctx->socket); ctx->agent, ctx->socket);
component_detach_socket_source (component, ctx->socket);
} else if (len > 0 && component->g_source_io_cb) { } else if (len > 0 && component->g_source_io_cb) {
gpointer data = ctx->component->data; gpointer data = ctx->component->data;
gint sid = ctx->stream->id; gint sid = ctx->stream->id;
...@@ -2739,7 +2726,9 @@ done: ...@@ -2739,7 +2726,9 @@ done:
} }
/* /*
* Attaches one socket handle to the main loop event context * Attaches one socket handle to the main loop event context.
*
* Takes ownership of the socket.
*/ */
void void
...@@ -2751,19 +2740,23 @@ agent_attach_stream_component_socket (NiceAgent *agent, ...@@ -2751,19 +2740,23 @@ agent_attach_stream_component_socket (NiceAgent *agent,
GSource *source; GSource *source;
IOCtx *ctx; IOCtx *ctx;
if (!component->ctx) if (!component->ctx) {
component_add_detached_socket (component, socket);
return; return;
}
/* note: without G_IO_ERR the glib mainloop goes into /* note: without G_IO_ERR the glib mainloop goes into
* busyloop if errors are encountered */ * busyloop if errors are encountered */
source = g_socket_create_source(socket->fileno, G_IO_IN | G_IO_ERR, NULL); source = g_socket_create_source (socket->fileno, G_IO_IN | G_IO_ERR, NULL);
ctx = io_ctx_new (agent, stream, component, socket, source); ctx = io_ctx_new (agent, stream, component, socket, source);
g_source_set_callback (source, (GSourceFunc) nice_agent_g_source_cb, g_source_set_callback (source, (GSourceFunc) nice_agent_g_source_cb,
ctx, (GDestroyNotify) io_ctx_free); ctx, (GDestroyNotify) io_ctx_free);
nice_debug ("Agent %p : Attach source %p (stream %u).", agent, source, stream->id); nice_debug ("Agent %p : Attach source %p (stream %u).", agent, source,
g_source_attach (source, component->ctx); stream->id);
component->gsources = g_slist_append (component->gsources, source);
/* Add the pair to the component. */
component_add_socket_source (component, socket, source);
} }
...@@ -2772,37 +2765,22 @@ agent_attach_stream_component_socket (NiceAgent *agent, ...@@ -2772,37 +2765,22 @@ agent_attach_stream_component_socket (NiceAgent *agent,
* context. * context.
* *
*/ */
static gboolean static void
priv_attach_stream_component (NiceAgent *agent, priv_attach_stream_component (NiceAgent *agent,
Stream *stream, Stream *stream,
Component *component) Component *component)
{ {
GSList *i; GSList *i;
for (i = component->sockets; i; i = i->next) /* Don’t bother if there is no main context. */
agent_attach_stream_component_socket (agent, stream, component, i->data); if (component->ctx == NULL)
return;
return TRUE;
}
/*
* Detaches socket handles of 'stream' from the main eventloop
* context.
*
*/
static void priv_detach_stream_component (Stream *stream, Component *component)
{
GSList *i;
for (i = component->gsources; i; i = i->next) { for (i = component->socket_sources; i != NULL; i = i->next) {
GSource *source = i->data; SocketSource *socket_source = i->data;
nice_debug ("Detach source %p (stream %u).", source, stream->id); agent_attach_stream_component_socket (agent, stream, component,
g_source_destroy (source); socket_source->socket);
g_source_unref (source);
} }
g_slist_free (component->gsources);
component->gsources = NULL;
} }
NICEAPI_EXPORT gboolean NICEAPI_EXPORT gboolean
...@@ -2818,6 +2796,9 @@ nice_agent_attach_recv ( ...@@ -2818,6 +2796,9 @@ nice_agent_attach_recv (
Stream *stream = NULL; Stream *stream = NULL;
gboolean ret = FALSE; gboolean ret = FALSE;
/* ctx must be non-NULL if func is non-NULL. */
g_return_val_if_fail (func == NULL || ctx != NULL, FALSE);
agent_lock(); agent_lock();
/* attach candidates */ /* attach candidates */
...@@ -2830,7 +2811,7 @@ nice_agent_attach_recv ( ...@@ -2830,7 +2811,7 @@ nice_agent_attach_recv (
} }
if (component->g_source_io_cb) if (component->g_source_io_cb)
priv_detach_stream_component (stream, component); component_detach_socket_sources (component);
ret = TRUE; ret = TRUE;
......
...@@ -53,6 +53,27 @@ ...@@ -53,6 +53,27 @@
#include "component.h" #include "component.h"
#include "agent-priv.h" #include "agent-priv.h"
static void
socket_source_detach (SocketSource *source)
{
if (source->source != NULL) {
g_source_destroy (source->source);
g_source_unref (source->source);
}
source->source = NULL;
}
static void
socket_source_free (SocketSource *source)
{
nice_socket_free (source->socket);
socket_source_detach (source);
g_slice_free (SocketSource, source);
}
Component * Component *
component_new (guint id) component_new (guint id)
{ {
...@@ -67,7 +88,6 @@ component_new (guint id) ...@@ -67,7 +88,6 @@ component_new (guint id)
return component; return component;
} }
void void
component_free (Component *cmp) component_free (Component *cmp)
{ {
...@@ -88,17 +108,6 @@ component_free (Component *cmp) ...@@ -88,17 +108,6 @@ component_free (Component *cmp)
nice_candidate_free (cmp->restart_candidate), nice_candidate_free (cmp->restart_candidate),
cmp->restart_candidate = NULL; cmp->restart_candidate = NULL;
for (i = cmp->sockets; i; i = i->next) {
NiceSocket *udpsocket = i->data;
nice_socket_free (udpsocket);
}
for (i = cmp->gsources; i; i = i->next) {
GSource *source = i->data;
g_source_destroy (source);
g_source_unref (source);
}
for (i = cmp->incoming_checks; i; i = i->next) { for (i = cmp->incoming_checks; i; i = i->next) {
IncomingCheck *icheck = i->data; IncomingCheck *icheck = i->data;
g_free (icheck->username); g_free (icheck->username);
...@@ -107,8 +116,7 @@ component_free (Component *cmp) ...@@ -107,8 +116,7 @@ component_free (Component *cmp)
g_slist_free (cmp->local_candidates); g_slist_free (cmp->local_candidates);
g_slist_free (cmp->remote_candidates); g_slist_free (cmp->remote_candidates);
g_slist_free (cmp->sockets); g_slist_free_full (cmp->socket_sources, (GDestroyNotify) socket_source_free);
g_slist_free (cmp->gsources);
g_slist_free (cmp->incoming_checks); g_slist_free (cmp->incoming_checks);
for (item = cmp->turn_servers; item; item = g_list_next (item)) { for (item = cmp->turn_servers; item; item = g_list_next (item)) {
...@@ -334,3 +342,108 @@ component_set_selected_remote_candidate (NiceAgent *agent, Component *component, ...@@ -334,3 +342,108 @@ component_set_selected_remote_candidate (NiceAgent *agent, Component *component,
return local; return local;
} }
static gint
_find_socket_source (gconstpointer a, gconstpointer b)
{
const SocketSource *source_a = a;
const NiceSocket *socket_b = b;
return (source_a->socket == socket_b) ? 0 : 1;
}
/* This takes ownership of socket and source.
* It attaches the source to the component’s context. */
void
component_add_socket_source (Component *component, NiceSocket *socket,
GSource *source)
{
GSList *l;
SocketSource *socket_source;
g_assert (component != NULL);
g_assert (socket != NULL);
g_assert (source != NULL);
/* Find an existing SocketSource in the component which contains socket, or
* create a new one. */
l = g_slist_find_custom (component->socket_sources, socket,
_find_socket_source);
if (l != NULL) {
socket_source = l->data;
} else {
socket_source = g_slice_new0 (SocketSource);
socket_source->socket = socket;
component->socket_sources =
g_slist_prepend (component->socket_sources, socket_source);
}
/* Add the source. */
g_assert (socket_source->source == NULL);
g_assert (component->ctx != NULL);
socket_source->source = source;
g_source_attach (source, component->ctx);
}
void
component_add_detached_socket (Component *component, NiceSocket *socket)
{
SocketSource *socket_source;
socket_source = g_slice_new0 (SocketSource);
socket_source->socket = socket;
socket_source->source = NULL;
component->socket_sources =
g_slist_prepend (component->socket_sources, socket_source);
}
/**
* component_detach_socket_source:
* @component: a #Component
* @socket: the socket to detach the source for
*
* Detach the #GSource for the single specified @socket. Leave the socket itself
* untouched.
*
* If the @socket doesn’t exist in this @component, do nothing.
*/
void
component_detach_socket_source (Component *component, NiceSocket *socket)
{
GSList *l;
SocketSource *socket_source;
/* Find the SocketSource for the socket. */
l = g_slist_find_custom (component->socket_sources, socket,
_find_socket_source);
if (l == NULL)
return;
/* Detach the source. */
socket_source = l->data;
socket_source_detach (socket_source);
}
/*
* Detaches socket handles of @component from the main context. Leaves the
* sockets themselves untouched.
*/
void
component_detach_socket_sources (Component *component)
{
GSList *i;
for (i = component->socket_sources; i != NULL; i = i->next) {
SocketSource *socket_source = i->data;
nice_debug ("Detach source %p.", socket_source->source);
socket_source_detach (socket_source);
}
}
void
component_free_socket_sources (Component *component)
{
g_slist_free_full (component->socket_sources,
(GDestroyNotify) socket_source_free);
component->socket_sources = NULL;
}
...@@ -101,6 +101,16 @@ typedef struct { ...@@ -101,6 +101,16 @@ typedef struct {
Component *component; Component *component;
} TcpUserData; } TcpUserData;
/* A pair of a socket and the GSource which polls it from the main loop. All
* GSources in a Component must be attached to the same main context:
* component->ctx.
*
* Socket must be non-NULL, but source may be NULL if it has been detached. */
typedef struct {
NiceSocket *socket;
GSource *source;
} SocketSource;
struct _Component struct _Component
{ {
NiceComponentType type; NiceComponentType type;
...@@ -108,8 +118,7 @@ struct _Component ...@@ -108,8 +118,7 @@ struct _Component
NiceComponentState state; NiceComponentState state;
GSList *local_candidates; /**< list of Candidate objs */ GSList *local_candidates; /**< list of Candidate objs */
GSList *remote_candidates; /**< list of Candidate objs */ GSList *remote_candidates; /**< list of Candidate objs */
GSList *sockets; /**< list of NiceSocket objs */ GSList *socket_sources; /**< list of SocketSource objs */
GSList *gsources; /**< list of GSource objs */
GSList *incoming_checks; /**< list of IncomingCheck objs */ GSList *incoming_checks; /**< list of IncomingCheck objs */
GList *turn_servers; /**< List of TURN servers */ GList *turn_servers; /**< List of TURN servers */
CandidatePair selected_pair; /**< independent from checklists, CandidatePair selected_pair; /**< independent from checklists,
...@@ -149,6 +158,19 @@ NiceCandidate * ...@@ -149,6 +158,19 @@ NiceCandidate *
component_set_selected_remote_candidate (NiceAgent *agent, Component *component, component_set_selected_remote_candidate (NiceAgent *agent, Component *component,
NiceCandidate *candidate); NiceCandidate *candidate);
void
component_add_socket_source (Component *component, NiceSocket *socket,
GSource *source);
void
component_add_detached_socket (Component *component, NiceSocket *socket);
void
component_detach_socket_source (Component *component, NiceSocket *socket);
void
component_detach_socket_sources (Component *component);
void
component_free_socket_sources (Component *component);
G_END_DECLS G_END_DECLS
#endif /* _NICE_COMPONENT_H */ #endif /* _NICE_COMPONENT_H */
......
...@@ -481,11 +481,6 @@ NiceCandidate *discovery_add_local_host_candidate ( ...@@ -481,11 +481,6 @@ NiceCandidate *discovery_add_local_host_candidate (
if (!udp_socket) if (!udp_socket)
goto errors; goto errors;
_priv_set_socket_tos (agent, udp_socket, stream->tos);
agent_attach_stream_component_socket (agent, stream,
component, udp_socket);
candidate->sockptr = udp_socket; candidate->sockptr = udp_socket;
candidate->addr = udp_socket->addr; candidate->addr = udp_socket->addr;
candidate->base_addr = udp_socket->addr; candidate->base_addr = udp_socket->addr;
...@@ -493,7 +488,9 @@ NiceCandidate *discovery_add_local_host_candidate ( ...@@ -493,7 +488,9 @@ NiceCandidate *discovery_add_local_host_candidate (
if (!priv_add_local_candidate_pruned (agent, stream_id, component, candidate)) if (!priv_add_local_candidate_pruned (agent, stream_id, component, candidate))
goto errors; goto errors;
component->sockets = g_slist_append (component->sockets, udp_socket); _priv_set_socket_tos (agent, udp_socket, stream->tos);
agent_attach_stream_component_socket (agent, stream,
component, udp_socket);
return candidate; return candidate;
...@@ -624,7 +621,7 @@ discovery_add_relay_candidate ( ...@@ -624,7 +621,7 @@ discovery_add_relay_candidate (
if (!priv_add_local_candidate_pruned (agent, stream_id, component, candidate)) if (!priv_add_local_candidate_pruned (agent, stream_id, component, candidate))
goto errors; goto errors;
component->sockets = g_slist_append (component->sockets, relay_socket); component_add_detached_socket (component, relay_socket);
agent_signal_new_candidate (agent, candidate); agent_signal_new_candidate (agent, candidate);
return candidate; return candidate;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment