Commit 919b2c46 authored by Olivier Crête's avatar Olivier Crête
Browse files

agent: Restore the ability nice_agent_send() to send partial buffers

This is very important for reliable mode.

Also use it in the GOutputStream so as to not get into the case where
there is still some space in the TCP buffer, but not enough for one message.
Also warn against this problem.
parent 430e8db5
...@@ -1035,23 +1035,29 @@ pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data) ...@@ -1035,23 +1035,29 @@ pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data)
* *
* Returns the number of messages successfully sent on success (which may be * Returns the number of messages successfully sent on success (which may be
* zero if sending the first buffer of the message would have blocked), or * zero if sending the first buffer of the message would have blocked), or
* a negative number on error. */ * a negative number on error. If "allow_partial" is TRUE, then it returns
* the number of bytes sent
*/
static gint static gint
pseudo_tcp_socket_send_messages (PseudoTcpSocket *self, pseudo_tcp_socket_send_messages (PseudoTcpSocket *self,
const NiceOutputMessage *messages, guint n_messages, GError **error) const NiceOutputMessage *messages, guint n_messages, gboolean allow_partial,
GError **error)
{ {
guint i; guint i;
gint bytes_sent = 0;
for (i = 0; i < n_messages; i++) { for (i = 0; i < n_messages; i++) {
const NiceOutputMessage *message = &messages[i]; const NiceOutputMessage *message = &messages[i];
guint j; guint j;
/* If there’s not enough space for the entire message, bail now before /* If allow_partial is FALSE and there’s not enough space for the
* queuing anything. This doesn’t gel with the fact this function is only * entire message, bail now before queuing anything. This doesn’t
* used in reliable mode, and there is no concept of a ‘message’, but is * gel with the fact this function is only used in reliable mode,
* necessary because the calling API has no way of returning to the client * and there is no concept of a ‘message’, but is necessary
* because the calling API has no way of returning to the client
* and indicating that a message was partially sent. */ * and indicating that a message was partially sent. */
if (output_message_get_size (message) > if (!allow_partial &&
output_message_get_size (message) >
pseudo_tcp_socket_get_available_send_space (self)) { pseudo_tcp_socket_get_available_send_space (self)) {
return i; return i;
} }
...@@ -1068,22 +1074,26 @@ pseudo_tcp_socket_send_messages (PseudoTcpSocket *self, ...@@ -1068,22 +1074,26 @@ pseudo_tcp_socket_send_messages (PseudoTcpSocket *self,
/* In case of -1, the error is either EWOULDBLOCK or ENOTCONN, which both /* In case of -1, the error is either EWOULDBLOCK or ENOTCONN, which both
* need the user to wait for the reliable-transport-writable signal */ * need the user to wait for the reliable-transport-writable signal */
if (ret < 0 && pseudo_tcp_socket_get_error (self) == EWOULDBLOCK) { if (ret < 0) {
ret = 0; if (pseudo_tcp_socket_get_error (self) == EWOULDBLOCK)
return i; goto out;
} else if (ret < 0 && pseudo_tcp_socket_get_error (self) == ENOTCONN) {
g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, if (pseudo_tcp_socket_get_error (self) == ENOTCONN)
"TCP connection is not yet established."); g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
return ret; "TCP connection is not yet established.");
} else if (ret < 0) { else
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Error writing data to pseudo-TCP socket."); "Error writing data to pseudo-TCP socket.");
return ret; return -1;
} else {
bytes_sent += ret;
} }
} }
} }
return i; out:
return allow_partial ? bytes_sent : (gint) i;
} }
/* Will fill up @messages from the first free byte onwards (as determined using /* Will fill up @messages from the first free byte onwards (as determined using
...@@ -2881,7 +2891,7 @@ output_message_get_size (const NiceOutputMessage *message) ...@@ -2881,7 +2891,7 @@ output_message_get_size (const NiceOutputMessage *message)
return message_len; return message_len;
} }
/** /*
* nice_input_message_iter_reset: * nice_input_message_iter_reset:
* @iter: a #NiceInputMessageIter * @iter: a #NiceInputMessageIter
* *
...@@ -2898,7 +2908,7 @@ nice_input_message_iter_reset (NiceInputMessageIter *iter) ...@@ -2898,7 +2908,7 @@ nice_input_message_iter_reset (NiceInputMessageIter *iter)
iter->offset = 0; iter->offset = 0;
} }
/** /*
* nice_input_message_iter_is_at_end: * nice_input_message_iter_is_at_end:
* @iter: a #NiceInputMessageIter * @iter: a #NiceInputMessageIter
* @messages: (array length=n_messages): an array of #NiceInputMessages * @messages: (array length=n_messages): an array of #NiceInputMessages
...@@ -2920,7 +2930,7 @@ nice_input_message_iter_is_at_end (NiceInputMessageIter *iter, ...@@ -2920,7 +2930,7 @@ nice_input_message_iter_is_at_end (NiceInputMessageIter *iter,
iter->buffer == 0 && iter->offset == 0); iter->buffer == 0 && iter->offset == 0);
} }
/** /*
* nice_input_message_iter_get_n_valid_messages: * nice_input_message_iter_get_n_valid_messages:
* @iter: a #NiceInputMessageIter * @iter: a #NiceInputMessageIter
* *
...@@ -3227,31 +3237,29 @@ nice_agent_recv_nonblocking (NiceAgent *agent, guint stream_id, ...@@ -3227,31 +3237,29 @@ nice_agent_recv_nonblocking (NiceAgent *agent, guint stream_id,
return local_messages.length; return local_messages.length;
} }
NICEAPI_EXPORT gint /* nice_agent_send_messages_nonblocking_internal:
nice_agent_send_messages_nonblocking ( *
* Returns: number of bytes sent if allow_partial is %TRUE, the number
* of messages otherwise.
*/
static gint
nice_agent_send_messages_nonblocking_internal (
NiceAgent *agent, NiceAgent *agent,
guint stream_id, guint stream_id,
guint component_id, guint component_id,
const NiceOutputMessage *messages, const NiceOutputMessage *messages,
guint n_messages, guint n_messages,
GCancellable *cancellable, gboolean allow_partial,
GError **error) GError **error)
{ {
Stream *stream; Stream *stream;
Component *component; Component *component;
gint n_sent_messages = -1; gint n_sent = -1; /* is in bytes if allow_partial is TRUE,
otherwise in messages */
GError *child_error = NULL; GError *child_error = NULL;
g_return_val_if_fail (NICE_IS_AGENT (agent), -1); g_assert (n_messages == 1 || !allow_partial);
g_return_val_if_fail (stream_id >= 1, -1);
g_return_val_if_fail (component_id >= 1, -1);
g_return_val_if_fail (n_messages == 0 || messages != NULL, -1);
g_return_val_if_fail (
cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
g_return_val_if_fail (error == NULL || *error == NULL, -1);
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
agent_lock (); agent_lock ();
...@@ -3267,15 +3275,15 @@ nice_agent_send_messages_nonblocking ( ...@@ -3267,15 +3275,15 @@ nice_agent_send_messages_nonblocking (
if (component->tcp != NULL) { if (component->tcp != NULL) {
/* Send on the pseudo-TCP socket. */ /* Send on the pseudo-TCP socket. */
n_sent_messages = pseudo_tcp_socket_send_messages (component->tcp, messages, n_sent = pseudo_tcp_socket_send_messages (component->tcp, messages,
n_messages, &child_error); n_messages, allow_partial, &child_error);
adjust_tcp_clock (agent, stream, component); adjust_tcp_clock (agent, stream, component);
if (!pseudo_tcp_socket_can_send (component->tcp)) if (!pseudo_tcp_socket_can_send (component->tcp))
g_cancellable_reset (component->tcp_writable_cancellable); g_cancellable_reset (component->tcp_writable_cancellable);
if (n_sent < 0 && !g_error_matches (child_error, G_IO_ERROR,
if (n_sent_messages < 0) { G_IO_ERROR_WOULD_BLOCK)) {
/* Signal error */ /* Signal errors */
priv_pseudo_tcp_error (agent, stream, component); priv_pseudo_tcp_error (agent, stream, component);
} }
} else if (agent->reliable) { } else if (agent->reliable) {
...@@ -3297,39 +3305,69 @@ nice_agent_send_messages_nonblocking ( ...@@ -3297,39 +3305,69 @@ nice_agent_send_messages_nonblocking (
sock = component->selected_pair.local->sockptr; sock = component->selected_pair.local->sockptr;
addr = &component->selected_pair.remote->addr; addr = &component->selected_pair.remote->addr;
n_sent_messages = nice_socket_send_messages (sock, addr, messages, n_sent = nice_socket_send_messages (sock, addr, messages, n_messages);
n_messages);
if (n_sent_messages < 0) { if (n_sent < 0) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED, g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Error writing data to socket."); "Error writing data to socket.");
} else if (allow_partial) {
g_assert (n_messages == 1);
n_sent = output_message_get_size (messages);
} }
} else { } else {
/* Socket isn’t properly open yet. */ /* Socket isn’t properly open yet. */
n_sent_messages = 0; /* EWOULDBLOCK */ n_sent = 0; /* EWOULDBLOCK */
} }
/* Handle errors and cancellations. */ /* Handle errors and cancellations. */
if (n_sent_messages == 0) { if (n_sent == 0) {
g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
g_strerror (EAGAIN)); g_strerror (EAGAIN));
n_sent_messages = -1; n_sent = -1;
} }
nice_debug ("%s: n_sent_messages: %d, n_messages: %u", G_STRFUNC, nice_debug ("%s: n_sent: %d, n_messages: %u", G_STRFUNC,
n_sent_messages, n_messages); n_sent, n_messages);
done: done:
g_assert ((child_error != NULL) == (n_sent_messages == -1)); g_assert ((child_error != NULL) == (n_sent == -1));
g_assert (n_sent_messages != 0); g_assert (n_sent != 0);
g_assert (n_sent_messages < 0 || (guint) n_sent_messages <= n_messages); g_assert (n_sent < 0 ||
(!allow_partial && (guint) n_sent <= n_messages) ||
(allow_partial && n_messages == 1 &&
(gsize) n_sent <= output_message_get_size (&messages[0])));
if (child_error != NULL) if (child_error != NULL)
g_propagate_error (error, child_error); g_propagate_error (error, child_error);
agent_unlock (); agent_unlock ();
return n_sent_messages; return n_sent;
}
NICEAPI_EXPORT gint
nice_agent_send_messages_nonblocking (
NiceAgent *agent,
guint stream_id,
guint component_id,
const NiceOutputMessage *messages,
guint n_messages,
GCancellable *cancellable,
GError **error)
{
g_return_val_if_fail (NICE_IS_AGENT (agent), -1);
g_return_val_if_fail (stream_id >= 1, -1);
g_return_val_if_fail (component_id >= 1, -1);
g_return_val_if_fail (n_messages == 0 || messages != NULL, -1);
g_return_val_if_fail (
cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
g_return_val_if_fail (error == NULL || *error == NULL, -1);
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
return nice_agent_send_messages_nonblocking_internal (agent, stream_id,
component_id, messages, n_messages, FALSE, error);
} }
NICEAPI_EXPORT gint NICEAPI_EXPORT gint
...@@ -3341,15 +3379,18 @@ nice_agent_send ( ...@@ -3341,15 +3379,18 @@ nice_agent_send (
const gchar *buf) const gchar *buf)
{ {
GOutputVector local_buf = { buf, len }; GOutputVector local_buf = { buf, len };
gint n_sent_messages;
NiceOutputMessage local_message = { &local_buf, 1 }; NiceOutputMessage local_message = { &local_buf, 1 };
gint n_sent_bytes;
g_return_val_if_fail (NICE_IS_AGENT (agent), -1);
g_return_val_if_fail (stream_id >= 1, -1);
g_return_val_if_fail (component_id >= 1, -1);
g_return_val_if_fail (buf != NULL, -1);
n_sent_messages = nice_agent_send_messages_nonblocking (agent, stream_id, n_sent_bytes = nice_agent_send_messages_nonblocking_internal (agent,
component_id, &local_message, 1, NULL, NULL); stream_id, component_id, &local_message, 1, TRUE, NULL);
if (n_sent_messages == 1) return n_sent_bytes;
return len;
return n_sent_messages;
} }
NICEAPI_EXPORT GSList * NICEAPI_EXPORT GSList *
......
...@@ -678,6 +678,13 @@ nice_agent_send ( ...@@ -678,6 +678,13 @@ nice_agent_send (
* part-way through. Zero will be returned if @n_messages is zero, or if * part-way through. Zero will be returned if @n_messages is zero, or if
* transmission would have blocked on the first message. * transmission would have blocked on the first message.
* *
* In reliable mode, it is instead recommended to use
* nice_agent_send(). The return value can be less than @n_messages
* or 0 even if it is still possible to send a partial message. In
* this case, "nice-agent-writable" will never be triggered, so the
* application would have to use nice_agent_sent() to fill the buffer or have
* to retry sending at a later point.
*
* On failure, -1 will be returned and @error will be set. If the #NiceAgent is * On failure, -1 will be returned and @error will be set. If the #NiceAgent is
* reliable and the socket is not yet connected, %G_IO_ERROR_BROKEN_PIPE will be * reliable and the socket is not yet connected, %G_IO_ERROR_BROKEN_PIPE will be
* returned; if the write buffer is full, %G_IO_ERROR_WOULD_BLOCK will be * returned; if the write buffer is full, %G_IO_ERROR_WOULD_BLOCK will be
......
...@@ -304,9 +304,9 @@ typedef struct { ...@@ -304,9 +304,9 @@ typedef struct {
GCond cond; GCond cond;
GMutex mutex; GMutex mutex;
GError *error;
gboolean writable; gboolean writable;
gboolean cancelled;
} WriteData; } WriteData;
static void static void
...@@ -315,7 +315,6 @@ write_data_unref (WriteData *write_data) ...@@ -315,7 +315,6 @@ write_data_unref (WriteData *write_data)
if (g_atomic_int_dec_and_test (&write_data->ref_count)) { if (g_atomic_int_dec_and_test (&write_data->ref_count)) {
g_cond_clear (&write_data->cond); g_cond_clear (&write_data->cond);
g_mutex_clear (&write_data->mutex); g_mutex_clear (&write_data->mutex);
g_clear_error (&write_data->error);
g_slice_free (WriteData, write_data); g_slice_free (WriteData, write_data);
} }
} }
...@@ -326,8 +325,8 @@ write_cancelled_cb (GCancellable *cancellable, gpointer user_data) ...@@ -326,8 +325,8 @@ write_cancelled_cb (GCancellable *cancellable, gpointer user_data)
WriteData *write_data = user_data; WriteData *write_data = user_data;
g_mutex_lock (&write_data->mutex); g_mutex_lock (&write_data->mutex);
g_cancellable_set_error_if_cancelled (cancellable, &write_data->error);
g_cond_broadcast (&write_data->cond); g_cond_broadcast (&write_data->cond);
write_data->cancelled = TRUE;
g_mutex_unlock (&write_data->mutex); g_mutex_unlock (&write_data->mutex);
} }
...@@ -349,8 +348,7 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count ...@@ -349,8 +348,7 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
{ {
NiceOutputStream *self = NICE_OUTPUT_STREAM (stream); NiceOutputStream *self = NICE_OUTPUT_STREAM (stream);
gssize len = -1; gssize len = -1;
gint n_sent_messages; gint n_sent;
GError *child_error = NULL;
NiceAgent *agent = NULL; /* owned */ NiceAgent *agent = NULL; /* owned */
gulong cancel_id = 0, writeable_id; gulong cancel_id = 0, writeable_id;
WriteData *write_data; WriteData *write_data;
...@@ -375,14 +373,13 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count ...@@ -375,14 +373,13 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
return 0; return 0;
} }
/* FIXME: nice_agent_send_full() is non-blocking, which is a bit unexpected /* FIXME: nice_agent_send() is non-blocking, which is a bit unexpected
* since nice_agent_recv() is blocking. Currently this uses a fairly dodgy * since nice_agent_recv() is blocking. Currently this uses a fairly dodgy
* GCond solution; would be much better for nice_agent_send() to block * GCond solution; would be much better for nice_agent_send() to block
* properly in the main loop. */ * properly in the main loop. */
len = 0; len = 0;
write_data = g_slice_new0 (WriteData); write_data = g_slice_new0 (WriteData);
g_atomic_int_set (&write_data->ref_count, 3); g_atomic_int_set (&write_data->ref_count, 3);
write_data->error = NULL;
g_mutex_init (&write_data->mutex); g_mutex_init (&write_data->mutex);
g_cond_init (&write_data->cond); g_cond_init (&write_data->cond);
...@@ -402,59 +399,43 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count ...@@ -402,59 +399,43 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
do { do {
GOutputVector local_buf = { (const guint8 *) buffer + len, count - len };
NiceOutputMessage local_message = {&local_buf, 1};
/* Have to unlock while calling into the agent because /* Have to unlock while calling into the agent because
* it will take the agent lock which will cause a deadlock if one of * it will take the agent lock which will cause a deadlock if one of
* the callbacks is called. * the callbacks is called.
*/ */
if (g_cancellable_is_cancelled (cancellable))
break;
write_data->writable = FALSE; write_data->writable = FALSE;
g_mutex_unlock (&write_data->mutex); g_mutex_unlock (&write_data->mutex);
n_sent_messages = nice_agent_send_messages_nonblocking (agent, n_sent = nice_agent_send (agent, self->priv->stream_id,
self->priv->stream_id, self->priv->component_id, &local_message, 1, self->priv->component_id, count - len, buffer + len);
cancellable, &child_error);
g_mutex_lock (&write_data->mutex); g_mutex_lock (&write_data->mutex);
if (n_sent_messages == -1 && if (n_sent <= 0) {
g_error_matches (child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { if (!write_data->writable && !write_data->cancelled)
/* EWOULDBLOCK. */
g_clear_error (&child_error);
if (!write_data->writable && !write_data->error)
g_cond_wait (&write_data->cond, &write_data->mutex); g_cond_wait (&write_data->cond, &write_data->mutex);
} else if (n_sent_messages > 0) { } else if (n_sent > 0) {
/* Success. */ /* Success. */
len = count; len += n_sent;
} else {
/* Other error. */
len = n_sent_messages;
break;
} }
} while ((gsize) len < count); } while ((gsize) len < count);
g_signal_handler_disconnect (G_OBJECT (agent), writeable_id); g_signal_handler_disconnect (G_OBJECT (agent), writeable_id);
g_mutex_unlock (&write_data->mutex); g_mutex_unlock (&write_data->mutex);
if (cancellable != NULL) { if (cancel_id)
g_cancellable_disconnect (cancellable, cancel_id); g_cancellable_disconnect (cancellable, cancel_id);
/* If we were cancelled, but we have no other errors can couldn't write
* anything, return the cancellation error. If we could write if (len == 0) {
* something partial, there is no error. g_cancellable_set_error_if_cancelled (cancellable, error);
*/ len = -1;
if (write_data->error && !child_error && len == 0) {
g_propagate_error (&child_error, write_data->error);
len = -1;
}
} }
write_data_unref (write_data); write_data_unref (write_data);
g_assert ((child_error != NULL) == (len == -1));
if (child_error)
g_propagate_error (error, child_error);
g_object_unref (agent); g_object_unref (agent);
g_assert (len != 0); g_assert (len != 0);
...@@ -522,9 +503,7 @@ nice_output_stream_write_nonblocking (GPollableOutputStream *stream, ...@@ -522,9 +503,7 @@ nice_output_stream_write_nonblocking (GPollableOutputStream *stream,
{ {
NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv; NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
NiceAgent *agent; /* owned */ NiceAgent *agent; /* owned */
GOutputVector local_buf = { buffer, count }; gint n_sent;
NiceOutputMessage local_message = { &local_buf, 1 };
gint n_sent_messages;
/* Closed streams are not writeable. */ /* Closed streams are not writeable. */
if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream))) { if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream))) {
...@@ -551,12 +530,18 @@ nice_output_stream_write_nonblocking (GPollableOutputStream *stream, ...@@ -551,12 +530,18 @@ nice_output_stream_write_nonblocking (GPollableOutputStream *stream,
return -1; return -1;
} }
n_sent_messages = nice_agent_send_messages_nonblocking (agent, n_sent = nice_agent_send (agent, priv->stream_id, priv->component_id,
priv->stream_id, priv->component_id, &local_message, 1, NULL, error); count, buffer);
if (n_sent == -1)
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
g_strerror (EAGAIN));
g_object_unref (agent); g_object_unref (agent);
return (n_sent_messages == 1) ? (gssize) count : n_sent_messages; return n_sent;
} }
static GSource * static GSource *
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment