Commit 448174e5 authored by Olivier Crête's avatar Olivier Crête

agent: First try to the sockets that were awoken by last GMainContext

If there were any sources that were notified by the last GMainContext,
first look through those, if they had nothign, then go for the others.
parent bf905793
......@@ -3480,6 +3480,61 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent,
g_mutex_unlock (&component->io_mutex);
{
GSList *item;
guint age;
NiceInputMessageIter prev_recv_messages_iter;
memcpy (&prev_recv_messages_iter, &component->recv_messages_iter,
sizeof (NiceInputMessageIter));
restart:
age = component->socket_sources_age;
for (item = component->socket_sources;
item && !received_enough && !error_reported && !reached_eos;
item = item->next) {
SocketSource *socket_source = item->data;
if (socket_source->last_condition & G_IO_IN) {
agent_unlock ();
component_io_cb (NULL, socket_source->last_condition, socket_source);
agent_lock();
if (!agent_find_component (agent, stream_id, component_id,
&stream, &component)) {
g_clear_error (&child_error);
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
"Component removed during call.");
component = NULL;
error_reported = TRUE;
goto recv_error;
}
if (component->socket_sources_age != age) {
goto restart;
}
if (child_error)
socket_source->last_condition = 0;
if (child_error && g_error_matches (child_error,
G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
g_clear_error (&child_error);
received_enough =
nice_input_message_iter_is_at_end (&component->recv_messages_iter,
component->recv_messages, component->n_recv_messages);
error_reported = (child_error != NULL);
reached_eos = FALSE;
}
}
}
/* Each iteration of the main context will either receive some data, a
* cancellation error or a socket error. The iter’s
* @message counter will be incremented after each read.
......@@ -4098,7 +4153,7 @@ component_io_cb (GSocket *gsocket, GIOCondition condition, gpointer user_data)
agent_lock ();
if (g_source_is_destroyed (g_main_current_source ())) {
if (gsocket && g_source_is_destroyed (g_main_current_source ())) {
/* Silently return FALSE. */
nice_debug ("%s: source %p destroyed", G_STRFUNC, g_main_current_source ());
......
......@@ -116,13 +116,29 @@ socket_source_detach (SocketSource *source)
source->source = NULL;
}
static SocketSource *
socket_source_ref (SocketSource *source)
{
g_atomic_int_inc (&source->ref_count);
return source;
}
static void
socket_source_unref (SocketSource *source)
{
if (g_atomic_int_dec_and_test (&source->ref_count)) {
g_slice_free (SocketSource, source);
}
}
static void
socket_source_free (SocketSource *source)
socket_source_release (SocketSource *source)
{
socket_source_detach (source);
nice_socket_free (source->socket);
source->socket = NULL;
g_slice_free (SocketSource, source);
socket_source_unref (source);
}
static gboolean
......@@ -577,6 +593,7 @@ component_attach_socket (Component *component, NiceSocket *nicesock)
socket_source = l->data;
} else {
socket_source = g_slice_new0 (SocketSource);
socket_source_ref (socket_source);
socket_source->socket = nicesock;
socket_source->component = component;
component->socket_sources =
......@@ -650,8 +667,7 @@ component_detach_socket (Component *component, NiceSocket *nicesock)
component->socket_sources = g_slist_delete_link (component->socket_sources, l);
component->socket_sources_age++;
socket_source_detach (socket_source);
socket_source_free (socket_source);
socket_source_release (socket_source);
}
/*
......@@ -680,7 +696,7 @@ component_free_socket_sources (Component *component)
nice_debug ("Free socket sources for component %p.", component);
g_slist_free_full (component->socket_sources,
(GDestroyNotify) socket_source_free);
(GDestroyNotify) socket_source_release);
component->socket_sources = NULL;
component->socket_sources_age++;
......@@ -1027,14 +1043,33 @@ typedef struct {
} ComponentSource;
typedef struct {
NiceSocket *socket;
GSource *source;
SocketSource *parent_socket_source;
ComponentSource *component_source;
} ChildSocketSource;
static void
free_child_socket_source (gpointer data)
{
ChildSocketSource *child_socket_source = data;
socket_source_unref (child_socket_source->parent_socket_source);
g_slice_free (ChildSocketSource, data);
}
static gboolean
component_child_cb (GSocket *socket,
GIOCondition condition,
gpointer user_data)
{
ComponentSource *component_source = user_data;
ChildSocketSource *child_socket_source = user_data;
SocketSource *parent_socket_source =
(SocketSource *) child_socket_source->parent_socket_source;
component_source->current_condition |= condition;
child_socket_source->component_source->current_condition |= condition;
parent_socket_source->last_condition |= condition;
return G_SOURCE_CONTINUE;
}
......@@ -1075,7 +1110,7 @@ component_source_prepare (GSource *source, gint *timeout_)
for (parentl = component->socket_sources; parentl; parentl = parentl->next) {
SocketSource *parent_socket_source = parentl->data;
SocketSource *child_socket_source;
ChildSocketSource *child_socket_source;
/* Iterating the list of socket sources every time isn't a big problem
* because the number of pairs is limited ~100 normally, so there will
......@@ -1090,13 +1125,16 @@ component_source_prepare (GSource *source, gint *timeout_)
if (childl)
break;
child_socket_source = g_slice_new0 (SocketSource);
child_socket_source = g_slice_new0 (ChildSocketSource);
child_socket_source->component_source = component_source;
child_socket_source->parent_socket_source =
socket_source_ref (parent_socket_source);
child_socket_source->socket = parent_socket_source->socket;
child_socket_source->source =
g_socket_create_source (child_socket_source->socket->fileno,
component_source->condition, NULL);
g_source_set_callback (child_socket_source->source,
(GSourceFunc) component_child_cb, component_source, NULL);
(GSourceFunc) component_child_cb, child_socket_source, NULL);
g_source_add_child_source (source, child_socket_source->source);
g_source_unref (child_socket_source->source);
component_source->socket_sources =
......@@ -1106,7 +1144,7 @@ component_source_prepare (GSource *source, gint *timeout_)
for (childl = component_source->socket_sources;
childl;) {
SocketSource *child_socket_source = childl->data;
ChildSocketSource *child_socket_source = childl->data;
GSList *next = childl->next;
parentl = g_slist_find_custom (component->socket_sources,
......@@ -1115,7 +1153,7 @@ component_source_prepare (GSource *source, gint *timeout_)
/* If this is not a currently used socket, remove the relevant source */
if (!parentl) {
g_source_remove_child_source (source, child_socket_source->source);
g_slice_free (SocketSource, child_socket_source);
free_child_socket_source (child_socket_source);
component_source->socket_sources =
g_slist_delete_link (component_source->socket_sources, childl);
}
......@@ -1148,12 +1186,6 @@ component_source_dispatch (GSource *source, GSourceFunc callback,
component_source->current_condition, user_data);
}
static void
free_child_socket_source (gpointer data)
{
g_slice_free (SocketSource, data);
}
static void
component_source_finalize (GSource *source)
{
......
......@@ -108,9 +108,11 @@ incoming_check_free (IncomingCheck *icheck);
* The Component is stored so this may be used as the user data for a GSource
* callback. */
typedef struct {
volatile gint ref_count;
NiceSocket *socket;
GSource *source;
Component *component;
GIOCondition last_condition;
} SocketSource;
......
......@@ -3111,7 +3111,9 @@ attempt_send(PseudoTcpSocket *self, SendFlags sflags)
guint32 now = get_current_time (self);
gboolean bFirst = TRUE;
#if 0
DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Attempting send with flags %u.", sflags);
#endif
if (time_diff(now, priv->lastsend) > (long) priv->rx_rto) {
priv->cwnd = priv->mss;
......@@ -3152,13 +3154,15 @@ attempt_send(PseudoTcpSocket *self, SendFlags sflags)
}
if (bFirst) {
gsize available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
bFirst = FALSE;
#if 0
gsize available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "[cwnd: %u nWindow: %u nInFlight: %u "
"nAvailable: %u nQueued: %" G_GSIZE_FORMAT " nEmpty: %" G_GSIZE_FORMAT
" ssthresh: %u]",
priv->cwnd, nWindow, nInFlight, nAvailable, snd_buffered,
available_space, priv->ssthresh);
#endif
}
if (nAvailable == 0 && sflags != sfFin && sflags != sfRst) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment