...
 
Commits (30)
......@@ -50,6 +50,14 @@
#define inet_pton inet_pton_win32
#define inet_ntop inet_ntop_win32
/* Defined in recent versions of mingw:
* https://github.com/mirror/mingw-w64/commit/0f4899473c4ba2e34fa447b1931a04e38c1f105e
*/
#ifndef IN6_ARE_ADDR_EQUAL
#define IN6_ARE_ADDR_EQUAL(a, b) \
(memcmp ((const void *) (a), (const void *) (b), sizeof (struct in6_addr)) == 0)
#endif
static const char *
inet_ntop_win32 (int af, const void *src, char *dst, socklen_t cnt)
......
......@@ -158,6 +158,12 @@ void agent_unlock(void)
#endif
static GType _nice_agent_stream_ids_get_type (void);
G_DEFINE_POINTER_TYPE (_NiceAgentStreamIds, _nice_agent_stream_ids);
#define NICE_TYPE_AGENT_STREAM_IDS _nice_agent_stream_ids_get_type ()
typedef struct {
guint signal_id;
GSignalQuery query;
......@@ -170,10 +176,12 @@ free_queued_signal (QueuedSignal *sig)
{
guint i;
g_value_unset (&sig->params[0]);
for (i = 0; i < sig->query.n_params; i++) {
if (G_VALUE_HOLDS_POINTER (&sig->params[i]))
g_free (g_value_get_pointer (&sig->params[i]));
g_value_unset (&sig->params[i]);
if (G_VALUE_HOLDS(&sig->params[i + 1], NICE_TYPE_AGENT_STREAM_IDS))
g_free (g_value_get_pointer (&sig->params[i + 1]));
g_value_unset (&sig->params[i + 1]);
}
g_slice_free1 (sizeof(GValue) * (sig->query.n_params + 1), sig->params);
......@@ -771,7 +779,7 @@ nice_agent_class_init (NiceAgentClass *klass)
g_cclosure_marshal_VOID__POINTER,
G_TYPE_NONE,
1,
G_TYPE_POINTER,
NICE_TYPE_AGENT_STREAM_IDS,
G_TYPE_INVALID);
/* Init debug options depending on env variables */
......@@ -1913,6 +1921,9 @@ nice_agent_set_relay_info(NiceAgent *agent,
{
Component *component = NULL;
Stream *stream = NULL;
gboolean ret = TRUE;
TurnServer *turn;
g_return_val_if_fail (server_ip, FALSE);
g_return_val_if_fail (server_port, FALSE);
......@@ -1922,19 +1933,24 @@ nice_agent_set_relay_info(NiceAgent *agent,
agent_lock();
if (agent_find_component (agent, stream_id, component_id, NULL, &component)) {
TurnServer *turn = g_slice_new0 (TurnServer);
if (!agent_find_component (agent, stream_id, component_id, &stream,
&component)) {
ret = FALSE;
goto done;
}
turn = g_slice_new0 (TurnServer);
nice_address_init (&turn->server);
if (nice_address_set_from_string (&turn->server, server_ip)) {
nice_address_set_port (&turn->server, server_port);
} else {
g_slice_free (TurnServer, turn);
agent_unlock_and_emit (agent);
return FALSE;
ret = FALSE;
goto done;
}
turn->username = g_strdup (username);
turn->password = g_strdup (password);
turn->type = type;
......@@ -1943,10 +1959,28 @@ nice_agent_set_relay_info(NiceAgent *agent,
server_ip, server_port, type);
component->turn_servers = g_list_append (component->turn_servers, turn);
if (stream->gathering_started) {
GSList *i;
for (i = component->local_candidates; i; i = i->next) {
NiceCandidate *candidate = i->data;
if (candidate->type == NICE_CANDIDATE_TYPE_HOST)
priv_add_new_candidate_discovery_turn (agent,
candidate->sockptr, turn, stream,
component_id);
}
if (agent->discovery_unsched_items)
discovery_schedule (agent);
}
done:
agent_unlock_and_emit (agent);
return TRUE;
return ret;
}
#ifdef HAVE_GUPNP
......@@ -2107,6 +2141,12 @@ nice_agent_gather_candidates (
return FALSE;
}
if (stream->gathering_started) {
/* Stream is already gathering, ignore this call */
agent_unlock_and_emit (agent);
return TRUE;
}
nice_debug ("Agent %p : In %s mode, starting candidate gathering.", agent,
agent->full_mode ? "ICE-FULL" : "ICE-LITE");
......@@ -2260,7 +2300,7 @@ nice_agent_gather_candidates (
}
stream->gathering = TRUE;
stream->gathering_started = TRUE;
/* Only signal the new candidates after we're sure that the gathering was
* succesfful. But before sending gathering-done */
......@@ -2378,7 +2418,8 @@ nice_agent_remove_stream (
if (!agent->streams)
priv_remove_keepalive_timer (agent);
agent_queue_signal (agent, signals[SIGNAL_STREAMS_REMOVED], stream_ids);
agent_queue_signal (agent, signals[SIGNAL_STREAMS_REMOVED],
g_memdup (stream_ids, sizeof(stream_ids)));
agent_unlock_and_emit (agent);
return;
......@@ -2388,14 +2429,20 @@ NICEAPI_EXPORT void
nice_agent_set_port_range (NiceAgent *agent, guint stream_id, guint component_id,
guint min_port, guint max_port)
{
Stream *stream;
Component *component;
agent_lock();
if (agent_find_component (agent, stream_id, component_id, NULL, &component)) {
if (agent_find_component (agent, stream_id, component_id, &stream,
&component)) {
if (stream->gathering_started) {
g_critical ("nice_agent_gather_candidates (stream_id=%u) already called for this stream", stream_id);
} else {
component->min_port = min_port;
component->max_port = max_port;
}
}
agent_unlock_and_emit (agent);
}
......@@ -2471,8 +2518,6 @@ static gboolean priv_add_remote_candidate (
g_free (candidate->password);
candidate->password = g_strdup (password);
}
if (conn_check_add_for_candidate (agent, stream_id, component, candidate) < 0)
goto errors;
}
else {
/* case 2: add a new candidate */
......@@ -2509,8 +2554,9 @@ static gboolean priv_add_remote_candidate (
if (foundation)
g_strlcpy (candidate->foundation, foundation,
NICE_CANDIDATE_MAX_FOUNDATION);
}
if (conn_check_add_for_candidate (agent, stream_id, component, candidate) < 0)
if (conn_check_add_for_candidate (agent, stream_id, component, candidate) < 0) {
goto errors;
}
......@@ -2717,7 +2763,12 @@ agent_recv_message_unlocked (
goto done;
}
if (nice_debug_is_enabled () && message->length > 0) {
if (retval == RECV_OOB || message->length == 0) {
retval = RECV_OOB;
goto done;
}
if (nice_debug_is_enabled ()) {
gchar tmpbuf[INET6_ADDRSTRLEN];
nice_address_to_string (message->from, tmpbuf);
nice_debug ("Agent %p : Packet received on local socket %d from [%s]:%u (%" G_GSSIZE_FORMAT " octets).", agent,
......@@ -2741,11 +2792,16 @@ agent_recv_message_unlocked (
if (cand->type == NICE_CANDIDATE_TYPE_RELAYED &&
cand->stream_id == stream->id &&
cand->component_id == component->id) {
nice_turn_socket_parse_recv_message (cand->sockptr, &socket, message);
retval = nice_turn_socket_parse_recv_message (cand->sockptr, &socket,
message);
break;
}
}
}
if (retval == RECV_OOB)
goto done;
agent->media_after_tick = TRUE;
/* If the message’s stated length is equal to its actual length, it’s probably
......@@ -2797,7 +2853,7 @@ agent_recv_message_unlocked (
nice_debug ("%s: Queued %" G_GSSIZE_FORMAT " bytes for agent %p.",
G_STRFUNC, vec->size, agent);
return 0;
return RECV_OOB;
} else {
process_queued_tcp_packets (agent, stream, component);
}
......@@ -2925,7 +2981,6 @@ memcpy_buffer_to_input_message (NiceInputMessage *message,
buffer += len;
buffer_length -= len;
message->buffers[i].size = len;
message->length += len;
}
......@@ -4732,15 +4787,14 @@ nice_agent_get_io_stream (NiceAgent *agent, guint stream_id,
g_return_val_if_fail (stream_id >= 1, NULL);
g_return_val_if_fail (component_id >= 1, NULL);
g_return_val_if_fail (agent->reliable, NULL);
agent_lock ();
if (!agent_find_component (agent, stream_id, component_id, NULL, &component))
goto done;
if (component->iostream == NULL)
component->iostream = nice_io_stream_new (agent, stream_id, component_id);
component->iostream = nice_io_stream_new (agent, stream_id, component_id,
agent->reliable);
iostream = g_object_ref (component->iostream);
......
......@@ -129,6 +129,7 @@ component_new (guint id, NiceAgent *agent, Stream *stream)
component->own_ctx = g_main_context_new ();
component->stop_cancellable = g_cancellable_new ();
src = g_cancellable_source_new (component->stop_cancellable);
g_source_set_dummy_callback (src);
g_source_attach (src, component->own_ctx);
g_source_unref (src);
component->ctx = g_main_context_ref (component->own_ctx);
......@@ -854,6 +855,8 @@ typedef struct {
guint stream_id;
guint component_id;
guint component_socket_sources_age;
GIOCondition condition;
} ComponentSource;
static gboolean
......@@ -983,6 +986,7 @@ static GSourceFuncs component_source_funcs = {
* @component_id: The component's number
* @pollable_stream: a #GPollableInputStream or #GPollableOutputStream to pass
* to dispatched callbacks
* @condition: underlying socket condition to dispatch on
* @cancellable: (allow-none): a #GCancellable, or %NULL
*
* Create a new #ComponentSource, a type of #GSource which proxies poll events
......@@ -999,13 +1003,15 @@ static GSourceFuncs component_source_funcs = {
* Returns: (transfer full): a new #ComponentSource; unref with g_source_unref()
*/
GSource *
component_input_source_new (NiceAgent *agent, guint stream_id,
guint component_id, GPollableInputStream *pollable_istream,
component_source_new (NiceAgent *agent, guint stream_id,
guint component_id, GObject *pollable_stream,
GIOCondition condition,
GCancellable *cancellable)
{
ComponentSource *component_source;
g_assert (G_IS_POLLABLE_INPUT_STREAM (pollable_istream));
g_assert (G_IS_POLLABLE_INPUT_STREAM (pollable_stream) ||
G_IS_POLLABLE_OUTPUT_STREAM (pollable_stream));
component_source =
(ComponentSource *)
......@@ -1013,7 +1019,7 @@ component_input_source_new (NiceAgent *agent, guint stream_id,
g_source_set_name ((GSource *) component_source, "ComponentSource");
component_source->component_socket_sources_age = 0;
component_source->pollable_stream = g_object_ref (pollable_istream);
component_source->pollable_stream = g_object_ref (pollable_stream);
g_weak_ref_init (&component_source->agent_ref, agent);
component_source->stream_id = stream_id;
component_source->component_id = component_id;
......
......@@ -237,9 +237,9 @@ void
component_free_socket_sources (Component *component);
GSource *
component_input_source_new (NiceAgent *agent, guint stream_id,
guint component_id, GPollableInputStream *pollable_istream,
GCancellable *cancellable);
component_source_new (NiceAgent *agent, guint stream_id,
guint component_id, GObject *pollable_istream,
GIOCondition condition, GCancellable *cancellable);
GMainContext *
component_dup_io_context (Component *component);
......
......@@ -587,12 +587,12 @@ static gboolean priv_conn_keepalive_tick_unlocked (NiceAgent *agent)
gchar tmpbuf[INET6_ADDRSTRLEN];
nice_address_to_string (&p->remote->addr, tmpbuf);
nice_debug ("Agent %p : Keepalive STUN-CC REQ to '%s:%u', "
"socket=%u (c-id:%u), username='%s' (%" G_GSIZE_FORMAT "), "
"password='%s' (%" G_GSIZE_FORMAT "), priority=%u.", agent,
"socket=%u (c-id:%u), username='%.*s' (%" G_GSIZE_FORMAT "), "
"password='%.*s' (%" G_GSIZE_FORMAT "), priority=%u.", agent,
tmpbuf, nice_address_get_port (&p->remote->addr),
g_socket_get_fd(((NiceSocket *)p->local->sockptr)->fileno),
component->id, uname, uname_len, password, password_len,
priority);
component->id, (int) uname_len, uname, uname_len,
(int) password_len, password, password_len, priority);
}
if (uname_len > 0) {
buf_len = stun_usage_ice_conncheck_create (&agent->stun_agent,
......@@ -1673,14 +1673,16 @@ int conn_check_send (NiceAgent *agent, CandidateCheckPair *pair)
gchar tmpbuf[INET6_ADDRSTRLEN];
nice_address_to_string (&pair->remote->addr, tmpbuf);
nice_debug ("Agent %p : STUN-CC REQ to '%s:%u', socket=%u, "
"pair=%s (c-id:%u), tie=%llu, username='%s' (%" G_GSIZE_FORMAT "), "
"password='%s' (%" G_GSIZE_FORMAT "), priority=%u.", agent,
"pair=%s (c-id:%u), tie=%llu, username='%.*s' (%" G_GSIZE_FORMAT "), "
"password='%.*s' (%" G_GSIZE_FORMAT "), priority=%u.", agent,
tmpbuf,
nice_address_get_port (&pair->remote->addr),
g_socket_get_fd(((NiceSocket *)pair->local->sockptr)->fileno),
pair->foundation, pair->component_id,
(unsigned long long)agent->tie_breaker,
uname, uname_len, password, password_len, priority);
(int) uname_len, uname, uname_len,
(int) password_len, password, password_len,
priority);
}
......
......@@ -424,8 +424,8 @@ nice_input_stream_create_source (GPollableInputStream *stream,
if (agent == NULL)
goto dummy_source;
component_source = component_input_source_new (agent, priv->stream_id,
priv->component_id, stream, cancellable);
component_source = component_source_new (agent, priv->stream_id,
priv->component_id, G_OBJECT (stream), G_IO_IN, cancellable);
g_object_unref (agent);
......
......@@ -80,6 +80,8 @@ struct _NiceIOStreamPrivate
guint stream_id;
guint component_id;
gboolean is_datagram;
GInputStream *input_stream; /* owned */
GOutputStream *output_stream; /* owned */
};
......@@ -94,6 +96,7 @@ static GOutputStream *nice_io_stream_get_output_stream (GIOStream *stream);
static void streams_removed_cb (NiceAgent *agent, guint *stream_ids,
gpointer user_data);
static gboolean nice_io_stream_is_datagram (GIOStream *stream);
static void
nice_io_stream_class_init (NiceIOStreamClass *klass)
......@@ -109,6 +112,7 @@ nice_io_stream_class_init (NiceIOStreamClass *klass)
stream_class->get_input_stream = nice_io_stream_get_input_stream;
stream_class->get_output_stream = nice_io_stream_get_output_stream;
stream_class->is_datagram = nice_io_stream_is_datagram;
/*
* NiceIOStream:agent:
......@@ -277,17 +281,24 @@ nice_io_stream_set_property (GObject *object, guint prop_id,
* Since: 0.1.5
*/
GIOStream *
nice_io_stream_new (NiceAgent *agent, guint stream_id, guint component_id)
nice_io_stream_new (NiceAgent *agent, guint stream_id, guint component_id,
gboolean reliable)
{
GIOStream *stream;
g_return_val_if_fail (NICE_IS_AGENT (agent), NULL);
g_return_val_if_fail (stream_id > 0, NULL);
g_return_val_if_fail (component_id > 0, NULL);
return g_object_new (NICE_TYPE_IO_STREAM,
stream = g_object_new (NICE_TYPE_IO_STREAM,
"agent", agent,
"stream-id", stream_id,
"component-id", component_id,
NULL);
NICE_IO_STREAM (stream)->priv->is_datagram = !reliable;
return stream;
}
static GInputStream *
......@@ -348,3 +359,12 @@ streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
}
}
}
static gboolean
nice_io_stream_is_datagram (GIOStream *stream)
{
NiceIOStream *self = NICE_IO_STREAM (stream);
return self->priv->is_datagram;
}
......@@ -84,7 +84,7 @@ struct _NiceIOStream
};
GIOStream *nice_io_stream_new (NiceAgent *agent,
guint stream_id, guint component_id);
guint stream_id, guint component_id, gboolean reliable);
G_END_DECLS
......
......@@ -314,6 +314,13 @@ typedef struct {
gboolean cancelled;
} WriteData;
static WriteData *
write_data_ref (WriteData *write_data)
{
g_atomic_int_inc (&write_data->ref_count);
return write_data;
}
static void
write_data_unref (WriteData *write_data)
{
......@@ -384,26 +391,25 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
* GCond solution; would be much better for nice_agent_send() to block
* properly in the main loop. */
write_data = g_slice_new0 (WriteData);
g_atomic_int_set (&write_data->ref_count, 4);
write_data->ref_count = 1;
g_mutex_init (&write_data->mutex);
g_cond_init (&write_data->cond);
if (cancellable != NULL) {
cancel_id = g_cancellable_connect (cancellable,
(GCallback) write_cancelled_cb, write_data,
(GCallback) write_cancelled_cb, write_data_ref (write_data),
(GDestroyNotify) write_data_unref);
}
closed_cancel_id = g_cancellable_connect (self->priv->closed_cancellable,
(GCallback) write_cancelled_cb, write_data,
(GCallback) write_cancelled_cb, write_data_ref (write_data),
(GDestroyNotify) write_data_unref);
g_mutex_lock (&write_data->mutex);
writeable_id = g_signal_connect_data (G_OBJECT (agent),
"reliable-transport-writable",
(GCallback) reliable_transport_writeable_cb, write_data,
(GCallback) reliable_transport_writeable_cb, write_data_ref (write_data),
(GClosureNotify) write_data_unref, 0);
......@@ -492,6 +498,14 @@ nice_output_stream_is_writable (GPollableOutputStream *stream)
goto done;
}
/* If it's a non-reliable agent, it never blocks, so one can
* always write
*/
if (!agent->reliable) {
retval = TRUE;
goto done;
}
/* Check whether any of the component’s FDs are pollable. */
for (i = component->socket_sources; i != NULL; i = i->next) {
SocketSource *socket_source = i->data;
......@@ -604,6 +618,15 @@ nice_output_stream_create_source (GPollableOutputStream *stream,
g_source_set_dummy_callback (cancellable_source);
g_source_add_child_source (component_source, cancellable_source);
g_source_unref (cancellable_source);
} else if (!agent->reliable) {
/* UDP streams are almost always writeable. */
GSource *child_source;
child_source = component_source_new (agent, priv->stream_id,
priv->component_id, G_OBJECT (stream), G_IO_OUT, cancellable);
g_source_set_dummy_callback (child_source);
g_source_add_child_source (component_source, child_source);
g_source_unref (child_source);
}
done:
......
......@@ -908,6 +908,12 @@ pseudo_tcp_socket_notify_message (PseudoTcpSocket *self,
{
gboolean retval;
g_assert_cmpuint (message->n_buffers, >, 0);
if (message->n_buffers == 1)
return pseudo_tcp_socket_notify_packet (self, message->buffers[0].buffer,
message->buffers[0].size);
g_assert_cmpuint (message->n_buffers, ==, 2);
g_assert_cmpuint (message->buffers[0].size, ==, HEADER_SIZE);
......@@ -1776,6 +1782,9 @@ parse_options (PseudoTcpSocket *self, const guint8 *data, guint32 len)
guint8 kind = TCP_OPT_EOL;
guint8 opt_len;
if (len < pos + 1)
return;
kind = data[pos];
pos++;
......@@ -1787,11 +1796,16 @@ parse_options (PseudoTcpSocket *self, const guint8 *data, guint32 len)
continue;
}
if (len < pos + 1)
return;
// Length of this option.
g_assert(len);
opt_len = data[pos];
pos++;
if (len < pos + opt_len)
return;
// Content of this option.
if (opt_len <= len - pos) {
apply_option (self, kind, data + pos, opt_len);
......@@ -1876,23 +1890,23 @@ pseudo_tcp_socket_get_available_bytes (PseudoTcpSocket *self)
gboolean
pseudo_tcp_socket_can_send (PseudoTcpSocket *self)
{
PseudoTcpSocketPrivate *priv = self->priv;
if (priv->state != TCP_ESTABLISHED) {
return FALSE;
}
return (pseudo_tcp_fifo_get_write_remaining (&priv->sbuf) != 0);
return (pseudo_tcp_socket_get_available_send_space (self) > 0);
}
gsize
pseudo_tcp_socket_get_available_send_space (PseudoTcpSocket *self)
{
PseudoTcpSocketPrivate *priv = self->priv;
gsize ret;
if (priv->state != TCP_ESTABLISHED) {
return 0;
}
return pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
if (priv->state == TCP_ESTABLISHED)
ret = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
else
ret = 0;
if (ret == 0)
priv->bWriteEnable = TRUE;
return ret;
}
......@@ -72,6 +72,7 @@ struct _Stream
gchar remote_ufrag[NICE_STREAM_MAX_UFRAG];
gchar remote_password[NICE_STREAM_MAX_PWD];
gboolean gathering;
gboolean gathering_started;
gint tos;
};
......
......@@ -92,6 +92,7 @@ AC_CHECK_HEADERS([arpa/inet.h net/in.h])
AC_CHECK_HEADERS([ifaddrs.h], \
[AC_DEFINE(HAVE_GETIFADDRS, [1], \
[Whether getifaddrs() is available on the system])])
AC_CHECK_TYPES([size_t, ssize_t])
# Also put matching version in LIBNICE_CFLAGS
GLIB_REQ=2.30
......
......@@ -164,6 +164,8 @@ nice_tcp_bsd_socket_new (GMainContext *ctx, NiceAddress *addr)
sock->priv = priv = g_slice_new0 (TcpPriv);
if (ctx == NULL)
ctx = g_main_context_default ();
priv->context = g_main_context_ref (ctx);
priv->server_addr = *addr;
priv->error = FALSE;
......@@ -389,8 +391,7 @@ socket_send_more (
}
if (ret < 0) {
if (gerr != NULL &&
g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
GOutputVector local_buf = { tbs->buf, tbs->length };
NiceOutputMessage local_message = {&local_buf, 1};
......@@ -399,7 +400,7 @@ socket_send_more (
g_error_free (gerr);
break;
}
g_error_free (gerr);
g_clear_error (&gerr);
} else if (ret < (int) tbs->length) {
GOutputVector local_buf = { tbs->buf + ret, tbs->length - ret };
NiceOutputMessage local_message = {&local_buf, 1};
......
......@@ -234,6 +234,7 @@ socket_send_message (NiceSocket *sock, const NiceAddress *to,
gint ret;
guint n_bufs;
guint16 header_buf;
guint offset = 0;
/* Count the number of buffers. */
if (message->n_buffers == -1) {
......@@ -247,30 +248,15 @@ socket_send_message (NiceSocket *sock, const NiceAddress *to,
/* Allocate a new array of buffers, covering all the buffers in the input
* @message, but with an additional one for a header and one for a footer. */
local_bufs = g_malloc_n (n_bufs + 2, sizeof (GOutputVector));
local_bufs = g_malloc_n (n_bufs + 1, sizeof (GOutputVector));
local_message.buffers = local_bufs;
local_message.n_buffers = n_bufs + 2;
local_message.n_buffers = n_bufs + 1;
/* Copy the existing buffers across. */
for (j = 0; j < n_bufs; j++) {
local_bufs[j + 1].buffer = message->buffers[j].buffer;
local_bufs[j + 1].size = message->buffers[j].size;
}
/* Header buffer. */
if (priv->compatibility == NICE_TURN_SOCKET_COMPATIBILITY_GOOGLE) {
header_buf = htons (output_message_get_size (message));
local_bufs[0].buffer = &header_buf;
local_bufs[0].size = sizeof (header_buf);
} else {
/* Skip over the allocated header buffer. */
local_message.buffers++;
local_message.n_buffers--;
}
/* Tail buffer. */
if (priv->compatibility == NICE_TURN_SOCKET_COMPATIBILITY_DRAFT9 ||
offset = 1;
} else if (priv->compatibility == NICE_TURN_SOCKET_COMPATIBILITY_DRAFT9 ||
priv->compatibility == NICE_TURN_SOCKET_COMPATIBILITY_RFC5766) {
gsize message_len = output_message_get_size (message);
gsize padlen = (message_len % 4) ? 4 - (message_len % 4) : 0;
......@@ -278,16 +264,23 @@ socket_send_message (NiceSocket *sock, const NiceAddress *to,
local_bufs[n_bufs].buffer = &padbuf;
local_bufs[n_bufs].size = padlen;
} else {
/* Skip over the allocated tail buffer. */
local_message.n_buffers--;
local_message.n_buffers = n_bufs;
}
/* Copy the existing buffers across. */
for (j = 0; j < n_bufs; j++) {
local_bufs[j + offset].buffer = message->buffers[j].buffer;
local_bufs[j + offset].size = message->buffers[j].size;
}
ret = nice_socket_send_messages (priv->base_socket, to, &local_message, 1);
if (ret == 1)
ret = output_message_get_size (&local_message);
g_free (local_bufs);
if (ret == 1)
return output_message_get_size (&local_message);
return ret;
}
......
......@@ -67,7 +67,7 @@ typedef struct {
NiceAddress peer;
uint16_t channel;
gboolean renew;
guint timeout_source;
GSource *timeout_source;
} ChannelBinding;
typedef struct {
......@@ -96,7 +96,7 @@ typedef struct {
there is an installed permission */
GList *sent_permissions; /* ongoing permission installed */
GHashTable *send_data_queues; /* stores a send data queue for per peer */
guint permission_timeout_source; /* timer used to invalidate
GSource *permission_timeout_source; /* timer used to invalidate
permissions */
} TurnPriv;
......@@ -247,8 +247,10 @@ socket_close (NiceSocket *sock)
for (i = priv->channels; i; i = i->next) {
ChannelBinding *b = i->data;
if (b->timeout_source)
g_source_remove (b->timeout_source);
if (b->timeout_source) {
g_source_destroy (b->timeout_source);
g_source_unref (b->timeout_source);
}
g_free (b);
}
g_list_free (priv->channels);
......@@ -288,8 +290,11 @@ socket_close (NiceSocket *sock)
g_list_free (priv->sent_permissions);
g_hash_table_destroy (priv->send_data_queues);
if (priv->permission_timeout_source)
g_source_remove (priv->permission_timeout_source);
if (priv->permission_timeout_source) {
g_source_destroy (priv->permission_timeout_source);
g_source_unref (priv->permission_timeout_source);
priv->permission_timeout_source = NULL;
}
if (priv->ctx)
g_main_context_unref (priv->ctx);
......@@ -339,6 +344,9 @@ socket_recv_messages (NiceSocket *sock,
n_valid_messages = 1;
if (message->length == 0)
continue;
/* Compact the message’s buffers into a single one for parsing. Avoid this
* in the (hopefully) common case of a single-element buffer vector. */
if (message->n_buffers == 1 ||
......@@ -391,12 +399,15 @@ socket_recv_messages (NiceSocket *sock,
static GSource *
priv_timeout_add_with_context (TurnPriv *priv, guint interval,
GSourceFunc function, gpointer data)
gboolean seconds, GSourceFunc function, gpointer data)
{
GSource *source;
g_return_val_if_fail (function != NULL, NULL);
if (seconds)
source = g_timeout_source_new_seconds (interval);
else
source = g_timeout_source_new (interval);
g_source_set_callback (source, function, data, NULL);
......@@ -482,8 +493,13 @@ priv_remove_peer_from_list (GList *list, const NiceAddress *peer)
NiceAddress *address = (NiceAddress *) iter->data;
if (nice_address_equal (address, peer)) {
GList *prev = iter->prev;
nice_address_free (address);
list = g_list_delete_link (list, iter);
iter = prev;
if (iter)
iter = list;
}
}
......@@ -687,7 +703,7 @@ socket_send_message (NiceSocket *sock, const NiceAddress *to,
req->priv = priv;
stun_message_id (&msg, req->id);
req->source = priv_timeout_add_with_context (priv,
STUN_END_TIMEOUT, priv_forget_send_request, req);
STUN_END_TIMEOUT, FALSE, priv_forget_send_request, req);
g_queue_push_tail (priv->send_requests, req);
}
}
......@@ -822,7 +838,7 @@ priv_binding_expired_timeout (gpointer data)
/* find current binding and destroy it */
for (i = priv->channels ; i; i = i->next) {
ChannelBinding *b = i->data;
if (b->timeout_source == g_source_get_id (source)) {
if (b->timeout_source == source) {
priv->channels = g_list_remove (priv->channels, b);
/* Make sure we don't free a currently being-refreshed binding */
if (priv->current_binding_msg && !priv->current_binding) {
......@@ -882,11 +898,12 @@ priv_binding_timeout (gpointer data)
/* find current binding and mark it for renewal */
for (i = priv->channels ; i; i = i->next) {
ChannelBinding *b = i->data;
if (b->timeout_source == g_source_get_id (source)) {
if (b->timeout_source == source) {
b->renew = TRUE;
/* Install timer to expire the permission */
b->timeout_source = g_timeout_add_seconds (STUN_EXPIRE_TIMEOUT,
priv_binding_expired_timeout, priv);
b->timeout_source = priv_timeout_add_with_context (priv,
STUN_EXPIRE_TIMEOUT, TRUE, priv_binding_expired_timeout, priv);
/* Send renewal */
if (!priv->current_binding_msg)
priv_send_channel_bind (priv, NULL, b->channel, &b->peer);
......@@ -1118,12 +1135,14 @@ nice_turn_socket_parse_recv (NiceSocket *sock, NiceSocket **from_sock,
binding->renew = FALSE;
/* Remove any existing timer */
if (binding->timeout_source)
g_source_remove (binding->timeout_source);
if (binding->timeout_source) {
g_source_destroy (binding->timeout_source);
g_source_unref (binding->timeout_source);
}
/* Install timer to schedule refresh of the permission */
binding->timeout_source =
g_timeout_add_seconds (STUN_BINDING_TIMEOUT,
priv_binding_timeout, priv);
priv_timeout_add_with_context (priv, STUN_BINDING_TIMEOUT,
TRUE, priv_binding_timeout, priv);
}
priv_process_pending_bindings (priv);
}
......@@ -1206,8 +1225,8 @@ nice_turn_socket_parse_recv (NiceSocket *sock, NiceSocket **from_sock,
if (stun_message_get_class (&msg) == STUN_RESPONSE &&
!priv->permission_timeout_source) {
priv->permission_timeout_source =
g_timeout_add_seconds (STUN_PERMISSION_TIMEOUT,
priv_permission_timeout, priv);
priv_timeout_add_with_context (priv, STUN_PERMISSION_TIMEOUT,
TRUE, priv_permission_timeout, priv);
}
/* send enqued data */
......@@ -1532,7 +1551,7 @@ priv_schedule_tick (TurnPriv *priv)
guint timeout = stun_timer_remainder (&priv->current_binding_msg->timer);
if (timeout > 0) {
priv->tick_source_channel_bind =
priv_timeout_add_with_context (priv, timeout,
priv_timeout_add_with_context (priv, timeout, FALSE,
priv_retransmissions_tick, priv);
} else {
priv_retransmissions_tick_unlocked (priv);
......@@ -1548,8 +1567,12 @@ priv_schedule_tick (TurnPriv *priv)
timeout = stun_timer_remainder (&current_create_permission_msg->timer);
if (timeout > 0) {
if (priv->tick_source_create_permission) {
g_source_destroy (priv->tick_source_create_permission);
g_source_unref (priv->tick_source_create_permission);
}
priv->tick_source_create_permission =
priv_timeout_add_with_context (priv,
priv_timeout_add_with_context (priv, FALSE,
timeout,
priv_retransmissions_create_permission_tick,
priv);
......
......@@ -57,6 +57,7 @@
#ifndef _WIN32_COMMON_H
#define _WIN32_COMMON_H
#include "config.h"
#include <sys/types.h>
/* 7.18.1.1 Exact-width integer types */
......@@ -69,8 +70,10 @@ typedef unsigned uint32_t;
typedef long long int64_t;
typedef unsigned long long uint64_t;
#ifndef _SSIZE_T_
#ifndef HAVE_SIZE_T
typedef unsigned int size_t;
#endif
#ifndef HAVE_SSIZE_T
typedef unsigned long ssize_t;
#endif
......
......@@ -155,10 +155,15 @@ static gboolean timer_cb (gpointer pointer)
return FALSE;
}
static void cb_writable (NiceAgent*agent, guint stream_id, guint component_id)
static void cb_writable (NiceAgent*agent, guint stream_id, guint component_id,
gpointer user_data)
{
guint *ls_id = user_data;
if (stream_id == *ls_id && component_id == 1) {
g_debug ("Transport is now writable, stopping mainloop");
g_main_loop_quit (global_mainloop);
*ls_id = 0;
}
}
static void cb_nice_recv (NiceAgent *agent, guint stream_id, guint component_id, guint len, gchar *buf, gpointer user_data)
......@@ -176,6 +181,9 @@ static void cb_nice_recv (NiceAgent *agent, guint stream_id, guint component_id,
if (strncmp ("12345678", buf, 8))
return;
if (component_id == 2)
return;
if (GPOINTER_TO_UINT (user_data) == 2) {
g_debug ("right agent received %d bytes, stopping mainloop", len);
global_ragent_read = len;
......@@ -370,7 +378,7 @@ static int run_full_test (NiceAgent *lagent, NiceAgent *ragent, NiceAddress *bas
nice_agent_set_port_range (ragent, rs_id, 2, 10000, 10002);
g_assert (nice_agent_gather_candidates (ragent, rs_id) == TRUE);
#ifdef USE_LOOPBACK
#if USE_LOOPBACK
{
GSList *cands = NULL, *i;
NiceCandidate *cand = NULL;
......@@ -466,10 +474,13 @@ static int run_full_test (NiceAgent *lagent, NiceAgent *ragent, NiceAddress *bas
g_debug ("Sending data returned -1 in %s mode", reliable?"Reliable":"Non-reliable");
if (reliable) {
gulong signal_handler;
guint ls_id_copy = ls_id;
signal_handler = g_signal_connect (G_OBJECT (lagent),
"reliable-transport-writable", G_CALLBACK (cb_writable), NULL);
"reliable-transport-writable", G_CALLBACK (cb_writable), &ls_id_copy);
g_debug ("Running mainloop until transport is writable");
g_main_loop_run (global_mainloop);
while (ls_id_copy == ls_id)
g_main_context_iteration (NULL, TRUE);
g_signal_handler_disconnect(G_OBJECT (lagent), signal_handler);
ret = nice_agent_send (lagent, ls_id, 1, 16, "1234567812345678");
......@@ -477,7 +488,8 @@ static int run_full_test (NiceAgent *lagent, NiceAgent *ragent, NiceAddress *bas
}
g_debug ("Sent %d bytes", ret);
g_assert (ret == 16);
g_main_loop_run (global_mainloop);
while (global_ragent_read != 16)
g_main_context_iteration (NULL, TRUE);
g_assert (global_ragent_read == 16);
g_debug ("test-fullmode: Ran mainloop, removing streams...");
......@@ -589,14 +601,18 @@ static int run_full_test_delayed_answer (NiceAgent *lagent, NiceAgent *ragent, N
/* note: test payload send and receive */
global_ragent_read = 0;
ret = nice_agent_send (lagent, ls_id, 1, 16, "1234567812345678");
{
if (ret == -1) {
gboolean reliable = FALSE;
g_object_get (G_OBJECT (lagent), "reliable", &reliable, NULL);
if (reliable) {
gulong signal_handler;
guint ls_id_copy = ls_id;
signal_handler = g_signal_connect (G_OBJECT (lagent),
"reliable-transport-writable", G_CALLBACK (cb_writable), NULL);
g_main_loop_run (global_mainloop);
"reliable-transport-writable", G_CALLBACK (cb_writable), &ls_id_copy);
g_debug ("Running mainloop until transport is writable");
while (ls_id_copy == ls_id)
g_main_context_iteration (NULL, TRUE);
g_signal_handler_disconnect(G_OBJECT (lagent), signal_handler);
ret = nice_agent_send (lagent, ls_id, 1, 16, "1234567812345678");
......