Commit 253be348 authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête

agent: Add support for vectored I/O for receives

Add two new public functions:
 • nice_agent_recv_messages()
 • nice_agent_recv_messages_nonblocking()
which allow receiving multiple messages in a single call, and support
vectors of buffers to receive the messages into.

The existing nice_agent_recv[_nonblocking]() APIs have been left
untouched.

This tidies up a lot of the message handling code internally, and
eliminates a couple of memcpy()s. There are still a few more memcpy()s
on the critical path, which could be eliminated with further work.

In the reliable agent case, every message is memcpy()ed twice: once into
the pseudo-TCP receive buffer, and once out of it. The copy on input
could be eliminated (in the case of in-order delivery of packets) by
receiving directly into the receive buffer. The copy on output can’t be
eliminated except in the I/O callback case (when
nice_agent_attach_recv() has been used), in which case the callback
could be invoked with a pointer directly into the pseudo-TCP receive
buffer.

In the non-reliable agent case, zero memcpy()s are used.

A couple of the more complex socket implementations (TURN and HTTP) have
slow paths during setup, and partially also during normal use. These
could be optimised further, and FIXME comments have been added.
parent 9661150d
......@@ -52,11 +52,44 @@
#include <glib.h>
#include "agent.h"
/**
* NiceInputMessageIter:
* @message: index of the message currently being written into
* @buffer: index of the buffer currently being written into
* @offset: byte offset into the buffer
*
* Iterator for sequentially writing into an array of #NiceInputMessages,
* tracking the current write position (i.e. the index of the next byte to be
* written).
*
* If @message is equal to the number of messages in the associated
* #NiceInputMessage array, and @buffer and @offset are zero, the iterator is at
* the end of the messages array, and the array is (presumably) full.
*
* Since: 0.1.5
*/
typedef struct {
guint message;
guint buffer;
gsize offset;
} NiceInputMessageIter;
void
nice_input_message_iter_reset (NiceInputMessageIter *iter);
gboolean
nice_input_message_iter_is_at_end (NiceInputMessageIter *iter,
NiceInputMessage *messages, guint n_messages);
guint
nice_input_message_iter_get_n_valid_messages (NiceInputMessageIter *iter);
#include "socket.h"
#include "candidate.h"
#include "stream.h"
#include "conncheck.h"
#include "component.h"
#include "random.h"
#include "stun/stunagent.h"
#include "stun/usages/turn.h"
#include "stun/usages/ice.h"
......@@ -178,9 +211,6 @@ component_io_cb (
GIOCondition condition,
gpointer data);
gssize agent_recv_locked (NiceAgent *agent, Stream *stream,
Component *component, NiceSocket *socket, guint8 *buf, gsize buf_len);
gsize
memcpy_buffer_to_input_message (NiceInputMessage *message,
const guint8 *buffer, gsize buffer_length);
......
This diff is collapsed.
......@@ -58,16 +58,16 @@
* for valid streams/components.
*
* Each stream can receive data in one of two ways: using
* nice_agent_attach_recv() or nice_agent_recv() (and the derived
* nice_agent_attach_recv() or nice_agent_recv_messages() (and the derived
* #NiceInputStream and #NiceIOStream classes accessible using
* nice_agent_build_io_stream()). nice_agent_attach_recv() is non-blocking: it
* takes a user-provided callback function and attaches the stream’s socket to
* the provided #GMainContext, invoking the callback in that context for every
* packet received. nice_agent_recv() instead blocks on receiving a packet, and
* writes it directly into a user-provided buffer. This reduces the number of
* callback invokations and (potentially) buffer copies required to receive
* packets. nice_agent_recv() (or #NiceInputStream) is designed to be used in a
* blocking loop in a separate thread.
* packet received. nice_agent_recv_messages() instead blocks on receiving a
* packet, and writes it directly into a user-provided buffer. This reduces the
* number of callback invokations and (potentially) buffer copies required to
* receive packets. nice_agent_recv_messages() (or #NiceInputStream) is designed
* to be used in a blocking loop in a separate thread.
*
<example>
<title>Simple example on how to use libnice</title>
......@@ -721,7 +721,7 @@ nice_agent_restart (
* Attaches the stream's component's sockets to the Glib Mainloop Context in
* order to be notified whenever data becomes available for a component.
*
* This must not be used in combination with nice_agent_recv() (or
* This must not be used in combination with nice_agent_recv_messages() (or
* #NiceIOStream or #NiceInputStream) on the same stream/component pair.
*
* Calling nice_agent_attach_recv() with a %NULL @func will detach any existing
......@@ -751,6 +751,16 @@ nice_agent_recv (
GCancellable *cancellable,
GError **error);
gint
nice_agent_recv_messages (
NiceAgent *agent,
guint stream_id,
guint component_id,
NiceInputMessage *messages,
guint n_messages,
GCancellable *cancellable,
GError **error);
gssize
nice_agent_recv_nonblocking (
NiceAgent *agent,
......@@ -761,6 +771,16 @@ nice_agent_recv_nonblocking (
GCancellable *cancellable,
GError **error);
gint
nice_agent_recv_messages_nonblocking (
NiceAgent *agent,
guint stream_id,
guint component_id,
NiceInputMessage *messages,
guint n_messages,
GCancellable *cancellable,
GError **error);
/**
* nice_agent_set_selected_pair:
* @agent: The #NiceAgent Object
......
......@@ -126,8 +126,8 @@ component_new (guint id, NiceAgent *agent, Stream *stream)
component->io_callback_id = 0;
/* Start off with a fresh main context and all I/O paused. This
* will be updated when nice_agent_attach_recv() or nice_agent_recv() are
* called. */
* will be updated when nice_agent_attach_recv() or nice_agent_recv_messages()
* are called. */
component_set_io_context (component, NULL);
component_set_io_callback (component, NULL, NULL, NULL, 0, NULL);
......@@ -561,9 +561,9 @@ component_set_io_context (Component *component, GMainContext *context)
g_mutex_unlock (&component->io_mutex);
}
/* (func, user_data) and (recv_buf, recv_buf_len) are mutually exclusive.
* At most one of the two must be specified; if both are NULL, the Component
* will not receive any data (i.e. reception is paused).
/* (func, user_data) and (recv_messages, n_recv_messages) are mutually
* exclusive. At most one of the two must be specified; if both are NULL, the
* Component will not receive any data (i.e. reception is paused).
*
* Apart from during setup, this must always be called with the agent lock held,
* and the I/O lock released (because it takes the I/O lock itself). Requiring
......@@ -574,11 +574,11 @@ component_set_io_context (Component *component, GMainContext *context)
void
component_set_io_callback (Component *component,
NiceAgentRecvFunc func, gpointer user_data,
guint8 *recv_buf, gsize recv_buf_len,
NiceInputMessage *recv_messages, guint n_recv_messages,
GError **error)
{
g_assert (func == NULL || recv_buf == NULL);
g_assert (recv_buf != NULL || recv_buf_len == 0);
g_assert (func == NULL || recv_messages == NULL);
g_assert (n_recv_messages == 0 || recv_messages != NULL);
g_assert (error == NULL || *error == NULL);
g_mutex_lock (&component->io_mutex);
......@@ -586,20 +586,20 @@ component_set_io_callback (Component *component,
if (func != NULL) {
component->io_callback = func;
component->io_user_data = user_data;
component->recv_buf = NULL;
component->recv_buf_len = 0;
component->recv_messages = NULL;
component->n_recv_messages = 0;
component_schedule_io_callback (component);
} else {
component->io_callback = NULL;
component->io_user_data = NULL;
component->recv_buf = recv_buf;
component->recv_buf_len = recv_buf_len;
component->recv_messages = recv_messages;
component->n_recv_messages = n_recv_messages;
component_deschedule_io_callback (component);
}
component->recv_buf_valid_len = 0;
nice_input_message_iter_reset (&component->recv_messages_iter);
component->recv_buf_error = error;
g_mutex_unlock (&component->io_mutex);
......
......@@ -45,6 +45,7 @@
typedef struct _Component Component;
#include "agent.h"
#include "agent-priv.h"
#include "candidate.h"
#include "stun/stunagent.h"
#include "stun/usages/timer.h"
......@@ -110,7 +111,7 @@ typedef struct {
} SocketSource;
/* A buffer of data which has been received and processed (so is guaranteed not
/* A message which has been received and processed (so is guaranteed not
* to be a STUN packet, or to contain pseudo-TCP header bytes, for example), but
* which hasn’t yet been sent to the client in an I/O callback. This could be
* due to the main context not being run, or due to the I/O callback being
......@@ -153,8 +154,8 @@ struct _Component
* socket recv() operations. All io_callback emissions are invoked in this
* context too.
*
* recv_buf and io_callback are mutually exclusive, but it is allowed for both
* to be NULL if the Component is not currently ready to receive data. */
* recv_messages and io_callback are mutually exclusive, but it is allowed for
* both to be NULL if the Component is not currently ready to receive data. */
GMutex io_mutex; /**< protects io_callback, io_user_data,
pending_io_messages and io_callback_id.
immutable: can be accessed without
......@@ -163,7 +164,7 @@ struct _Component
taken before this one */
NiceAgentRecvFunc io_callback; /**< function called on io cb */
gpointer io_user_data; /**< data passed to the io function */
GQueue pending_io_messages; /**< queue of packets which have been
GQueue pending_io_messages; /**< queue of messages which have been
received but not passed to the client
in an I/O callback or recv() call yet.
each element is an owned
......@@ -172,9 +173,10 @@ struct _Component
GMainContext *ctx; /**< context for GSources for this
component */
guint8 *recv_buf; /**< unowned buffer for receiving into */
gsize recv_buf_len; /**< allocated size of recv_buf in bytes */
gsize recv_buf_valid_len; /**< length of valid data in recv_buf */
NiceInputMessage *recv_messages; /**< unowned messages for receiving into */
guint n_recv_messages; /**< length of recv_messages */
NiceInputMessageIter recv_messages_iter; /**< current write position in
recv_messages */
GError **recv_buf_error; /**< error information about failed reads */
NiceAgent *agent; /* unowned, immutable: can be accessed without holding the
......@@ -240,7 +242,7 @@ component_set_io_context (Component *component, GMainContext *context);
void
component_set_io_callback (Component *component,
NiceAgentRecvFunc func, gpointer user_data,
guint8 *recv_buf, gsize recv_buf_len,
NiceInputMessage *recv_messages, guint n_recv_messages,
GError **error);
void
component_emit_io_callback (Component *component,
......
......@@ -74,6 +74,7 @@
#endif
#include "pseudotcp.h"
#include "agent-priv.h"
G_DEFINE_TYPE (PseudoTcpSocket, pseudo_tcp_socket, G_TYPE_OBJECT);
......@@ -497,8 +498,9 @@ static guint32 queue(PseudoTcpSocket *self, const gchar * data,
guint32 len, gboolean bCtrl);
static PseudoTcpWriteResult packet(PseudoTcpSocket *self, guint32 seq,
guint8 flags, guint32 offset, guint32 len);
static gboolean parse(PseudoTcpSocket *self,
const guint8 * buffer, guint32 size);
static gboolean parse (PseudoTcpSocket *self,
const guint8 *_header_buf, gsize header_buf_len,
const guint8 *data_buf, gsize data_buf_len);
static gboolean process(PseudoTcpSocket *self, Segment *seg);
static gboolean transmit(PseudoTcpSocket *self, const GList *seg, guint32 now);
static void attempt_send(PseudoTcpSocket *self, SendFlags sflags);
......@@ -882,12 +884,45 @@ pseudo_tcp_socket_notify_packet(PseudoTcpSocket *self,
if (len > MAX_PACKET) {
//LOG_F(WARNING) << "packet too large";
return FALSE;
} else if (len < HEADER_SIZE) {
//LOG_F(WARNING) << "packet too small";
return FALSE;
}
/* Hold a reference to the PseudoTcpSocket during parsing, since it may be
* closed from within a callback. */
g_object_ref (self);
retval = parse (self, (guint8 *) buffer, HEADER_SIZE,
(guint8 *) buffer + HEADER_SIZE, len - HEADER_SIZE);
g_object_unref (self);
return retval;
}
/* Assume there are two buffers in the given #NiceInputMessage: a 24-byte one
* containing the header, and a bigger one for the data. */
gboolean
pseudo_tcp_socket_notify_message (PseudoTcpSocket *self,
NiceInputMessage *message)
{
gboolean retval;
g_assert_cmpuint (message->n_buffers, ==, 2);
g_assert_cmpuint (message->buffers[0].size, ==, HEADER_SIZE);
if (message->length > MAX_PACKET) {
//LOG_F(WARNING) << "packet too large";
return FALSE;
} else if (message->length < HEADER_SIZE) {
//LOG_F(WARNING) << "packet too small";
return FALSE;
}
/* Hold a reference to the PseudoTcpSocket during parsing, since it may be
* closed from within a callback. */
g_object_ref (self);
retval = parse (self, (guint8 *) buffer, len);
retval = parse (self, message->buffers[0].buffer, message->buffers[0].size,
message->buffers[1].buffer, message->length - message->buffers[0].size);
g_object_unref (self);
return retval;
......@@ -1126,7 +1161,8 @@ packet(PseudoTcpSocket *self, guint32 seq, guint8 flags,
}
static gboolean
parse(PseudoTcpSocket *self, const guint8 * _buffer, guint32 size)
parse (PseudoTcpSocket *self, const guint8 *_header_buf, gsize header_buf_len,
const guint8 *data_buf, gsize data_buf_len)
{
Segment seg;
......@@ -1134,24 +1170,24 @@ parse(PseudoTcpSocket *self, const guint8 * _buffer, guint32 size)
const guint8 *u8;
const guint16 *u16;
const guint32 *u32;
} buffer;
} header_buf;
buffer.u8 = _buffer;
header_buf.u8 = _header_buf;
if (size < 12)
if (header_buf_len != 24)
return FALSE;
seg.conv = ntohl(*buffer.u32);
seg.seq = ntohl(*(buffer.u32 + 1));
seg.ack = ntohl(*(buffer.u32 + 2));
seg.flags = buffer.u8[13];
seg.wnd = ntohs(*(buffer.u16 + 7));
seg.conv = ntohl(*header_buf.u32);
seg.seq = ntohl(*(header_buf.u32 + 1));
seg.ack = ntohl(*(header_buf.u32 + 2));
seg.flags = header_buf.u8[13];
seg.wnd = ntohs(*(header_buf.u16 + 7));
seg.tsval = ntohl(*(buffer.u32 + 4));
seg.tsecr = ntohl(*(buffer.u32 + 5));
seg.tsval = ntohl(*(header_buf.u32 + 4));
seg.tsecr = ntohl(*(header_buf.u32 + 5));
seg.data = ((gchar *)buffer.u8) + HEADER_SIZE;
seg.len = size - HEADER_SIZE;
seg.data = (const gchar *) data_buf;
seg.len = data_buf_len;
DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "--> <CONV=%d><FLG=%d><SEQ=%d:%d><ACK=%d>"
"<WND=%d><TS=%d><TSR=%d><LEN=%d>",
......
......@@ -66,6 +66,8 @@
# define ECONNRESET WSAECONNRESET
#endif
#include "agent.h"
G_BEGIN_DECLS
/**
......@@ -402,6 +404,22 @@ gboolean pseudo_tcp_socket_notify_packet(PseudoTcpSocket *self,
const gchar * buffer, guint32 len);
/**
* pseudo_tcp_socket_notify_message:
* @self: The #PseudoTcpSocket object.
* @message: A #NiceInputMessage containing the received data.
*
* Notify the #PseudoTcpSocket that a new message has arrived, and enqueue the
* data in its buffers to the #PseudoTcpSocket’s receive buffer.
*
* Returns: %TRUE if the packet was processed successfully, %FALSE otherwise
*
* Since: 0.1.5
*/
gboolean pseudo_tcp_socket_notify_message (PseudoTcpSocket *self,
NiceInputMessage *message);
/**
* pseudo_tcp_set_debug_level:
* @level: The level of debug to set
......
......@@ -26,7 +26,9 @@ nice_agent_get_selected_pair
nice_agent_send
nice_agent_send_full
nice_agent_recv
nice_agent_recv_messages
nice_agent_recv_nonblocking
nice_agent_recv_messages_nonblocking
nice_agent_attach_recv
nice_agent_set_selected_pair
nice_agent_set_selected_remote_candidate
......
......@@ -18,7 +18,9 @@ nice_agent_add_local_address
nice_agent_add_stream
nice_agent_build_io_stream
nice_agent_recv
nice_agent_recv_messages
nice_agent_recv_nonblocking
nice_agent_recv_messages_nonblocking
nice_agent_attach_recv
nice_agent_gather_candidates
nice_agent_generate_local_candidate_sdp
......
......@@ -822,6 +822,45 @@ priv_binding_timeout (gpointer data)
return FALSE;
}
guint
nice_turn_socket_parse_recv_message (NiceSocket *sock, NiceSocket **from_sock,
NiceInputMessage *message)
{
/* TODO: Speed this up in the common reliable case of having a 24-byte header
* buffer to begin with, followed by one or more massive buffers. */
guint8 *buf;
gsize buf_len, len;
if (message->n_buffers == 1 ||
(message->n_buffers == -1 &&
message->buffers[0].buffer != NULL &&
message->buffers[1].buffer == NULL)) {
/* Fast path. Single massive buffer. */
g_assert_cmpuint (message->length, <=, message->buffers[0].size);
len = nice_turn_socket_parse_recv (sock, from_sock,
message->from, message->length, message->buffers[0].buffer,
message->from, message->buffers[0].buffer, message->length);
g_assert_cmpuint (len, <=, message->length);
message->length = len;
return (len > 0) ? 1 : 0;
}
/* Slow path. */
nice_debug ("%s: **WARNING: SLOW PATH**", G_STRFUNC);
buf = compact_input_message (message, &buf_len);
len = nice_turn_socket_parse_recv (sock, from_sock,
message->from, buf_len, buf,
message->from, buf, buf_len);
len = memcpy_buffer_to_input_message (message, buf, len);
return (len > 0) ? 1 : 0;
}
gsize
nice_turn_socket_parse_recv (NiceSocket *sock, NiceSocket **from_sock,
NiceAddress *from, gsize len, guint8 *buf,
......
......@@ -52,6 +52,10 @@ typedef enum {
G_BEGIN_DECLS
guint
nice_turn_socket_parse_recv_message (NiceSocket *sock, NiceSocket **from_sock,
NiceInputMessage *message);
gsize
nice_turn_socket_parse_recv (NiceSocket *sock, NiceSocket **from_sock,
NiceAddress *from, gsize len, guint8 *buf,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment