Commit 4de919a1 authored by Wim Taymans's avatar Wim Taymans
Browse files

jitterbuffer: use separate thread for timeouts

Use a separate thread for scheduling the timeouts instead of using the
downstream streaming thread that might block at any time.
parent b363832c
......@@ -117,17 +117,29 @@ enum
if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
goto label; \
} G_STMT_END
#define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
#define JBUF_WAIT(priv) (g_cond_wait (&(priv)->jbuf_cond, &(priv)->jbuf_lock))
#define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \
JBUF_WAIT(priv); \
if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
goto label; \
#define JBUF_WAIT_TIMER(priv) G_STMT_START { \
(priv)->waiting_timer = TRUE; \
g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \
(priv)->waiting_timer = FALSE; \
} G_STMT_END
#define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \
if (G_UNLIKELY ((priv)->waiting_timer)) \
g_cond_signal (&(priv)->jbuf_timer); \
} G_STMT_END
#define JBUF_SIGNAL(priv) (g_cond_signal (&(priv)->jbuf_cond))
#define JBUF_WAIT_EVENT(priv,label) G_STMT_START { \
(priv)->waiting_event = TRUE; \
g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
(priv)->waiting_event = FALSE; \
if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
goto label; \
} G_STMT_END
#define JBUF_SIGNAL_EVENT(priv) G_STMT_START { \
if (G_UNLIKELY ((priv)->waiting_event)) \
g_cond_signal (&(priv)->jbuf_event); \
} G_STMT_END
struct _GstRtpJitterBufferPrivate
{
......@@ -136,13 +148,18 @@ struct _GstRtpJitterBufferPrivate
RTPJitterBuffer *jbuf;
GMutex jbuf_lock;
GCond jbuf_cond;
gboolean waiting;
gboolean waiting_timer;
GCond jbuf_timer;
gboolean waiting_event;
GCond jbuf_event;
gboolean discont;
gboolean ts_discont;
gboolean active;
guint64 out_offset;
gboolean timer_running;
GThread *timer_thread;
/* properties */
guint latency_ms;
guint64 latency_ns;
......@@ -321,6 +338,8 @@ static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
static void
gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
{
......@@ -596,7 +615,8 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
priv->jbuf = rtp_jitter_buffer_new ();
g_mutex_init (&priv->jbuf_lock);
g_cond_init (&priv->jbuf_cond);
g_cond_init (&priv->jbuf_timer);
g_cond_init (&priv->jbuf_event);
/* reset skew detection initialy */
rtp_jitter_buffer_reset_skew (priv->jbuf);
......@@ -641,7 +661,8 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
g_array_free (jitterbuffer->priv->timers, TRUE);
g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
g_cond_clear (&jitterbuffer->priv->jbuf_cond);
g_cond_clear (&jitterbuffer->priv->jbuf_timer);
g_cond_clear (&jitterbuffer->priv->jbuf_event);
g_object_unref (jitterbuffer->priv->jbuf);
......@@ -831,7 +852,7 @@ gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
GST_TIME_ARGS (priv->out_offset));
priv->active = active;
JBUF_SIGNAL (priv);
JBUF_SIGNAL_EVENT (priv);
}
if (!active) {
rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
......@@ -980,7 +1001,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
priv->srcresult = GST_FLOW_FLUSHING;
GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
/* this unblocks any waiting pops on the src pad task */
JBUF_SIGNAL (priv);
JBUF_SIGNAL_EVENT (priv);
/* unlock clock, we just unschedule, the entry will be released by the
* locking streaming thread. */
unschedule_current_timer (jitterbuffer);
......@@ -1077,13 +1098,16 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
priv->last_pt = -1;
/* block until we go to PLAYING */
priv->blocked = TRUE;
priv->timer_running = TRUE;
priv->timer_thread =
g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
JBUF_UNLOCK (priv);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
JBUF_LOCK (priv);
/* unblock to allow streaming in PLAYING */
priv->blocked = FALSE;
JBUF_SIGNAL (priv);
JBUF_SIGNAL_EVENT (priv);
JBUF_UNLOCK (priv);
break;
default:
......@@ -1108,7 +1132,13 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
JBUF_LOCK (priv);
gst_buffer_replace (&priv->last_sr, NULL);
priv->timer_running = FALSE;
JBUF_SIGNAL_TIMER (priv);
JBUF_UNLOCK (priv);
g_thread_join (priv->timer_thread);
priv->timer_thread = NULL;
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
......@@ -1229,7 +1259,7 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
if (ret && !priv->eos) {
GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
priv->eos = TRUE;
JBUF_SIGNAL (priv);
JBUF_SIGNAL_EVENT (priv);
} else if (priv->eos) {
GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
} else {
......@@ -1479,6 +1509,7 @@ add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
timer->rtx_retry = 0;
}
recalculate_timer (jitterbuffer, timer);
JBUF_SIGNAL_TIMER (priv);
return timer;
}
......@@ -1599,6 +1630,7 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
continue;
if (gap == 0) {
GST_DEBUG ("found timer for current seqnum");
/* the timer for the current seqnum */
timer = test;
} else if (gap > priv->rtx_delay_reorder) {
......@@ -1624,6 +1656,7 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
/* if we had a timer, remove it, we don't know when to expect the next
* packet. */
remove_timer (jitterbuffer, timer);
JBUF_SIGNAL_EVENT (priv);
}
}
......@@ -1676,8 +1709,9 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
expected_dts = priv->last_in_dts + duration;
if (priv->do_retransmission) {
expected++;
type = TIMER_TYPE_EXPECTED;
if (find_timer (jitterbuffer, type, expected))
expected++;
} else {
type = TIMER_TYPE_LOST;
}
......@@ -1894,8 +1928,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
do_handle_sync (jitterbuffer);
/* signal addition of new buffer when the _loop is waiting. */
if (priv->waiting && priv->active)
JBUF_SIGNAL (priv);
if (priv->active && priv->waiting_timer)
JBUF_SIGNAL_EVENT (priv);
/* let's unschedule and unblock any waiting buffers. We only want to do this
* when the tail buffer changed */
......@@ -2194,7 +2228,7 @@ wait:
}
/* the timeout for when we expected a packet expired */
static GstFlowReturn
static void
do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
GstClockTimeDiff clock_jitter)
{
......@@ -2228,12 +2262,10 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
}
reschedule_timer (jitterbuffer, timer, timer->seqnum,
timer->rtx_base + timer->rtx_retry);
return priv->srcresult;
}
/* a packet is lost */
static GstFlowReturn
static void
do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
GstClockTimeDiff clock_jitter)
{
......@@ -2286,6 +2318,7 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff;
/* remove timer now */
remove_timer (jitterbuffer, timer);
JBUF_SIGNAL_EVENT (priv);
if (priv->do_lost) {
GstEvent *event;
......@@ -2299,25 +2332,31 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
"late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
JBUF_UNLOCK (priv);
gst_pad_push_event (priv->srcpad, event);
JBUF_LOCK_CHECK (priv, flushing);
}
return GST_FLOW_OK;
/* ERRORS */
flushing:
{
GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
return priv->srcresult;
JBUF_LOCK (priv);
}
}
static GstFlowReturn
static void
do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
remove_timer (jitterbuffer, timer);
JBUF_SIGNAL_EVENT (priv);
}
return GST_FLOW_EOS;
static void
do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
GstClockTimeDiff clock_jitter)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
priv->next_seqnum = timer->seqnum;
remove_timer (jitterbuffer, timer);
JBUF_SIGNAL_EVENT (priv);
}
/* called when we need to wait for the next timeout.
......@@ -2327,137 +2366,127 @@ do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
*
* If there are no timers, we wait on a gcond until something new happens.
*/
static GstFlowReturn
static void
wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstFlowReturn result = GST_FLOW_OK;
gint i, len;
TimerData *timer = NULL;
GstClockTime timer_timeout = -1;
gint timer_idx;
len = priv->timers->len;
for (i = 0; i < len; i++) {
TimerData *test = &g_array_index (priv->timers, TimerData, i);
GstClockTime test_timeout = get_timeout (jitterbuffer, test);
GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %" GST_TIME_FORMAT,
i, test->seqnum, GST_TIME_ARGS (test_timeout));
/* find the smallest timeout */
if (timer == NULL || test_timeout == -1 || test_timeout < timer_timeout) {
timer = test;
timer_timeout = test_timeout;
if (timer_timeout == -1)
break;
JBUF_LOCK (priv);
while (priv->timer_running) {
TimerData *timer = NULL;
GstClockTime timer_timeout = -1;
gint i, len;
gint timer_idx;
len = priv->timers->len;
for (i = 0; i < len; i++) {
TimerData *test = &g_array_index (priv->timers, TimerData, i);
GstClockTime test_timeout = get_timeout (jitterbuffer, test);
GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
/* find the smallest timeout */
if (timer == NULL || test_timeout == -1 || test_timeout < timer_timeout) {
timer = test;
timer_timeout = test_timeout;
if (timer_timeout == -1)
break;
}
}
}
if (timer) {
GstClock *clock;
GstClockTime sync_time;
GstClockID id;
GstClockReturn ret;
GstClockTimeDiff clock_jitter;
/* no timestamp, timeout immeditately */
if (timer_timeout == -1)
goto do_timeout;
if (timer) {
GstClock *clock;
GstClockTime sync_time;
GstClockID id;
GstClockReturn ret;
GstClockTimeDiff clock_jitter;
/* no timestamp, timeout immeditately */
if (timer_timeout == -1)
goto do_timeout;
GST_OBJECT_LOCK (jitterbuffer);
clock = GST_ELEMENT_CLOCK (jitterbuffer);
if (!clock) {
GST_OBJECT_UNLOCK (jitterbuffer);
/* let's just push if there is no clock */
GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
goto do_timeout;
}
GST_OBJECT_LOCK (jitterbuffer);
clock = GST_ELEMENT_CLOCK (jitterbuffer);
if (!clock) {
/* prepare for sync against clock */
sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
/* add latency of peer to get input time */
sync_time += priv->peer_latency;
GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
" with sync time %" GST_TIME_FORMAT,
GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
/* create an entry for the clock */
id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
priv->unscheduled = FALSE;
priv->timer_timeout = timer_timeout;
priv->timer_seqnum = timer->seqnum;
timer_idx = timer->idx;
GST_OBJECT_UNLOCK (jitterbuffer);
/* let's just push if there is no clock */
GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
goto do_timeout;
}
/* prepare for sync against clock */
sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
/* add latency of peer to get input time */
sync_time += priv->peer_latency;
GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
" with sync time %" GST_TIME_FORMAT,
GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
/* create an entry for the clock */
id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
priv->unscheduled = FALSE;
priv->timer_timeout = timer_timeout;
priv->timer_seqnum = timer->seqnum;
timer_idx = timer->idx;
GST_OBJECT_UNLOCK (jitterbuffer);
/* release the lock so that the other end can push stuff or unlock */
JBUF_UNLOCK (priv);
ret = gst_clock_id_wait (id, &clock_jitter);
/* release the lock so that the other end can push stuff or unlock */
JBUF_UNLOCK (priv);
JBUF_LOCK (priv);
GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
ret, priv->timer_seqnum, clock_jitter);
/* and free the entry */
gst_clock_id_unref (id);
priv->clock_id = NULL;
/* at this point, the clock could have been unlocked by a timeout, a new
* tail element was added to the queue or because we are shutting down. Check
* for shutdown first. */
if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))
goto flushing;
if (priv->timers->len <= timer_idx)
goto done;
/* we released the lock, the array might have changed */
timer = &g_array_index (priv->timers, TimerData, timer_idx);
/* if changed to timeout immediately, do so */
if (timer->timeout == -1)
goto do_timeout;
/* if we got unscheduled and we are not flushing, it's because a new tail
* element became available in the queue or we flushed the queue.
* Grab it and try to push or sync. */
if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
goto done;
}
ret = gst_clock_id_wait (id, &clock_jitter);
do_timeout:
switch (timer->type) {
case TIMER_TYPE_EXPECTED:
result = do_expected_timeout (jitterbuffer, timer, clock_jitter);
break;
case TIMER_TYPE_LOST:
result = do_lost_timeout (jitterbuffer, timer, clock_jitter);
break;
case TIMER_TYPE_DEADLINE:
priv->next_seqnum = timer->seqnum;
remove_timer (jitterbuffer, timer);
break;
case TIMER_TYPE_EOS:
result = do_eos_timeout (jitterbuffer, timer);
JBUF_LOCK (priv);
if (!priv->timer_running)
break;
}
} else {
/* no timers, wait for activity */
GST_DEBUG_OBJECT (jitterbuffer, "waiting");
priv->waiting = TRUE;
JBUF_WAIT_CHECK (priv, flushing);
priv->waiting = FALSE;
GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
}
done:
return result;
GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
ret, priv->timer_seqnum, clock_jitter);
/* and free the entry */
gst_clock_id_unref (id);
priv->clock_id = NULL;
if (priv->timers->len <= timer_idx)
continue;
/* we released the lock, the array might have changed */
timer = &g_array_index (priv->timers, TimerData, timer_idx);
/* if changed to timeout immediately, do so */
if (timer->timeout == -1)
goto do_timeout;
/* if we got unscheduled and we are not flushing, it's because a new tail
* element became available in the queue or we flushed the queue.
* Grab it and try to push or sync. */
if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
continue;
}
flushing:
{
GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
return priv->srcresult;
do_timeout:
switch (timer->type) {
case TIMER_TYPE_EXPECTED:
do_expected_timeout (jitterbuffer, timer, clock_jitter);
break;
case TIMER_TYPE_LOST:
do_lost_timeout (jitterbuffer, timer, clock_jitter);
break;
case TIMER_TYPE_DEADLINE:
do_deadline_timeout (jitterbuffer, timer, clock_jitter);
break;
case TIMER_TYPE_EOS:
do_eos_timeout (jitterbuffer, timer);
break;
}
} else {
/* no timers, wait for activity */
GST_DEBUG_OBJECT (jitterbuffer, "waiting");
JBUF_WAIT_TIMER (priv);
GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
}
}
GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
return;
}
/*
......@@ -2477,9 +2506,13 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
JBUF_LOCK_CHECK (priv, flushing);
do {
result = handle_next_buffer (jitterbuffer);
if (G_LIKELY (result == GST_FLOW_WAIT))
if (G_LIKELY (result == GST_FLOW_WAIT)) {
GST_DEBUG_OBJECT (jitterbuffer, "waiting for event");
/* now wait for the next event */
result = wait_next_timeout (jitterbuffer);
JBUF_WAIT_EVENT (priv, flushing);
GST_DEBUG_OBJECT (jitterbuffer, "waiting for event done");
result = GST_FLOW_OK;
}
}
while (result == GST_FLOW_OK);
JBUF_UNLOCK (priv);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment