Commit 39c481ea authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête
Browse files

agent: Port PseudoTcpSocket to GDatagramBased

Including porting the tests.

WIP: Still needs testing and documenting.

tests: Port test-pseudotcp to GTest and add additional tests

WIP work to rebase PseudoTcpSocket from GCommunicable to GIOStream

WIP update for GDatagramBased API changes
parent 418bfbcb
......@@ -48,6 +48,10 @@ libagent_la_SOURCES = \
pseudotcp.c \
datagram-based.c \
datagram-based.h \
pseudo-tcp-input-stream.c \
pseudo-tcp-input-stream.h \
pseudo-tcp-output-stream.c \
pseudo-tcp-output-stream.h \
$(BUILT_SOURCES)
libagent_la_LIBADD = \
......
......@@ -1443,6 +1443,12 @@ void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id,
"PEER-RFLX" : "???");
}
/* TODO: process_queued_tcp_packets (agent, stream, component);
1721 -
1722 - pseudo_tcp_socket_connect (component->tcp);
1723 - pseudo_tcp_socket_notify_mtu (component->tcp, MAX_TCP_MTU);
1724 - adjust_tcp_clock (agent, stream, component); */
agent_queue_signal (agent, signals[SIGNAL_NEW_SELECTED_PAIR_FULL],
stream_id, component_id, lcandidate, rcandidate);
agent_queue_signal (agent, signals[SIGNAL_NEW_SELECTED_PAIR],
......@@ -1507,6 +1513,8 @@ void agent_signal_component_state_change (NiceAgent *agent, guint stream_id, gui
return;
}
/* TODO process_queued_tcp_packets (agent, stream, component); */
nice_debug ("Agent %p : stream %u component %u STATE-CHANGE %s -> %s.", agent,
stream_id, component_id, nice_component_state_to_string (old_state),
nice_component_state_to_string (new_state));
......
......@@ -2144,7 +2144,6 @@ int conn_check_send (NiceAgent *agent, CandidateCheckPair *pair)
if (new_socket) {
pair->sockptr = new_socket;
_priv_set_socket_tos (agent, pair->sockptr, stream2->tos);
nice_component_attach_socket (component2, new_socket);
}
}
......
......@@ -89,10 +89,10 @@ static void nice_datagram_based_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
static gint nice_datagram_based_receive_messages (GDatagramBased *datagram_based,
GInputMessage *messages, guint num_messages, gint flags, gboolean blocking,
GInputMessage *messages, guint num_messages, gint flags, gint64 timeout,
GCancellable *cancellable, GError **error);
static gint nice_datagram_based_send_messages (GDatagramBased *datagram_based,
GOutputMessage *messages, guint num_messages, gint flags, gboolean blocking,
GOutputMessage *messages, guint num_messages, gint flags, gint64 timeout,
GCancellable *cancellable, GError **error);
static gboolean nice_datagram_based_shutdown (GDatagramBased *datagram_based,
gboolean shutdown_read, gboolean shutdown_write, gint64 timeout,
......@@ -172,8 +172,6 @@ nice_datagram_based_class_init (NiceDatagramBasedClass *klass)
0, G_MAXUINT,
0,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/* TODO: Implement GDatagramBased properties. */
}
static void
......@@ -292,7 +290,7 @@ nice_datagram_based_set_property (GObject *object, guint prop_id,
static gint
nice_datagram_based_receive_messages (GDatagramBased *datagram_based,
GInputMessage *messages, guint num_messages, gint flags, gboolean blocking,
GInputMessage *messages, guint num_messages, gint flags, gint64 timeout,
GCancellable *cancellable, GError **error)
{
NiceDatagramBased *self;
......@@ -332,8 +330,8 @@ nice_datagram_based_receive_messages (GDatagramBased *datagram_based,
nice_messages[i].length = 0;
}
/* Receive the messages. */
if (blocking) {
/* Receive the messages. FIXME: Support the timeout. */
if (timeout < 0) {
retval = nice_agent_recv_messages (agent, priv->stream_id,
priv->component_id, nice_messages, num_messages, cancellable, error);
} else {
......@@ -375,7 +373,7 @@ nice_datagram_based_receive_messages (GDatagramBased *datagram_based,
static gint
nice_datagram_based_send_messages (GDatagramBased *datagram_based,
GOutputMessage *messages, guint num_messages, gint flags, gboolean blocking,
GOutputMessage *messages, guint num_messages, gint flags, gint64 timeout,
GCancellable *cancellable, GError **error)
{
NiceDatagramBased *self;
......@@ -430,7 +428,7 @@ nice_datagram_based_send_messages (GDatagramBased *datagram_based,
}
/* Send the messages. */
g_assert (!blocking); /* FIXME: support blocking mode */
g_assert (timeout == 0); /* FIXME: support blocking mode and timeouts */
retval = nice_agent_send_messages_nonblocking (agent, priv->stream_id,
priv->component_id, nice_messages, num_messages, cancellable, error);
......
/*
* This file is part of the Nice GLib ICE library.
*
* © 2015 Collabora Ltd.
* Contact: Philip Withnall
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is the Nice GLib ICE library.
*
* The Initial Developers of the Original Code are Collabora Ltd and Nokia
* Corporation. All Rights Reserved.
*
* Contributors:
* Philip Withnall, Collabora Ltd.
*
* Alternatively, the contents of this file may be used under the terms of the
* the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
* case the provisions of LGPL are applicable instead of those above. If you
* wish to allow use of your version of this file only under the terms of the
* LGPL and not to allow others to use your version of this file under the
* MPL, indicate your decision by deleting the provisions above and replace
* them with the notice and other provisions required by the LGPL. If you do
* not delete the provisions above, a recipient may use your version of this
* file under either the MPL or the LGPL.
*/
/**
* SECTION:nicepseudotcpinputstream
* @short_description: #NicePseudoTcpInputStream implementation for libnice
* @see_also: #PseudoTcpSocket
* @include: pseudo-tcp-input-stream.h
* @stability: Stable
*
* TODO
*
* Since: UNRELEASED
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <errno.h>
#include "pseudo-tcp-input-stream.h"
enum
{
PROP_SOCKET = 1,
};
typedef struct
{
GWeakRef/*<PseudoTcpSocket>*/ socket_ref;
} NicePseudoTcpInputStreamPrivate;
static void nice_pseudo_tcp_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
static void nice_pseudo_tcp_input_stream_dispose (GObject *object);
static void nice_pseudo_tcp_input_stream_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec);
static void nice_pseudo_tcp_input_stream_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
static gssize
nice_pseudo_tcp_input_stream_read (GInputStream *stream, void *buffer,
gsize count, GCancellable *cancellable, GError **error);
static gboolean
nice_pseudo_tcp_input_stream_close (GInputStream *stream,
GCancellable *cancellable, GError **error);
static gboolean
nice_pseudo_tcp_input_stream_can_poll (GPollableInputStream *stream);
static gboolean
nice_pseudo_tcp_input_stream_is_readable (GPollableInputStream *stream);
static GSource *
nice_pseudo_tcp_input_stream_create_source (GPollableInputStream *stream,
GCancellable *cancellable);
static gssize
nice_pseudo_tcp_input_stream_read_nonblocking (GPollableInputStream *stream,
void *buffer, gsize count, GError **error);
G_DEFINE_TYPE_WITH_CODE (NicePseudoTcpInputStream, nice_pseudo_tcp_input_stream,
G_TYPE_INPUT_STREAM,
G_ADD_PRIVATE (NicePseudoTcpInputStream)
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
nice_pseudo_tcp_input_stream_pollable_iface_init));
static void
nice_pseudo_tcp_input_stream_class_init (NicePseudoTcpInputStreamClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (klass);
gobject_class->set_property = nice_pseudo_tcp_input_stream_set_property;
gobject_class->get_property = nice_pseudo_tcp_input_stream_get_property;
gobject_class->dispose = nice_pseudo_tcp_input_stream_dispose;
input_stream_class->read_fn = nice_pseudo_tcp_input_stream_read;
input_stream_class->close_fn = nice_pseudo_tcp_input_stream_close;
/*
* NicePseudoTcpInputStream:socket:
*
* The owning #PseudoTcpSocket.
*
* A reference is not held on the #PseudoTcpSocket. If the socket is destroyed
* before the #NicePseudoTcpInputStream, %G_IO_ERROR_CLOSED will be returned
* for all subsequent operations on the stream.
*
* Since: UNRELEASED
*/
g_object_class_install_property (gobject_class, PROP_SOCKET,
g_param_spec_object ("socket",
"PseudoTcpSocket",
"The owning PseudoTcpSocket",
PSEUDO_TCP_SOCKET_TYPE,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
nice_pseudo_tcp_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
{
iface->can_poll = nice_pseudo_tcp_input_stream_can_poll;
iface->is_readable = nice_pseudo_tcp_input_stream_is_readable;
iface->create_source = nice_pseudo_tcp_input_stream_create_source;
iface->read_nonblocking = nice_pseudo_tcp_input_stream_read_nonblocking;
}
static void
nice_pseudo_tcp_input_stream_init (NicePseudoTcpInputStream *self)
{
NicePseudoTcpInputStreamPrivate *priv;
priv = nice_pseudo_tcp_input_stream_get_instance_private (self);
g_weak_ref_init (&priv->socket_ref, NULL);
}
static void
nice_pseudo_tcp_input_stream_dispose (GObject *object)
{
NicePseudoTcpInputStream *self;
NicePseudoTcpInputStreamPrivate *priv;
self = NICE_PSEUDO_TCP_INPUT_STREAM (object);
priv = nice_pseudo_tcp_input_stream_get_instance_private (self);
/* Ensure the stream is closed before continuing. */
g_input_stream_close (G_INPUT_STREAM (object), NULL, NULL);
g_weak_ref_clear (&priv->socket_ref);
G_OBJECT_CLASS (nice_pseudo_tcp_input_stream_parent_class)->dispose (object);
}
static void
nice_pseudo_tcp_input_stream_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec)
{
NicePseudoTcpInputStream *self;
NicePseudoTcpInputStreamPrivate *priv;
self = NICE_PSEUDO_TCP_INPUT_STREAM (object);
priv = nice_pseudo_tcp_input_stream_get_instance_private (self);
switch (prop_id) {
case PROP_SOCKET:
g_value_take_object (value, g_weak_ref_get (&priv->socket_ref));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
}
}
static void
nice_pseudo_tcp_input_stream_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec)
{
NicePseudoTcpInputStream *self;
NicePseudoTcpInputStreamPrivate *priv;
self = NICE_PSEUDO_TCP_INPUT_STREAM (object);
priv = nice_pseudo_tcp_input_stream_get_instance_private (self);
switch (prop_id) {
case PROP_SOCKET:
/* Construct only. */
g_weak_ref_set (&priv->socket_ref, g_value_get_object (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
}
}
static void
set_g_error_from_pseudo_tcp_error (GError **error, gint err_no)
{
g_set_error (error, G_IO_ERROR, g_io_error_from_errno (err_no), "%s",
g_strerror (err_no));
}
static gssize
nice_pseudo_tcp_input_stream_read (GInputStream *stream, void *buffer,
gsize count, GCancellable *cancellable, GError **error)
{
NicePseudoTcpInputStream *self;
NicePseudoTcpInputStreamPrivate *priv;
PseudoTcpSocket *socket = NULL;
gssize len;
self = NICE_PSEUDO_TCP_INPUT_STREAM (stream);
priv = nice_pseudo_tcp_input_stream_get_instance_private (self);
socket = g_weak_ref_get (&priv->socket_ref);
if (socket == NULL) {
g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED, "Socket was destroyed.");
return -1;
}
do {
len = pseudo_tcp_socket_recv (socket, buffer, count);
if (len < 0) {
if (pseudo_tcp_socket_get_error (socket) == EWOULDBLOCK) {
/* Block until data is available. */
/* FIXME: Use something like g_socket_condition_wait(). */
while (!g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (stream)))
g_main_context_iteration (NULL, TRUE);
continue;
} else {
/* Fatal error (ENOTCONN). */
set_g_error_from_pseudo_tcp_error (error,
pseudo_tcp_socket_get_error (socket));
break;
}
}
} while (len < 0);
g_object_unref (socket);
return len;
}
static gboolean
nice_pseudo_tcp_input_stream_close (GInputStream *stream,
GCancellable *cancellable, GError **error)
{
NicePseudoTcpInputStream *self;
NicePseudoTcpInputStreamPrivate *priv;
PseudoTcpSocket *socket = NULL;
self = NICE_PSEUDO_TCP_INPUT_STREAM (stream);
priv = nice_pseudo_tcp_input_stream_get_instance_private (self);
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return FALSE;
socket = g_weak_ref_get (&priv->socket_ref);
if (socket == NULL) {
g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED, "Socket was destroyed.");
return FALSE;
}
pseudo_tcp_socket_shutdown (socket, PSEUDO_TCP_SHUTDOWN_RD);
g_object_unref (socket);
return TRUE;
}
static gboolean
nice_pseudo_tcp_input_stream_can_poll (GPollableInputStream *stream)
{
return TRUE;
}
static gboolean
nice_pseudo_tcp_input_stream_is_readable (GPollableInputStream *stream)
{
NicePseudoTcpInputStream *self;
NicePseudoTcpInputStreamPrivate *priv;
PseudoTcpSocket *socket = NULL;
gboolean is_readable;
self = NICE_PSEUDO_TCP_INPUT_STREAM (stream);
priv = nice_pseudo_tcp_input_stream_get_instance_private (self);
socket = g_weak_ref_get (&priv->socket_ref);
if (socket == NULL)
return FALSE;
is_readable = (pseudo_tcp_socket_get_available_bytes (socket) > 0);
g_object_unref (socket);
return is_readable;
}
static GSource *
nice_pseudo_tcp_input_stream_create_source (GPollableInputStream *stream,
GCancellable *cancellable)
{
NicePseudoTcpInputStream *self;
NicePseudoTcpInputStreamPrivate *priv;
PseudoTcpSocket *socket = NULL;
GSource *source = NULL;
self = NICE_PSEUDO_TCP_INPUT_STREAM (stream);
priv = nice_pseudo_tcp_input_stream_get_instance_private (self);
socket = g_weak_ref_get (&priv->socket_ref);
if (socket == NULL)
return NULL;
source = pseudo_tcp_socket_create_source (socket, G_IO_IN | G_IO_OUT,
cancellable);
g_object_unref (socket);
return source;
}
static gssize
nice_pseudo_tcp_input_stream_read_nonblocking (GPollableInputStream *stream,
void *buffer, gsize count, GError **error)
{
NicePseudoTcpInputStream *self;
NicePseudoTcpInputStreamPrivate *priv;
PseudoTcpSocket *socket = NULL;
gssize len;
self = NICE_PSEUDO_TCP_INPUT_STREAM (stream);
priv = nice_pseudo_tcp_input_stream_get_instance_private (self);
socket = g_weak_ref_get (&priv->socket_ref);
if (socket == NULL) {
g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED, "Socket was destroyed.");
return -1;
}
len = pseudo_tcp_socket_recv (socket, buffer, count);
if (len < 0) {
set_g_error_from_pseudo_tcp_error (error,
pseudo_tcp_socket_get_error (socket));
}
g_object_unref (socket);
return len;
}
/**
* nice_pseudo_tcp_input_stream_new:
* @socket: a #PseudoTcpSocket
*
* Create a new #NicePseudoTcpInputStream for the given @socket.
*
* The constructed #NicePseudoTcpInputStream will not hold a reference to
* @socket. If @socket is destroyed before the input stream, %G_IO_ERROR_CLOSED
* will be returned for all subsequent operations on the stream.
*
* Returns: The new #NicePseudoTcpInputStream object
*
* Since: UNRELEASED
*/
GInputStream *
nice_pseudo_tcp_input_stream_new (PseudoTcpSocket *socket)
{
g_return_val_if_fail (IS_PSEUDO_TCP_SOCKET (socket), NULL);
return g_object_new (NICE_TYPE_PSEUDO_TCP_INPUT_STREAM,
"socket", socket,
NULL);
}
/*
* This file is part of the Nice GLib ICE library.
*
* © 2015 Collabora Ltd.
* Contact: Philip Withnall
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is the Nice GLib ICE library.
*
* The Initial Developers of the Original Code are Collabora Ltd and Nokia
* Corporation. All Rights Reserved.
*
* Contributors:
* Philip Withnall, Collabora Ltd.
*
* Alternatively, the contents of this file may be used under the terms of the
* the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
* case the provisions of LGPL are applicable instead of those above. If you
* wish to allow use of your version of this file only under the terms of the
* LGPL and not to allow others to use your version of this file under the
* MPL, indicate your decision by deleting the provisions above and replace
* them with the notice and other provisions required by the LGPL. If you do
* not delete the provisions above, a recipient may use your version of this
* file under either the MPL or the LGPL.
*/
#ifndef __NICE_PSEUDO_TCP_INPUT_STREAM_H__
#define __NICE_PSEUDO_TCP_INPUT_STREAM_H__
#include <glib-object.h>
#include <gio/gio.h>
#include "pseudotcp.h"
G_BEGIN_DECLS
#define NICE_TYPE_PSEUDO_TCP_INPUT_STREAM \
(nice_pseudo_tcp_input_stream_get_type ())
#define NICE_PSEUDO_TCP_INPUT_STREAM(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj), NICE_TYPE_PSEUDO_TCP_INPUT_STREAM, \
NicePseudoTcpInputStream))
#define NICE_PSEUDO_TCP_INPUT_STREAM_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass), NICE_TYPE_PSEUDO_TCP_INPUT_STREAM, \
NicePseudoTcpInputStreamClass))
#define NICE_IS_PSEUDO_TCP_INPUT_STREAM(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj), NICE_TYPE_PSEUDO_TCP_INPUT_STREAM))
#define NICE_IS_PSEUDO_TCP_INPUT_STREAM_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass), NICE_TYPE_PSEUDO_TCP_INPUT_STREAM))
#define NICE_PSEUDO_TCP_INPUT_STREAM_GET_CLASS(obj) \
(G_TYPE_INSTANCE_GET_CLASS ((obj), NICE_TYPE_PSEUDO_TCP_INPUT_STREAM, \
NicePseudoTcpInputStreamClass))
typedef struct _NicePseudoTcpInputStreamClass NicePseudoTcpInputStreamClass;
typedef struct _NicePseudoTcpInputStream NicePseudoTcpInputStream;
#include "agent.h"
GType nice_pseudo_tcp_input_stream_get_type (void);
struct _NicePseudoTcpInputStreamClass
{
GInputStreamClass parent_class;
};
struct _NicePseudoTcpInputStream
{
GInputStream parent_instance;
};
GInputStream *nice_pseudo_tcp_input_stream_new (PseudoTcpSocket *socket);
G_END_DECLS
#endif /* __NICE_PSEUDO_TCP_INPUT_STREAM_H__ */
/*
* This file is part of the Nice GLib ICE library.
*
* © 2015 Collabora Ltd.
* Contact: Philip Withnall
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is the Nice GLib ICE library.
*
* The Initial Developers of the Original Code are Collabora Ltd and Nokia
* Corporation. All Rights Reserved.
*
* Contributors:
* Philip Withnall, Collabora Ltd.
*
* Alternatively, the contents of this file may be used under the terms of the
* the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
* case the provisions of LGPL are applicable instead of those above. If you
* wish to allow use of your version of this file only under the terms of the
* LGPL and not to allow others to use your version of this file under the
* MPL, indicate your decision by deleting the provisions above and replace
* them with the notice and other provisions required by the LGPL. If you do
* not delete the provisions above, a recipient may use your version of this
* file under either the MPL or the LGPL.
*/
/**
* SECTION:nicepseudotcpoutputstream
* @short_description: #NicePseudoTcpOutputStream implementation for libnice
* @see_also: #PseudoTcpSocket
* @include: pseudo-tcp-output-stream.h
* @stability: Stable
*
* TODO
*