Commit 18e2e3a2 authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête

agent: Queue incoming pseudo-TCP messages until ACKs can be sent

If pseudo-TCP messages are received before a socket has been selected
from all the STUN candidates, they would previously be immediately
passed to the pseudo-TCP state machine, which would attempt to send ACKs
for them. This would fail (due to a lack of an outbound UDP socket), and
would incur a retransmit timeout in the TCP state machine. This slowed
down the tests enormously if one agent in a test completed candidate
selection before the other (which is an entirely reasonable scenario).

This never occurred before because the existing tests artificially run
both agents in lock-step, and never send data packets from one to the
other until both have completed candidate selection. This is basically
cheating.

Fix the problem by queuing incoming pseudo-TCP messages until an
outbound UDP socket is available to send the ACKs or SYNACKs on.
parent d638586c
......@@ -1140,7 +1140,10 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *socket,
NiceSocket *sock;
NiceAddress *addr;
sock = component->selected_pair.local->sockptr;
#ifndef NDEBUG
{
gchar tmpbuf[INET6_ADDRSTRLEN];
nice_address_to_string (&component->selected_pair.remote->addr, tmpbuf);
......@@ -1149,13 +1152,16 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *socket,
component->agent, component->stream->id, component->id, len,
sock->fileno, g_socket_get_fd (sock->fileno), tmpbuf,
nice_address_get_port (&component->selected_pair.remote->addr));
}
#endif
sock = component->selected_pair.local->sockptr;
addr = &component->selected_pair.remote->addr;
if (nice_socket_send (sock, addr, len, buffer)) {
return WR_SUCCESS;
}
} else {
nice_debug ("%s: WARNING: Failed to send pseudo-TCP packet from agent %p "
"as no pair has been selected yet.", G_STRFUNC, component->agent);
}
return WR_FAIL;
......@@ -1284,6 +1290,52 @@ void agent_signal_initial_binding_request_received (NiceAgent *agent, Stream *st
}
}
/* If the Component now has a selected_pair, and has pending TCP packets which
* it couldn’t receive before due to not being able to send out ACKs (or
* SYNACKs, for the initial SYN packet), handle them now.
*
* Must be called with the agent lock held. */
static void
process_queued_tcp_packets (NiceAgent *agent, Stream *stream,
Component *component)
{
GOutputVector *vec;
if (component->selected_pair.local == NULL || component->tcp == NULL)
return;
nice_debug ("%s: Sending outstanding packets for agent %p.", G_STRFUNC,
agent);
while ((vec = g_queue_peek_head (&component->queued_tcp_packets)) != NULL) {
gboolean retval;
g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
nice_debug ("%s: Sending %" G_GSIZE_FORMAT " bytes.", G_STRFUNC, vec->size);
retval =
pseudo_tcp_socket_notify_packet (component->tcp, vec->buffer,
vec->size);
if (agent != NULL) {
adjust_tcp_clock (agent, stream, component);
g_object_remove_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
} else {
nice_debug ("%s: Agent %p was destroyed in "
"pseudo_tcp_socket_notify_packet().", G_STRFUNC, agent);
}
if (!retval) {
/* Failed to send; try again later. */
break;
}
g_queue_pop_head (&component->queued_tcp_packets);
g_free ((gpointer) vec->buffer);
g_slice_free (GOutputVector, vec);
}
}
void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id, guint component_id, const gchar *local_foundation, const gchar *remote_foundation)
{
Component *component;
......@@ -1301,6 +1353,8 @@ void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id, guint co
}
if (component->tcp) {
process_queued_tcp_packets (agent, stream, component);
pseudo_tcp_socket_connect (component->tcp);
pseudo_tcp_socket_notify_mtu (component->tcp, MAX_TCP_MTU);
adjust_tcp_clock (agent, stream, component);
......@@ -1382,6 +1436,8 @@ void agent_signal_component_state_change (NiceAgent *agent, guint stream_id, gui
component->state = state;
process_queued_tcp_packets (agent, stream, component);
g_signal_emit (agent, signals[SIGNAL_COMPONENT_STATE_CHANGED], 0,
stream_id, component_id, state);
}
......@@ -2428,6 +2484,26 @@ agent_recv_locked (
handle_tcp:
/* Unhandled STUN; try handling TCP data, then pass to the client. */
if (len > 0 && component->tcp) {
/* If we don’t yet have an underlying selected socket, queue up the incoming
* data to handle later. This is because we can’t send ACKs (or, more
* importantly for the first few packets, SYNACKs) without an underlying
* socket. We’d rather wait a little longer for a pair to be selected, then
* process the incoming packets and send out ACKs, than try to process them
* now, fail to send the ACKs, and incur a timeout in our pseudo-TCP state
* machine. */
if (component->selected_pair.local == NULL) {
GOutputVector *vec = g_slice_new (GOutputVector);
vec->buffer = g_memdup (local_buf, len);
vec->size = len;
g_queue_push_tail (&component->queued_tcp_packets, vec);
nice_debug ("%s: Queued %" G_GSSIZE_FORMAT " bytes for agent %p.",
G_STRFUNC, len, agent);
return 0;
} else {
process_queued_tcp_packets (agent, stream, component);
}
/* Received data on a reliable connection. */
g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
......
......@@ -131,6 +131,8 @@ component_new (guint id, NiceAgent *agent, Stream *stream)
component_set_io_context (component, NULL);
component_set_io_callback (component, NULL, NULL, NULL, 0, NULL);
g_queue_init (&component->queued_tcp_packets);
return component;
}
......@@ -140,6 +142,7 @@ component_free (Component *cmp)
GSList *i;
GList *item;
IOCallbackData *data;
GOutputVector *vec;
for (i = cmp->local_candidates; i; i = i->next) {
NiceCandidate *candidate = i->data;
......@@ -201,6 +204,11 @@ component_free (Component *cmp)
cmp->ctx = NULL;
}
while ((vec = g_queue_pop_head (&cmp->queued_tcp_packets)) != NULL) {
g_free ((gpointer) vec->buffer);
g_slice_free (GOutputVector, vec);
}
g_mutex_clear (&cmp->io_mutex);
g_slice_free (Component, cmp);
......
......@@ -188,6 +188,11 @@ struct _Component
guint min_port;
guint max_port;
/* Queue of messages received before a selected socket was available to send
* ACKs on. The messages are dequeued to the pseudo-TCP socket once a selected
* UDP socket is available. This is only used for reliable Components. */
GQueue queued_tcp_packets;
};
Component *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment