Commit 3d507cae authored by Youness Alaoui's avatar Youness Alaoui
Browse files

Add nice_agent_new_reliable to the libnice API which uses pseudotcp

parent d3278562
......@@ -120,6 +120,7 @@ struct _NiceAgent
GSource *upnp_timer_source; /* source of upnp timeout timer */
#endif
gchar *software_attribute; /* SOFTWARE attribute */
gboolean reliable; /* property: reliable */
/* XXX: add pointer to internal data struct for ABI-safe extensions */
};
......
......@@ -7,4 +7,6 @@ VOID:UINT,UINT,STRING
# candidate-gathering-done
# initial-binding-request-received
VOID:UINT
# reliable-transport-writable
VOID:UINT,UINT
......@@ -71,6 +71,8 @@
#include "stream.h"
#include "interfaces.h"
#include "pseudotcp.h"
/* This is the max size of a UDP packet
* will it work tcp relaying??
*/
......@@ -78,6 +80,8 @@
#define DEFAULT_STUN_PORT 3478
#define DEFAULT_UPNP_TIMEOUT 200
#define MAX_TCP_MTU 1400 /* Use 1400 because of VPNs and we assume IEE 802.3 */
G_DEFINE_TYPE (NiceAgent, nice_agent, G_TYPE_OBJECT);
enum
......@@ -96,7 +100,8 @@ enum
PROP_PROXY_USERNAME,
PROP_PROXY_PASSWORD,
PROP_UPNP,
PROP_UPNP_TIMEOUT
PROP_UPNP_TIMEOUT,
PROP_RELIABLE
};
......@@ -108,6 +113,7 @@ enum
SIGNAL_NEW_CANDIDATE,
SIGNAL_NEW_REMOTE_CANDIDATE,
SIGNAL_INITIAL_BINDING_REQUEST_RECEIVED,
SIGNAL_RELIABLE_TRANSPORT_WRITABLE,
N_SIGNALS,
};
......@@ -396,6 +402,15 @@ nice_agent_class_init (NiceAgentClass *klass)
DEFAULT_UPNP_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
g_object_class_install_property (gobject_class, PROP_RELIABLE,
g_param_spec_boolean (
"reliable",
"reliable mode",
"Whether agent should use PseudoTcp to ensure a reliable transport"
"of messages",
FALSE,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
/* install signals */
/**
......@@ -532,6 +547,34 @@ nice_agent_class_init (NiceAgentClass *klass)
G_TYPE_UINT,
G_TYPE_INVALID);
/**
* NiceAgent::reliable-transport-writable
* @stream_id: The ID of the stream
* @component_id: The ID of the component
*
* This signal is fired on the reliable #NiceAgent when the underlying reliable
* transport becomes writable.
* This signal is only emitted when the nice_agent_send() function returns less
* bytes than requested to send (or -1) and once when the connection
* is established.
*/
/*TODO: transform into a property to allow people to check if it's writable
* before doing a send... */
signals[SIGNAL_RELIABLE_TRANSPORT_WRITABLE] =
g_signal_new (
"reliable-transport-writable",
G_OBJECT_CLASS_TYPE (klass),
G_SIGNAL_RUN_LAST,
0,
NULL,
NULL,
agent_marshal_VOID__UINT_UINT,
G_TYPE_NONE,
2,
G_TYPE_UINT, G_TYPE_UINT,
G_TYPE_INVALID);
/* Init debug options depending on env variables */
nice_debug_init ();
}
......@@ -562,6 +605,7 @@ nice_agent_init (NiceAgent *agent)
agent->software_attribute = NULL;
agent->compatibility = NICE_COMPATIBILITY_DRAFT19;
agent->reliable = FALSE;
stun_agent_init (&agent->stun_agent, STUN_ALL_KNOWN_ATTRIBUTES,
STUN_COMPATIBILITY_RFC5389,
......@@ -579,6 +623,20 @@ nice_agent_new (GMainContext *ctx, NiceCompatibility compat)
NiceAgent *agent = g_object_new (NICE_TYPE_AGENT,
"compatibility", compat,
"main-context", ctx,
"reliable", FALSE,
NULL);
return agent;
}
NICEAPI_EXPORT NiceAgent *
nice_agent_new_reliable (GMainContext *ctx, NiceCompatibility compat)
{
NiceAgent *agent = g_object_new (NICE_TYPE_AGENT,
"compatibility", compat,
"main-context", ctx,
"reliable", TRUE,
NULL);
return agent;
......@@ -667,6 +725,10 @@ nice_agent_get_property (
#endif
break;
case PROP_RELIABLE:
g_value_set_boolean (value, agent->reliable);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
}
......@@ -774,6 +836,11 @@ nice_agent_set_property (
agent->upnp_enabled = g_value_get_boolean (value);
#endif
break;
case PROP_RELIABLE:
agent->reliable = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
}
......@@ -782,6 +849,162 @@ nice_agent_set_property (
}
static void adjust_tcp_clock (NiceAgent *agent, Component *component);
static void
pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data)
{
TcpUserData *data = (TcpUserData *)user_data;
NiceAgent *agent = data->agent;
Component *component = data->component;
Stream *stream = data->stream;
nice_debug ("Agent %p: s%d:%d pseudo Tcp socket Opened", data->agent,
stream->id, component->id);
g_signal_emit (agent, signals[SIGNAL_RELIABLE_TRANSPORT_WRITABLE], 0,
stream->id, component->id);
}
static void
pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data)
{
TcpUserData *data = (TcpUserData *)user_data;
NiceAgent *agent = data->agent;
Component *component = data->component;
Stream *stream = data->stream;
gchar buf[MAX_TCP_MTU];
gint len;
nice_debug ("Agent %p: s%d:%d pseudo Tcp socket readable", agent,
stream->id, component->id);
do {
len = pseudo_tcp_socket_recv (sock, buf, sizeof(buf));
if (len > 0 && component->g_source_io_cb) {
gpointer data = component->data;
gint sid = stream->id;
gint cid = component->id;
NiceAgentRecvFunc callback = component->g_source_io_cb;
/* Unlock the agent before calling the callback */
agent_unlock();
callback (agent, sid, cid, len, buf, data);
agent_lock();
} else if (len < 0 &&
pseudo_tcp_socket_get_error (sock) != EWOULDBLOCK) {
/* Signal error */
/* TODO: close the pseudotcp ?*/
}
} while (len > 0);
adjust_tcp_clock (agent, component);
}
static void
pseudo_tcp_socket_writable (PseudoTcpSocket *sock, gpointer user_data)
{
TcpUserData *data = (TcpUserData *)user_data;
NiceAgent *agent = data->agent;
Component *component = data->component;
Stream *stream = data->stream;
nice_debug ("Agent %p: s%d:%d pseudo Tcp socket writable", data->agent,
data->stream->id, data->component->id);
g_signal_emit (agent, signals[SIGNAL_RELIABLE_TRANSPORT_WRITABLE], 0,
stream->id, component->id);
}
static void
pseudo_tcp_socket_closed (PseudoTcpSocket *sock, guint32 err,
gpointer user_data)
{
TcpUserData *data = (TcpUserData *)user_data;
nice_debug ("Agent %p: s%d:%d pseudo Tcp socket closed", data->agent,
data->stream->id, data->component->id);
/* TODO: do something ? detach source... */
}
static PseudoTcpWriteResult
pseudo_tcp_socket_write_packet (PseudoTcpSocket *sock,
const gchar *buffer, guint32 len, gpointer user_data)
{
TcpUserData *data = (TcpUserData *)user_data;
NiceAgent *agent = data->agent;
Component *component = data->component;
Stream *stream = data->stream;
if (component->selected_pair.local != NULL) {
NiceSocket *sock;
NiceAddress *addr;
#ifndef NDEBUG
gchar tmpbuf[INET6_ADDRSTRLEN];
nice_address_to_string (&component->selected_pair.remote->addr, tmpbuf);
nice_debug ("Agent %p : s%d:%d: sending %d bytes to [%s]:%d", agent,
stream->id, component->id, len, tmpbuf,
nice_address_get_port (&component->selected_pair.remote->addr));
#endif
sock = component->selected_pair.local->sockptr;
addr = &component->selected_pair.remote->addr;
if (nice_socket_send (sock, addr, len, buffer)) {
return WR_SUCCESS;
}
}
return WR_FAIL;
}
static gboolean
notify_pseudo_tcp_socket_clock (gpointer user_data)
{
TcpUserData *data = (TcpUserData *)user_data;
Component *component = data->component;
NiceAgent *agent = data->agent;
agent_lock();
if (component->tcp_clock) {
g_source_destroy (component->tcp_clock);
g_source_unref (component->tcp_clock);
component->tcp_clock = NULL;
}
pseudo_tcp_socket_notify_clock (component->tcp);
adjust_tcp_clock (agent, component);
agent_unlock();
return FALSE;
}
static void adjust_tcp_clock (NiceAgent *agent, Component *component)
{
long timeout = 0;
if (pseudo_tcp_socket_get_next_clock (component->tcp, &timeout)) {
if (component->tcp_clock) {
g_source_destroy (component->tcp_clock);
g_source_unref (component->tcp_clock);
component->tcp_clock = NULL;
}
component->tcp_clock = agent_timeout_add_with_context (agent,
timeout, notify_pseudo_tcp_socket_clock, component->tcp_data);
} else {
nice_debug ("Agent %p: component %d pseudo tcp socket should be destroyed",
agent, component->id);
g_object_unref (component->tcp);
component->tcp = NULL;
/* TODO: error ?*/
}
}
void agent_gathering_done (NiceAgent *agent)
{
......@@ -868,6 +1091,11 @@ void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id, guint co
&component->selected_pair.remote->addr);
}
if (component->tcp) {
pseudo_tcp_socket_connect (component->tcp);
adjust_tcp_clock (agent, component);
}
g_signal_emit (agent, signals[SIGNAL_NEW_SELECTED_PAIR], 0,
stream_id, component_id, lf_copy, rf_copy);
......@@ -1093,15 +1321,40 @@ nice_agent_add_stream (
Stream *stream;
GSList *modified_list = NULL;
guint ret = 0;
guint i;
agent_lock();
stream = stream_new (n_components);
if (stream) {
modified_list = g_slist_append (agent->streams, stream);
if (modified_list) {
stream->id = agent->next_stream_id++;
nice_debug ("Agent %p : allocating stream id %u (%p)", agent, stream->id, stream);
if (agent->reliable) {
nice_debug ("Agent %p : reliable stream", agent);
for (i = 0; i < n_components; i++) {
Component *component = stream_find_component_by_id (stream, i + 1);
if (component) {
TcpUserData *data = g_slice_new0 (TcpUserData);
PseudoTcpCallbacks tcp_callbacks = {data,
pseudo_tcp_socket_opened,
pseudo_tcp_socket_readable,
pseudo_tcp_socket_writable,
pseudo_tcp_socket_closed,
pseudo_tcp_socket_write_packet};
data->agent = agent;
data->stream = stream;
data->component = component;
component->tcp_data = data;
component->tcp = pseudo_tcp_socket_new (0, &tcp_callbacks);
adjust_tcp_clock (agent, component);
nice_debug ("Agent %p: Create Pseudo Tcp Socket for component %d",
agent, i+1);
} else {
nice_debug ("Agent %p: couldn't find component %d", agent, i+1);
}
}
}
stream_initialize_credentials (stream, agent->rng);
......@@ -1514,7 +1767,23 @@ nice_agent_remove_stream (
/* remove the stream itself */
for (i = stream->components; i; i = i->next) {
priv_detach_stream_component (stream, (Component *) i->data);
Component *component = (Component *) i->data;
priv_detach_stream_component (stream, component);
if (component->tcp_clock) {
g_source_destroy (component->tcp_clock);
g_source_unref (component->tcp_clock);
component->tcp_clock = NULL;
}
if (component->tcp_data != NULL) {
g_slice_free (TcpUserData, component->tcp_data);
component->tcp_data = NULL;
}
if (component->tcp) {
pseudo_tcp_socket_close (component->tcp, TRUE);
g_object_unref (component->tcp);
component->tcp = NULL;
}
}
agent->streams = g_slist_remove (agent->streams, stream);
......@@ -1866,35 +2135,42 @@ nice_agent_send (
{
Stream *stream;
Component *component;
guint ret = -1;
gint ret = -1;
agent_lock();
if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) {
if (!agent_find_component (agent, stream_id, component_id,
&stream, &component)) {
goto done;
}
if (component->selected_pair.local != NULL)
{
NiceSocket *sock;
NiceAddress *addr;
if (component->tcp != NULL) {
ret = pseudo_tcp_socket_send (component->tcp, buf, len);
adjust_tcp_clock (agent, component);
if (ret == -1 &&
pseudo_tcp_socket_get_error (component->tcp) != EWOULDBLOCK) {
/* TODO: close? error ?*/
}
} else if (component->selected_pair.local != NULL) {
NiceSocket *sock;
NiceAddress *addr;
#ifndef NDEBUG
gchar tmpbuf[INET6_ADDRSTRLEN];
nice_address_to_string (&component->selected_pair.remote->addr, tmpbuf);
gchar tmpbuf[INET6_ADDRSTRLEN];
nice_address_to_string (&component->selected_pair.remote->addr, tmpbuf);
nice_debug ("Agent %p : s%d:%d: sending %d bytes to [%s]:%d", agent, stream_id, component_id,
len, tmpbuf,
nice_address_get_port (&component->selected_pair.remote->addr));
nice_debug ("Agent %p : s%d:%d: sending %d bytes to [%s]:%d", agent, stream_id, component_id,
len, tmpbuf,
nice_address_get_port (&component->selected_pair.remote->addr));
#endif
sock = component->selected_pair.local->sockptr;
addr = &component->selected_pair.remote->addr;
if (nice_socket_send (sock, addr, len, buf)) {
ret = len;
}
goto done;
sock = component->selected_pair.local->sockptr;
addr = &component->selected_pair.remote->addr;
if (nice_socket_send (sock, addr, len, buf)) {
ret = len;
}
goto done;
}
done:
agent_unlock();
......@@ -1913,6 +2189,7 @@ nice_agent_get_local_candidates (
GSList * item = NULL;
agent_lock();
if (!agent_find_component (agent, stream_id, component_id, NULL, &component)) {
goto done;
}
......@@ -1950,7 +2227,7 @@ nice_agent_get_remote_candidates (
}
gboolean
gboolean
nice_agent_restart (
NiceAgent *agent)
{
......@@ -2105,7 +2382,10 @@ nice_agent_g_source_cb (
len = _nice_agent_recv (agent, stream, component, ctx->socket,
MAX_BUFFER_SIZE, buf);
if (len > 0 && component->g_source_io_cb) {
if (len > 0 && component->tcp) {
pseudo_tcp_socket_notify_packet (component->tcp, buf, len);
adjust_tcp_clock (agent, component);
} else if (len > 0 && component->g_source_io_cb) {
gpointer data = component->data;
gint sid = stream->id;
gint cid = component->id;
......
......@@ -277,6 +277,21 @@ typedef void (*NiceAgentRecvFunc) (
NiceAgent *
nice_agent_new (GMainContext *ctx, NiceCompatibility compat);
/**
* nice_agent_new_reliable:
* @ctx: The Glib Mainloop Context to use for timers
* @compat: The compatibility mode of the agent
*
* Create a new #NiceAgent in reliable mode, which uses #PseudoTcpSocket to
* assure reliability of the messages.
* The returned object must be freed with g_object_unref()
*
* Returns: The new agent GObject
*/
NiceAgent *
nice_agent_new_reliable (GMainContext *ctx, NiceCompatibility compat);
/**
* nice_agent_add_local_address:
* @agent: The #NiceAgent Object
......
......@@ -53,9 +53,7 @@
#include "agent-priv.h"
Component *
component_new (
G_GNUC_UNUSED
guint id)
component_new (guint id)
{
Component *component;
......@@ -63,6 +61,8 @@ component_new (
component->id = id;
component->state = NICE_COMPONENT_STATE_DISCONNECTED;
component->restart_candidate = NULL;
component->tcp = NULL;
return component;
}
......@@ -213,8 +213,9 @@ void component_update_selected_pair (Component *component, const CandidatePair *
{
g_assert (component);
g_assert (pair);
nice_debug ("setting SELECTED PAIR for component %u: %s:%s (prio:%lu).",
component->id, pair->local->foundation, pair->remote->foundation, (long unsigned)pair->priority);
nice_debug ("setting SELECTED PAIR for component %u: %s:%s (prio:%"
G_GUINT64_FORMAT ").", component->id, pair->local->foundation,
pair->remote->foundation, pair->priority);
if (component->selected_pair.keepalive.tick_source != NULL) {
g_source_destroy (component->selected_pair.keepalive.tick_source);
......
......@@ -41,10 +41,14 @@
#include <glib.h>
typedef struct _Component Component;
#include "agent.h"
#include "candidate.h"
#include "stun/stunagent.h"
#include "stun/usages/timer.h"
#include "pseudotcp.h"
#include "stream.h"
G_BEGIN_DECLS
......@@ -55,7 +59,6 @@ G_BEGIN_DECLS
* would end up with 2*K host candidates if an agent has K interfaces.""
*/
typedef struct _Component Component;
typedef struct _CandidatePair CandidatePair;
typedef struct _CandidatePairKeepalive CandidatePairKeepalive;
typedef struct _IncomingCheck IncomingCheck;
......@@ -89,6 +92,12 @@ struct _IncomingCheck
uint16_t username_len;
};
typedef struct {
NiceAgent *agent;
Stream *stream;
Component *component;
} TcpUserData;
struct _Component
{
NiceComponentType type;
......@@ -107,12 +116,13 @@ struct _Component
gpointer data; /**< data passed to the io function */
GMainContext *ctx; /**< context for data callbacks for this
component */
PseudoTcpSocket *tcp;
GSource* tcp_clock;
TcpUserData *tcp_data;
};
Component *
component_new (
G_GNUC_UNUSED
guint component_id);
component_new (guint component_id);
void
component_free (Component *cmp);
......
......@@ -40,6 +40,8 @@
#include <glib.h>
typedef struct _Stream Stream;
#include "component.h"
#include "random.h"
......@@ -68,8 +70,6 @@ typedef enum
} NiceCheckListState;
typedef struct _Stream Stream;
struct _Stream
{
guint id;
......@@ -86,6 +86,7 @@ struct _Stream
gint tos;
};
Stream *
stream_new (guint n_components);
......
......@@ -9,6 +9,7 @@ NiceCompatibility
NiceAgentRecvFunc
NICE_AGENT_MAX_REMOTE_CANDIDATES
nice_agent_new
nice_agent_new_reliable
nice_agent_add_local_address
nice_agent_add_stream
nice_agent_remove_stream
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment