Commit fc8c5cba authored by Tim-Philipp Müller's avatar Tim-Philipp Müller
Browse files

rtspconnection: make gst_rtsp_watch_queue_message() thread-safe

People might queue messages from a thread other than the thread in which
the main context which this watch is attached is iterated from, so use
a GAsyncQueue instead of a GList, so g_list_append() doesn't trample
over list nodes just freed in the other thread. This just fixes issues
I've had with gst-rtsp-server. We might need more locking in various
places here.
parent dfe96ce6
......@@ -2646,7 +2646,7 @@ struct _GstRTSPWatch
gboolean write_added;
/* queued message for transmission */
GList *messages;
GAsyncQueue *messages;
guint8 *write_data;
guint write_off;
guint write_len;
......@@ -2733,18 +2733,17 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback,
if (watch->write_data == NULL) {
GstRTSPRec *data;
if (!watch->messages)
/* get a new message from the queue */
data = g_async_queue_try_pop (watch->messages);
if (data == NULL)
goto done;
/* no data, get a new message from the queue */
data = watch->messages->data;
watch->messages = g_list_delete_link (watch->messages, watch->messages);
watch->write_off = 0;
watch->write_len = data->str->len;
watch->write_data = (guint8 *) g_string_free (data->str, FALSE);
watch->write_cseq = data->cseq;
data->str = NULL;
g_slice_free (GstRTSPRec, data);
}
......@@ -2759,7 +2758,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback,
watch->funcs.message_sent (watch, watch->write_cseq, watch->user_data);
done:
if (watch->messages == NULL && watch->write_added) {
if (g_async_queue_length (watch->messages) == 0 && watch->write_added) {
g_source_remove_poll ((GSource *) watch, &watch->writefd);
watch->write_added = FALSE;
watch->writefd.revents = 0;
......@@ -2786,22 +2785,25 @@ error:
}
}
static void
gst_rtsp_rec_free (gpointer data)
{
GstRTSPRec *rec = data;
g_string_free (rec->str, TRUE);
rec->str = NULL;
g_slice_free (GstRTSPRec, rec);
}
static void
gst_rtsp_source_finalize (GSource * source)
{
GstRTSPWatch *watch = (GstRTSPWatch *) source;
GList *walk;
build_reset (&watch->builder);
for (walk = watch->messages; walk; walk = g_list_next (walk)) {
GstRTSPRec *data = walk->data;
g_string_free (data->str, TRUE);
g_slice_free (GstRTSPRec, data);
}
g_list_free (watch->messages);
g_free (watch->write_data);
g_async_queue_unref (watch->messages);
watch->messages = NULL;
if (watch->notify)
watch->notify (watch->user_data);
......@@ -2851,6 +2853,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
result->conn = conn;
result->builder.state = STATE_START;
result->messages = g_async_queue_new_full (gst_rtsp_rec_free);
result->readfd.fd = -1;
result->writefd.fd = -1;
......@@ -2943,8 +2947,10 @@ queue_response (GstRTSPWatch * watch, GString * str, guint cseq)
data->cseq = cseq;
/* add the record to a queue. FIXME we would like to have an upper limit here */
watch->messages = g_list_append (watch->messages, data);
g_async_queue_push (watch->messages, data);
/* FIXME: does the following need to be made thread-safe? (queue_response
* might be called from a streaming thread, like appsink's render function) */
/* make sure the main context will now also check for writability on the
* socket */
if (!watch->write_added) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment