Commit 6fe29601 authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête

agent: Move I/O callback handling from NiceAgent into Component

Compartmentalise the handling of setting and clearing the user-provided
per-Component I/O callbacks.

Data flows from the socket, through nice_agent_g_source_cb(), to
component_emit_io_callback() which calls the callback provided by the
user when originally attaching to the stream/component.
parent 790974e0
......@@ -125,10 +125,6 @@ static GRecMutex agent_mutex; /* Mutex used for thread-safe lib */
static GStaticRecMutex agent_mutex = G_STATIC_REC_MUTEX_INIT;
#endif
static void priv_attach_stream_component (NiceAgent *agent,
Stream *stream,
Component *component);
static void priv_free_upnp (NiceAgent *agent);
#if GLIB_CHECK_VERSION(2,31,8)
......@@ -1053,20 +1049,17 @@ pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data)
g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *)&agent);
do {
if (component->g_source_io_cb)
if (component->io_callback != NULL)
len = pseudo_tcp_socket_recv (sock, buf, sizeof(buf));
else
len = 0;
if (len > 0) {
gpointer data = component->data;
gint sid = stream->id;
gint cid = component->id;
NiceAgentRecvFunc callback = component->g_source_io_cb;
/* Unlock the agent before calling the callback */
agent_unlock();
callback (agent, sid, cid, len, buf, data);
agent_lock();
component_emit_io_callback (component, agent, sid, cid,
(guint8 *) buf, len);
if (sock == NULL) {
nice_debug ("PseudoTCP socket got destroyed in readable callback!");
break;
......@@ -2705,23 +2698,16 @@ nice_agent_g_source_cb (
nice_debug ("Agent %p: unable to recv from socket %p. Detaching",
ctx->agent, ctx->socket);
component_detach_socket_source (component, ctx->socket);
} else if (len > 0 && component->g_source_io_cb) {
gpointer data = ctx->component->data;
gint sid = ctx->stream->id;
gint cid = ctx->component->id;
NiceAgentRecvFunc callback = ctx->component->g_source_io_cb;
/* Unlock the agent before calling the callback */
agent_unlock ();
callback (agent, sid, cid, len, buf, data);
} else if (len > 0 && component->io_callback) {
gint sid = stream->id;
gint cid = component->id;
goto done;
component_emit_io_callback (component, agent, sid, cid,
(guint8 *) buf, len);
}
agent_unlock ();
done:
return TRUE;
}
......@@ -2759,30 +2745,6 @@ agent_attach_stream_component_socket (NiceAgent *agent,
component_add_socket_source (component, socket, source);
}
/*
* Attaches socket handles of 'stream' to the main eventloop
* context.
*
*/
static void
priv_attach_stream_component (NiceAgent *agent,
Stream *stream,
Component *component)
{
GSList *i;
/* Don’t bother if there is no main context. */
if (component->ctx == NULL)
return;
for (i = component->socket_sources; i != NULL; i = i->next) {
SocketSource *socket_source = i->data;
agent_attach_stream_component_socket (agent, stream, component,
socket_source->socket);
}
}
NICEAPI_EXPORT gboolean
nice_agent_attach_recv (
NiceAgent *agent,
......@@ -2810,25 +2772,19 @@ nice_agent_attach_recv (
goto done;
}
if (component->g_source_io_cb)
component_detach_socket_sources (component);
/* Set the component’s I/O callback. */
component_set_io_callback (component, func, data, ctx);
ret = TRUE;
component->g_source_io_cb = NULL;
component->data = NULL;
if (component->ctx)
g_main_context_unref (component->ctx);
component->ctx = NULL;
if (func) {
component->g_source_io_cb = func;
component->data = data;
component->ctx = ctx;
if (ctx)
g_main_context_ref (ctx);
GSList *i;
priv_attach_stream_component (agent, stream, component);
/* Attach any detached sockets to the new main context. */
for (i = component->socket_sources; i != NULL; i = i->next) {
SocketSource *socket_source = i->data;
agent_attach_stream_component_socket (agent, stream, component,
socket_source->socket);
}
/* If we got detached, maybe our readable callback didn't finish reading
* all available data in the pseudotcp, so we need to make sure we free
......@@ -2838,7 +2794,6 @@ nice_agent_attach_recv (
* trigger an error in the initial, pre-connection attach. */
if (component->tcp && component->tcp_data && component->tcp_readable)
pseudo_tcp_socket_readable (component->tcp, component->tcp_data);
}
done:
......
......@@ -447,3 +447,56 @@ component_free_socket_sources (Component *component)
(GDestroyNotify) socket_source_free);
component->socket_sources = NULL;
}
void
component_set_io_callback (Component *component, NiceAgentRecvFunc func,
gpointer user_data, GMainContext *context)
{
/* Reference the context early so we don’t accidentally free it below. */
if (context != NULL && func != NULL)
g_main_context_ref (context);
if (component->io_callback != NULL)
component_detach_socket_sources (component);
component->io_callback = NULL;
component->io_user_data = NULL;
if (component->ctx != NULL)
g_main_context_unref (component->ctx);
component->ctx = NULL;
if (func != NULL) {
component->io_callback = func;
component->io_user_data = user_data;
component->ctx = context; /* referenced above */
}
}
/* This must be called with the agent lock *held*. */
void
component_emit_io_callback (Component *component, NiceAgent *agent,
gint stream_id, gint component_id, const guint8 *buf, gsize buf_len)
{
NiceAgentRecvFunc io_callback;
gpointer io_user_data;
g_assert (component != NULL);
g_assert (NICE_IS_AGENT (agent));
g_assert (stream_id > 0);
g_assert (component_id > 0);
g_assert (buf != NULL);
g_assert (buf_len > 0);
g_assert (component->io_callback != NULL);
io_callback = component->io_callback;
io_user_data = component->io_user_data;
agent_unlock ();
io_callback (agent, stream_id, component_id,
buf_len, (gchar *) buf, io_user_data);
agent_lock ();
}
......@@ -124,10 +124,12 @@ struct _Component
CandidatePair selected_pair; /**< independent from checklists,
see ICE 11.1. "Sending Media" (ID-19) */
NiceCandidate *restart_candidate; /**< for storing active remote candidate during a restart */
NiceAgentRecvFunc g_source_io_cb; /**< function called on io cb */
gpointer data; /**< data passed to the io function */
GMainContext *ctx; /**< context for data callbacks for this
NiceAgentRecvFunc io_callback; /**< function called on io cb */
gpointer io_user_data; /**< data passed to the io function */
GMainContext *ctx; /**< context for GSources for this
component */
PseudoTcpSocket *tcp;
GSource* tcp_clock;
TcpUserData *tcp_data;
......@@ -171,6 +173,13 @@ component_detach_socket_sources (Component *component);
void
component_free_socket_sources (Component *component);
void
component_set_io_callback (Component *component, NiceAgentRecvFunc func,
gpointer user_data, GMainContext *context);
void
component_emit_io_callback (Component *component, NiceAgent *agent,
gint stream_id, gint component_id, const guint8 *buf, gsize buf_len);
G_END_DECLS
#endif /* _NICE_COMPONENT_H */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment