pseudotcp.c 117 KB
Newer Older
1 2 3
/*
 * This file is part of the Nice GLib ICE library.
 *
4
 * (C) 2010, 2014, 2015 Collabora Ltd.
5
 *  Contact: Philip Withnall
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24

 *
 * The contents of this file are subject to the Mozilla Public License Version
 * 1.1 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * http://www.mozilla.org/MPL/
 *
 * Software distributed under the License is distributed on an "AS IS" basis,
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
 * for the specific language governing rights and limitations under the
 * License.
 *
 * The Original Code is the Nice GLib ICE library.
 *
 * The Initial Developers of the Original Code are Collabora Ltd and Nokia
 * Corporation. All Rights Reserved.
 *
 * Contributors:
 *   Youness Alaoui, Collabora Ltd.
25
 *   Philip Withnall, Collabora Ltd.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
 *
 * Alternatively, the contents of this file may be used under the terms of the
 * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
 * case the provisions of LGPL are applicable instead of those above. If you
 * wish to allow use of your version of this file only under the terms of the
 * LGPL and not to allow others to use your version of this file under the
 * MPL, indicate your decision by deleting the provisions above and replace
 * them with the notice and other provisions required by the LGPL. If you do
 * not delete the provisions above, a recipient may use your version of this
 * file under either the MPL or the LGPL.
 */

/* Reproducing license from libjingle for copied code */

/*
 * libjingle
 * Copyright 2004--2005, Google Inc.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *  1. Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *  2. Redistributions in binary form must reproduce the above copyright notice,
 *     this list of conditions and the following disclaimer in the documentation
 *     and/or other materials provided with the distribution.
 *  3. The name of the author may not be used to endorse or promote products
 *     derived from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#include <stdlib.h>
#include <errno.h>
#include <string.h>

#include <glib.h>
72 73

#ifndef G_OS_WIN32
74 75
#  include <arpa/inet.h>
#endif
76 77

#include "pseudotcp.h"
78
#include "agent-priv.h"
79 80
#include "pseudo-tcp-input-stream.h"
#include "pseudo-tcp-output-stream.h"
81

82
struct _PseudoTcpSocketClass {
83
    GIOStreamClass parent_class;
84 85 86 87
};

typedef struct _PseudoTcpSocketPrivate PseudoTcpSocketPrivate;

88

89
struct _PseudoTcpSocket {
90
    GIOStream parent;
91 92
    PseudoTcpSocketPrivate *priv;
};
93
G_DEFINE_TYPE (PseudoTcpSocket, pseudo_tcp_socket, G_TYPE_IO_STREAM);
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121

//////////////////////////////////////////////////////////////////////
// Network Constants
//////////////////////////////////////////////////////////////////////

// Standard MTUs
const guint16 PACKET_MAXIMUMS[] = {
  65535,    // Theoretical maximum, Hyperchannel
  32000,    // Nothing
  17914,    // 16Mb IBM Token Ring
  8166,   // IEEE 802.4
  //4464,   // IEEE 802.5 (4Mb max)
  4352,   // FDDI
  //2048,   // Wideband Network
  2002,   // IEEE 802.5 (4Mb recommended)
  //1536,   // Expermental Ethernet Networks
  //1500,   // Ethernet, Point-to-Point (default)
  1492,   // IEEE 802.3
  1006,   // SLIP, ARPANET
  //576,    // X.25 Networks
  //544,    // DEC IP Portal
  //512,    // NETBIOS
  508,    // IEEE 802/Source-Rt Bridge, ARCNET
  296,    // Point-to-Point (low delay)
  //68,     // Official minimum
  0,      // End of list marker
};

122 123 124
// FIXME: This is a reasonable MTU, but we should get it from the lower layer
#define DEF_MTU 1400
#define MAX_PACKET 65532
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
// Note: we removed lowest level because packet overhead was larger!
#define MIN_PACKET 296

// (+ up to 40 bytes of options?)
#define IP_HEADER_SIZE 20
#define ICMP_HEADER_SIZE 8
#define UDP_HEADER_SIZE 8
// TODO: Make JINGLE_HEADER_SIZE transparent to this code?
// when relay framing is in use
#define JINGLE_HEADER_SIZE 64

//////////////////////////////////////////////////////////////////////
// Global Constants and Functions
//////////////////////////////////////////////////////////////////////
//
//    0                   1                   2                   3
//    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
//    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//  0 |                      Conversation Number                      |
//    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//  4 |                        Sequence Number                        |
//    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//  8 |                     Acknowledgment Number                     |
//    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//    |               |   |U|A|P|R|S|F|                               |
// 12 |    Control    |   |R|C|S|S|Y|I|            Window             |
//    |               |   |G|K|H|T|N|N|                               |
//    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// 16 |                       Timestamp sending                       |
//    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// 20 |                      Timestamp receiving                      |
//    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// 24 |                             data                              |
//    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
//////////////////////////////////////////////////////////////////////

#define MAX_SEQ 0xFFFFFFFF
#define HEADER_SIZE 24

#define PACKET_OVERHEAD (HEADER_SIZE + UDP_HEADER_SIZE + \
      IP_HEADER_SIZE + JINGLE_HEADER_SIZE)

168 169
// MIN_RTO = 1 second (RFC6298, Sec 2.4)
#define MIN_RTO     1000
170
#define DEF_RTO     1000 /* 1 seconds (RFC 6298 sect 2.1) */
171
#define MAX_RTO    60000 /* 60 seconds */
172
#define DEFAULT_ACK_DELAY    100 /* 100 milliseconds */
173
#define DEFAULT_NO_DELAY     TRUE
174

175 176
#define DEFAULT_RCV_BUF_SIZE (60 * 1024)
#define DEFAULT_SND_BUF_SIZE (90 * 1024)
177
#define MAX_BUFFER (128 * 1024)
178

179 180 181 182 183 184
/* Retransmission limits for a single segment. RFC 1122, §4.2.3.5. */
#define DEFAULT_RETRIES1 10  /* SHOULD correspond to at least 3 */
#define DEFAULT_RETRIES2 100  /* SHOULD correspond to at least 100 seconds … at the current RTO */
#define DEFAULT_SYN_RETRIES 180  /* MUST … provide retransmission … for at least 3 minutes */
#define DEFAULT_SYN_ACK_RETRIES DEFAULT_SYN_RETRIES  /* no RFC guidance provided */

185 186
/* NOTE: This must fit in 8 bits. This is used on the wire. */
typedef enum {
187
  /* Google-provided options: */
188 189 190 191
  TCP_OPT_EOL = 0,  /* end of list */
  TCP_OPT_NOOP = 1,  /* no-op */
  TCP_OPT_MSS = 2,  /* maximum segment size */
  TCP_OPT_WND_SCALE = 3,  /* window scale factor */
192 193
  /* libnice extensions: */
  TCP_OPT_FIN_ACK = 254,  /* FIN-ACK support */
194
} TcpOption;
195 196


197 198 199 200 201
/*
#define FLAG_SYN 0x02
#define FLAG_ACK 0x10
*/

202 203 204
/* NOTE: This must fit in 5 bits. This is used on the wire. */
typedef enum {
  FLAG_NONE = 0,
205
  FLAG_FIN = 1 << 0,
206 207 208
  FLAG_CTL = 1 << 1,
  FLAG_RST = 1 << 2,
} TcpFlags;
209 210 211 212 213 214 215 216

#define CTL_CONNECT  0
//#define CTL_REDIRECT  1
#define CTL_EXTRA 255


#define CTRL_BOUND 0x80000000

217 218 219 220 221
/* Maximum segment lifetime (1 minute).
 * RFC 793, §3.3 specifies 2 minutes; but Linux uses 1 minute, so let’s go with
 * that. */
#define TCP_MSL (60 * 1000)

222 223 224 225
// If there are no pending clocks, wake up every 4 seconds
#define DEFAULT_TIMEOUT 4000
// If the connection is closed, once per minute
#define CLOSED_TIMEOUT (60 * 1000)
226 227 228 229 230 231 232 233
/* Timeout after reaching the TIME_WAIT state, in milliseconds.
 * See: RFC 1122, §4.2.2.13.
 *
 * XXX: Since we can control the underlying layer’s channel ID, we can guarantee
 * delayed segments won’t affect subsequent connections, so can radically
 * shorten the TIME-WAIT timeout (to the extent that it basically doesn’t
 * exist). It would normally be (2 * TCP_MSL). */
#define TIME_WAIT_TIMEOUT 1
234 235 236 237

//////////////////////////////////////////////////////////////////////
// Helper Functions
//////////////////////////////////////////////////////////////////////
238 239 240 241
#ifndef G_OS_WIN32
#  define min(first, second) ((first) < (second) ? (first) : (second))
#  define max(first, second) ((first) > (second) ? (first) : (second))
#endif
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278

static guint32
bound(guint32 lower, guint32 middle, guint32 upper)
{
   return min (max (lower, middle), upper);
}

static gboolean
time_is_between(guint32 later, guint32 middle, guint32 earlier)
{
  if (earlier <= later) {
    return ((earlier <= middle) && (middle <= later));
  } else {
    return !((later < middle) && (middle < earlier));
  }
}

static gint32
time_diff(guint32 later, guint32 earlier)
{
  guint32 LAST = 0xFFFFFFFF;
  guint32 HALF = 0x80000000;
  if (time_is_between(earlier + HALF, later, earlier)) {
    if (earlier <= later) {
      return (long)(later - earlier);
    } else {
      return (long)(later + (LAST - earlier) + 1);
    }
  } else {
    if (later <= earlier) {
      return -(long) (earlier - later);
    } else {
      return -(long)(earlier + (LAST - later) + 1);
    }
  }
}


////////////////////////////////////////////////////////
// PseudoTcpFifo works exactly like FifoBuffer in libjingle
////////////////////////////////////////////////////////


typedef struct {
  guint8 *buffer;
  gsize buffer_length;
  gsize data_length;
  gsize read_position;
} PseudoTcpFifo;


static void
pseudo_tcp_fifo_init (PseudoTcpFifo *b, gsize size)
{
  b->buffer = g_slice_alloc (size);
  b->buffer_length = size;
}

static void
pseudo_tcp_fifo_clear (PseudoTcpFifo *b)
{
  if (b->buffer)
    g_slice_free1 (b->buffer_length, b->buffer);
  b->buffer = NULL;
  b->buffer_length = 0;
}

static gsize
pseudo_tcp_fifo_get_buffered (PseudoTcpFifo *b)
{
  return b->data_length;
}

static gboolean
pseudo_tcp_fifo_set_capacity (PseudoTcpFifo *b, gsize size)
{
  if (b->data_length > size)
    return FALSE;

  if (size != b->data_length) {
    guint8 *buffer = g_slice_alloc (size);
    gsize copy = b->data_length;
    gsize tail_copy = min (copy, b->buffer_length - b->read_position);

    memcpy (buffer, &b->buffer[b->read_position], tail_copy);
    memcpy (buffer + tail_copy, &b->buffer[0], copy - tail_copy);
    g_slice_free1 (b->buffer_length, b->buffer);
    b->buffer = buffer;
    b->buffer_length = size;
    b->read_position = 0;
  }

  return TRUE;
}

static void
pseudo_tcp_fifo_consume_read_data (PseudoTcpFifo *b, gsize size)
{
  g_assert (size <= b->data_length);

  b->read_position = (b->read_position + size) % b->buffer_length;
  b->data_length -= size;
}

static void
pseudo_tcp_fifo_consume_write_buffer (PseudoTcpFifo *b, gsize size)
{
  g_assert (size <= b->buffer_length - b->data_length);

  b->data_length += size;
}

static gsize
pseudo_tcp_fifo_get_write_remaining (PseudoTcpFifo *b)
{
  return b->buffer_length - b->data_length;
}

static gsize
pseudo_tcp_fifo_read_offset (PseudoTcpFifo *b, guint8 *buffer, gsize bytes,
    gsize offset)
{
  gsize available = b->data_length - offset;
  gsize read_position = (b->read_position + offset) % b->buffer_length;
  gsize copy = min (bytes, available);
  gsize tail_copy = min(copy, b->buffer_length - read_position);

  /* EOS */
  if (offset >= b->data_length)
    return 0;

  memcpy(buffer, &b->buffer[read_position], tail_copy);
  memcpy(buffer + tail_copy, &b->buffer[0], copy - tail_copy);

  return copy;
}

static gsize
pseudo_tcp_fifo_write_offset (PseudoTcpFifo *b, const guint8 *buffer,
    gsize bytes, gsize offset)
{
  gsize available = b->buffer_length - b->data_length - offset;
  gsize write_position = (b->read_position + b->data_length + offset)
      % b->buffer_length;
  gsize copy = min (bytes, available);
  gsize tail_copy = min(copy, b->buffer_length - write_position);

  if (b->data_length + offset >= b->buffer_length) {
    return 0;
  }

  memcpy(&b->buffer[write_position], buffer, tail_copy);
  memcpy(&b->buffer[0], buffer + tail_copy, copy - tail_copy);

  return copy;
}

static gsize
pseudo_tcp_fifo_read (PseudoTcpFifo *b, guint8 *buffer, gsize bytes)
{
  gsize copy;

  copy = pseudo_tcp_fifo_read_offset (b, buffer, bytes, 0);

  b->read_position = (b->read_position + copy) % b->buffer_length;
  b->data_length -= copy;

  return copy;
}

static gsize
pseudo_tcp_fifo_write (PseudoTcpFifo *b, const guint8 *buffer, gsize bytes)
{
  gsize copy;

  copy = pseudo_tcp_fifo_write_offset (b, buffer, bytes, 0);
  b->data_length += copy;

  return copy;
}


423 424 425 426
//////////////////////////////////////////////////////////////////////
// PseudoTcp
//////////////////////////////////////////////////////////////////////

427
/* Only used if FIN-ACK support is disabled. */
428 429 430 431 432 433 434 435 436
typedef enum {
  SD_NONE,
  SD_GRACEFUL,
  SD_FORCEFUL
} Shutdown;

typedef enum {
  sfNone,
  sfDelayedAck,
437 438 439
  sfImmediateAck,
  sfFin,
  sfRst,
440
  sfDuplicateAck,
441 442 443 444
} SendFlags;

typedef struct {
  guint32 conv, seq, ack;
445
  TcpFlags flags;
446 447 448 449 450 451 452 453
  guint16 wnd;
  const gchar * data;
  guint32 len;
  guint32 tsval, tsecr;
} Segment;

typedef struct {
  guint32 seq, len;
454
  guint8 xmit;  /* retransmission count */
455
  TcpFlags flags;
456 457 458 459 460 461
} SSegment;

typedef struct {
  guint32 seq, len;
} RSegment;

462 463 464 465 466 467 468 469
/**
 * ClosedownSource:
 * @CLOSEDOWN_LOCAL: Error detected locally, or connection forcefully closed
 * locally.
 * @CLOSEDOWN_REMOTE: RST segment received from the peer.
 *
 * Reasons for calling closedown().
 *
Olivier Crête's avatar
Olivier Crête committed
470
 * Since: 0.1.8
471 472 473 474 475 476
 */
typedef enum {
  CLOSEDOWN_LOCAL,
  CLOSEDOWN_REMOTE,
} ClosedownSource;

477
typedef struct _PseudoTcpSource PseudoTcpSource;
478

479 480
static GPrivate pseudotcp_error;

481
struct _PseudoTcpSocketPrivate {
Olivier Crête's avatar
Olivier Crête committed
482
  PseudoTcpCallbacks callbacks;
483 484 485
  GDatagramBased *base_socket;  /* owned */
  GMainContext *main_context;  /* owned */

486 487
  GRecMutex mutex;

488
  GTask *connect_task;
489
  GTask *close_task;
490

491
  GPtrArray/*<owned PseudoTcpSource>*/ *sources;  /* owned */
492
  GSource *base_source;  /* owned */
493

494
  Shutdown shutdown;  /* only used if !support_fin_ack */
495
  gboolean shutdown_reads;
496

497 498 499
  GInputStream *input_stream;  /* owned; nullable */
  GOutputStream *output_stream;  /* owned; nullable */

500 501 502
  // TCB data
  PseudoTcpState state;
  guint32 conv;
Olivier Crête's avatar
Olivier Crête committed
503
  gboolean bReadEnable, bWriteEnable;
504 505 506 507
  guint32 last_traffic;

  // Incoming data
  GList *rlist;
508 509 510
  guint32 rbuf_len, rcv_nxt, rcv_wnd, lastrecv;
  guint8 rwnd_scale; // Window scale factor
  PseudoTcpFifo rbuf;
511
  guint32 rcv_fin;  /* sequence number of the received FIN octet, or 0 */
512 513

  // Outgoing data
514
  GQueue slist;
515
  GQueue unsent_slist;
516 517
  guint32 sbuf_len, snd_nxt, snd_wnd, lastsend;
  guint32 snd_una;  /* oldest unacknowledged sequence number */
518 519 520
  guint8 swnd_scale; // Window scale factor
  PseudoTcpFifo sbuf;

521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
  // Maximum segment size, estimated protocol level, largest segment sent
  guint32 mss, msslevel, largest, mtu_advise;
  // Retransmit timer
  guint32 rto_base;

  // Timestamp tracking
  guint32 ts_recent, ts_lastack;

  // Round-trip calculation
  guint32 rx_rttvar, rx_srtt, rx_rto;

  // Congestion avoidance, Fast retransmit/recovery, Delayed ACKs
  guint32 ssthresh, cwnd;
  guint8 dup_acks;
  guint32 recover;
536
  gboolean fast_recovery;
537
  guint32 t_ack;  /* time a delayed ack was scheduled; 0 if no acks scheduled */
538
  guint32 last_acked_ts;
539

540 541
  gboolean use_nagling;
  guint32 ack_delay;
542 543 544 545

  // This is used by unit tests to test backward compatibility of
  // PseudoTcp implementations that don't support window scaling.
  gboolean support_wnd_scale;
546 547 548 549

  /* Current time. Typically only used for testing, when non-zero. When zero,
   * the system monotonic clock is used. Units: monotonic milliseconds. */
  guint32 current_time;
550

551 552 553 554
  /* TCP clock data. */
  GSource *tcp_clock_source;  /* owned */
  guint64 last_clock_timeout;

555 556 557 558
  /* This is used by compatible implementations (with the TCP_OPT_FIN_ACK
   * option) to enable correct FIN-ACK connection termination. Defaults to
   * TRUE unless no compatible option is received. */
  gboolean support_fin_ack;
559 560 561 562 563 564 565 566 567 568

  /* R1 and R2 from RFC 1122, §4.2.3.5.
   * 0 to retry indefinitely. */
  guint retries1;
  guint retries2;

  /* tcp_syn_retries and tcp_synack_retries from tcp(7).
   * 0 to retry indefinitely. */
  guint syn_retries;
  guint syn_ack_retries;
569 570
};

571 572
/* TODO: MAX_TCP_MTU */

573 574 575 576
#define LARGER(a,b) (((a) - (b) - 1) < (G_MAXUINT32 >> 1))
#define LARGER_OR_EQUAL(a,b) (((a) - (b)) < (G_MAXUINT32 >> 1))
#define SMALLER(a,b) LARGER ((b),(a))
#define SMALLER_OR_EQUAL(a,b) LARGER_OR_EQUAL ((b),(a))
577 578 579 580 581

/* properties */
enum
{
  PROP_CONVERSATION = 1,
Olivier Crête's avatar
Olivier Crête committed
582
  PROP_CALLBACKS,
583 584
  PROP_BASE_SOCKET,
  PROP_MAIN_CONTEXT,
585
  PROP_STATE,
586 587
  PROP_ACK_DELAY,
  PROP_NO_DELAY,
588 589
  PROP_RCV_BUF,
  PROP_SND_BUF,
590
  PROP_SUPPORT_FIN_ACK,
591 592 593 594
  PROP_RETRIES1,
  PROP_RETRIES2,
  PROP_SYN_RETRIES,
  PROP_SYN_ACK_RETRIES,
595 596
};

597
static void pseudo_tcp_socket_constructed (GObject *obj);
598 599 600 601
static void pseudo_tcp_socket_get_property (GObject *object, guint property_id,
    GValue *value,  GParamSpec *pspec);
static void pseudo_tcp_socket_set_property (GObject *object, guint property_id,
    const GValue *value, GParamSpec *pspec);
602
static void pseudo_tcp_socket_finalize (GObject *object);
603

604
static void queue_connect_message (PseudoTcpSocket *self);
605
static guint32 queue (PseudoTcpSocket *self, const gchar *data,
606
    guint32 len, TcpFlags flags);
607
static PseudoTcpWriteResult packet(PseudoTcpSocket *self, guint32 seq,
608
    TcpFlags flags, guint32 offset, guint32 len, guint32 now);
609 610 611
static gboolean parse (PseudoTcpSocket *self,
    const guint8 *_header_buf, gsize header_buf_len,
    const guint8 *data_buf, gsize data_buf_len);
612
static gboolean process(PseudoTcpSocket *self, Segment *seg);
613
static int transmit(PseudoTcpSocket *self, SSegment *sseg, guint32 now);
614
static void attempt_send(PseudoTcpSocket *self, SendFlags sflags);
615 616
static void closedown (PseudoTcpSocket *self, guint32 err,
    ClosedownSource source);
617
static void adjustMTU(PseudoTcpSocket *self);
618 619 620 621
static void parse_options (PseudoTcpSocket *self, const guint8 *data,
    guint32 len);
static void resize_send_buffer (PseudoTcpSocket *self, guint32 new_size);
static void resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size);
622
static void set_state (PseudoTcpSocket *self, PseudoTcpState new_state);
623 624
static void set_state_established (PseudoTcpSocket *self);
static void set_state_closed (PseudoTcpSocket *self, guint32 err);
625

626
static const gchar *pseudo_tcp_state_get_name (PseudoTcpState state);
627 628
static gboolean pseudo_tcp_state_has_sent_fin (PseudoTcpState state);
static gboolean pseudo_tcp_state_has_received_fin (PseudoTcpState state);
629
static gboolean pseudo_tcp_state_has_received_fin_ack (PseudoTcpState state);
630 631 632
static gboolean pseudo_tcp_socket_is_closed_locked (PseudoTcpSocket *self);
static gint pseudo_tcp_socket_get_available_bytes_locked (
    PseudoTcpSocket *self);
633

634 635 636 637
static PseudoTcpSource *
pseudo_tcp_source_new (PseudoTcpSocket *socket, GIOCondition condition,
    GCancellable *cancellable);
static void
638
pseudo_tcp_source_update (PseudoTcpSocket *self,
639
    PseudoTcpSource *pseudo_tcp_source, gboolean locked);
640 641 642 643 644 645 646

static gboolean
notify_pseudo_tcp_socket_base (GDatagramBased *datagram_based,
    GIOCondition condition, gpointer user_data);

static void
adjust_tcp_clock (PseudoTcpSocket *self);
647 648
static void
update_sources (PseudoTcpSocket *self);
649 650 651 652 653 654 655 656

static GInputStream *
pseudo_tcp_socket_get_input_stream (GIOStream *stream);
static GOutputStream *
pseudo_tcp_socket_get_output_stream (GIOStream *stream);
static gboolean
pseudo_tcp_socket_io_stream_close (GIOStream *stream, GCancellable *cancellable,
    GError **error);
657 658 659 660 661 662 663 664
static void
pseudo_tcp_socket_io_stream_close_async (GIOStream *stream, int io_priority,
    GCancellable *cancellable, GAsyncReadyCallback callback,
    gpointer user_data);
static gboolean
pseudo_tcp_socket_io_stream_close_finish (GIOStream *stream, GAsyncResult *res,
    GError **error);

665

666 667 668
// The following logging is for detailed (packet-level) pseudotcp analysis only.
static PseudoTcpDebugLevel debug_level = PSEUDO_TCP_DEBUG_NONE;

669 670
#define DEBUG(level, fmt, ...)                                          \
  if (debug_level >= level)                                             \
671 672
    g_log (level == PSEUDO_TCP_DEBUG_NORMAL ? "libnice-pseudotcp" : "libnice-pseudotcp-verbose", G_LOG_LEVEL_DEBUG, "PseudoTcpSocket %p %s: " fmt, \
        self, pseudo_tcp_state_get_name (self->priv->state), ## __VA_ARGS__)
673 674 675 676 677 678

void
pseudo_tcp_set_debug_level (PseudoTcpDebugLevel level)
{
  debug_level = level;
}
679

680 681 682 683 684 685 686 687 688 689 690 691
static guint32
get_current_time (PseudoTcpSocket *socket)
{
  if (G_UNLIKELY (socket->priv->current_time != 0))
    return socket->priv->current_time;

  return g_get_monotonic_time () / 1000;
}

void
pseudo_tcp_socket_set_time (PseudoTcpSocket *self, guint32 current_time)
{
692
  g_rec_mutex_lock (&self->priv->mutex);
693
  self->priv->current_time = current_time;
694
  g_rec_mutex_unlock (&self->priv->mutex);
695 696
}

697 698 699 700
static void
pseudo_tcp_socket_class_init (PseudoTcpSocketClass *cls)
{
  GObjectClass *object_class = G_OBJECT_CLASS (cls);
701
  GIOStreamClass *io_stream_class = G_IO_STREAM_CLASS (cls);
702

703
  object_class->constructed = pseudo_tcp_socket_constructed;
704 705
  object_class->get_property = pseudo_tcp_socket_get_property;
  object_class->set_property = pseudo_tcp_socket_set_property;
706
  object_class->finalize = pseudo_tcp_socket_finalize;
707

708 709 710
  io_stream_class->get_input_stream = pseudo_tcp_socket_get_input_stream;
  io_stream_class->get_output_stream = pseudo_tcp_socket_get_output_stream;
  io_stream_class->close_fn = pseudo_tcp_socket_io_stream_close;
711 712
  io_stream_class->close_async = pseudo_tcp_socket_io_stream_close_async;
  io_stream_class->close_finish = pseudo_tcp_socket_io_stream_close_finish;
713

714 715 716 717 718 719
  g_object_class_install_property (object_class, PROP_CONVERSATION,
      g_param_spec_uint ("conversation", "TCP Conversation ID",
          "The TCP Conversation ID",
          0, G_MAXUINT32, 0,
          G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

Olivier Crête's avatar
Olivier Crête committed
720 721 722 723 724
  g_object_class_install_property (object_class, PROP_CALLBACKS,
      g_param_spec_pointer ("callbacks", "PseudoTcp socket callbacks",
          "Structure with the callbacks to call when PseudoTcp events happen",
          G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));

725 726 727 728 729 730 731 732 733 734 735
  g_object_class_install_property (object_class, PROP_BASE_SOCKET,
      g_param_spec_object ("base-socket", "Base UDP Socket",
          "Underlying UDP socket to use as a transport",
          G_TYPE_DATAGRAM_BASED,
          G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (object_class, PROP_MAIN_CONTEXT,
      g_param_spec_boxed ("main-context", "Main Context",
          "Main context to attach timers to",
          G_TYPE_MAIN_CONTEXT,
          G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
736 737 738 739 740 741 742

  g_object_class_install_property (object_class, PROP_STATE,
      g_param_spec_uint ("state", "PseudoTcp State",
          "The current state (enum PseudoTcpState) of the PseudoTcp socket",
          TCP_LISTEN, TCP_CLOSED, TCP_LISTEN,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

743 744 745 746 747 748 749 750 751 752 753
  g_object_class_install_property (object_class, PROP_ACK_DELAY,
      g_param_spec_uint ("ack-delay", "ACK Delay",
          "Delayed ACK timeout (in milliseconds)",
          0, G_MAXUINT, DEFAULT_ACK_DELAY,
          G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (object_class, PROP_NO_DELAY,
      g_param_spec_boolean ("no-delay", "No Delay",
          "Disable the Nagle algorithm (like the TCP_NODELAY option)",
          DEFAULT_NO_DELAY,
          G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
754 755 756 757 758 759 760 761 762 763 764 765

  g_object_class_install_property (object_class, PROP_RCV_BUF,
      g_param_spec_uint ("rcv-buf", "Receive Buffer",
          "Receive Buffer size",
          1, G_MAXUINT, DEFAULT_RCV_BUF_SIZE,
          G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (object_class, PROP_SND_BUF,
      g_param_spec_uint ("snd-buf", "Send Buffer",
          "Send Buffer size",
          1, G_MAXUINT, DEFAULT_SND_BUF_SIZE,
          G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
766 767 768 769 770 771 772 773 774 775 776 777 778

  /**
   * PseudoTcpSocket:support-fin-ack:
   *
   * Whether to support the FIN–ACK extension to the pseudo-TCP protocol for
   * this socket. The extension is only compatible with other libnice pseudo-TCP
   * stacks, and not with Jingle pseudo-TCP stacks. If enabled, support is
   * negotiatied on connection setup, so it is safe for a #PseudoTcpSocket with
   * support enabled to be used with one with it disabled, or with a Jingle
   * pseudo-TCP socket which doesn’t support it at all.
   *
   * Support is enabled by default.
   *
Olivier Crête's avatar
Olivier Crête committed
779
   * Since: 0.1.8
780 781 782 783 784 785
   */
  g_object_class_install_property (object_class, PROP_SUPPORT_FIN_ACK,
      g_param_spec_boolean ("support-fin-ack", "Support FIN–ACK",
          "Whether to enable the optional FIN–ACK support.",
          TRUE,
          G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859

  /* FIXME: Might want to implement the equivalent of TCP_USER_TIMEOUT to handle
   * connection and retransmission timeouts. See tcp(7). However, best to make
   * it a per-operation timeout rather than a per-socket option. */

  /**
   * PseudoTcpSocket:retries1:
   *
   * The number of times #PseudoTcpSocket will attempt to retransmit a packet on
   * an established connection normally. Once this number of retransmits is
   * exceeded, the socket might have the underlying network layer update its
   * route before each new retransmit, if possible.
   *
   * Set to 0 to retry indefinitely.
   *
   * Since: UNRELEASED
   */
  g_object_class_install_property (object_class, PROP_RETRIES1,
      g_param_spec_uint ("retries1", "Established retries 1",
          "Number of retransmission attempts on an established connection.",
          0, G_MAXUINT, DEFAULT_RETRIES1,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * PseudoTcpSocket:retries2:
   *
   * The maximum number of times #PseudoTcpSocket will attempt to retransmit a
   * packet on an established connection before giving up and returning
   * %G_IO_ERROR_TIMED_OUT.
   *
   * Set to 0 to retry indefinitely.
   *
   * Since: UNRELEASED
   */
  g_object_class_install_property (object_class, PROP_RETRIES2,
      g_param_spec_uint ("retries2", "Established retries 2",
          "Maximum number of retransmission attempts on an established "
          "connection.",
          0, G_MAXUINT, DEFAULT_RETRIES2,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * PseudoTcpSocket:syn-retries:
   *
   * The maximum number of times #PseudoTcpSocket will attempt to retransmit an
   * initial SYN packet when attempting to connect, before giving up and
   * returning %G_IO_ERROR_TIMED_OUT.
   *
   * Set to 0 to retry indefinitely.
   *
   * Since: UNRELEASED
   */
  g_object_class_install_property (object_class, PROP_SYN_RETRIES,
      g_param_spec_uint ("syn-retries", "SYN retries",
          "Maximum number of retransmission attempts for SYN packets.",
          0, G_MAXUINT, DEFAULT_SYN_RETRIES,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * PseudoTcpSocket:syn-ack-retries:
   *
   * The maximum number of times #PseudoTcpSocket will attempt to retransmit a
   * SYN–ACK packet when attempting to connect, before giving up and returning
   * %G_IO_ERROR_TIMED_OUT.
   *
   * Set to 0 to retry indefinitely.
   *
   * Since: UNRELEASED
   */
  g_object_class_install_property (object_class, PROP_SYN_ACK_RETRIES,
      g_param_spec_uint ("syn-ack-retries", "SYN–ACK retries",
          "Maximum number of retransmission attempts for SYN–ACK packets.",
          0, G_MAXUINT, DEFAULT_SYN_ACK_RETRIES,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
860 861 862 863 864 865 866 867 868 869 870 871 872 873
}

static void
pseudo_tcp_socket_get_property (GObject *object,
                                  guint property_id,
                                  GValue *value,
                                  GParamSpec *pspec)
{
  PseudoTcpSocket *self = PSEUDO_TCP_SOCKET (object);

  switch (property_id) {
    case PROP_CONVERSATION:
      g_value_set_uint (value, self->priv->conv);
      break;
874 875 876 877 878
    case PROP_BASE_SOCKET:
      g_value_set_object (value, self->priv->base_socket);
      break;
    case PROP_MAIN_CONTEXT:
      g_value_set_boxed (value, self->priv->main_context);
879 880 881 882
      break;
    case PROP_STATE:
      g_value_set_uint (value, self->priv->state);
      break;
883 884 885 886 887 888
    case PROP_ACK_DELAY:
      g_value_set_uint (value, self->priv->ack_delay);
      break;
    case PROP_NO_DELAY:
      g_value_set_boolean (value, !self->priv->use_nagling);
      break;
889 890 891 892 893 894
    case PROP_RCV_BUF:
      g_value_set_uint (value, self->priv->rbuf_len);
      break;
    case PROP_SND_BUF:
      g_value_set_uint (value, self->priv->sbuf_len);
      break;
895 896 897
    case PROP_SUPPORT_FIN_ACK:
      g_value_set_boolean (value, self->priv->support_fin_ack);
      break;
898 899 900 901 902 903 904 905 906 907 908 909
    case PROP_RETRIES1:
      g_value_set_uint (value, self->priv->retries1);
      break;
    case PROP_RETRIES2:
      g_value_set_uint (value, self->priv->retries2);
      break;
    case PROP_SYN_RETRIES:
      g_value_set_uint (value, self->priv->syn_retries);
      break;
    case PROP_SYN_ACK_RETRIES:
      g_value_set_uint (value, self->priv->syn_ack_retries);
      break;
910 911 912 913 914 915 916 917 918 919 920 921 922 923
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
      break;
  }
}

static void
pseudo_tcp_socket_set_property (GObject *object,
                                  guint property_id,
                                  const GValue *value,
                                  GParamSpec *pspec)
{
  PseudoTcpSocket *self = PSEUDO_TCP_SOCKET (object);

924 925 926
  if (!(pspec->flags & (G_PARAM_CONSTRUCT | G_PARAM_CONSTRUCT_ONLY)))
    g_rec_mutex_lock (&self->priv->mutex);

927 928 929 930
  switch (property_id) {
    case PROP_CONVERSATION:
      self->priv->conv = g_value_get_uint (value);
      break;
Olivier Crête's avatar
Olivier Crête committed
931 932 933 934 935
     case PROP_CALLBACKS:
       {
         PseudoTcpCallbacks *c = g_value_get_pointer (value);
         self->priv->callbacks = *c;
       }
936
       break;
937 938 939 940 941 942 943
    case PROP_BASE_SOCKET:
      g_assert (self->priv->base_socket == NULL);
      self->priv->base_socket = g_value_dup_object (value);
      break;
    case PROP_MAIN_CONTEXT:
      g_assert (self->priv->main_context == NULL);
      self->priv->main_context = g_value_dup_boxed (value);
944
      break;
945 946 947 948 949 950
    case PROP_ACK_DELAY:
      self->priv->ack_delay = g_value_get_uint (value);
      break;
    case PROP_NO_DELAY:
      self->priv->use_nagling = !g_value_get_boolean (value);
      break;
951 952 953 954 955 956 957 958
    case PROP_RCV_BUF:
      g_return_if_fail (self->priv->state == TCP_LISTEN);
      resize_receive_buffer (self, g_value_get_uint (value));
      break;
    case PROP_SND_BUF:
      g_return_if_fail (self->priv->state == TCP_LISTEN);
      resize_send_buffer (self, g_value_get_uint (value));
      break;
959 960 961
    case PROP_SUPPORT_FIN_ACK:
      self->priv->support_fin_ack = g_value_get_boolean (value);
      break;
962 963 964 965 966 967 968 969 970 971 972 973
    case PROP_RETRIES1:
      self->priv->retries1 = g_value_get_uint (value);
      break;
    case PROP_RETRIES2:
      self->priv->retries2 = g_value_get_uint (value);
      break;
    case PROP_SYN_RETRIES:
      self->priv->syn_retries = g_value_get_uint (value);
      break;
    case PROP_SYN_ACK_RETRIES:
      self->priv->syn_ack_retries = g_value_get_uint (value);
      break;
974 975 976 977
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
      break;
  }
978 979 980

  if (!(pspec->flags & (G_PARAM_CONSTRUCT | G_PARAM_CONSTRUCT_ONLY)))
    g_rec_mutex_unlock (&self->priv->mutex);
981 982 983
}

static void
984
pseudo_tcp_socket_finalize (GObject *object)
985 986 987
{
  PseudoTcpSocket *self = PSEUDO_TCP_SOCKET (object);
  PseudoTcpSocketPrivate *priv = self->priv;
988
  GList *i;
989
  SSegment *sseg;
990 991 992 993

  if (priv == NULL)
    return;

994
  g_clear_object (&priv->connect_task);
995 996 997
  g_clear_object (&priv->input_stream);
  g_clear_object (&priv->output_stream);

998
  while ((sseg = g_queue_pop_head (&priv->slist)))
999
    g_slice_free (SSegment, sseg);
1000
  g_queue_clear (&priv->unsent_slist);
1001 1002 1003 1004 1005 1006
  for (i = priv->rlist; i; i = i->next) {
    RSegment *rseg = i->data;
    g_slice_free (RSegment, rseg);
  }
  g_list_free (priv->rlist);
  priv->rlist = NULL;
1007

1008 1009 1010
  pseudo_tcp_fifo_clear (&priv->rbuf);
  pseudo_tcp_fifo_clear (&priv->sbuf);

1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
  if (priv->tcp_clock_source != NULL) {
    g_source_destroy (priv->tcp_clock_source);
    g_clear_pointer (&priv->tcp_clock_source, (GDestroyNotify) g_source_unref);
  }

  if (priv->base_source != NULL) {
    g_source_destroy (priv->base_source);
    g_clear_pointer (&priv->base_source, (GDestroyNotify) g_source_unref);
  }

1021
  g_clear_pointer (&priv->sources, (GDestroyNotify) g_ptr_array_unref);
1022 1023 1024 1025 1026 1027 1028

  g_clear_object (&priv->base_socket);
  if (priv->main_context != NULL) {
    g_main_context_unref (priv->main_context);
    priv->main_context = NULL;
  }

1029 1030
  g_rec_mutex_clear (&priv->mutex);

1031 1032 1033
  g_free (priv);
  self->priv = NULL;

1034 1035
  if (G_OBJECT_CLASS (pseudo_tcp_socket_parent_class)->finalize)
    G_OBJECT_CLASS (pseudo_tcp_socket_parent_class)->finalize (object);
1036 1037
}

1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
static void
pseudo_tcp_socket_constructed (GObject *obj)
{
  PseudoTcpSocket *self;
  PseudoTcpSocketPrivate *priv;

  self = PSEUDO_TCP_SOCKET (obj);
  priv = self->priv;

  /* Ensure we have a main context set, in case it changes while we’re
   * running. */
  if (priv->main_context == NULL) {
    priv->main_context = g_main_context_ref_thread_default ();
  }

Olivier Crête's avatar
Olivier Crête committed
1053 1054
  g_assert(priv->callbacks.WritePacket || priv->base_socket);

1055 1056 1057
  /* Chain up. */
  G_OBJECT_CLASS (pseudo_tcp_socket_parent_class)->constructed (obj);
}
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068

static void
pseudo_tcp_socket_init (PseudoTcpSocket *obj)
{
  /* Use g_new0, and do not use g_object_set_private because the size of
   * our private data is too big (150KB+) and the g_slice_allow cannot allocate
   * it. So we handle the private ourselves */
  PseudoTcpSocketPrivate *priv = g_new0 (PseudoTcpSocketPrivate, 1);

  obj->priv = priv;

1069 1070
  g_rec_mutex_init (&priv->mutex);

1071 1072
  priv->shutdown = SD_NONE;

1073 1074 1075 1076 1077
  priv->rbuf_len = DEFAULT_RCV_BUF_SIZE;
  pseudo_tcp_fifo_init (&priv->rbuf, priv->rbuf_len);
  priv->sbuf_len = DEFAULT_SND_BUF_SIZE;
  pseudo_tcp_fifo_init (&priv->sbuf, priv->sbuf_len);

1078 1079
  priv->state = TCP_LISTEN;
  priv->conv = 0;
1080
  g_queue_init (&priv->slist);
1081
  g_queue_init (&priv->unsent_slist);
1082 1083 1084
  priv->rcv_wnd = priv->rbuf_len;
  priv->rwnd_scale = priv->swnd_scale = 0;
  priv->snd_nxt = 0;
1085
  priv->snd_wnd = 1;
1086
  priv->snd_una = priv->rcv_nxt = 0;
Olivier Crête's avatar
Olivier Crête committed
1087 1088
  priv->bReadEnable = TRUE;
  priv->bWriteEnable = FALSE;
1089
  priv->rcv_fin = 0;
1090 1091 1092 1093 1094
  priv->t_ack = 0;

  priv->msslevel = 0;
  priv->largest = 0;
  priv->mss = MIN_PACKET - PACKET_OVERHEAD;
1095
  priv->mtu_advise = DEF_MTU;
1096 1097 1098 1099

  priv->rto_base = 0;

  priv->cwnd = 2 * priv->mss;
1100
  priv->ssthresh = priv->rbuf_len;
1101
  priv->lastrecv = priv->lastsend = priv->last_traffic = 0;
1102 1103 1104

  priv->dup_acks = 0;
  priv->recover = 0;
1105
  priv->last_acked_ts = 0;
1106 1107 1108 1109 1110

  priv->ts_recent = priv->ts_lastack = 0;

  priv->rx_rto = DEF_RTO;
  priv->rx_srtt = priv->rx_rttvar = 0;
1111 1112 1113

  priv->ack_delay = DEFAULT_ACK_DELAY;
  priv->use_nagling = !DEFAULT_NO_DELAY;
1114 1115

  priv->support_wnd_scale = TRUE;
1116
  priv->support_fin_ack = TRUE;
1117

1118
  priv->sources = g_ptr_array_new ();
1119 1120 1121 1122 1123

  priv->retries1 = DEFAULT_RETRIES1;
  priv->retries2 = DEFAULT_RETRIES2;
  priv->syn_retries = DEFAULT_SYN_RETRIES;
  priv->syn_ack_retries = DEFAULT_SYN_ACK_RETRIES;
1124 1125
}

1126 1127
static GInputStream *
pseudo_tcp_socket_get_input_stream (GIOStream *stream)
1128 1129 1130 1131
{
  PseudoTcpSocket *self;
  PseudoTcpSocketPrivate *priv;

1132
  self = PSEUDO_TCP_SOCKET (stream);
1133 1134
  priv = self->priv;

1135 1136
  if (priv->input_stream == NULL) {
    priv->input_stream = nice_pseudo_tcp_input_stream_new (self);
1137 1138
  }

1139
  return priv->input_stream;
1140 1141
}

1142 1143
static GOutputStream *
pseudo_tcp_socket_get_output_stream (GIOStream *stream)
1144 1145 1146 1147
{
  PseudoTcpSocket *self;
  PseudoTcpSocketPrivate *priv;

1148
  self = PSEUDO_TCP_SOCKET (stream);
1149 1150
  priv = self->priv;

1151 1152
  if (priv->output_stream == NULL) {
    priv->output_stream = nice_pseudo_tcp_output_stream_new (self);
1153 1154
  }

1155
  return priv->output_stream;
1156 1157 1158
}

static gboolean
1159 1160
pseudo_tcp_socket_io_stream_close (GIOStream *stream, GCancellable *cancellable,
    GError **error)
1161 1162 1163
{
  PseudoTcpSocket *self;

1164
  self = PSEUDO_TCP_SOCKET (stream);
1165 1166 1167 1168 1169

  if (g_cancellable_set_error_if_cancelled (cancellable, error)) {
    return FALSE;
  }

1170
  pseudo_tcp_socket_shutdown (self, PSEUDO_TCP_SHUTDOWN_RDWR);
1171 1172 1173 1174

  return TRUE;
}

1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197
static void
pseudo_tcp_socket_io_stream_close_async (GIOStream *stream, int io_priority,
    GCancellable *cancellable, GAsyncReadyCallback callback,
    gpointer user_data)
{
  PseudoTcpSocket *self;

  self = PSEUDO_TCP_SOCKET (stream);

  pseudo_tcp_socket_close_async (self, cancellable, callback, user_data);
}

static gboolean
pseudo_tcp_socket_io_stream_close_finish (GIOStream *stream, GAsyncResult *res,
    GError **error)
{
  PseudoTcpSocket *self;

  self = PSEUDO_TCP_SOCKET (stream);

  return pseudo_tcp_socket_close_finish (self, res, error);
}

1198 1199
GSource *
pseudo_tcp_socket_create_source (PseudoTcpSocket *self,
1200 1201 1202
    GIOCondition condition, GCancellable *cancellable)
{
  PseudoTcpSocketPrivate *priv;
1203
  GSource *source = NULL;
1204 1205 1206

  priv = self->priv;

1207

1208 1209 1210 1211
  source = (GSource *) pseudo_tcp_source_new (self, condition, cancellable);
  DEBUG (PSEUDO_TCP_DEBUG_NORMAL,
      "Created pseudo-TCP source %p with condition %u.",
      source, condition);
1212
  g_rec_mutex_lock (&self->priv->mutex);
1213
  g_ptr_array_add (priv->sources, source);
1214
  g_rec_mutex_unlock (&self->priv->mutex);
1215

1216
  return source;
1217 1218
}

1219
PseudoTcpSocket *pseudo_tcp_socket_new (guint32 conversation,
Olivier Crête's avatar
Olivier Crête committed
1220 1221 1222 1223 1224 1225 1226 1227 1228
    PseudoTcpCallbacks callbacks)
{
  return g_object_new (PSEUDO_TCP_SOCKET_TYPE,
      "conversation", conversation,
      "callbacks", callbacks,
      NULL);
}

PseudoTcpSocket *pseudo_tcp_socket_new_from_socket (guint32 conversation,
1229
    GDatagramBased *base_socket, GMainContext *main_context)
1230 1231 1232
{
  return g_object_new (PSEUDO_TCP_SOCKET_TYPE,
      "conversation", conversation,
1233 1234
      "base-socket", base_socket,
      "main-context", main_context,
1235 1236 1237
      NULL);
}

1238 1239 1240 1241
static void
queue_connect_message (PseudoTcpSocket *self)
{
  PseudoTcpSocketPrivate *priv = self->priv;