...
 
Commits (3)
......@@ -4795,15 +4795,14 @@ nice_agent_get_io_stream (NiceAgent *agent, guint stream_id,
g_return_val_if_fail (stream_id >= 1, NULL);
g_return_val_if_fail (component_id >= 1, NULL);
g_return_val_if_fail (agent->reliable, NULL);
agent_lock ();
if (!agent_find_component (agent, stream_id, component_id, NULL, &component))
goto done;
if (component->iostream == NULL)
component->iostream = nice_io_stream_new (agent, stream_id, component_id);
component->iostream = nice_io_stream_new (agent, stream_id, component_id,
agent->reliable);
iostream = g_object_ref (component->iostream);
......
......@@ -956,8 +956,8 @@ component_source_prepare (GSource *source, gint *timeout_)
child_socket_source = g_slice_new0 (SocketSource);
child_socket_source->socket = parent_socket_source->socket;
child_socket_source->source =
g_socket_create_source (child_socket_source->socket->fileno, G_IO_IN,
NULL);
g_socket_create_source (child_socket_source->socket->fileno,
component_source->condition, NULL);
g_source_set_dummy_callback (child_socket_source->source);
g_source_add_child_source (source, child_socket_source->source);
g_source_unref (child_socket_source->source);
......@@ -1062,6 +1062,7 @@ static GSourceFuncs component_source_funcs = {
* @component_id: The component's number
* @pollable_stream: a #GPollableInputStream or #GPollableOutputStream to pass
* to dispatched callbacks
* @condition: underlying socket condition to dispatch on
* @cancellable: (allow-none): a #GCancellable, or %NULL
*
* Create a new #ComponentSource, a type of #GSource which proxies poll events
......@@ -1078,21 +1079,24 @@ static GSourceFuncs component_source_funcs = {
* Returns: (transfer full): a new #ComponentSource; unref with g_source_unref()
*/
GSource *
component_input_source_new (NiceAgent *agent, guint stream_id,
guint component_id, GPollableInputStream *pollable_istream,
component_source_new (NiceAgent *agent, guint stream_id,
guint component_id, GObject *pollable_stream,
GIOCondition condition,
GCancellable *cancellable)
{
ComponentSource *component_source;
g_assert (G_IS_POLLABLE_INPUT_STREAM (pollable_istream));
g_assert (G_IS_POLLABLE_INPUT_STREAM (pollable_stream) ||
G_IS_POLLABLE_OUTPUT_STREAM (pollable_stream));
component_source =
(ComponentSource *)
g_source_new (&component_source_funcs, sizeof (ComponentSource));
g_source_set_name ((GSource *) component_source, "ComponentSource");
component_source->condition = condition;
component_source->component_socket_sources_age = 0;
component_source->pollable_stream = g_object_ref (pollable_istream);
component_source->pollable_stream = g_object_ref (pollable_stream);
g_weak_ref_init (&component_source->agent_ref, agent);
component_source->stream_id = stream_id;
component_source->component_id = component_id;
......
......@@ -74,7 +74,7 @@ struct _CandidatePairKeepalive
guint stream_id;
guint component_id;
StunTimer timer;
uint8_t stun_buffer[STUN_MAX_MESSAGE_SIZE];
uint8_t stun_buffer[STUN_MAX_MESSAGE_SIZE_IPV6];
StunMessage stun_message;
};
......@@ -237,9 +237,9 @@ void
component_free_socket_sources (Component *component);
GSource *
component_input_source_new (NiceAgent *agent, guint stream_id,
guint component_id, GPollableInputStream *pollable_istream,
GCancellable *cancellable);
component_source_new (NiceAgent *agent, guint stream_id,
guint component_id, GObject *pollable_istream,
GIOCondition condition, GCancellable *cancellable);
GMainContext *
component_dup_io_context (Component *component);
......
......@@ -666,7 +666,7 @@ static gboolean priv_conn_keepalive_tick_unlocked (NiceAgent *agent)
NiceAddress stun_server;
if (nice_address_set_from_string (&stun_server, agent->stun_server_ip)) {
StunAgent stun_agent;
uint8_t stun_buffer[STUN_MAX_MESSAGE_SIZE];
uint8_t stun_buffer[STUN_MAX_MESSAGE_SIZE_IPV6];
StunMessage stun_message;
size_t buffer_len = 0;
......
......@@ -77,7 +77,7 @@ struct _CandidateCheckPair
guint64 priority;
GTimeVal next_tick; /* next tick timestamp */
StunTimer timer;
uint8_t stun_buffer[STUN_MAX_MESSAGE_SIZE];
uint8_t stun_buffer[STUN_MAX_MESSAGE_SIZE_IPV6];
StunMessage stun_message;
};
......
......@@ -58,7 +58,7 @@ typedef struct
TurnServer *turn;
StunAgent stun_agent;
StunTimer timer;
uint8_t stun_buffer[STUN_MAX_MESSAGE_SIZE];
uint8_t stun_buffer[STUN_MAX_MESSAGE_SIZE_IPV6];
StunMessage stun_message;
uint8_t stun_resp_buffer[STUN_MAX_MESSAGE_SIZE];
StunMessage stun_resp_msg;
......@@ -76,7 +76,7 @@ typedef struct
GSource *timer_source;
GSource *tick_source;
StunTimer timer;
uint8_t stun_buffer[STUN_MAX_MESSAGE_SIZE];
uint8_t stun_buffer[STUN_MAX_MESSAGE_SIZE_IPV6];
StunMessage stun_message;
uint8_t stun_resp_buffer[STUN_MAX_MESSAGE_SIZE];
StunMessage stun_resp_msg;
......
......@@ -424,8 +424,8 @@ nice_input_stream_create_source (GPollableInputStream *stream,
if (agent == NULL)
goto dummy_source;
component_source = component_input_source_new (agent, priv->stream_id,
priv->component_id, stream, cancellable);
component_source = component_source_new (agent, priv->stream_id,
priv->component_id, G_OBJECT (stream), G_IO_IN, cancellable);
g_object_unref (agent);
......
......@@ -80,6 +80,8 @@ struct _NiceIOStreamPrivate
guint stream_id;
guint component_id;
gboolean is_datagram;
GInputStream *input_stream; /* owned */
GOutputStream *output_stream; /* owned */
};
......@@ -94,6 +96,7 @@ static GOutputStream *nice_io_stream_get_output_stream (GIOStream *stream);
static void streams_removed_cb (NiceAgent *agent, guint *stream_ids,
gpointer user_data);
static gboolean nice_io_stream_is_datagram (GIOStream *stream);
static void
nice_io_stream_class_init (NiceIOStreamClass *klass)
......@@ -109,6 +112,7 @@ nice_io_stream_class_init (NiceIOStreamClass *klass)
stream_class->get_input_stream = nice_io_stream_get_input_stream;
stream_class->get_output_stream = nice_io_stream_get_output_stream;
stream_class->is_datagram = nice_io_stream_is_datagram;
/*
* NiceIOStream:agent:
......@@ -277,17 +281,24 @@ nice_io_stream_set_property (GObject *object, guint prop_id,
* Since: 0.1.5
*/
GIOStream *
nice_io_stream_new (NiceAgent *agent, guint stream_id, guint component_id)
nice_io_stream_new (NiceAgent *agent, guint stream_id, guint component_id,
gboolean reliable)
{
GIOStream *stream;
g_return_val_if_fail (NICE_IS_AGENT (agent), NULL);
g_return_val_if_fail (stream_id > 0, NULL);
g_return_val_if_fail (component_id > 0, NULL);
return g_object_new (NICE_TYPE_IO_STREAM,
stream = g_object_new (NICE_TYPE_IO_STREAM,
"agent", agent,
"stream-id", stream_id,
"component-id", component_id,
NULL);
NICE_IO_STREAM (stream)->priv->is_datagram = !reliable;
return stream;
}
static GInputStream *
......@@ -348,3 +359,12 @@ streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
}
}
}
static gboolean
nice_io_stream_is_datagram (GIOStream *stream)
{
NiceIOStream *self = NICE_IO_STREAM (stream);
return self->priv->is_datagram;
}
......@@ -84,7 +84,7 @@ struct _NiceIOStream
};
GIOStream *nice_io_stream_new (NiceAgent *agent,
guint stream_id, guint component_id);
guint stream_id, guint component_id, gboolean reliable);
G_END_DECLS
......
......@@ -498,6 +498,14 @@ nice_output_stream_is_writable (GPollableOutputStream *stream)
goto done;
}
/* If it's a non-reliable agent, it never blocks, so one can
* always write
*/
if (!agent->reliable) {
retval = TRUE;
goto done;
}
/* Check whether any of the component’s FDs are pollable. */
for (i = component->socket_sources; i != NULL; i = i->next) {
SocketSource *socket_source = i->data;
......@@ -610,6 +618,15 @@ nice_output_stream_create_source (GPollableOutputStream *stream,
g_source_set_dummy_callback (cancellable_source);
g_source_add_child_source (component_source, cancellable_source);
g_source_unref (cancellable_source);
} else if (!agent->reliable) {
/* UDP streams are almost always writeable. */
GSource *child_source;
child_source = component_source_new (agent, priv->stream_id,
priv->component_id, G_OBJECT (stream), G_IO_OUT, cancellable);
g_source_set_dummy_callback (child_source);
g_source_add_child_source (component_source, child_source);
g_source_unref (child_source);
}
done:
......