Commit a19497ab authored by Sebastian Dröge's avatar Sebastian Dröge

appsrc/sink: Fix optimization for only signalling waiters if someone is actually waiting

It is possible that both application and the stream are waiting
currently, if for example the following happens:
  1) app is waiting because no buffer in appsink
  2) appsink providing a buffer and waking up app
  3) appsink getting another buffer and waiting because it's full now
  4) app thread getting back control

Previously step 4 would overwrite that the appsink is currently waiting,
so it would never be signalled again.

https://bugzilla.gnome.org/show_bug.cgi?id=795551
parent 8600fc31
......@@ -75,9 +75,9 @@
typedef enum
{
NOONE_WAITING,
STREAM_WAITING, /* streaming thread is waiting for application thread */
APP_WAITING, /* application thread is waiting for streaming thread */
NOONE_WAITING = 0,
STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */
APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
} GstAppSinkWaitStatus;
struct _GstAppSinkPrivate
......@@ -731,9 +731,9 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
* consumed, which is a bit confusing for the application
*/
while (priv->num_buffers > 0 && !priv->flushing && priv->wait_on_eos) {
priv->wait_status = STREAM_WAITING;
priv->wait_status |= STREAM_WAITING;
g_cond_wait (&priv->cond, &priv->mutex);
priv->wait_status = NOONE_WAITING;
priv->wait_status &= ~STREAM_WAITING;
}
if (priv->flushing)
emit = FALSE;
......@@ -781,7 +781,7 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer)
GST_DEBUG_OBJECT (appsink, "setting preroll buffer %p", buffer);
gst_buffer_replace (&priv->preroll_buffer, buffer);
if (priv->wait_status == APP_WAITING)
if ((priv->wait_status & APP_WAITING))
g_cond_signal (&priv->cond);
emit = priv->emit_signals;
......@@ -902,9 +902,9 @@ restart:
}
/* wait for a buffer to be removed or flush */
priv->wait_status = STREAM_WAITING;
priv->wait_status |= STREAM_WAITING;
g_cond_wait (&priv->cond, &priv->mutex);
priv->wait_status = NOONE_WAITING;
priv->wait_status &= ~STREAM_WAITING;
if (priv->flushing)
goto flushing;
......@@ -914,7 +914,7 @@ restart:
gst_queue_array_push_tail (priv->queue, gst_mini_object_ref (data));
priv->num_buffers++;
if (priv->wait_status == APP_WAITING)
if ((priv->wait_status & APP_WAITING))
g_cond_signal (&priv->cond);
emit = priv->emit_signals;
......@@ -1012,9 +1012,9 @@ gst_app_sink_query (GstBaseSink * bsink, GstQuery * query)
g_mutex_lock (&priv->mutex);
GST_DEBUG_OBJECT (appsink, "waiting buffers to be consumed");
while (priv->num_buffers > 0 || priv->preroll_buffer) {
priv->wait_status = STREAM_WAITING;
priv->wait_status |= STREAM_WAITING;
g_cond_wait (&priv->cond, &priv->mutex);
priv->wait_status = NOONE_WAITING;
priv->wait_status &= ~STREAM_WAITING;
}
g_mutex_unlock (&priv->mutex);
ret = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query);
......@@ -1528,14 +1528,14 @@ gst_app_sink_try_pull_preroll (GstAppSink * appsink, GstClockTime timeout)
/* nothing to return, wait */
GST_DEBUG_OBJECT (appsink, "waiting for the preroll buffer");
priv->wait_status = APP_WAITING;
priv->wait_status |= APP_WAITING;
if (timeout_valid) {
if (!g_cond_wait_until (&priv->cond, &priv->mutex, end_time))
goto expired;
} else {
g_cond_wait (&priv->cond, &priv->mutex);
}
priv->wait_status = NOONE_WAITING;
priv->wait_status &= ~APP_WAITING;
}
sample =
gst_sample_new (priv->preroll_buffer, priv->preroll_caps,
......@@ -1550,7 +1550,7 @@ gst_app_sink_try_pull_preroll (GstAppSink * appsink, GstClockTime timeout)
expired:
{
GST_DEBUG_OBJECT (appsink, "timeout expired, return NULL");
priv->wait_status = NOONE_WAITING;
priv->wait_status &= ~APP_WAITING;
g_mutex_unlock (&priv->mutex);
return NULL;
}
......@@ -1626,14 +1626,14 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
/* nothing to return, wait */
GST_DEBUG_OBJECT (appsink, "waiting for a buffer");
priv->wait_status = APP_WAITING;
priv->wait_status |= APP_WAITING;
if (timeout_valid) {
if (!g_cond_wait_until (&priv->cond, &priv->mutex, end_time))
goto expired;
} else {
g_cond_wait (&priv->cond, &priv->mutex);
}
priv->wait_status = NOONE_WAITING;
priv->wait_status &= ~APP_WAITING;
}
obj = dequeue_buffer (appsink);
......@@ -1652,7 +1652,7 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
}
gst_mini_object_unref (obj);
if (priv->wait_status == STREAM_WAITING)
if ((priv->wait_status & STREAM_WAITING))
g_cond_signal (&priv->cond);
g_mutex_unlock (&priv->mutex);
......@@ -1663,7 +1663,7 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
expired:
{
GST_DEBUG_OBJECT (appsink, "timeout expired, return NULL");
priv->wait_status = NOONE_WAITING;
priv->wait_status &= ~APP_WAITING;
g_mutex_unlock (&priv->mutex);
return NULL;
}
......
......@@ -103,9 +103,9 @@
typedef enum
{
NOONE_WAITING,
STREAM_WAITING, /* streaming thread is waiting for application thread */
APP_WAITING, /* application thread is waiting for streaming thread */
NOONE_WAITING = 0,
STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */
APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
} GstAppSrcWaitStatus;
struct _GstAppSrcPrivate
......@@ -1242,7 +1242,7 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
priv->offset += buf_size;
/* signal that we removed an item */
if (priv->wait_status == APP_WAITING)
if ((priv->wait_status & APP_WAITING))
g_cond_broadcast (&priv->cond);
/* see if we go lower than the empty-percent */
......@@ -1277,9 +1277,9 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
goto eos;
/* nothing to return, wait a while for new data or flushing. */
priv->wait_status = STREAM_WAITING;
priv->wait_status |= STREAM_WAITING;
g_cond_wait (&priv->cond, &priv->mutex);
priv->wait_status = NOONE_WAITING;
priv->wait_status &= ~STREAM_WAITING;
}
g_mutex_unlock (&priv->mutex);
return ret;
......@@ -1840,9 +1840,9 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
GST_DEBUG_OBJECT (appsrc, "waiting for free space");
/* we are filled, wait until a buffer gets popped or when we
* flush. */
priv->wait_status = APP_WAITING;
priv->wait_status |= APP_WAITING;
g_cond_wait (&priv->cond, &priv->mutex);
priv->wait_status = NOONE_WAITING;
priv->wait_status &= ~APP_WAITING;
} else {
/* no need to wait for free space, we just pump more data into the
* queue hoping that the caller reacts to the enough-data signal and
......@@ -1867,7 +1867,7 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
priv->queued_bytes += gst_buffer_get_size (buffer);
}
if (priv->wait_status == STREAM_WAITING)
if ((priv->wait_status & STREAM_WAITING))
g_cond_broadcast (&priv->cond);
g_mutex_unlock (&priv->mutex);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment