Commit 3886a83f authored by Wim Taymans's avatar Wim Taymans Committed by Wim Taymans
Browse files

queue2: fix leak and improve buffering

Keep track of the max requested position and compare this to the write position
in the temp file to get the current amount of buffered data.
Fix memleak of all incomming buffers.

Fixes #588551
parent 5366b61b
......@@ -219,6 +219,7 @@ struct _GstQueue
FILE *temp_file;
guint64 writing_pos;
guint64 reading_pos;
guint64 max_reading_pos;
/* we need this to send the first new segment event of the stream
* because we can't save it on the file */
gboolean segment_event_received;
......@@ -244,7 +245,7 @@ struct _GstQueueClass
queue->cur_level.time, \
queue->max_level.time, \
(guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \
queue->writing_pos - queue->reading_pos : \
queue->writing_pos - queue->max_reading_pos : \
queue->queue->length))
#define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
......@@ -871,6 +872,11 @@ gst_queue_write_buffer_to_file (GstQueue * queue, GstBuffer * buffer)
GST_ERROR_OBJECT (queue, "fwrite returned error");
}
queue->writing_pos += size;
if (queue->writing_pos > queue->max_reading_pos)
queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos;
else
queue->cur_level.bytes = 0;
}
/* see if there is enough data in the file to read a full buffer */
......@@ -938,6 +944,12 @@ gst_queue_create_read (GstQueue * queue, guint64 offset, guint length,
*buffer = buf;
queue->reading_pos = offset + length;
queue->max_reading_pos = MAX (queue->max_reading_pos, queue->reading_pos);
if (queue->writing_pos > queue->max_reading_pos)
queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos;
else
queue->cur_level.bytes = 0;
return GST_FLOW_OK;
......@@ -1042,6 +1054,7 @@ gst_queue_open_temp_location_file (GstQueue * queue)
queue->writing_pos = 0;
queue->reading_pos = 0;
queue->max_reading_pos = 0;
return TRUE;
......@@ -1100,6 +1113,7 @@ gst_queue_flush_temp_file (GstQueue * queue)
queue->writing_pos = 0;
queue->reading_pos = 0;
queue->max_reading_pos = 0;
}
static void
......@@ -1143,6 +1157,7 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
queue->cur_level.buffers++;
queue->cur_level.bytes += size;
queue->bytes_in += size;
/* apply new buffer to segment stats */
apply_buffer (queue, buffer, &queue->sink_segment);
/* update the byterate stats */
......@@ -1172,7 +1187,10 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
goto unexpected_event;
queue->segment_event_received = TRUE;
if (queue->starting_segment != NULL)
gst_event_unref (queue->starting_segment);
queue->starting_segment = event;
item = NULL;
}
/* a new segment allows us to accept more buffers if we got UNEXPECTED
* from downstream */
......@@ -1196,6 +1214,9 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
if (!QUEUE_IS_USING_TEMP_FILE (queue))
g_queue_push_tail (queue->queue, item);
else
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
GST_QUEUE_SIGNAL_ADD (queue);
}
......@@ -1370,7 +1391,7 @@ gst_queue_is_empty (GstQueue * queue)
return FALSE;
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
return queue->writing_pos == queue->reading_pos;
return queue->writing_pos == queue->max_reading_pos;
} else {
if (queue->queue->length == 0)
return TRUE;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment