Commit 50d11e4d authored by Olivier Crête's avatar Olivier Crête

agent: Allow re-entrant read calls, only one is processed at a time

This allows for multiple threads to read from the agent concurrently.
parent 2cfc5454
......@@ -3927,13 +3927,23 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent,
goto done;
}
/* Disallow re-entrant reads. */
while (stream && component && (component->n_recv_messages != 0 ||
component->recv_messages != NULL)) {
g_cond_wait (&component->read_cond, &agent_mutex);
if (!agent_find_component (agent, stream_id, component_id,
&stream, &component)) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
"Invalid stream/component.");
goto done;
}
}
nice_debug ("%s: %p: (%s):", G_STRFUNC, agent,
blocking ? "blocking" : "non-blocking");
nice_debug_input_message_composition (messages, n_messages);
/* Disallow re-entrant reads. */
g_assert (component->n_recv_messages == 0 &&
component->recv_messages == NULL);
/* Set the component’s receive buffer. */
context = component_dup_io_context (component);
......@@ -4047,6 +4057,8 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent,
component_set_io_callback (component, NULL, NULL, NULL, 0, NULL);
recv_error:
g_cond_broadcast (&component->read_cond);
/* Tidy up. Below this point, @component may be %NULL. */
if (cancellable_source != NULL) {
g_source_destroy (cancellable_source);
......
......@@ -125,6 +125,7 @@ component_new (guint id, NiceAgent *agent, Stream *stream)
g_mutex_init (&component->io_mutex);
g_queue_init (&component->pending_io_messages);
g_cond_init (&component->read_cond);
component->io_callback_id = 0;
component->own_ctx = g_main_context_new ();
......@@ -283,6 +284,7 @@ component_free (Component *cmp)
g_clear_object (&cmp->stop_cancellable);
g_clear_object (&cmp->iostream);
g_mutex_clear (&cmp->io_mutex);
g_cond_clear (&cmp->read_cond);
if (cmp->ctx != NULL) {
g_main_context_unref (cmp->ctx);
......
......@@ -203,6 +203,9 @@ struct _Component
* ACKs on. The messages are dequeued to the pseudo-TCP socket once a selected
* UDP socket is available. This is only used for reliable Components. */
GQueue queued_tcp_packets;
/* Condition to prevent multiple concurrent reads */
GCond read_cond;
};
Component *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment