Commit da41258a authored by Juan Navarro's avatar Juan Navarro Committed by Olivier Crête

Use per-agent locks and GWeakRefs in callbacks from timeout sources

Work on libnice's bug #1 in Gitlab. This work is composed of multiple
merged parts:

- "Global lock contention removed"
Phabricator D1900: https://phabricator.freedesktop.org/D1900
By @nifigase
Opened in GitLab as Merge Request !12

- "agent: properly handle NiceAgent ref in callbacks from timeout
sources"
Phabricator D1898: https://phabricator.freedesktop.org/D1898
By @mparis
This patch was itself based upon a previous version of the work done in
D1900. After the switch of hosting, it got lost.

On top of these, additions to follow some review comments from @ocrete:
- https://phabricator.freedesktop.org/D1900#40412
- https://phabricator.freedesktop.org/D1898#39332
parent 78bdcfad
......@@ -138,6 +138,8 @@ struct _NiceAgent
{
GObject parent; /* gobject pointer */
GMutex agent_mutex; /* Mutex used for thread-safe lib */
gboolean full_mode; /* property: full-mode */
gchar *stun_server_ip; /* property: STUN server IP */
guint stun_server_port; /* property: STUN server port */
......@@ -208,8 +210,8 @@ NiceStream *agent_find_stream (NiceAgent *agent, guint stream_id);
void agent_gathering_done (NiceAgent *agent);
void agent_signal_gathering_done (NiceAgent *agent);
void agent_lock (void);
void agent_unlock (void);
void agent_lock (NiceAgent *agent);
void agent_unlock (NiceAgent *agent);
void agent_unlock_and_emit (NiceAgent *agent);
void agent_signal_new_selected_pair (
......@@ -235,8 +237,14 @@ void agent_signal_initial_binding_request_received (NiceAgent *agent, NiceStream
guint64 agent_candidate_pair_priority (NiceAgent *agent, NiceCandidate *local, NiceCandidate *remote);
typedef gboolean (*NiceTimeoutLockedCallback)(NiceAgent *agent,
gpointer user_data);
void agent_timeout_add_with_context (NiceAgent *agent, GSource **out,
const gchar *name, guint interval, GSourceFunc function, gpointer data);
const gchar *name, guint interval, NiceTimeoutLockedCallback function,
gpointer data);
void agent_timeout_add_seconds_with_context (NiceAgent *agent, GSource **out,
const gchar *name, guint interval, NiceTimeoutLockedCallback function,
gpointer data);
StunUsageIceCompatibility agent_to_ice_compatibility (NiceAgent *agent);
StunUsageTurnCompatibility agent_to_turn_compatibility (NiceAgent *agent);
......
This diff is collapsed.
......@@ -894,7 +894,7 @@ nice_component_emit_io_callback (NiceComponent *component,
agent_unlock_and_emit (agent);
io_callback (agent, stream_id,
component_id, buf_len, (gchar *) buf, io_user_data);
agent_lock ();
agent_lock (agent);
} else {
IOCallbackData *data;
......@@ -1210,7 +1210,7 @@ component_source_prepare (GSource *source, gint *timeout_)
return FALSE;
/* Needed due to accessing the Component. */
agent_lock ();
agent_lock (agent);
if (!agent_find_component (agent,
component_source->stream_id, component_source->component_id, NULL,
......
This diff is collapsed.
......@@ -83,7 +83,6 @@ struct _StunTransaction
struct _CandidateCheckPair
{
NiceAgent *agent; /* back pointer to owner */
guint stream_id;
guint component_id;
NiceCandidate *local;
......
......@@ -998,10 +998,9 @@ NiceCandidate *discovery_learn_remote_peer_reflexive_candidate (
*
* @return will return FALSE when no more pending timers.
*/
static gboolean priv_discovery_tick_unlocked (gpointer pointer)
static gboolean priv_discovery_tick_unlocked (NiceAgent *agent)
{
CandidateDiscovery *cand;
NiceAgent *agent = pointer;
GSList *i;
int not_done = 0; /* note: track whether to continue timer */
size_t buffer_len = 0;
......@@ -1183,20 +1182,12 @@ static gboolean priv_discovery_tick_unlocked (gpointer pointer)
return TRUE;
}
static gboolean priv_discovery_tick (gpointer pointer)
static gboolean priv_discovery_tick_agent_locked (NiceAgent *agent,
gpointer pointer)
{
NiceAgent *agent = pointer;
gboolean ret;
agent_lock();
if (g_source_is_destroyed (g_main_current_source ())) {
nice_debug ("Source was destroyed. "
"Avoided race condition in priv_discovery_tick");
agent_unlock ();
return FALSE;
}
ret = priv_discovery_tick_unlocked (pointer);
ret = priv_discovery_tick_unlocked (agent);
if (ret == FALSE) {
if (agent->discovery_timer_source != NULL) {
g_source_destroy (agent->discovery_timer_source);
......@@ -1204,7 +1195,6 @@ static gboolean priv_discovery_tick (gpointer pointer)
agent->discovery_timer_source = NULL;
}
}
agent_unlock_and_emit (agent);
return ret;
}
......@@ -1227,7 +1217,7 @@ void discovery_schedule (NiceAgent *agent)
if (res == TRUE) {
agent_timeout_add_with_context (agent, &agent->discovery_timer_source,
"Candidate discovery tick", agent->timer_ta,
priv_discovery_tick, agent);
priv_discovery_tick_agent_locked, NULL);
}
}
}
......
......@@ -341,7 +341,7 @@ nice_input_stream_close (GInputStream *stream, GCancellable *cancellable,
if (agent == NULL)
return TRUE;
agent_lock ();
agent_lock (agent);
/* Shut down the read side of the pseudo-TCP stream, if it still exists. */
if (agent_find_component (agent, priv->stream_id, priv->component_id,
......@@ -350,7 +350,7 @@ nice_input_stream_close (GInputStream *stream, GCancellable *cancellable,
pseudo_tcp_socket_shutdown (component->tcp, PSEUDO_TCP_SHUTDOWN_RD);
}
agent_unlock ();
agent_unlock (agent);
g_object_unref (agent);
......@@ -376,7 +376,7 @@ nice_input_stream_is_readable (GPollableInputStream *stream)
if (agent == NULL)
return FALSE;
agent_lock ();
agent_lock (agent);
if (!agent_find_component (agent, priv->stream_id, priv->component_id,
&_stream, &component)) {
......@@ -405,7 +405,7 @@ nice_input_stream_is_readable (GPollableInputStream *stream)
}
done:
agent_unlock ();
agent_unlock (agent);
g_object_unref (agent);
......
......@@ -485,7 +485,7 @@ nice_output_stream_close (GOutputStream *stream, GCancellable *cancellable,
if (agent == NULL)
return TRUE;
agent_lock ();
agent_lock (agent);
/* Shut down the write side of the pseudo-TCP stream. */
if (agent_find_component (agent, priv->stream_id, priv->component_id,
......@@ -494,7 +494,7 @@ nice_output_stream_close (GOutputStream *stream, GCancellable *cancellable,
pseudo_tcp_socket_shutdown (component->tcp, PSEUDO_TCP_SHUTDOWN_WR);
}
agent_unlock ();
agent_unlock (agent);
g_object_unref (agent);
......@@ -519,7 +519,7 @@ nice_output_stream_is_writable (GPollableOutputStream *stream)
if (agent == NULL)
return FALSE;
agent_lock ();
agent_lock (agent);
if (!agent_find_component (agent, priv->stream_id, priv->component_id,
&_stream, &component)) {
......@@ -540,7 +540,7 @@ nice_output_stream_is_writable (GPollableOutputStream *stream)
}
done:
agent_unlock ();
agent_unlock (agent);
g_object_unref (agent);
......@@ -618,7 +618,7 @@ nice_output_stream_create_source (GPollableOutputStream *stream,
if (agent == NULL)
return component_source;
agent_lock ();
agent_lock (agent);
/* Grab the socket for this component. */
if (!agent_find_component (agent, priv->stream_id, priv->component_id,
......@@ -638,7 +638,7 @@ nice_output_stream_create_source (GPollableOutputStream *stream,
}
done:
agent_unlock ();
agent_unlock (agent);
g_object_unref (agent);
......
......@@ -60,6 +60,7 @@
#define TCP_NODELAY 1
typedef struct {
GMutex mutex;
NiceAddress remote_addr;
GQueue send_queue;
GMainContext *context;
......@@ -101,6 +102,7 @@ nice_tcp_bsd_socket_new_from_gsock (GMainContext *ctx, GSocket *gsock,
if (ctx == NULL)
ctx = g_main_context_default ();
g_mutex_init (&priv->mutex);
priv->context = g_main_context_ref (ctx);
priv->remote_addr = *remote_addr;
priv->error = FALSE;
......@@ -227,6 +229,8 @@ socket_close (NiceSocket *sock)
if (priv->context)
g_main_context_unref (priv->context);
g_mutex_clear (&priv->mutex);
g_slice_free(TcpPriv, sock->priv);
}
......@@ -424,12 +428,12 @@ socket_send_more (
NiceSocket *sock = (NiceSocket *) data;
TcpPriv *priv = sock->priv;
agent_lock ();
g_mutex_lock (&priv->mutex);
if (g_source_is_destroyed (g_main_current_source ())) {
nice_debug ("Source was destroyed. "
"Avoided race condition in tcp-bsd.c:socket_send_more");
agent_unlock ();
g_mutex_unlock (&priv->mutex);
return FALSE;
}
......@@ -441,7 +445,7 @@ socket_send_more (
g_source_unref (priv->io_source);
priv->io_source = NULL;
agent_unlock ();
g_mutex_unlock (&priv->mutex);
if (priv->writable_cb)
priv->writable_cb (sock, priv->writable_data);
......@@ -449,6 +453,6 @@ socket_send_more (
return FALSE;
}
agent_unlock ();
g_mutex_unlock (&priv->mutex);
return TRUE;
}
......@@ -71,6 +71,7 @@ typedef struct {
} ChannelBinding;
typedef struct {
GMutex mutex;
GMainContext *ctx;
StunAgent agent;
GList *channels;
......@@ -144,7 +145,7 @@ static gboolean priv_send_channel_bind (UdpTurnPriv *priv,
const NiceAddress *peer);
static gboolean priv_add_channel_binding (UdpTurnPriv *priv,
const NiceAddress *peer);
static gboolean priv_forget_send_request (gpointer pointer);
static gboolean priv_forget_send_request_agent_locked (gpointer pointer);
static void priv_clear_permissions (UdpTurnPriv *priv);
static guint
......@@ -209,6 +210,7 @@ nice_udp_turn_socket_new (GMainContext *ctx, NiceAddress *addr,
STUN_AGENT_USAGE_NO_ALIGNED_ATTRIBUTES);
}
g_mutex_init (&priv->mutex);
priv->channels = NULL;
priv->current_binding = NULL;
priv->base_socket = base_socket;
......@@ -420,18 +422,16 @@ socket_recv_messages (NiceSocket *sock,
return i;
}
/* interval is given in milliseconds */
static GSource *
priv_timeout_add_with_context (UdpTurnPriv *priv, guint interval,
gboolean seconds, GSourceFunc function, gpointer data)
GSourceFunc function, gpointer data)
{
GSource *source;
GSource *source = NULL;
g_return_val_if_fail (function != NULL, NULL);
if (seconds)
source = g_timeout_source_new_seconds (interval);
else
source = g_timeout_source_new (interval);
source = g_timeout_source_new (interval);
g_source_set_callback (source, function, data, NULL);
g_source_attach (source, priv->ctx);
......@@ -825,7 +825,7 @@ socket_send_message (NiceSocket *sock, const NiceAddress *to,
req->priv = priv;
stun_message_id (&msg, req->id);
req->source = priv_timeout_add_with_context (priv,
STUN_END_TIMEOUT, FALSE, priv_forget_send_request, req);
STUN_END_TIMEOUT, priv_forget_send_request_agent_locked, req);
g_queue_push_tail (priv->send_requests, req);
}
}
......@@ -962,19 +962,10 @@ socket_is_based_on (NiceSocket *sock, NiceSocket *other)
}
static gboolean
priv_forget_send_request (gpointer pointer)
priv_forget_send_request_agent_locked (gpointer pointer)
{
SendRequest *req = pointer;
agent_lock ();
if (g_source_is_destroyed (g_main_current_source ())) {
nice_debug ("Source was destroyed. "
"Avoided race condition in turn.c:priv_forget_send_request");
agent_unlock ();
return FALSE;
}
stun_agent_forget_transaction (&req->priv->agent, req->id);
g_queue_remove (req->priv->send_requests, req);
......@@ -983,8 +974,6 @@ priv_forget_send_request (gpointer pointer)
g_source_unref (req->source);
req->source = NULL;
agent_unlock ();
g_slice_free (SendRequest, req);
return FALSE;
......@@ -997,11 +986,11 @@ priv_permission_timeout (gpointer data)
nice_debug ("Permission is about to timeout, schedule renewal");
agent_lock ();
g_mutex_lock (&priv->mutex);
/* remove all permissions for this agent (the permission for the peer
we are sending to will be renewed) */
priv_clear_permissions (priv);
agent_unlock ();
g_mutex_unlock (&priv->mutex);
return TRUE;
}
......@@ -1015,16 +1004,6 @@ priv_binding_expired_timeout (gpointer data)
nice_debug ("Permission expired, refresh failed");
agent_lock ();
source = g_main_current_source ();
if (g_source_is_destroyed (source)) {
nice_debug ("Source was destroyed. "
"Avoided race condition in turn.c:priv_binding_expired_timeout");
agent_unlock ();
return FALSE;
}
/* find current binding and destroy it */
for (i = priv->channels ; i; i = i->next) {
ChannelBinding *b = i->data;
......@@ -1061,8 +1040,6 @@ priv_binding_expired_timeout (gpointer data)
}
}
agent_unlock ();
return FALSE;
}
......@@ -1075,16 +1052,6 @@ priv_binding_timeout (gpointer data)
nice_debug ("Permission is about to timeout, sending binding renewal");
agent_lock ();
source = g_main_current_source ();
if (g_source_is_destroyed (source)) {
nice_debug ("Source was destroyed. "
"Avoided race condition in turn.c:priv_binding_timeout");
agent_unlock ();
return FALSE;
}
/* find current binding and mark it for renewal */
for (i = priv->channels ; i; i = i->next) {
ChannelBinding *b = i->data;
......@@ -1099,7 +1066,7 @@ priv_binding_timeout (gpointer data)
/* Install timer to expire the permission */
b->timeout_source = priv_timeout_add_with_context (priv,
STUN_EXPIRE_TIMEOUT, TRUE, priv_binding_expired_timeout, priv);
STUN_EXPIRE_TIMEOUT * 1000, priv_binding_expired_timeout, priv);
/* Send renewal */
if (!priv->current_binding_msg)
......@@ -1108,8 +1075,6 @@ priv_binding_timeout (gpointer data)
}
}
agent_unlock ();
return FALSE;
}
......@@ -1372,8 +1337,8 @@ nice_udp_turn_socket_parse_recv (NiceSocket *sock, NiceSocket **from_sock,
}
/* Install timer to schedule refresh of the permission */
binding->timeout_source =
priv_timeout_add_with_context (priv, STUN_BINDING_TIMEOUT,
TRUE, priv_binding_timeout, priv);
priv_timeout_add_with_context (priv,
STUN_BINDING_TIMEOUT * 1000, priv_binding_timeout, priv);
}
priv_process_pending_bindings (priv);
}
......@@ -1463,8 +1428,9 @@ nice_udp_turn_socket_parse_recv (NiceSocket *sock, NiceSocket **from_sock,
if (stun_message_get_class (&msg) == STUN_RESPONSE &&
!priv->permission_timeout_source) {
priv->permission_timeout_source =
priv_timeout_add_with_context (priv, STUN_PERMISSION_TIMEOUT,
TRUE, priv_permission_timeout, priv);
priv_timeout_add_with_context (priv,
STUN_PERMISSION_TIMEOUT * 1000, priv_permission_timeout,
priv);
}
/* send enqued data */
......@@ -1721,14 +1687,6 @@ priv_retransmissions_tick (gpointer pointer)
{
UdpTurnPriv *priv = pointer;
agent_lock ();
if (g_source_is_destroyed (g_main_current_source ())) {
nice_debug ("Source was destroyed. "
"Avoided race condition in turn.c:priv_retransmissions_tick");
agent_unlock ();
return FALSE;
}
if (priv_retransmissions_tick_unlocked (priv) == FALSE) {
if (priv->tick_source_channel_bind != NULL) {
g_source_destroy (priv->tick_source_channel_bind);
......@@ -1736,7 +1694,6 @@ priv_retransmissions_tick (gpointer pointer)
priv->tick_source_channel_bind = NULL;
}
}
agent_unlock ();
return FALSE;
}
......@@ -1746,21 +1703,11 @@ priv_retransmissions_create_permission_tick (gpointer pointer)
{
UdpTurnPriv *priv = pointer;
agent_lock ();
if (g_source_is_destroyed (g_main_current_source ())) {
nice_debug ("Source was destroyed. Avoided race condition in "
"turn.c:priv_retransmissions_create_permission_tick");
agent_unlock ();
return FALSE;
}
/* This will call priv_retransmissions_create_permission_tick_unlocked() for
* every pending permission with an expired timer and will create a new timer
* if there are pending permissions that require it */
priv_schedule_tick (priv);
agent_unlock ();
return FALSE;
}
......@@ -1781,7 +1728,7 @@ priv_schedule_tick (UdpTurnPriv *priv)
guint timeout = stun_timer_remainder (&priv->current_binding_msg->timer);
if (timeout > 0) {
priv->tick_source_channel_bind =
priv_timeout_add_with_context (priv, timeout, FALSE,
priv_timeout_add_with_context (priv, timeout,
priv_retransmissions_tick, priv);
} else {
priv_retransmissions_tick_unlocked (priv);
......@@ -1819,8 +1766,7 @@ priv_schedule_tick (UdpTurnPriv *priv)
/* We create one timer for the minimal timeout we need */
if (min_timeout != G_MAXUINT) {
priv->tick_source_create_permission =
priv_timeout_add_with_context (priv, FALSE,
min_timeout,
priv_timeout_add_with_context (priv, min_timeout,
priv_retransmissions_create_permission_tick,
priv);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment