Commit cf9d3f18 authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête

agent: Add GPollableOutputStream support to NiceOutputStream

parent 0ac9f3f9
......@@ -1131,6 +1131,8 @@ nice_agent_parse_remote_candidate_sdp (
*
* Build a #GIOStream wrapper around the given stream and component in
* @agent. The I/O stream will be valid for as long as @stream_id is valid.
* The #GInputStream and #GOutputStream implement #GPollableInputStream and
* #GPollableOutputStream.
*
* This function may only be called on reliable #NiceAgents. It is an error to
* try and create an I/O stream wrapper for an unreliable stream.
......
......@@ -65,13 +65,20 @@
# include "config.h"
#endif
#include <errno.h>
#include "outputstream.h"
#include "agent-priv.h"
static void nice_output_stream_init_pollable (
GPollableOutputStreamInterface *iface);
static void streams_removed_cb (NiceAgent *agent, guint *stream_ids,
gpointer user_data);
G_DEFINE_TYPE (NiceOutputStream,
nice_output_stream, G_TYPE_OUTPUT_STREAM);
G_DEFINE_TYPE_WITH_CODE (NiceOutputStream,
nice_output_stream, G_TYPE_OUTPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
nice_output_stream_init_pollable));
enum
{
......@@ -96,7 +103,12 @@ static void nice_output_stream_set_property (GObject *object, guint prop_id,
static gssize nice_output_stream_write (GOutputStream *stream,
const void *buffer, gsize count, GCancellable *cancellable, GError **error);
static gboolean nice_output_stream_is_writable (GPollableOutputStream *stream);
static gssize nice_output_stream_write_nonblocking (
GPollableOutputStream *stream, const void *buffer, gsize count,
GError **error);
static GSource *nice_output_stream_create_source (GPollableOutputStream *stream,
GCancellable *cancellable);
/* Output Stream */
static void
......@@ -248,6 +260,14 @@ nice_output_stream_init (NiceOutputStream *stream)
g_weak_ref_init (&stream->priv->agent_ref, NULL);
}
static void
nice_output_stream_init_pollable (GPollableOutputStreamInterface *iface)
{
iface->is_writable = nice_output_stream_is_writable;
iface->write_nonblocking = nice_output_stream_write_nonblocking;
iface->create_source = nice_output_stream_create_source;
}
/**
* nice_output_stream_new:
* @agent: A #NiceAgent
......@@ -401,6 +421,148 @@ done:
return len;
}
static gboolean
nice_output_stream_is_writable (GPollableOutputStream *stream)
{
NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
Component *component = NULL;
Stream *_stream = NULL;
gboolean retval = FALSE;
GSList *i;
NiceAgent *agent; /* owned */
/* Closed streams are not writeable. */
if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream)))
return FALSE;
/* Has the agent disappeared? */
agent = g_weak_ref_get (&priv->agent_ref);
if (agent == NULL)
return FALSE;
agent_lock ();
if (!agent_find_component (agent, priv->stream_id, priv->component_id,
&_stream, &component)) {
g_warning ("Could not find component %u in stream %u", priv->component_id,
priv->stream_id);
goto done;
}
/* If it’s a reliable agent, see if there’s any space in the pseudo-TCP output
* buffer. */
if (agent->reliable && component->tcp != NULL &&
pseudo_tcp_socket_can_send (component->tcp)) {
retval = TRUE;
goto done;
}
/* Check whether any of the component’s FDs are pollable. */
for (i = component->socket_sources; i != NULL; i = i->next) {
SocketSource *socket_source = i->data;
NiceSocket *socket = socket_source->socket;
if (g_socket_condition_check (socket->fileno, G_IO_OUT) != 0) {
retval = TRUE;
break;
}
}
done:
agent_unlock ();
g_object_unref (agent);
return retval;
}
static gssize
nice_output_stream_write_nonblocking (GPollableOutputStream *stream,
const void *buffer, gsize count, GError **error)
{
NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
NiceAgent *agent; /* owned */
gssize len;
/* Closed streams are not writeable. */
if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream))) {
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
"Stream is closed.");
return -1;
}
/* Has the agent disappeared? */
agent = g_weak_ref_get (&priv->agent_ref);
if (agent == NULL) {
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
"Stream is closed due to the NiceAgent being finalised.");
return -1;
}
if (count == 0)
return 0;
/* This is equivalent to the default GPollableOutputStream implementation. */
if (!g_pollable_output_stream_is_writable (stream)) {
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
g_strerror (EAGAIN));
return -1;
}
len = nice_agent_send_full (agent, priv->stream_id, priv->component_id,
buffer, count, NULL, error);
g_object_unref (agent);
return len;
}
static GSource *
nice_output_stream_create_source (GPollableOutputStream *stream,
GCancellable *cancellable)
{
NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
GSource *component_source = NULL;
Component *component = NULL;
Stream *_stream = NULL;
NiceAgent *agent; /* owned */
/* Closed streams cannot have sources. */
if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream)))
return g_pollable_source_new (G_OBJECT (stream)); /* dummy */
/* Has the agent disappeared? */
agent = g_weak_ref_get (&priv->agent_ref);
if (agent == NULL)
return g_pollable_source_new (G_OBJECT (stream)); /* dummy */
agent_lock ();
/* Grab the socket for this component. */
if (!agent_find_component (agent, priv->stream_id, priv->component_id,
&_stream, &component)) {
g_warning ("Could not find component %u in stream %u", priv->component_id,
priv->stream_id);
component_source = g_pollable_source_new (G_OBJECT (stream)); /* dummy */
goto done;
}
/* Note: We need G_IO_IN here to handle pseudo-TCP streams. If our TCP
* transmit buffer is full, but the kernel's receive buffer has pending ACKs
* sitting in it, we need to receive those ACKs so we can transmit the head
* bytes in the transmit buffer, and hence free up space in the tail of the
* buffer so the stream is writeable again. */
component_source = component_source_new (component, G_OBJECT (stream),
G_IO_IN | G_IO_OUT, cancellable);
done:
agent_unlock ();
g_object_unref (agent);
return component_source;
}
static void
streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment