Commit dbf6691e authored by Olivier Crête's avatar Olivier Crête
Browse files

pseudotcp-iostream: Implement better blocking operation

parent ca96bbdc
......@@ -217,6 +217,8 @@ nice_pseudo_tcp_input_stream_read (GInputStream *stream, void *buffer,
NicePseudoTcpInputStreamPrivate *priv;
PseudoTcpSocket *socket = NULL;
gssize len;
GMainContext *ctx = NULL;
GSource *src = NULL;
self = NICE_PSEUDO_TCP_INPUT_STREAM (stream);
priv = nice_pseudo_tcp_input_stream_get_instance_private (self);
......@@ -232,11 +234,24 @@ nice_pseudo_tcp_input_stream_read (GInputStream *stream, void *buffer,
if (len < 0) {
if (pseudo_tcp_socket_get_error (socket) == EWOULDBLOCK) {
/* Block until data is available. */
/* FIXME: Use something like g_socket_condition_wait(). */
while (!g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (stream)))
g_main_context_iteration (NULL, TRUE);
if (ctx == NULL)
ctx = g_main_context_new ();
if (src == NULL) {
src = pseudo_tcp_socket_create_source (socket, G_IO_IN, cancellable);
g_source_set_dummy_callback (src);
g_source_attach (src, ctx);
}
/* Block until data is available. */
while (!g_cancellable_is_cancelled (cancellable) &&
!nice_pseudo_tcp_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (
stream)))
g_main_context_iteration (ctx, TRUE);
if (g_cancellable_is_cancelled (cancellable)) {
if (len > 0)
g_cancellable_set_error_if_cancelled (cancellable, error);
break;
}
continue;
} else {
/* Fatal error (ENOTCONN). */
......@@ -247,6 +262,13 @@ nice_pseudo_tcp_input_stream_read (GInputStream *stream, void *buffer,
}
} while (len < 0);
if (src) {
g_source_destroy (src);
g_source_unref (src);
}
if (ctx)
g_main_context_unref (ctx);
g_object_unref (socket);
return len;
......
......@@ -217,6 +217,8 @@ nice_pseudo_tcp_output_stream_write (GOutputStream *stream, const void *buffer,
NicePseudoTcpOutputStreamPrivate *priv;
PseudoTcpSocket *socket = NULL;
gssize len;
GMainContext *ctx = NULL;
GSource *src = NULL;
self = NICE_PSEUDO_TCP_OUTPUT_STREAM (stream);
priv = nice_pseudo_tcp_output_stream_get_instance_private (self);
......@@ -232,11 +234,25 @@ nice_pseudo_tcp_output_stream_write (GOutputStream *stream, const void *buffer,
if (len < 0) {
if (pseudo_tcp_socket_get_error (socket) == EWOULDBLOCK) {
/* Block until data is available. */
/* FIXME: Use something like g_socket_condition_wait(). */
while (!g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (stream)))
g_main_context_iteration (NULL, TRUE);
if (ctx == NULL)
ctx = g_main_context_new ();
if (src == NULL) {
src = pseudo_tcp_socket_create_source (socket, G_IO_OUT,
cancellable);
g_source_set_dummy_callback (src);
g_source_attach (src, ctx);
}
/* Block until data is available. */
while (!g_cancellable_is_cancelled (cancellable) &&
!nice_pseudo_tcp_output_stream_is_writable (
G_POLLABLE_OUTPUT_STREAM (stream)))
g_main_context_iteration (ctx, TRUE);
if (g_cancellable_is_cancelled (cancellable)) {
if (len > 0)
g_cancellable_set_error_if_cancelled (cancellable, error);
break;
}
continue;
} else {
/* Fatal error (ENOTCONN). */
......@@ -247,6 +263,13 @@ nice_pseudo_tcp_output_stream_write (GOutputStream *stream, const void *buffer,
}
} while (len < 0);
if (src) {
g_source_destroy (src);
g_source_unref (src);
}
if (ctx)
g_main_context_unref (ctx);
g_object_unref (socket);
return len;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment