Commit b560a86f authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête
Browse files

agent: Combine nice_agent_recv() and nice_agent_recv_nonblocking()

Sharing is caring.
parent 73124323
......@@ -2463,47 +2463,10 @@ nice_agent_recv_cancelled_cb (GCancellable *cancellable, gpointer user_data)
return !g_cancellable_set_error_if_cancelled (cancellable, error);
}
/**
* nice_agent_recv:
* @agent: a #NiceAgent
* @stream_id: the ID of the stream to receive on
* @component_id: the ID of the component to receive on
* @buf: (array length=buf_len) (out caller-allocates): caller-allocated buffer
* to write the received data into, of length at least @buf_len
* @buf_len: length of @buf
* @cancellable: (allow-none): a #GCancellable to allow the operation to be
* cancelled from another thread, or %NULL
* @error: (allow-none): return location for a #GError, or %NULL
*
* Block on receiving data from the given stream/component combination on
* @agent, returning only once at least 1 byte has been received and written
* into @buf, the stream is closed by the other end or by calling
* nice_agent_remove_stream(), or @cancellable is cancelled.
*
* In the non-error case, in reliable mode, this will block until exactly
* @buf_len bytes have been received. In non-reliable mode, it will block until
* a single message has been received. In this case, @buf must be big enough to
* contain an entire message (65536 bytes), or any excess data may be silently
* dropped.
*
* This must not be used in combination with nice_agent_attach_recv() on the
* same stream/component pair.
*
* Internally, this may iterate the current thread’s default main context.
*
* If the stream/component pair doesn’t exist, or if a suitable candidate socket
* hasn’t yet been selected for it, a %G_IO_ERROR_BROKEN_PIPE error will be
* returned. A %G_IO_ERROR_CANCELLED error will be returned if the operation was
* cancelled. %G_IO_ERROR_FAILED will be returned for other errors.
*
* Returns: the number of bytes written to @buf on success (guaranteed to be
* greater than 0 unless @buf_len is 0), or -1 on error
*
* Since: 0.1.5
*/
NICEAPI_EXPORT gssize
nice_agent_recv (NiceAgent *agent, guint stream_id, guint component_id,
guint8 *buf, gsize buf_len, GCancellable *cancellable, GError **error)
static gssize
nice_agent_recv_blocking_or_nonblocking (NiceAgent *agent, guint stream_id,
guint component_id, gboolean blocking, guint8 *buf, gsize buf_len,
GCancellable *cancellable, GError **error)
{
GMainContext *context;
Stream *stream;
......@@ -2511,6 +2474,7 @@ nice_agent_recv (NiceAgent *agent, guint stream_id, guint component_id,
gssize len = -1;
GSource *cancellable_source = NULL;
gboolean received_enough = FALSE, error_reported = FALSE;
gboolean all_sockets_would_block = FALSE;
GError *child_error = NULL;
g_return_val_if_fail (NICE_IS_AGENT (agent), -1);
......@@ -2588,21 +2552,57 @@ nice_agent_recv (NiceAgent *agent, guint stream_id, guint component_id,
g_mutex_unlock (&component->io_mutex);
/* For a reliable stream, grab any data from the pseudo-TCP input buffer
* before trying the sockets. */
if (agent->reliable && component->tcp != NULL &&
pseudo_tcp_socket_get_available_bytes (component->tcp) > 0) {
len = pseudo_tcp_socket_recv (component->tcp, (gchar *) component->recv_buf,
component->recv_buf_len);
adjust_tcp_clock (agent, stream, component);
nice_debug ("%s: Received %" G_GSSIZE_FORMAT " bytes from pseudo-TCP read "
"buffer.", G_STRFUNC, len);
if (len < 0 &&
pseudo_tcp_socket_get_error (component->tcp) == EWOULDBLOCK) {
len = 0;
} else if (len < 0 &&
pseudo_tcp_socket_get_error (component->tcp) == ENOTCONN) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
"Error reading data from pseudo-TCP socket: not connected.");
} else if (len < 0) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Error reading data from pseudo-TCP socket.");
} else if (len > 0) {
/* Got some data! */
component->recv_buf_valid_len += len;
}
received_enough = (component->recv_buf_valid_len == buf_len);
error_reported = (child_error != NULL);
}
/* Each iteration of the main context will either receive some data, a
* cancellation error or a socket error.
*
* In reliable mode, iterate the loop enough to receive exactly @buf_len
* bytes. In non-reliable mode, iterate the loop to receive a single message.
* In blocking, reliable mode, iterate the loop enough to receive exactly
* @buf_len bytes. In blocking, non-reliable mode, iterate the loop to receive
* a single message. In non-blocking mode, stop iterating the loop if all
* sockets would block (i.e. if no data was received for an iteration).
*/
while (!received_enough && !error_reported) {
while (!received_enough && !error_reported && !all_sockets_would_block) {
gsize prev_recv_buf_valid_len = component->recv_buf_valid_len;
agent_unlock ();
g_main_context_iteration (context, TRUE);
g_main_context_iteration (context, blocking);
agent_lock ();
received_enough =
((agent->reliable && component->recv_buf_valid_len >= buf_len) ||
((agent->reliable && component->recv_buf_valid_len == buf_len) ||
(!agent->reliable && component->recv_buf_valid_len > 0));
error_reported = (child_error != NULL);
all_sockets_would_block =
!blocking && (component->recv_buf_valid_len == prev_recv_buf_valid_len);
}
len = component->recv_buf_valid_len;
......@@ -2619,14 +2619,18 @@ nice_agent_recv (NiceAgent *agent, guint stream_id, guint component_id,
g_main_context_unref (context);
/* Handle errors and cancellations. */
if (!received_enough) {
g_assert (error_reported);
if (error_reported) {
len = -1;
} else if (len == 0 && all_sockets_would_block) {
g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
g_strerror (EAGAIN));
len = -1;
}
done:
g_assert ((child_error != NULL) == (len == -1));
g_assert (len != 0);
g_assert (len < 0 || (gsize) len <= buf_len);
if (child_error != NULL)
g_propagate_error (error, child_error);
......@@ -2636,6 +2640,52 @@ done:
return len;
}
/**
* nice_agent_recv:
* @agent: a #NiceAgent
* @stream_id: the ID of the stream to receive on
* @component_id: the ID of the component to receive on
* @buf: (array length=buf_len) (out caller-allocates): caller-allocated buffer
* to write the received data into, of length at least @buf_len
* @buf_len: length of @buf
* @cancellable: (allow-none): a #GCancellable to allow the operation to be
* cancelled from another thread, or %NULL
* @error: (allow-none): return location for a #GError, or %NULL
*
* Block on receiving data from the given stream/component combination on
* @agent, returning only once at least 1 byte has been received and written
* into @buf, the stream is closed by the other end or by calling
* nice_agent_remove_stream(), or @cancellable is cancelled.
*
* In the non-error case, in reliable mode, this will block until exactly
* @buf_len bytes have been received. In non-reliable mode, it will block until
* a single message has been received. In this case, @buf must be big enough to
* contain an entire message (65536 bytes), or any excess data may be silently
* dropped.
*
* This must not be used in combination with nice_agent_attach_recv() on the
* same stream/component pair.
*
* Internally, this may iterate the current thread’s default main context.
*
* If the stream/component pair doesn’t exist, or if a suitable candidate socket
* hasn’t yet been selected for it, a %G_IO_ERROR_BROKEN_PIPE error will be
* returned. A %G_IO_ERROR_CANCELLED error will be returned if the operation was
* cancelled. %G_IO_ERROR_FAILED will be returned for other errors.
*
* Returns: the number of bytes written to @buf on success (guaranteed to be
* greater than 0 unless @buf_len is 0), or -1 on error
*
* Since: 0.1.5
*/
NICEAPI_EXPORT gssize
nice_agent_recv (NiceAgent *agent, guint stream_id, guint component_id,
guint8 *buf, gsize buf_len, GCancellable *cancellable, GError **error)
{
return nice_agent_recv_blocking_or_nonblocking (agent, stream_id,
component_id, TRUE, buf, buf_len, cancellable, error);
}
/**
* nice_agent_recv_nonblocking:
* @agent: a #NiceAgent
......@@ -2684,131 +2734,8 @@ nice_agent_recv_nonblocking (NiceAgent *agent, guint stream_id,
guint component_id, guint8 *buf, gsize buf_len, GCancellable *cancellable,
GError **error)
{
Component *component;
Stream *stream;
gssize total_len = 0;
gboolean received_enough = FALSE, error_reported = FALSE;
gboolean all_sockets_would_block = FALSE;
GError *child_error = NULL;
if (buf_len == 0)
return 0;
/* Support cancellation at the beginning only. */
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
/* Try and receive some data. */
agent_lock ();
if (!agent_find_component (agent, stream_id, component_id,
&stream, &component)) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
"Invalid stream/component.");
total_len = -1;
goto done;
}
/* For a reliable stream, grab any data from the pseudo-TCP input buffer
* before trying the sockets (which we try to see if there’s any more data
* available to read without blocking). */
if (agent->reliable && component->tcp != NULL &&
pseudo_tcp_socket_get_available_bytes (component->tcp) > 0) {
gssize len;
len = pseudo_tcp_socket_recv (component->tcp, (gchar *) buf, buf_len);
adjust_tcp_clock (agent, stream, component);
nice_debug ("%s: Received %" G_GSSIZE_FORMAT " bytes from pseudo-TCP read "
"buffer.", G_STRFUNC, len);
if (len < 0 &&
pseudo_tcp_socket_get_error (component->tcp) == EWOULDBLOCK) {
len = 0;
} else if (len < 0 &&
pseudo_tcp_socket_get_error (component->tcp) == ENOTCONN) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
"Error reading data from pseudo-TCP socket: not connected.");
} else if (len < 0) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Error reading data from pseudo-TCP socket.");
} else if (len > 0) {
/* Got some data! */
buf += len;
buf_len -= len;
total_len += len;
}
received_enough = ((gsize) total_len == buf_len);
error_reported = (len < 0);
}
/* Each call to agent_recv_locked() will either receive some data or a socket
* error (including EWOULDBLOCK). (Cancellation is not supported.) If *any*
* socket returns an error, discard all the data in @buf and return an error
* from nice_agent_recv_nonblocking() overall.
*
* In reliable mode, iterate the loop enough to receive at least one byte.
* In non-reliable mode, iterate the loop to receive a single message. */
while (!received_enough && !error_reported && !all_sockets_would_block) {
GSList *i;
gssize len = 0;
for (i = component->socket_sources; i != NULL; i = i->next) {
SocketSource *socket_source = i->data;
/* Actually read the data. This will return 0 if the data has already been
* handled (e.g. for STUN control packets). */
len = agent_recv_locked (agent, stream, component,
socket_source->socket, buf, buf_len);
nice_debug ("%s: Received %" G_GSSIZE_FORMAT " bytes from socket %p.",
G_STRFUNC, len, socket_source->socket);
if (len < 0) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Unable to receive from socket %p. Detaching.",
socket_source->socket);
break;
} else if (len > 0) {
/* Got some data! */
buf += len;
buf_len -= len;
total_len += len;
break;
}
}
received_enough =
((agent->reliable && (gsize) total_len == buf_len) ||
(!agent->reliable && total_len > 0));
error_reported = (len < 0);
all_sockets_would_block = (len == 0);
}
nice_debug ("%s: total_len: %" G_GSSIZE_FORMAT ", buf_len: %" G_GSIZE_FORMAT,
G_STRFUNC, total_len, buf_len);
if (error_reported) {
total_len = -1;
} else if (total_len == 0 && all_sockets_would_block) {
g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
g_strerror (EAGAIN));
total_len = -1;
}
done:
g_assert ((child_error != NULL) == (total_len == -1));
g_assert (total_len != 0);
if (child_error != NULL)
g_propagate_error (error, child_error);
agent_unlock ();
return total_len;
return nice_agent_recv_blocking_or_nonblocking (agent, stream_id,
component_id, FALSE, buf, buf_len, cancellable, error);
}
/**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment