...
 
Commits (2)
......@@ -136,7 +136,7 @@ static guint signals[N_SIGNALS];
#if GLIB_CHECK_VERSION(2,31,8)
static GMutex agent_mutex; /* Mutex used for thread-safe lib */
#else
static GStaticMutex agent_mutex = G_STATIC_MUTEX_INIT;
static GMutex *agent_mutex;
#endif
static void priv_free_upnp (NiceAgent *agent);
......@@ -170,12 +170,12 @@ void agent_unlock (void)
#else
void agent_lock(void)
{
g_static_mutex_lock (&agent_mutex);
g_mutex_lock (agent_mutex);
}
void agent_unlock(void)
{
g_static_mutex_unlock (&agent_mutex);
g_mutex_unlock (agent_mutex);
}
#endif
......@@ -1011,6 +1011,19 @@ static void priv_generate_tie_breaker (NiceAgent *agent)
static void
nice_agent_init (NiceAgent *agent)
{
#if !GLIB_CHECK_VERSION(2,31,8)
static gsize mutex_init_flag = 0;
if (g_once_init_enter (&mutex_init_flag))
{
gsize setup_value = 42;
mutex = g_mutex_new ();
g_once_init_leave (&mutex_init_flag, setup_value);
}
#endif
agent->next_candidate_id = 1;
agent->next_stream_id = 1;
......@@ -3914,13 +3927,23 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent,
goto done;
}
/* Disallow re-entrant reads. */
while (stream && component && (component->n_recv_messages != 0 ||
component->recv_messages != NULL)) {
g_cond_wait (&component->read_cond, &agent_mutex);
if (!agent_find_component (agent, stream_id, component_id,
&stream, &component)) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
"Invalid stream/component.");
goto done;
}
}
nice_debug ("%s: %p: (%s):", G_STRFUNC, agent,
blocking ? "blocking" : "non-blocking");
nice_debug_input_message_composition (messages, n_messages);
/* Disallow re-entrant reads. */
g_assert (component->n_recv_messages == 0 &&
component->recv_messages == NULL);
/* Set the component’s receive buffer. */
context = component_dup_io_context (component);
......@@ -4034,6 +4057,8 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent,
component_set_io_callback (component, NULL, NULL, NULL, 0, NULL);
recv_error:
g_cond_broadcast (&component->read_cond);
/* Tidy up. Below this point, @component may be %NULL. */
if (cancellable_source != NULL) {
g_source_destroy (cancellable_source);
......
......@@ -125,6 +125,7 @@ component_new (guint id, NiceAgent *agent, Stream *stream)
g_mutex_init (&component->io_mutex);
g_queue_init (&component->pending_io_messages);
g_cond_init (&component->read_cond);
component->io_callback_id = 0;
component->own_ctx = g_main_context_new ();
......@@ -283,6 +284,7 @@ component_free (Component *cmp)
g_clear_object (&cmp->stop_cancellable);
g_clear_object (&cmp->iostream);
g_mutex_clear (&cmp->io_mutex);
g_cond_clear (&cmp->read_cond);
if (cmp->ctx != NULL) {
g_main_context_unref (cmp->ctx);
......
......@@ -203,6 +203,9 @@ struct _Component
* ACKs on. The messages are dequeued to the pseudo-TCP socket once a selected
* UDP socket is available. This is only used for reliable Components. */
GQueue queued_tcp_packets;
/* Condition to prevent multiple concurrent reads */
GCond read_cond;
};
Component *
......