Commit b8f8b048 authored by Michael Smith's avatar Michael Smith
Browse files

gst/tcp/gstmultifdsink.*: Make using the remove or clear signals threadsafe.

Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_add_full),
(gst_multi_fd_sink_remove), (gst_multi_fd_sink_clear),
(gst_multi_fd_sink_get_stats),
(gst_multi_fd_sink_remove_client_link),
(gst_multi_fd_sink_queue_buffer),
(gst_multi_fd_sink_handle_clients):
* gst/tcp/gstmultifdsink.h:
Make using the remove or clear signals threadsafe.
Make calling get-stats with an invalid fd not segfault.
Fixes 368273.
parent 351622d0
2006-10-31 Michael Smith <msmith@fluendo.com>
* gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_add_full),
(gst_multi_fd_sink_remove), (gst_multi_fd_sink_clear),
(gst_multi_fd_sink_get_stats),
(gst_multi_fd_sink_remove_client_link),
(gst_multi_fd_sink_queue_buffer),
(gst_multi_fd_sink_handle_clients):
* gst/tcp/gstmultifdsink.h:
Make using the remove or clear signals threadsafe.
Make calling get-stats with an invalid fd not segfault.
Fixes 368273.
2006-10-31 Wim Taymans <wim@fluendo.com> 2006-10-31 Wim Taymans <wim@fluendo.com>
   
* gst-libs/gst/rtp/Makefile.am: * gst-libs/gst/rtp/Makefile.am:
...@@ -688,6 +688,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, ...@@ -688,6 +688,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
client->burst_max_unit = max_unit; client->burst_max_unit = max_unit;
client->burst_max_value = max_value; client->burst_max_value = max_value;
client->sync_method = sync_method; client->sync_method = sync_method;
client->currently_removing = FALSE;
/* update start time */ /* update start time */
g_get_current_time (&now); g_get_current_time (&now);
...@@ -706,6 +707,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, ...@@ -706,6 +707,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
/* we can add the fd now */ /* we can add the fd now */
clink = sink->clients = g_list_prepend (sink->clients, client); clink = sink->clients = g_list_prepend (sink->clients, client);
g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink); g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
sink->clients_cookie++;
/* set the socket to non blocking */ /* set the socket to non blocking */
res = fcntl (fd, F_SETFL, O_NONBLOCK); res = fcntl (fd, F_SETFL, O_NONBLOCK);
...@@ -775,28 +777,45 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd) ...@@ -775,28 +777,45 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
if (clink != NULL) { if (clink != NULL) {
GstTCPClient *client = (GstTCPClient *) clink->data; GstTCPClient *client = (GstTCPClient *) clink->data;
if (client->status != GST_CLIENT_STATUS_OK) {
GST_INFO_OBJECT (sink,
"[fd %5d] Client already disconnecting with status %d",
fd, client->status);
goto done;
}
client->status = GST_CLIENT_STATUS_REMOVED; client->status = GST_CLIENT_STATUS_REMOVED;
gst_multi_fd_sink_remove_client_link (sink, clink); gst_multi_fd_sink_remove_client_link (sink, clink);
SEND_COMMAND (sink, CONTROL_RESTART); SEND_COMMAND (sink, CONTROL_RESTART);
} else { } else {
GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd); GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
} }
done:
CLIENTS_UNLOCK (sink); CLIENTS_UNLOCK (sink);
} }
/* can be called both through the signal (ie from any thread) or when stopping, /* can be called both through the signal (i.e. from any thread) or when
* after the writing thread has shut down */ * stopping, after the writing thread has shut down */
void void
gst_multi_fd_sink_clear (GstMultiFdSink * sink) gst_multi_fd_sink_clear (GstMultiFdSink * sink)
{ {
GList *clients, *next; GList *clients, *next;
guint32 cookie;
GST_DEBUG_OBJECT (sink, "clearing all clients"); GST_DEBUG_OBJECT (sink, "clearing all clients");
CLIENTS_LOCK (sink); CLIENTS_LOCK (sink);
restart:
cookie = sink->clients_cookie;
for (clients = sink->clients; clients; clients = next) { for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client; GstTCPClient *client;
if (cookie != sink->clients_cookie) {
GST_DEBUG_OBJECT (sink, "cookie changed while removing all clients");
goto restart;
}
client = (GstTCPClient *) clients->data; client = (GstTCPClient *) clients->data;
next = g_list_next (clients); next = g_list_next (clients);
...@@ -826,6 +845,9 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd) ...@@ -826,6 +845,9 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd)
CLIENTS_LOCK (sink); CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->fd_hash, &fd); clink = g_hash_table_lookup (sink->fd_hash, &fd);
if (clink == NULL)
goto noclient;
client = (GstTCPClient *) clink->data; client = (GstTCPClient *) clink->data;
if (client != NULL) { if (client != NULL) {
GValue value = { 0 }; GValue value = { 0 };
...@@ -866,6 +888,8 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd) ...@@ -866,6 +888,8 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd)
g_value_set_uint64 (&value, client->dropped_buffers); g_value_set_uint64 (&value, client->dropped_buffers);
result = g_value_array_append (result, &value); result = g_value_array_append (result, &value);
} }
noclient:
CLIENTS_UNLOCK (sink); CLIENTS_UNLOCK (sink);
/* python doesn't like a NULL pointer yet */ /* python doesn't like a NULL pointer yet */
...@@ -879,7 +903,7 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd) ...@@ -879,7 +903,7 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd)
/* should be called with the clientslock helt. /* should be called with the clientslock helt.
* Note that we don't close the fd as we didn't open it in the first * Note that we don't close the fd as we didn't open it in the first
* place. An application should connect to the client-removed signal and * place. An application should connect to the client-fd-removed signal and
* close the fd itself. * close the fd itself.
*/ */
static void static void
...@@ -894,6 +918,13 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link) ...@@ -894,6 +918,13 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
fd = client->fd.fd; fd = client->fd.fd;
if (client->currently_removing) {
GST_WARNING_OBJECT (sink, "[fd %5d] client is already being removed", fd);
return;
} else {
client->currently_removing = TRUE;
}
/* FIXME: if we keep track of ip we can log it here and signal */ /* FIXME: if we keep track of ip we can log it here and signal */
switch (client->status) { switch (client->status) {
case GST_CLIENT_STATUS_OK: case GST_CLIENT_STATUS_OK:
...@@ -958,6 +989,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link) ...@@ -958,6 +989,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
* and take a shortcut when it did not change between unlocking and locking * and take a shortcut when it did not change between unlocking and locking
* our mutex. For now we just walk the list again. */ * our mutex. For now we just walk the list again. */
sink->clients = g_list_remove (sink->clients, client); sink->clients = g_list_remove (sink->clients, client);
sink->clients_cookie++;
if (fclass->removed) if (fclass->removed)
fclass->removed (sink, client->fd.fd); fclass->removed (sink, client->fd.fd);
...@@ -1972,6 +2004,7 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) ...@@ -1972,6 +2004,7 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
GTimeVal nowtv; GTimeVal nowtv;
GstClockTime now; GstClockTime now;
gint max_buffers, soft_max_buffers; gint max_buffers, soft_max_buffers;
guint cookie;
g_get_current_time (&nowtv); g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv); now = GST_TIMEVAL_TO_TIME (nowtv);
...@@ -1995,9 +2028,17 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) ...@@ -1995,9 +2028,17 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
/* then loop over the clients and update the positions */ /* then loop over the clients and update the positions */
max_buffer_usage = 0; max_buffer_usage = 0;
restart:
cookie = sink->clients_cookie;
for (clients = sink->clients; clients; clients = next) { for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client; GstTCPClient *client;
if (cookie != sink->clients_cookie) {
GST_DEBUG_OBJECT (sink, "Clients cookie outdated, restarting");
goto restart;
}
client = (GstTCPClient *) clients->data; client = (GstTCPClient *) clients->data;
next = g_list_next (clients); next = g_list_next (clients);
...@@ -2031,10 +2072,11 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) ...@@ -2031,10 +2072,11 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
/* remove the client, the fd set will be cleared and the select thread /* remove the client, the fd set will be cleared and the select thread
* will be signaled */ * will be signaled */
client->status = GST_CLIENT_STATUS_SLOW; client->status = GST_CLIENT_STATUS_SLOW;
gst_multi_fd_sink_remove_client_link (sink, clients);
/* set client to invalid position while being removed */ /* set client to invalid position while being removed */
client->bufpos = -1; client->bufpos = -1;
gst_multi_fd_sink_remove_client_link (sink, clients);
need_signal = TRUE; need_signal = TRUE;
continue;
} else if (client->bufpos == 0 || client->new_connection) { } else if (client->bufpos == 0 || client->new_connection) {
/* can send data to this client now. need to signal the select thread that /* can send data to this client now. need to signal the select thread that
* the fd_set changed */ * the fd_set changed */
...@@ -2134,6 +2176,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) ...@@ -2134,6 +2176,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
GList *clients, *next; GList *clients, *next;
gboolean try_again; gboolean try_again;
GstMultiFdSinkClass *fclass; GstMultiFdSinkClass *fclass;
guint cookie;
fclass = GST_MULTI_FD_SINK_GET_CLASS (sink); fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
...@@ -2157,12 +2200,19 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) ...@@ -2157,12 +2200,19 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
/* ok, so one or more of the fds is invalid. We loop over them to find /* ok, so one or more of the fds is invalid. We loop over them to find
* the ones that give an error to the F_GETFL fcntl. */ * the ones that give an error to the F_GETFL fcntl. */
CLIENTS_LOCK (sink); CLIENTS_LOCK (sink);
restart:
cookie = sink->clients_cookie;
for (clients = sink->clients; clients; clients = next) { for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client; GstTCPClient *client;
int fd; int fd;
long flags; long flags;
int res; int res;
if (cookie != sink->clients_cookie) {
GST_DEBUG_OBJECT (sink, "Cookie changed finding bad fd");
goto restart;
}
client = (GstTCPClient *) clients->data; client = (GstTCPClient *) clients->data;
next = g_list_next (clients); next = g_list_next (clients);
...@@ -2242,9 +2292,17 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) ...@@ -2242,9 +2292,17 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
/* Check the clients */ /* Check the clients */
CLIENTS_LOCK (sink); CLIENTS_LOCK (sink);
restart2:
cookie = sink->clients_cookie;
for (clients = sink->clients; clients; clients = next) { for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client; GstTCPClient *client;
if (sink->clients_cookie != cookie) {
GST_DEBUG_OBJECT (sink, "Restarting loop, cookie out of date");
goto restart2;
}
client = (GstTCPClient *) clients->data; client = (GstTCPClient *) clients->data;
next = g_list_next (clients); next = g_list_next (clients);
......
...@@ -155,6 +155,8 @@ typedef struct { ...@@ -155,6 +155,8 @@ typedef struct {
gboolean caps_sent; gboolean caps_sent;
gboolean new_connection; gboolean new_connection;
gboolean currently_removing;
/* method to sync client when connecting */ /* method to sync client when connecting */
GstSyncMethod sync_method; GstSyncMethod sync_method;
GstUnitType burst_min_unit; GstUnitType burst_min_unit;
...@@ -193,6 +195,7 @@ struct _GstMultiFdSink { ...@@ -193,6 +195,7 @@ struct _GstMultiFdSink {
GStaticRecMutex clientslock; /* lock to protect the clients list */ GStaticRecMutex clientslock; /* lock to protect the clients list */
GList *clients; /* list of clients we are serving */ GList *clients; /* list of clients we are serving */
GHashTable *fd_hash; /* index on fd to client */ GHashTable *fd_hash; /* index on fd to client */
guint clients_cookie; /* Cookie to detect changes to the clients list */
GstFDSetMode mode; GstFDSetMode mode;
GstFDSet *fdset; GstFDSet *fdset;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment