Commit 335c9f28 authored by Jan Schmidt's avatar Jan Schmidt
Browse files

splitmux: Rewrite buffer collection and scheduling

Majorly change the way that splitmuxsink collects
incoming data and sends it to the output, so that it
makes all decisions about when / where to split files
on the input side.

Use separate queues for each stream, so they can be
grown individually and kept as small as possible.

This removes raciness I observed where sometimes
some data would end up put in a different output file
over multiple runs with the same input.

Also fixes hangs with input queues getting full
and causing muxing to stall out.
parent f7009eb5
This diff is collapsed.
......@@ -24,26 +24,39 @@
#include <gst/pbutils/pbutils.h>
G_BEGIN_DECLS
#define GST_TYPE_SPLITMUX_SINK (gst_splitmux_sink_get_type())
#define GST_SPLITMUX_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SPLITMUX_SINK,GstSplitMuxSink))
#define GST_SPLITMUX_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SPLITMUX_SINK,GstSplitMuxSinkClass))
#define GST_IS_SPLITMUX_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SPLITMUX_SINK))
#define GST_IS_SPLITMUX_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SPLITMUX_SINK))
typedef struct _GstSplitMuxSink GstSplitMuxSink;
typedef struct _GstSplitMuxSinkClass GstSplitMuxSinkClass;
GType gst_splitmux_sink_get_type(void);
GType gst_splitmux_sink_get_type (void);
gboolean register_splitmuxsink (GstPlugin * plugin);
typedef enum _SplitMuxState {
SPLITMUX_STATE_STOPPED,
SPLITMUX_STATE_COLLECTING_GOP_START,
SPLITMUX_STATE_WAITING_GOP_COMPLETE,
SPLITMUX_STATE_ENDING_FILE,
SPLITMUX_STATE_START_NEXT_FRAGMENT,
} SplitMuxState;
typedef enum _SplitMuxInputState
{
SPLITMUX_INPUT_STATE_STOPPED,
SPLITMUX_INPUT_STATE_COLLECTING_GOP_START, /* Waiting for the next ref ctx keyframe */
SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT, /* Waiting for all streams to collect GOP */
SPLITMUX_INPUT_STATE_FINISHING_UP /* Got EOS from reference ctx, send everything */
} SplitMuxInputState;
typedef enum _SplitMuxOutputState
{
SPLITMUX_OUTPUT_STATE_STOPPED,
SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND, /* Waiting first command packet from input */
SPLITMUX_OUTPUT_STATE_OUTPUT_GOP, /* Outputting a collected GOP */
SPLITMUX_OUTPUT_STATE_ENDING_FILE, /* Finishing the current fragment */
SPLITMUX_OUTPUT_STATE_START_NEXT_FILE /* Restarting after ENDING_FILE */
} SplitMuxOutputState;
typedef struct _SplitMuxOutputCommand
{
gboolean start_new_fragment; /* Whether to start a new fragment before advancing output ts */
GstClockTimeDiff max_output_ts; /* Set the limit to stop GOP output */
} SplitMuxOutputCommand;
typedef struct _MqStreamBuf
{
......@@ -59,6 +72,7 @@ typedef struct _MqStreamCtx
GstSplitMuxSink *splitmux;
guint q_overrun_id;
guint sink_pad_block_id;
guint src_pad_block_id;
......@@ -67,6 +81,7 @@ typedef struct _MqStreamCtx
gboolean flushing;
gboolean in_eos;
gboolean out_eos;
gboolean need_unblock;
GstSegment in_segment;
GstSegment out_segment;
......@@ -74,26 +89,24 @@ typedef struct _MqStreamCtx
GstClockTimeDiff in_running_time;
GstClockTimeDiff out_running_time;
guint64 in_bytes;
GstElement *q;
GQueue queued_bufs;
GstPad *sinkpad;
GstPad *srcpad;
gboolean out_blocked;
GstBuffer *cur_buffer;
GstEvent *pending_gap;
} MqStreamCtx;
struct _GstSplitMuxSink {
struct _GstSplitMuxSink
{
GstBin parent;
GMutex lock;
GCond data_cond;
GCond input_cond;
GCond output_cond;
SplitMuxState state;
gdouble mux_overhead;
GstClockTime threshold_time;
......@@ -101,9 +114,6 @@ struct _GstSplitMuxSink {
guint max_files;
gboolean send_keyframe_requests;
guint mq_max_buffers;
GstElement *mq;
GstElement *muxer;
GstElement *sink;
......@@ -112,25 +122,39 @@ struct _GstSplitMuxSink {
GstElement *provided_sink;
GstElement *active_sink;
gboolean ready_for_output;
gchar *location;
guint fragment_id;
GList *contexts;
MqStreamCtx *reference_ctx;
guint queued_gops;
SplitMuxInputState input_state;
GstClockTimeDiff max_in_running_time;
/* Number of bytes sent to the
* current fragment */
guint64 fragment_total_bytes;
/* Number of bytes we've collected into
* the GOP that's being collected */
guint64 gop_total_bytes;
/* Start time of the current fragment */
GstClockTimeDiff fragment_start_time;
/* Start time of the current GOP */
GstClockTimeDiff gop_start_time;
GQueue out_cmd_q; /* Queue of commands for output thread */
SplitMuxOutputState output_state;
GstClockTimeDiff max_out_running_time;
GstClockTimeDiff next_max_out_running_time;
GstClockTimeDiff muxed_out_time;
guint64 muxed_out_bytes;
gboolean have_muxed_something;
gboolean update_mux_start_time;
GstClockTimeDiff mux_start_time;
guint64 mux_start_bytes;
MqStreamCtx *reference_ctx;
/* Count of queued keyframes in the reference ctx */
guint queued_keyframes;
gboolean opening_first_fragment;
gboolean switching_fragment;
gboolean have_video;
......@@ -139,10 +163,10 @@ struct _GstSplitMuxSink {
gboolean async_pending;
};
struct _GstSplitMuxSinkClass {
struct _GstSplitMuxSinkClass
{
GstBinClass parent_class;
};
G_END_DECLS
#endif /* __GST_SPLITMUXSINK_H__ */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment