Commit 0556ec49 authored by Jakub Adam's avatar Jakub Adam Committed by Olivier Crête
Browse files

discovery: Asynchronous removal of TURN refreshes

When deleting a turn refresh, ensure the assocaited port allocations on
the TURN server are properly removed. This means wait for a response for
our 0-lifetime refresh request and implement retransmissions. Only
delete the refresh after TURN port removal has been confirmed or all
our retransmissions timed out.
parent f7319520
......@@ -3278,6 +3278,25 @@ static void priv_remove_keepalive_timer (NiceAgent *agent)
}
}
static gboolean
on_stream_refreshes_pruned (NiceAgent *agent, NiceStream *stream)
{
// This is called from a timeout cb with agent lock held
nice_stream_close (agent, stream);
agent_unlock (agent);
/* Actually free the stream. This should be done with the lock released, as
* it could end up disposing of a NiceIOStream, which tries to take the
* agent lock itself. */
g_object_unref (stream);
agent_lock (agent);
return G_SOURCE_REMOVE;
}
NICEAPI_EXPORT void
nice_agent_remove_stream (
NiceAgent *agent,
......@@ -3303,11 +3322,11 @@ nice_agent_remove_stream (
/* note: remove items with matching stream_ids from both lists */
conn_check_prune_stream (agent, stream);
discovery_prune_stream (agent, stream_id);
refresh_prune_stream (agent, stream_id);
refresh_prune_stream_async (agent, stream,
(NiceTimeoutLockedCallback) on_stream_refreshes_pruned);
/* Remove the stream and signal its removal. */
agent->streams = g_slist_remove (agent->streams, stream);
nice_stream_close (agent, stream);
if (!agent->streams)
priv_remove_keepalive_timer (agent);
......@@ -3316,13 +3335,6 @@ nice_agent_remove_stream (
g_memdup (stream_ids, sizeof(stream_ids)));
agent_unlock_and_emit (agent);
/* Actually free the stream. This should be done with the lock released, as
* it could end up disposing of a NiceIOStream, which tries to take the
* agent lock itself. */
g_object_unref (stream);
return;
}
NICEAPI_EXPORT void
......@@ -5143,8 +5155,6 @@ nice_agent_dispose (GObject *object)
/* step: free resources for the binding discovery timers */
discovery_free (agent);
g_assert (agent->discovery_list == NULL);
refresh_free (agent);
g_assert (agent->refresh_list == NULL);
/* step: free resources for the connectivity check timers */
conn_check_free (agent);
......
......@@ -208,6 +208,21 @@ nice_component_remove_socket (NiceAgent *agent, NiceComponent *cmp,
nice_component_detach_socket (cmp, nsocket);
}
static gboolean
on_candidate_refreshes_pruned (NiceAgent *agent, NiceCandidate *candidate)
{
NiceComponent *component;
if (agent_find_component (agent, candidate->stream_id,
candidate->component_id, NULL, &component)) {
nice_component_detach_socket (component, candidate->sockptr);
}
nice_candidate_free (candidate);
return G_SOURCE_REMOVE;
}
void
nice_component_clean_turn_servers (NiceAgent *agent, NiceComponent *cmp)
{
......@@ -258,13 +273,13 @@ nice_component_clean_turn_servers (NiceAgent *agent, NiceComponent *cmp)
for (i = relay_candidates; i; i = i->next) {
NiceCandidate * candidate = i->data;
refresh_prune_candidate (agent, candidate);
discovery_prune_socket (agent, candidate->sockptr);
if (stream) {
conn_check_prune_socket (agent, stream, cmp, candidate->sockptr);
}
nice_component_detach_socket (cmp, candidate->sockptr);
nice_candidate_free (candidate);
refresh_prune_candidate_async (agent, candidate,
(NiceTimeoutLockedCallback) on_candidate_refreshes_pruned);
}
}
......@@ -444,14 +459,13 @@ nice_component_update_selected_pair (NiceAgent *agent, NiceComponent *component,
if (component->selected_pair.local &&
component->selected_pair.local == component->turn_candidate) {
refresh_prune_candidate (agent, component->turn_candidate);
discovery_prune_socket (agent,
component->turn_candidate->sockptr);
if (stream)
conn_check_prune_socket (agent, stream, component,
component->turn_candidate->sockptr);
nice_component_detach_socket (component, component->turn_candidate->sockptr);
nice_candidate_free (component->turn_candidate);
refresh_prune_candidate_async (agent, component->turn_candidate,
(NiceTimeoutLockedCallback) on_candidate_refreshes_pruned);
component->turn_candidate = NULL;
}
......
......@@ -1492,7 +1492,7 @@ static gboolean priv_turn_allocate_refresh_retransmissions_tick_agent_locked (
stun_message_id (&cand->stun_message, id);
stun_agent_forget_transaction (&cand->stun_agent, id);
refresh_cancel (agent, cand);
refresh_free (agent, cand);
break;
}
case STUN_USAGE_TIMER_RETURN_RETRANSMIT:
......@@ -3737,7 +3737,7 @@ static gboolean priv_map_reply_to_relay_refresh (NiceAgent *agent, StunMessage *
for (i = agent->refresh_list; i && trans_found != TRUE; i = i->next) {
CandidateRefresh *cand = i->data;
if (cand->stun_message.buffer) {
if (!cand->disposing && cand->stun_message.buffer) {
stun_message_id (&cand->stun_message, refresh_id);
if (memcmp (refresh_id, response_id, sizeof(StunTransactionId)) == 0) {
......@@ -3787,11 +3787,11 @@ static gboolean priv_map_reply_to_relay_refresh (NiceAgent *agent, StunMessage *
priv_turn_allocate_refresh_tick_unlocked (agent, cand);
} else {
/* case: a real unauthorized error */
refresh_cancel (agent, cand);
refresh_free (agent, cand);
}
} else {
/* case: STUN error, the check STUN context was freed */
refresh_cancel (agent, cand);
refresh_free (agent, cand);
}
trans_found = TRUE;
}
......@@ -3802,6 +3802,42 @@ static gboolean priv_map_reply_to_relay_refresh (NiceAgent *agent, StunMessage *
return trans_found;
}
static gboolean priv_map_reply_to_relay_remove (NiceAgent *agent,
StunMessage *resp)
{
StunTransactionId response_id;
GSList *i;
stun_message_id (resp, response_id);
for (i = agent->refresh_list; i; i = i->next) {
CandidateRefresh *cand = i->data;
StunTransactionId request_id;
StunUsageTurnReturn res;
uint32_t lifetime;
if (!cand->disposing || !cand->stun_message.buffer) {
continue;
}
stun_message_id (&cand->stun_message, request_id);
if (memcmp (request_id, response_id, sizeof(StunTransactionId)) == 0) {
res = stun_usage_turn_refresh_process (resp, &lifetime,
agent_to_turn_compatibility (agent));
nice_debug ("Agent %p : priv_map_reply_to_relay_remove for %p res %d "
"with lifetime %u.", agent, cand, res, lifetime);
if (res != STUN_USAGE_TURN_RETURN_INVALID) {
refresh_free (agent, cand);
return TRUE;
}
}
}
return FALSE;
}
static gboolean priv_map_reply_to_keepalive_conncheck (NiceAgent *agent,
NiceComponent *component, StunMessage *resp)
......@@ -4324,6 +4360,9 @@ gboolean conn_check_handle_inbound_stun (NiceAgent *agent, NiceStream *stream,
if (trans_found != TRUE)
trans_found = priv_map_reply_to_relay_refresh (agent, &req);
if (trans_found != TRUE)
trans_found = priv_map_reply_to_relay_remove (agent, &req);
/* step: let's try to match the response to an existing keepalive conncheck */
if (trans_found != TRUE)
trans_found = priv_map_reply_to_keepalive_conncheck (agent, component,
......
......@@ -151,12 +151,74 @@ void discovery_prune_socket (NiceAgent *agent, NiceSocket *sock)
}
}
/*
* Frees a CandidateRefresh and calls destroy callback if it has been set.
*/
void refresh_free (NiceAgent *agent, CandidateRefresh *cand)
{
nice_debug ("Freeing candidate refresh %p", cand);
agent->refresh_list = g_slist_remove (agent->refresh_list, cand);
if (cand->timer_source != NULL) {
g_source_destroy (cand->timer_source);
g_clear_pointer (&cand->timer_source, g_source_unref);
}
if (cand->tick_source) {
g_source_destroy (cand->tick_source);
g_clear_pointer (&cand->tick_source, g_source_unref);
}
if (cand->destroy_cb) {
cand->destroy_cb (cand->destroy_cb_data);
}
g_slice_free (CandidateRefresh, cand);
}
static gboolean on_refresh_remove_timeout (NiceAgent *agent,
CandidateRefresh *cand)
{
switch (stun_timer_refresh (&cand->timer)) {
case STUN_USAGE_TIMER_RETURN_TIMEOUT:
{
StunTransactionId id;
nice_debug ("TURN deallocate for refresh %p timed out", cand);
stun_message_id (&cand->stun_message, id);
stun_agent_forget_transaction (&cand->stun_agent, id);
refresh_free (agent, cand);
break;
}
case STUN_USAGE_TIMER_RETURN_RETRANSMIT:
nice_debug ("Retransmitting TURN deallocate for refresh %p", cand);
agent_socket_send (cand->nicesock, &cand->server,
stun_message_length (&cand->stun_message), (gchar *)cand->stun_buffer);
G_GNUC_FALLTHROUGH;
case STUN_USAGE_TIMER_RETURN_SUCCESS:
agent_timeout_add_with_context (agent, &cand->tick_source,
"TURN deallocate retransmission", stun_timer_remainder (&cand->timer),
(NiceTimeoutLockedCallback) on_refresh_remove_timeout, cand);
break;
default:
break;
}
return G_SOURCE_REMOVE;
}
/*
* Frees the CandidateDiscovery structure pointed to
* by 'user data'. Compatible with g_slist_free_full().
* Closes the port associated with the candidate refresh on the TURN server by
* sending a refresh request that has zero lifetime. After a response is
* received or the request times out, 'cand' gets freed and 'cb' is called.
*/
static void refresh_free_item (NiceAgent *agent, CandidateRefresh *cand)
static gboolean refresh_remove_async (NiceAgent *agent, CandidateRefresh *cand,
GDestroyNotify cb, gpointer cb_data)
{
uint8_t *username;
gsize username_len;
......@@ -165,18 +227,19 @@ static void refresh_free_item (NiceAgent *agent, CandidateRefresh *cand)
size_t buffer_len = 0;
StunUsageTurnCompatibility turn_compat = agent_to_turn_compatibility (agent);
agent->refresh_list = g_slist_remove (agent->refresh_list, cand);
if (cand->disposing) {
return FALSE;
}
nice_debug ("Sending request to remove TURN allocation for refresh %p", cand);
cand->disposing = TRUE;
if (cand->timer_source != NULL) {
g_source_destroy (cand->timer_source);
g_source_unref (cand->timer_source);
cand->timer_source = NULL;
}
if (cand->tick_source != NULL) {
g_source_destroy (cand->tick_source);
g_source_unref (cand->tick_source);
cand->tick_source = NULL;
}
username = (uint8_t *)cand->candidate->turn->username;
username_len = (size_t) strlen (cand->candidate->turn->username);
......@@ -197,21 +260,15 @@ static void refresh_free_item (NiceAgent *agent, CandidateRefresh *cand)
agent_to_turn_compatibility (agent));
if (buffer_len > 0) {
StunTransactionId id;
agent_socket_send (cand->nicesock, &cand->server, buffer_len,
(gchar *)cand->stun_buffer);
/* forget the transaction since we don't care about the result and
* we don't implement retransmissions/timeout */
stun_message_id (&cand->stun_message, id);
stun_agent_forget_transaction (&cand->stun_agent, id);
/* send the refresh twice since we won't do retransmissions */
agent_socket_send (cand->nicesock, &cand->server,
buffer_len, (gchar *)cand->stun_buffer);
if (!nice_socket_is_reliable (cand->nicesock)) {
agent_socket_send (cand->nicesock, &cand->server,
buffer_len, (gchar *)cand->stun_buffer);
}
stun_timer_start (&cand->timer, agent->stun_initial_timeout,
agent->stun_max_retransmissions);
agent_timeout_add_with_context (agent, &cand->tick_source,
"TURN deallocate retransmission", stun_timer_remainder (&cand->timer),
(NiceTimeoutLockedCallback) on_refresh_remove_timeout, cand);
}
if (turn_compat == STUN_USAGE_TURN_COMPATIBILITY_MSN ||
......@@ -220,44 +277,92 @@ static void refresh_free_item (NiceAgent *agent, CandidateRefresh *cand)
g_free (password);
}
g_slice_free (CandidateRefresh, cand);
cand->destroy_cb = cb;
cand->destroy_cb_data = cb_data;
return TRUE;
}
/*
* Frees all discovery related resources for the agent.
*/
void refresh_free (NiceAgent *agent)
typedef struct {
NiceAgent *agent;
gpointer user_data;
guint items_to_free;
NiceTimeoutLockedCallback cb;
} RefreshPruneAsyncData;
static void on_refresh_removed (RefreshPruneAsyncData *data)
{
while (agent->refresh_list)
refresh_free_item (agent, agent->refresh_list->data);
if (data->items_to_free == 0 || --(data->items_to_free) == 0) {
GSource *timeout_source = NULL;
agent_timeout_add_with_context (data->agent, &timeout_source,
"Async refresh prune", 0, data->cb, data->user_data);
g_free (data);
}
}
static void refresh_prune_async (NiceAgent *agent, GSList *refreshes,
NiceTimeoutLockedCallback function, gpointer user_data)
{
RefreshPruneAsyncData *data = g_new0 (RefreshPruneAsyncData, 1);
GSList *it;
data->agent = agent;
data->user_data = user_data;
data->cb = function;
for (it = refreshes; it; it = it->next) {
if (refresh_remove_async (agent, it->data,
(GDestroyNotify) on_refresh_removed, data)) {
++data->items_to_free;
}
}
if (data->items_to_free == 0) {
/* Stream doesn't have any refreshes to remove. Invoke our callback once to
* schedule client's callback function. */
on_refresh_removed (data);
}
}
void refresh_prune_agent_async (NiceAgent *agent,
NiceTimeoutLockedCallback function)
{
refresh_prune_async (agent, agent->refresh_list, function, NULL);
}
/*
* Prunes the list of discovery processes for items related
* to stream 'stream_id'.
*
* @return TRUE on success, FALSE on a fatal error
* Removes the candidate refreshes related to 'stream' and asynchronously
* closes the associated port allocations on TURN server. Invokes 'function'
* when the process finishes.
*/
void refresh_prune_stream (NiceAgent *agent, guint stream_id)
void refresh_prune_stream_async (NiceAgent *agent, NiceStream *stream,
NiceTimeoutLockedCallback function)
{
GSList *refreshes = NULL;
GSList *i;
for (i = agent->refresh_list; i ;) {
for (i = agent->refresh_list; i ; i = i->next) {
CandidateRefresh *cand = i->data;
GSList *next = i->next;
/* Don't free the candidate refresh to the currently selected local candidate
* unless the whole pair is being destroyed.
*/
if (cand->stream_id == stream_id) {
refresh_free_item (agent, cand);
if (cand->stream_id == stream->id) {
refreshes = g_slist_append (refreshes, cand);
}
i = next;
}
refresh_prune_async (agent, refreshes, function, stream);
g_slist_free (refreshes);
}
/*
* Removes the candidate refreshes related to 'candidate'. The function does not
* close any associated port allocations on TURN server. Its purpose is in
* situations when an error is detected in socket communication that prevents
* sending more requests to the server.
*/
void refresh_prune_candidate (NiceAgent *agent, NiceCandidate *candidate)
{
GSList *i;
......@@ -267,18 +372,35 @@ void refresh_prune_candidate (NiceAgent *agent, NiceCandidate *candidate)
CandidateRefresh *refresh = i->data;
if (refresh->candidate == candidate) {
refresh_free_item (agent, refresh);
refresh_free(agent, refresh);
}
i = next;
}
}
void refresh_cancel (NiceAgent *agent, CandidateRefresh *refresh)
/*
* Removes the candidate refreshes related to 'candidate' and asynchronously
* closes the associated port allocations on TURN server. Invokes 'function'
* when the process finishes.
*/
void refresh_prune_candidate_async (NiceAgent *agent, NiceCandidate *candidate,
NiceTimeoutLockedCallback function)
{
refresh_free_item (agent, refresh);
}
GSList *refreshes = NULL;
GSList *i;
for (i = agent->refresh_list; i; i = i->next) {
CandidateRefresh *refresh = i->data;
if (refresh->candidate == candidate) {
refreshes = g_slist_append (refreshes, refresh);
}
}
refresh_prune_async (agent, refreshes, function, candidate);
g_slist_free (refreshes);
}
/*
* Adds a new local candidate. Implements the candidate pruning
......
......@@ -78,12 +78,20 @@ typedef struct
StunMessage stun_message;
uint8_t stun_resp_buffer[STUN_MAX_MESSAGE_SIZE];
StunMessage stun_resp_msg;
gboolean disposing;
GDestroyNotify destroy_cb;
gpointer destroy_cb_data;
} CandidateRefresh;
void refresh_free (NiceAgent *agent);
void refresh_prune_stream (NiceAgent *agent, guint stream_id);
void refresh_free (NiceAgent *agent, CandidateRefresh *refresh);
void refresh_prune_agent_async (NiceAgent *agent,
NiceTimeoutLockedCallback function);
void refresh_prune_stream_async (NiceAgent *agent, NiceStream *stream,
NiceTimeoutLockedCallback function);
void refresh_prune_candidate (NiceAgent *agent, NiceCandidate *candidate);
void refresh_cancel (NiceAgent *agent, CandidateRefresh *refresh);
void refresh_prune_candidate_async (NiceAgent *agent, NiceCandidate *candidate,
NiceTimeoutLockedCallback function);
void discovery_free (NiceAgent *agent);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment