Commit 7524f806 authored by Thomas Vander Stichele's avatar Thomas Vander Stichele
Browse files

gst/tcp/: make multifdsink properly deal with streamheader:

Original commit message from CVS:
* gst/tcp/README:
* gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_init),
(gst_multi_fd_sink_remove_client_link),
(gst_multi_fd_sink_client_queue_caps),
(gst_multi_fd_sink_client_queue_buffer),
(gst_multi_fd_sink_handle_client_write),
(gst_multi_fd_sink_render):
* gst/tcp/gstmultifdsink.h:
make multifdsink properly deal with streamheader:
- streamheader is taken from caps
- buffers marked with IN_CAPS are not sent
- streamheaders are sent, on connection, from the caps of the
buffer where the client gets positioned to
- further streamheader changes are done every time the client
will receive a buffer with different caps
* tests/check/elements/multifdsink.c: (GST_START_TEST),
(gst_multifdsink_create_streamheader):
add tests for this
parent 61b592b7
2006-06-02 Thomas Vander Stichele <thomas at apestaart dot org>
* gst/tcp/README:
* gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_init),
(gst_multi_fd_sink_remove_client_link),
(gst_multi_fd_sink_client_queue_caps),
(gst_multi_fd_sink_client_queue_buffer),
(gst_multi_fd_sink_handle_client_write),
(gst_multi_fd_sink_render):
* gst/tcp/gstmultifdsink.h:
make multifdsink properly deal with streamheader:
- streamheader is taken from caps
- buffers marked with IN_CAPS are not sent
- streamheaders are sent, on connection, from the caps of the
buffer where the client gets positioned to
- further streamheader changes are done every time the client
will receive a buffer with different caps
* tests/check/elements/multifdsink.c: (GST_START_TEST),
(gst_multifdsink_create_streamheader):
add tests for this
2006-06-02 Michael Smith <msmith@fluendo.com>
 
* ext/vorbis/vorbisdec.c: (vorbis_handle_identification_packet):
......
......@@ -29,3 +29,22 @@ TODO
----
- implement DNS resolution
multifdsink
-----------
- operation:
- client fd gets added when "add" signal gets emitted on multifdsink
- signal handler creates a GstTCPClient structure, adds it to ->clients,
and adds the fd to ->fd_hash, then emits client-added
- client
- when a buffer comes in:
- the _render vmethod puts the buffer on the global queue
- and increases bytes_to_serve
- (currently it sets streamheaders, but since this is treated globally
this is wrong - clients can be at different positions in the stream)
- when a client issues a write (ie requests data):
- when using GDP, if no caps sent yet, send caps first, then set caps_sent
- if streamheader buffers, and we haven't sent yet to this client,
send current streamheader buffers, then set streamheader_sent
- send out buffers
......@@ -537,6 +537,8 @@ gst_multi_fd_sink_init (GstMultiFdSink * this, GstMultiFdSinkClass * klass)
this->timeout = DEFAULT_TIMEOUT;
this->sync_method = DEFAULT_SYNC_METHOD;
this->header_flags = 0;
}
static void
......@@ -792,6 +794,10 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
g_slist_free (client->sending);
client->sending = NULL;
if (client->caps)
gst_caps_unref (client->caps);
client->caps = NULL;
/* unlock the mutex before signaling because the signal handler
* might query some properties */
CLIENTS_UNLOCK (sink);
......@@ -936,7 +942,8 @@ gst_multi_fd_sink_client_queue_caps (GstMultiFdSink * sink,
client->fd.fd, string);
g_free (string);
if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
if (!gst_dp_packet_from_caps (caps, sink->header_flags, &length, &header,
&payload)) {
GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps");
return FALSE;
}
......@@ -965,11 +972,118 @@ static gboolean
gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
GstTCPClient * client, GstBuffer * buffer)
{
GstCaps *caps;
/* TRUE: send them if the new caps have them */
gboolean send_streamheader = FALSE;
GstStructure *s;
/* before we queue the buffer, we check if we need to queue streamheader
* buffers (because it's a new client, or because they changed) */
caps = gst_buffer_get_caps (buffer); /* cleaned up after streamheader */
if (!client->caps) {
GST_LOG_OBJECT (sink,
"[fd %5d] no previous caps for this client, send streamheader",
client->fd.fd);
send_streamheader = TRUE;
client->caps = gst_caps_ref (caps);
} else {
/* there were previous caps recorded, so compare */
if (!gst_caps_is_equal (caps, client->caps)) {
const GValue *sh1, *sh2;
/* caps are not equal, but could still have the same streamheader */
s = gst_caps_get_structure (caps, 0);
if (!gst_structure_has_field (s, "streamheader")) {
/* no new streamheader, so nothing new to send */
GST_LOG_OBJECT (sink,
"[fd %5d] new caps do not have streamheader, not sending",
client->fd.fd);
} else {
/* there is a new streamheader */
s = gst_caps_get_structure (client->caps, 0);
if (!gst_structure_has_field (s, "streamheader")) {
/* no previous streamheader, so send the new one */
GST_LOG_OBJECT (sink,
"[fd %5d] previous caps did not have streamheader, sending",
client->fd.fd);
send_streamheader = TRUE;
} else {
/* both old and new caps have streamheader set */
sh1 = gst_structure_get_value (s, "streamheader");
s = gst_caps_get_structure (caps, 0);
sh2 = gst_structure_get_value (s, "streamheader");
if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
GST_LOG_OBJECT (sink,
"[fd %5d] new streamheader different from old, sending",
client->fd.fd);
send_streamheader = TRUE;
}
}
}
}
}
if (G_UNLIKELY (send_streamheader)) {
const GValue *sh;
GArray *buffers;
int i;
GST_LOG_OBJECT (sink,
"[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
client->fd.fd, caps);
s = gst_caps_get_structure (caps, 0);
if (!gst_structure_has_field (s, "streamheader")) {
GST_LOG_OBJECT (sink,
"[fd %5d] no new streamheader, so nothing to send", client->fd.fd);
} else {
GST_LOG_OBJECT (sink,
"[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
client->fd.fd, caps);
sh = gst_structure_get_value (s, "streamheader");
g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
buffers = g_value_peek_pointer (sh);
for (i = 0; i < buffers->len; ++i) {
GValue *bufval;
GstBuffer *buffer;
bufval = &g_array_index (buffers, GValue, i);
g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
buffer = g_value_peek_pointer (bufval);
GST_LOG_OBJECT (sink,
"[fd %5d] queueing streamheader buffer of length %d",
client->fd.fd, GST_BUFFER_SIZE (buffer));
gst_buffer_ref (buffer);
if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
guint8 *header;
guint len;
if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len,
&header)) {
GST_DEBUG_OBJECT (sink,
"[fd %5d] could not create header, removing client",
client->fd.fd);
return FALSE;
}
gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header,
len);
}
client->sending = g_slist_append (client->sending, buffer);
}
}
}
gst_caps_unref (caps);
caps = NULL;
/* now we can send the buffer, possibly sending a GDP header first */
if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
guint8 *header;
guint len;
if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) {
if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len, &header)) {
GST_DEBUG_OBJECT (sink,
"[fd %5d] could not create header, removing client", client->fd.fd);
return FALSE;
......@@ -1143,28 +1257,6 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
client->caps_sent = TRUE;
}
}
/* if we have streamheader buffers, and haven't sent them to this client
* yet, send them out one by one */
if (!client->streamheader_sent) {
GST_DEBUG_OBJECT (sink, "[fd %5d] Sending streamheader, %d buffers", fd,
g_slist_length (sink->streamheader));
if (sink->streamheader) {
GSList *l;
for (l = sink->streamheader; l; l = l->next) {
/* queue stream headers for sending */
res =
gst_multi_fd_sink_client_queue_buffer (sink, client,
GST_BUFFER (l->data));
if (!res) {
GST_DEBUG_OBJECT (sink,
"Failed queueing streamheader, removing client");
return FALSE;
}
}
}
client->streamheader_sent = TRUE;
}
more = TRUE;
do {
......@@ -1645,9 +1737,32 @@ static GstFlowReturn
gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
{
GstMultiFdSink *sink;
GstCaps *bufcaps, *padcaps;
sink = GST_MULTI_FD_SINK (bsink);
/* since we check every buffer for streamheader caps, we need to make
* sure every buffer has caps set */
bufcaps = gst_buffer_get_caps (buf);
padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
/* make sure we have caps on the pad */
if (!padcaps) {
if (!bufcaps) {
GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
("Received first buffer without caps set"));
return GST_FLOW_NOT_NEGOTIATED;
}
}
/* stamp the buffer with previous caps if no caps set */
if (!bufcaps) {
buf = gst_buffer_make_writable (buf);
gst_buffer_set_caps (buf, padcaps);
} else {
gst_caps_unref (bufcaps);
}
/* since we keep this buffer out of the scope of this method */
gst_buffer_ref (buf);
......@@ -1670,9 +1785,11 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
/* if the incoming buffer is marked as IN CAPS, then we assume for now
* it's a streamheader that needs to be sent to each new client, so we
* put it on our internal list of streamheader buffers.
* After that we return, since we only send these out when we get
* non IN_CAPS buffers so we properly keep track of clients that got
* streamheaders. */
* FIXME: we could check if the buffer's contents are in fact part of the
* current streamheader.
*
* We don't send the buffer to the client, since streamheaders are sent
* separately when necessary. */
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) {
sink->previous_buffer_in_caps = TRUE;
GST_DEBUG_OBJECT (sink,
......
......@@ -142,9 +142,10 @@ typedef struct {
GstTCPProtocol protocol;
gboolean caps_sent;
gboolean streamheader_sent;
gboolean new_connection;
GstCaps *caps; /* caps of last queued buffer */
/* stats */
guint64 bytes_sent;
guint64 connect_time;
......@@ -152,7 +153,6 @@ typedef struct {
guint64 last_activity_time;
guint64 dropped_buffers;
guint64 avg_queue_size;
} GstTCPClient;
#define CLIENTS_LOCK_INIT(fdsink) (g_static_rec_mutex_init(&fdsink->clientslock))
......@@ -203,6 +203,8 @@ struct _GstMultiFdSink {
gint buffers_queued; /* number of queued buffers */
gint bytes_queued; /* number of queued bytes */
gint time_queued; /* number of queued time */
guint8 header_flags;
};
struct _GstMultiFdSinkClass {
......
......@@ -31,7 +31,7 @@ GstPad *mysrcpad;
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/x-gdp")
GST_STATIC_CAPS ("application/x-gst-check")
);
GstElement *
......@@ -80,12 +80,16 @@ GST_START_TEST (test_no_clients)
{
GstElement *sink;
GstBuffer *buffer;
GstCaps *caps;
sink = setup_multifdsink ();
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
caps = gst_caps_from_string ("application/x-gst-check");
buffer = gst_buffer_new_and_alloc (4);
gst_buffer_set_caps (buffer, caps);
gst_caps_unref (caps);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
GST_DEBUG ("cleaning up multifdsink");
......@@ -99,6 +103,7 @@ GST_START_TEST (test_add_client)
{
GstElement *sink;
GstBuffer *buffer;
GstCaps *caps;
int pfd[2];
gchar data[4];
guint64 bytes_served;
......@@ -112,7 +117,11 @@ GST_START_TEST (test_add_client)
/* add the client */
g_signal_emit_by_name (sink, "add", pfd[1]);
caps = gst_caps_from_string ("application/x-gst-check");
GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
buffer = gst_buffer_new_and_alloc (4);
gst_buffer_set_caps (buffer, caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
memcpy (GST_BUFFER_DATA (buffer), "dead", 4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
......@@ -123,6 +132,9 @@ GST_START_TEST (test_add_client)
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_multifdsink (sink);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_caps_unref (caps);
}
GST_END_TEST;
......@@ -143,12 +155,13 @@ G_STMT_START { \
/* from the given two data buffers, create two streamheader buffers and
* some caps that match it, and store them in the given pointers
* returns buffers and caps with a refcount of 1 */
* returns one ref to each of the buffers and the caps */
static void
gst_multifdsink_create_streamheader (const gchar * data1,
const gchar * data2, GstBuffer ** hbuf1, GstBuffer ** hbuf2,
GstCaps ** caps)
{
GstBuffer *buf;
GValue array = { 0 };
GValue value = { 0 };
GstStructure *structure;
......@@ -174,12 +187,19 @@ gst_multifdsink_create_streamheader (const gchar * data1,
g_value_init (&array, GST_TYPE_ARRAY);
g_value_init (&value, GST_TYPE_BUFFER);
gst_value_set_buffer (&value, *hbuf1);
/* we take a copy, set it on the array (which refs it), then unref our copy */
buf = gst_buffer_copy (*hbuf1);
gst_value_set_buffer (&value, buf);
ASSERT_BUFFER_REFCOUNT (buf, "copied buffer", 2);
gst_buffer_unref (buf);
gst_value_array_append_value (&array, &value);
g_value_unset (&value);
g_value_init (&value, GST_TYPE_BUFFER);
gst_value_set_buffer (&value, *hbuf2);
buf = gst_buffer_copy (*hbuf2);
gst_value_set_buffer (&value, buf);
ASSERT_BUFFER_REFCOUNT (buf, "copied buffer", 2);
gst_buffer_unref (buf);
gst_value_array_append_value (&array, &value);
g_value_unset (&value);
......@@ -188,6 +208,14 @@ gst_multifdsink_create_streamheader (const gchar * data1,
gst_structure_set_value (structure, "streamheader", &array);
g_value_unset (&array);
ASSERT_CAPS_REFCOUNT (*caps, "streamheader caps", 1);
/* set our streamheadery caps on the buffers */
gst_buffer_set_caps (*hbuf1, *caps);
gst_buffer_set_caps (*hbuf2, *caps);
ASSERT_CAPS_REFCOUNT (*caps, "streamheader caps", 3);
GST_DEBUG ("created streamheader caps %p %" GST_PTR_FORMAT, *caps, *caps);
}
......@@ -196,7 +224,8 @@ gst_multifdsink_create_streamheader (const gchar * data1,
* - sets streamheader caps on the pad
* - pushes the IN_CAPS buffers
* - pushes a buffer
* - verifies that the client received all the data correctly
* - verifies that the client received all the data correctly, and did not
* get multiple copies of the streamheader
* - adds a second client
* - verifies that this second client receives the streamheader caps too, plus
* - the new buffer
......@@ -227,7 +256,8 @@ GST_START_TEST (test_streamheader)
gst_multifdsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2,
&caps);
fail_unless (gst_pad_set_caps (mysrcpad, caps));
gst_caps_unref (caps);
/* one is ours, two on the buffers, and one now on the pad */
ASSERT_CAPS_REFCOUNT (caps, "caps", 4);
fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
......@@ -265,11 +295,21 @@ GST_START_TEST (test_streamheader)
fail_unless_read ("second client", pfd2[0], 4, "deaf");
wait_bytes_served (sink, 36);
gst_buffer_unref (hbuf1);
gst_buffer_unref (hbuf2);
GST_DEBUG ("cleaning up multifdsink");
g_signal_emit_by_name (sink, "remove", pfd1[1]);
g_signal_emit_by_name (sink, "remove", pfd2[1]);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_multifdsink (sink);
ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 1);
ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 1);
gst_buffer_unref (hbuf1);
gst_buffer_unref (hbuf2);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_caps_unref (caps);
}
GST_END_TEST;
......@@ -306,10 +346,15 @@ GST_START_TEST (test_change_streamheader)
/* create caps with streamheader, set the caps, and push the IN_CAPS
* buffers */
gst_multifdsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2,
gst_multifdsink_create_streamheader ("first", "header", &hbuf1, &hbuf2,
&caps);
fail_unless (gst_pad_set_caps (mysrcpad, caps));
gst_caps_unref (caps);
/* one is ours, two on the buffers, and one now on the pad */
ASSERT_CAPS_REFCOUNT (caps, "caps", 4);
/* one to hold for the test and one to give away */
ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
......@@ -327,22 +372,32 @@ GST_START_TEST (test_change_streamheader)
memcpy (GST_BUFFER_DATA (buf), "f00d", 4);
gst_pad_push (mysrcpad, buf);
fail_unless_read ("change: first client", pfd1[0], 4, "babe");
fail_unless_read ("change: first client", pfd1[0], 8, "deadbeef");
fail_unless_read ("change: first client", pfd1[0], 5, "first");
fail_unless_read ("change: first client", pfd1[0], 6, "header");
fail_unless_read ("change: first client", pfd1[0], 4, "f00d");
wait_bytes_served (sink, 16);
//wait_bytes_served (sink, 16);
/* now add the second client */
g_signal_emit_by_name (sink, "add", pfd2[1]);
fail_if_can_read ("second client, no buffer", pfd2[0]);
/* change the streamheader */
/* before we change, multifdsink still has a list of the old streamheaders */
ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
gst_buffer_unref (hbuf1);
gst_buffer_unref (hbuf2);
gst_multifdsink_create_streamheader ("beef", "deadbabe", &hbuf1, &hbuf2,
/* drop our ref to the previous caps */
gst_caps_unref (caps);
gst_multifdsink_create_streamheader ("second", "header", &hbuf1, &hbuf2,
&caps);
fail_unless (gst_pad_set_caps (mysrcpad, caps));
gst_caps_unref (caps);
/* one to hold for the test and one to give away */
ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
......@@ -353,25 +408,35 @@ GST_START_TEST (test_change_streamheader)
/* now push another buffer, which will trigger streamheader for second
* client, but should also send new streamheaders to first client */
buf = gst_buffer_new_and_alloc (4);
memcpy (GST_BUFFER_DATA (buf), "deaf", 4);
buf = gst_buffer_new_and_alloc (8);
memcpy (GST_BUFFER_DATA (buf), "deadbabe", 8);
gst_pad_push (mysrcpad, buf);
/* FIXME: here's a bug - the first client does not get new streamheaders */
fail_unless_read ("first client", pfd1[0], 4, "deaf");
fail_unless_read ("first client", pfd1[0], 6, "second");
fail_unless_read ("first client", pfd1[0], 6, "header");
fail_unless_read ("first client", pfd1[0], 8, "deadbabe");
/* new streamheader data */
fail_unless_read ("second client", pfd2[0], 4, "beef");
fail_unless_read ("second client", pfd2[0], 8, "deadbabe");
fail_unless_read ("second client", pfd2[0], 6, "second");
fail_unless_read ("second client", pfd2[0], 6, "header");
/* we missed the f00d buffer */
fail_unless_read ("second client", pfd2[0], 4, "deaf");
wait_bytes_served (sink, 36);
fail_unless_read ("second client", pfd2[0], 8, "deadbabe");
//wait_bytes_served (sink, 36);
gst_buffer_unref (hbuf1);
gst_buffer_unref (hbuf2);
GST_DEBUG ("cleaning up multifdsink");
g_signal_emit_by_name (sink, "remove", pfd1[1]);
g_signal_emit_by_name (sink, "remove", pfd2[1]);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
/* setting to NULL should have cleared the streamheader */
ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 1);
ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 1);
gst_buffer_unref (hbuf1);
gst_buffer_unref (hbuf2);
cleanup_multifdsink (sink);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_caps_unref (caps);
}
GST_END_TEST;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment