Commit 6d3a32a0 authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête

agent: Add a nice_agent_recv_nonblocking() function

This is a non-blocking variant of nice_agent_recv(), and will be used
internally by the GPollableInputStream implementation. External
implementations may use it as well.

It reserves the right to iterate the main context, but doesn’t currently
do so.
parent 11f04d6c
......@@ -2636,6 +2636,181 @@ done:
return len;
}
/**
* nice_agent_recv_nonblocking:
* @agent: a #NiceAgent
* @stream_id: the ID of the stream to receive on
* @component_id: the ID of the component to receive on
* @buf: (array length=buf_len) (out caller-allocates): caller-allocated buffer
* to write the received data into, of length at least @buf_len
* @buf_len: length of @buf
* @cancellable: (allow-none): a #GCancellable to allow the operation to be
* cancelled from another thread, or %NULL
* @error: (allow-none): return location for a #GError, or %NULL
*
* Try to receive data from the given stream/component combination on @agent,
* without blocking. If receiving data would block, -1 is returned and a
* %G_IO_ERROR_WOULD_BLOCK is set in @error. If any other error occurs, -1 is
* returned. Otherwise, 0 is returned if (and only if) @buf_len is 0. In all
* other cases, the number of bytes read into @buf is returned, and will be
* greater than 0.
*
* For a reliable @agent, this function will receive as many bytes as possible
* up to @buf_len. For a non-reliable @agent, it will receive a single message.
* In this case, @buf must be big enough to contain the entire message (65536
* bytes), or any excess data may be silently dropped.
*
* As this function is non-blocking, @cancellable is included only for parity
* with nice_agent_recv(). If @cancellable is cancelled before this function is
* called, a %G_IO_ERROR_CANCELLED error will be returned immediately.
*
* This must not be used in combination with nice_agent_attach_recv() on the
* same stream/component pair.
*
* Internally, this may iterate the current thread’s default main context.
*
* If the stream/component pair doesn’t exist, or if a suitable candidate socket
* hasn’t yet been selected for it, a %G_IO_ERROR_BROKEN_PIPE error will be
* returned. A %G_IO_ERROR_CANCELLED error will be returned if the operation was
* cancelled. %G_IO_ERROR_FAILED will be returned for other errors.
*
* Returns: the number of bytes received into @buf on success (guaranteed to be
* greater than 0 unless @buf_len is 0), or -1 on error
*
* Since: 0.1.5
*/
NICEAPI_EXPORT gssize
nice_agent_recv_nonblocking (NiceAgent *agent, guint stream_id,
guint component_id, guint8 *buf, gsize buf_len, GCancellable *cancellable,
GError **error)
{
Component *component;
Stream *stream;
gssize total_len = 0;
gboolean received_enough = FALSE, error_reported = FALSE;
gboolean all_sockets_would_block = FALSE;
GError *child_error = NULL;
if (buf_len == 0)
return 0;
/* Support cancellation at the beginning only. */
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
/* Try and receive some data. */
agent_lock ();
if (!agent_find_component (agent, stream_id, component_id,
&stream, &component)) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
"Invalid stream/component.");
total_len = -1;
goto done;
}
/* For a reliable stream, grab any data from the pseudo-TCP input buffer
* before trying the sockets (which we try to see if there’s any more data
* available to read without blocking). */
if (agent->reliable && component->tcp != NULL &&
pseudo_tcp_socket_get_available_bytes (component->tcp) > 0) {
gssize len;
len = pseudo_tcp_socket_recv (component->tcp, (gchar *) buf, buf_len);
adjust_tcp_clock (agent, stream, component);
nice_debug ("%s: Received %" G_GSSIZE_FORMAT " bytes from pseudo-TCP read "
"buffer.", G_STRFUNC, len);
if (len < 0 &&
pseudo_tcp_socket_get_error (component->tcp) == EWOULDBLOCK) {
len = 0;
} else if (len < 0 &&
pseudo_tcp_socket_get_error (component->tcp) == ENOTCONN) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
"Error reading data from pseudo-TCP socket: not connected.");
} else if (len < 0) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Error reading data from pseudo-TCP socket.");
} else if (len > 0) {
/* Got some data! */
buf += len;
buf_len -= len;
total_len += len;
}
received_enough = ((gsize) total_len == buf_len);
error_reported = (len < 0);
}
/* Each call to agent_recv_locked() will either receive some data or a socket
* error (including EWOULDBLOCK). (Cancellation is not supported.) If *any*
* socket returns an error, discard all the data in @buf and return an error
* from nice_agent_recv_nonblocking() overall.
*
* In reliable mode, iterate the loop enough to receive at least one byte.
* In non-reliable mode, iterate the loop to receive a single message. */
while (!received_enough && !error_reported && !all_sockets_would_block) {
GSList *i;
gssize len = 0;
for (i = component->socket_sources; i != NULL; i = i->next) {
SocketSource *socket_source = i->data;
/* Actually read the data. This will return 0 if the data has already been
* handled (e.g. for STUN control packets). */
len = agent_recv_locked (agent, stream, component,
socket_source->socket, buf, buf_len);
nice_debug ("%s: Received %" G_GSSIZE_FORMAT " bytes from socket %p.",
G_STRFUNC, len, socket_source->socket);
if (len < 0) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Unable to receive from socket %p. Detaching.",
socket_source->socket);
break;
} else if (len > 0) {
/* Got some data! */
buf += len;
buf_len -= len;
total_len += len;
break;
}
}
received_enough =
((agent->reliable && (gsize) total_len == buf_len) ||
(!agent->reliable && total_len > 0));
error_reported = (len < 0);
all_sockets_would_block = (len == 0);
}
nice_debug ("%s: total_len: %" G_GSSIZE_FORMAT ", buf_len: %" G_GSIZE_FORMAT,
G_STRFUNC, total_len, buf_len);
if (error_reported) {
total_len = -1;
} else if (total_len == 0 && all_sockets_would_block) {
g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
g_strerror (EAGAIN));
total_len = -1;
}
done:
g_assert ((child_error != NULL) == (total_len == -1));
g_assert (total_len != 0);
if (child_error != NULL)
g_propagate_error (error, child_error);
agent_unlock ();
return total_len;
}
/**
* nice_agent_send_full:
* @agent: a #NiceAgent
......
......@@ -711,6 +711,16 @@ nice_agent_recv (
GCancellable *cancellable,
GError **error);
gssize
nice_agent_recv_nonblocking (
NiceAgent *agent,
guint stream_id,
guint component_id,
guint8 *buf,
gsize buf_len,
GCancellable *cancellable,
GError **error);
/**
* nice_agent_set_selected_pair:
* @agent: The #NiceAgent Object
......
......@@ -25,6 +25,7 @@ nice_agent_get_selected_pair
nice_agent_send
nice_agent_send_full
nice_agent_recv
nice_agent_recv_nonblocking
nice_agent_attach_recv
nice_agent_set_selected_pair
nice_agent_set_selected_remote_candidate
......
......@@ -17,6 +17,7 @@ nice_address_to_string
nice_agent_add_local_address
nice_agent_add_stream
nice_agent_recv
nice_agent_recv_nonblocking
nice_agent_attach_recv
nice_agent_gather_candidates
nice_agent_generate_local_candidate_sdp
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment