From 14fd1a60acdd439f80bdfc0aeb86761ba649db79 Mon Sep 17 00:00:00 2001
From: "sergeyu@chromium.org"
 <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>
Date: Wed, 3 Nov 2010 04:17:09 +0000
Subject: [PATCH] Add VideoReader and VideoWriter interfaces.

Implemented VideoReader and VideoWriter for RTP and Protobuf.

BUG=53986
TEST=None

Review URL: http://codereview.chromium.org/4229003

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@64878 0039d316-1c4b-4281-b951-d872f2087c98
---
 chrome/service/service_process.cc          |  6 +-
 remoting/base/decoder_vp8.cc               |  4 +-
 remoting/base/encoder_vp8.cc               |  3 +-
 remoting/client/chromoting_client.cc       | 90 +++++++++++-----------
 remoting/client/chromoting_client.h        | 32 +++++---
 remoting/client/host_connection.h          |  5 +-
 remoting/client/jingle_host_connection.cc  | 19 +++--
 remoting/client/jingle_host_connection.h   | 15 +++-
 remoting/host/chromoting_host.cc           | 34 ++++++--
 remoting/host/chromoting_host.h            |  7 +-
 remoting/host/client_connection.cc         | 16 ++--
 remoting/host/client_connection.h          |  4 +-
 remoting/host/mock_objects.h               |  2 +-
 remoting/host/simple_host_process.cc       | 25 +-----
 remoting/proto/video.proto                 |  6 +-
 remoting/protocol/chromotocol_config.h     |  3 +-
 remoting/protocol/message_decoder.h        |  2 +-
 remoting/protocol/message_reader.h         |  1 +
 remoting/protocol/protobuf_video_reader.cc | 30 ++++++++
 remoting/protocol/protobuf_video_reader.h  | 35 +++++++++
 remoting/protocol/protobuf_video_writer.cc | 36 +++++++++
 remoting/protocol/protobuf_video_writer.h  | 34 ++++++++
 remoting/protocol/rtp_reader.cc            |  2 +-
 remoting/protocol/rtp_reader.h             |  2 +-
 remoting/protocol/rtp_utils.cc             |  9 +--
 remoting/protocol/rtp_video_reader.cc      | 41 ++++++++++
 remoting/protocol/rtp_video_reader.h       | 35 +++++++++
 remoting/protocol/rtp_video_writer.cc      | 35 +++++++++
 remoting/protocol/rtp_video_writer.h       | 32 ++++++++
 remoting/protocol/rtp_writer.cc            | 18 +++--
 remoting/protocol/rtp_writer.h             |  7 +-
 remoting/protocol/stream_writer.cc         |  7 +-
 remoting/protocol/stream_writer.h          | 10 ++-
 remoting/protocol/video_reader.cc          | 26 +++++++
 remoting/protocol/video_reader.h           | 43 +++++++++++
 remoting/protocol/video_stub.h             | 33 ++++++++
 remoting/protocol/video_writer.cc          | 26 +++++++
 remoting/protocol/video_writer.h           | 48 ++++++++++++
 remoting/remoting.gyp                      | 13 ++++
 39 files changed, 648 insertions(+), 148 deletions(-)
 create mode 100644 remoting/protocol/protobuf_video_reader.cc
 create mode 100644 remoting/protocol/protobuf_video_reader.h
 create mode 100644 remoting/protocol/protobuf_video_writer.cc
 create mode 100644 remoting/protocol/protobuf_video_writer.h
 create mode 100644 remoting/protocol/rtp_video_reader.cc
 create mode 100644 remoting/protocol/rtp_video_reader.h
 create mode 100644 remoting/protocol/rtp_video_writer.cc
 create mode 100644 remoting/protocol/rtp_video_writer.h
 create mode 100644 remoting/protocol/video_reader.cc
 create mode 100644 remoting/protocol/video_reader.h
 create mode 100644 remoting/protocol/video_stub.h
 create mode 100644 remoting/protocol/video_writer.cc
 create mode 100644 remoting/protocol/video_writer.h

diff --git a/chrome/service/service_process.cc b/chrome/service/service_process.cc
index 1ec4a83118d51..2a6f069666709 100644
--- a/chrome/service/service_process.cc
+++ b/chrome/service/service_process.cc
@@ -25,7 +25,6 @@
 
 #if defined(ENABLE_REMOTING)
 #include "remoting/base/constants.h"
-#include "remoting/base/encoder_zlib.h"
 #include "remoting/host/chromoting_host.h"
 #include "remoting/host/chromoting_host_context.h"
 #include "remoting/host/json_host_config.h"
@@ -282,10 +281,9 @@ bool ServiceProcess::StartChromotingHost() {
   chromoting_context_.reset(new remoting::ChromotingHostContext());
   chromoting_context_->Start();
 
-  // Create capturer, encoder and executor. The ownership will be transfered
+  // Create capturer and executor. The ownership will be transfered
   // to the chromoting host.
   scoped_ptr<remoting::Capturer> capturer;
-  scoped_ptr<remoting::Encoder> encoder;
   scoped_ptr<remoting::EventExecutor> executor;
 
 #if defined(OS_WIN)
@@ -298,13 +296,11 @@ bool ServiceProcess::StartChromotingHost() {
   capturer.reset(new remoting::CapturerMac());
   executor.reset(new remoting::EventExecutorMac(capturer.get()));
 #endif
-  encoder.reset(new remoting::EncoderZlib());
 
   // Create a chromoting host object.
   chromoting_host_ = new remoting::ChromotingHost(chromoting_context_.get(),
                                                   chromoting_config_,
                                                   capturer.release(),
-                                                  encoder.release(),
                                                   executor.release());
 
   // Then start the chromoting host.
diff --git a/remoting/base/decoder_vp8.cc b/remoting/base/decoder_vp8.cc
index a8d52456f73df..46a6d31ae4e31 100644
--- a/remoting/base/decoder_vp8.cc
+++ b/remoting/base/decoder_vp8.cc
@@ -63,8 +63,6 @@ void DecoderVp8::DecodeBytes(const std::string& encoded_bytes) {
     }
   }
 
-  LOG(WARNING) << "Decoding " <<  encoded_bytes.size();
-
   // Do the actual decoding.
   vpx_codec_err_t ret = vpx_codec_decode(
       codec_, reinterpret_cast<const uint8*>(encoded_bytes.data()),
@@ -73,6 +71,7 @@ void DecoderVp8::DecodeBytes(const std::string& encoded_bytes) {
     LOG(INFO) << "Decoding failed:" << vpx_codec_err_to_string(ret) << "\n"
               << "Details: " << vpx_codec_error(codec_) << "\n"
               << vpx_codec_error_detail(codec_);
+    return;
   }
 
   // Gets the decoded data.
@@ -80,6 +79,7 @@ void DecoderVp8::DecodeBytes(const std::string& encoded_bytes) {
   vpx_image_t* image = vpx_codec_get_frame(codec_, &iter);
   if (!image) {
     LOG(INFO) << "No video frame decoded";
+    return;
   }
 
   // Perform YUV conversion.
diff --git a/remoting/base/encoder_vp8.cc b/remoting/base/encoder_vp8.cc
index 7a58f7b2aeb90..3a0f75be68d1b 100644
--- a/remoting/base/encoder_vp8.cc
+++ b/remoting/base/encoder_vp8.cc
@@ -188,8 +188,7 @@ void EncoderVp8::Encode(scoped_refptr<CaptureData> capture_data,
     switch (packet->kind) {
       case VPX_CODEC_CX_FRAME_PKT:
         got_data = true;
-        message->set_data(
-            packet->data.frame.buf, packet->data.frame.sz);
+        message->set_data(packet->data.frame.buf, packet->data.frame.sz);
         break;
       default:
         break;
diff --git a/remoting/client/chromoting_client.cc b/remoting/client/chromoting_client.cc
index 8b46a9d68309b..c47ecd819226a 100644
--- a/remoting/client/chromoting_client.cc
+++ b/remoting/client/chromoting_client.cc
@@ -30,7 +30,7 @@ ChromotingClient::ChromotingClient(const ClientConfig& config,
       input_handler_(input_handler),
       client_done_(client_done),
       state_(CREATED),
-      message_being_processed_(false) {
+      packet_being_processed_(false) {
 }
 
 ChromotingClient::~ChromotingClient() {
@@ -44,7 +44,7 @@ void ChromotingClient::Start() {
     return;
   }
 
-  connection_->Connect(config_, this);
+  connection_->Connect(config_, this, this);
 
   if (!view_->Initialize()) {
     ClientDone();
@@ -103,51 +103,47 @@ void ChromotingClient::HandleMessage(HostConnection* conn,
     return;
   }
 
-  // Put all messages in the queue.
-  received_messages_.push_back(msg);
-
-  if (!message_being_processed_) {
-    DispatchMessage();
-  }
-}
-
-void ChromotingClient::DispatchMessage() {
-  DCHECK_EQ(message_loop(), MessageLoop::current());
-  CHECK(!message_being_processed_);
-
-  if (received_messages_.empty()) {
-    // Nothing to do!
-    return;
-  }
-
-  ChromotingHostMessage* msg = received_messages_.front();
-  received_messages_.pop_front();
-  message_being_processed_ = true;
-
   // TODO(ajwong): Consider creating a macro similar to the IPC message
   // mappings.  Also reconsider the lifetime of the message object.
   if (msg->has_init_client()) {
     ScopedTracer tracer("Handle Init Client");
-    // TODO(ajwong): Change this to use a done callback.
-    InitClient(msg->init_client(),
-               NewTracedMethod(this, &ChromotingClient::OnMessageDone, msg));
-  } else if (msg->has_video_packet()) {
-    ScopedTracer tracer("Handle Rectangle Update");
-    rectangle_decoder_->DecodePacket(
-        msg->video_packet(),
-        NewTracedMethod(this, &ChromotingClient::OnMessageDone, msg));
+    InitClient(msg->init_client());
+    delete msg;
   } else {
     NOTREACHED() << "Unknown message received";
+  }
+}
 
-    // We have an unknown message. Drop it, and schedule another dispatch.
-    // Call DispatchMessage as a continuation to avoid growing the stack.
-    delete msg;
-    message_being_processed_ = false;
+void ChromotingClient::ProcessVideoPacket(const VideoPacket* packet,
+                                          Task* done) {
+  if (message_loop() != MessageLoop::current()) {
     message_loop()->PostTask(
         FROM_HERE,
-        NewTracedMethod(this, &ChromotingClient::DispatchMessage));
+        NewRunnableMethod(this, &ChromotingClient::ProcessVideoPacket,
+                          packet, done));
+    return;
+  }
+
+  received_packets_.push_back(QueuedVideoPacket(packet, done));
+  if (!packet_being_processed_)
+    DispatchPacket();
+}
+
+void ChromotingClient::DispatchPacket() {
+  DCHECK_EQ(message_loop(), MessageLoop::current());
+  CHECK(!packet_being_processed_);
+
+  if (received_packets_.empty()) {
+    // Nothing to do!
     return;
   }
+
+  const VideoPacket* packet = received_packets_.front().packet;
+  packet_being_processed_ = true;
+
+  ScopedTracer tracer("Handle video packet");
+  rectangle_decoder_->DecodePacket(
+      *packet, NewTracedMethod(this, &ChromotingClient::OnPacketDone));
 }
 
 void ChromotingClient::OnConnectionOpened(HostConnection* conn) {
@@ -185,23 +181,26 @@ void ChromotingClient::SetConnectionState(ConnectionState s) {
   Repaint();
 }
 
-void ChromotingClient::OnMessageDone(ChromotingHostMessage* message) {
+void ChromotingClient::OnPacketDone() {
   if (message_loop() != MessageLoop::current()) {
     message_loop()->PostTask(
         FROM_HERE,
-        NewTracedMethod(this, &ChromotingClient::OnMessageDone, message));
+        NewTracedMethod(this, &ChromotingClient::OnPacketDone));
     return;
   }
 
-  TraceContext::tracer()->PrintString("Message done");
+  TraceContext::tracer()->PrintString("Packet done");
 
-  message_being_processed_ = false;
-  delete message;
-  DispatchMessage();
+  received_packets_.front().done->Run();
+  delete received_packets_.front().done;
+  received_packets_.pop_front();
+
+  packet_being_processed_ = false;
+
+  DispatchPacket();
 }
 
-void ChromotingClient::InitClient(const InitClientMessage& init_client,
-                                  Task* done) {
+void ChromotingClient::InitClient(const InitClientMessage& init_client) {
   DCHECK_EQ(message_loop(), MessageLoop::current());
   TraceContext::tracer()->PrintString("Init received");
 
@@ -217,9 +216,6 @@ void ChromotingClient::InitClient(const InitClientMessage& init_client,
 
   // Schedule the input handler to process the event queue.
   input_handler_->Initialize();
-
-  done->Run();
-  delete done;
 }
 
 }  // namespace remoting
diff --git a/remoting/client/chromoting_client.h b/remoting/client/chromoting_client.h
index e476f318de135..57e799c6ec8cd 100644
--- a/remoting/client/chromoting_client.h
+++ b/remoting/client/chromoting_client.h
@@ -13,6 +13,7 @@
 #include "remoting/client/host_connection.h"
 #include "remoting/client/client_config.h"
 #include "remoting/client/chromoting_view.h"
+#include "remoting/protocol/video_stub.h"
 
 class MessageLoop;
 
@@ -24,7 +25,9 @@ class InitClientMessage;
 class InputHandler;
 class RectangleUpdateDecoder;
 
-class ChromotingClient : public HostConnection::HostEventCallback {
+// TODO(sergeyu): Move VideoStub implementation to RectangleUpdateDecoder.
+class ChromotingClient : public HostConnection::HostEventCallback,
+                         public VideoStub {
  public:
   // Objects passed in are not owned by this class.
   ChromotingClient(const ClientConfig& config,
@@ -58,20 +61,31 @@ class ChromotingClient : public HostConnection::HostEventCallback {
   virtual void OnConnectionClosed(HostConnection* conn);
   virtual void OnConnectionFailed(HostConnection* conn);
 
+  // VideoStub implementation.
+  virtual void ProcessVideoPacket(const VideoPacket* packet, Task* done);
+
  private:
+  struct QueuedVideoPacket {
+    QueuedVideoPacket(const VideoPacket* packet, Task* done)
+        : packet(packet), done(done) {
+    }
+    const VideoPacket* packet;
+    Task* done;
+  };
+
   MessageLoop* message_loop();
 
   // Convenience method for modifying the state on this object's message loop.
   void SetConnectionState(ConnectionState s);
 
-  // If a message is not being processed, dispatches a single message from the
-  // |received_messages_| queue.
-  void DispatchMessage();
+  // If a packet is not being processed, dispatches a single message from the
+  // |received_packets_| queue.
+  void DispatchPacket();
 
-  void OnMessageDone(ChromotingHostMessage* msg);
+  void OnPacketDone();
 
   // Handles for chromotocol messages.
-  void InitClient(const InitClientMessage& msg, Task* done);
+  void InitClient(const InitClientMessage& msg);
 
   // The following are not owned by this class.
   ClientConfig config_;
@@ -86,15 +100,15 @@ class ChromotingClient : public HostConnection::HostEventCallback {
 
   ConnectionState state_;
 
-  // Contains all messages that have been received, but have not yet been
+  // Contains all video packets that have been received, but have not yet been
   // processed.
   //
   // Used to serialize sending of messages to the client.
-  std::list<ChromotingHostMessage*> received_messages_;
+  std::list<QueuedVideoPacket> received_packets_;
 
   // True if a message is being processed. Can be used to determine if it is
   // safe to dispatch another message.
-  bool message_being_processed_;
+  bool packet_being_processed_;
 
   DISALLOW_COPY_AND_ASSIGN(ChromotingClient);
 };
diff --git a/remoting/client/host_connection.h b/remoting/client/host_connection.h
index 71210257cfc6d..3d27ceb1b78b9 100644
--- a/remoting/client/host_connection.h
+++ b/remoting/client/host_connection.h
@@ -12,6 +12,8 @@
 
 namespace remoting {
 
+class VideoStub;
+
 struct ClientConfig;
 
 class HostConnection {
@@ -39,7 +41,8 @@ class HostConnection {
 
   // TODO(ajwong): We need to generalize this API.
   virtual void Connect(const ClientConfig& config,
-                       HostEventCallback* event_callback) = 0;
+                       HostEventCallback* event_callback,
+                       VideoStub* video_stub) = 0;
   virtual void Disconnect() = 0;
 
   // Send an input event to the host.
diff --git a/remoting/client/jingle_host_connection.cc b/remoting/client/jingle_host_connection.cc
index 9a5fe65100165..c48ea61ed4e7e 100644
--- a/remoting/client/jingle_host_connection.cc
+++ b/remoting/client/jingle_host_connection.cc
@@ -11,6 +11,7 @@
 #include "remoting/client/jingle_host_connection.h"
 #include "remoting/jingle_glue/jingle_thread.h"
 #include "remoting/protocol/jingle_chromotocol_server.h"
+#include "remoting/protocol/video_stub.h"
 #include "remoting/protocol/util.h"
 
 namespace remoting {
@@ -24,8 +25,10 @@ JingleHostConnection::~JingleHostConnection() {
 }
 
 void JingleHostConnection::Connect(const ClientConfig& config,
-                                   HostEventCallback* event_callback) {
+                                   HostEventCallback* event_callback,
+                                   VideoStub* video_stub) {
   event_callback_ = event_callback;
+  video_stub_ = video_stub;
 
   // Initialize |jingle_client_|.
   jingle_client_ = new JingleClient(context_->jingle_thread());
@@ -45,8 +48,9 @@ void JingleHostConnection::Disconnect() {
     return;
   }
 
+  control_reader_.Close();
   event_writer_.Close();
-  video_reader_.Close();
+  video_reader_->Close();
 
   if (connection_) {
     connection_->Close(
@@ -56,8 +60,7 @@ void JingleHostConnection::Disconnect() {
   }
 }
 
-void JingleHostConnection::OnVideoMessage(
-    ChromotingHostMessage* msg) {
+void JingleHostConnection::OnControlMessage(ChromotingHostMessage* msg) {
   event_callback_->HandleMessage(this, msg);
 }
 
@@ -150,10 +153,12 @@ void JingleHostConnection::OnConnectionStateChange(
 
     case ChromotocolConnection::CONNECTED:
       // Initialize reader and writer.
+      control_reader_.Init<ChromotingHostMessage>(
+          connection_->control_channel(),
+          NewCallback(this, &JingleHostConnection::OnControlMessage));
       event_writer_.Init(connection_->event_channel());
-      video_reader_.Init<ChromotingHostMessage>(
-          connection_->video_channel(),
-          NewCallback(this, &JingleHostConnection::OnVideoMessage));
+      video_reader_.reset(VideoReader::Create(connection_->config()));
+      video_reader_->Init(connection_, video_stub_);
       event_callback_->OnConnectionOpened(this);
       break;
 
diff --git a/remoting/client/jingle_host_connection.h b/remoting/client/jingle_host_connection.h
index df37702633052..f45c27a7e5107 100644
--- a/remoting/client/jingle_host_connection.h
+++ b/remoting/client/jingle_host_connection.h
@@ -29,12 +29,14 @@
 #include "remoting/protocol/chromotocol_connection.h"
 #include "remoting/protocol/chromotocol_server.h"
 #include "remoting/protocol/stream_writer.h"
+#include "remoting/protocol/video_reader.h"
 
 class MessageLoop;
 
 namespace remoting {
 
 class JingleThread;
+class VideoStub;
 
 struct ClientConfig;
 
@@ -45,7 +47,8 @@ class JingleHostConnection : public HostConnection,
   virtual ~JingleHostConnection();
 
   virtual void Connect(const ClientConfig& config,
-                       HostEventCallback* event_callback);
+                       HostEventCallback* event_callback,
+                       VideoStub* video_stub);
   virtual void Disconnect();
 
   virtual void SendEvent(const ChromotingClientMessage& msg);
@@ -69,9 +72,11 @@ class JingleHostConnection : public HostConnection,
   // P2P connection to the host.
   void InitConnection();
 
+  // Callback for |control_reader_|.
+  void OnControlMessage(ChromotingHostMessage* msg);
+
   // Callback for |video_reader_|.
-  // TODO(sergeyu): This should be replaced with RTP/RTCP handler.
-  void OnVideoMessage(ChromotingHostMessage* msg);
+  void OnVideoPacket(VideoPacket* packet);
 
   // Used by Disconnect() to disconnect chromoting connection, stop chromoting
   // server, and then disconnect XMPP connection.
@@ -84,10 +89,12 @@ class JingleHostConnection : public HostConnection,
   scoped_refptr<ChromotocolServer> chromotocol_server_;
   scoped_refptr<ChromotocolConnection> connection_;
 
+  MessageReader control_reader_;
   EventStreamWriter event_writer_;
-  MessageReader video_reader_;
+  scoped_ptr<VideoReader> video_reader_;
 
   HostEventCallback* event_callback_;
+  VideoStub* video_stub_;
 
   std::string host_jid_;
 
diff --git a/remoting/host/chromoting_host.cc b/remoting/host/chromoting_host.cc
index 21a95be29913d..d77e23c6e80dd 100644
--- a/remoting/host/chromoting_host.cc
+++ b/remoting/host/chromoting_host.cc
@@ -9,24 +9,26 @@
 #include "build/build_config.h"
 #include "remoting/base/constants.h"
 #include "remoting/base/encoder.h"
+#include "remoting/base/encoder_verbatim.h"
+#include "remoting/base/encoder_vp8.h"
+#include "remoting/base/encoder_zlib.h"
 #include "remoting/host/chromoting_host_context.h"
 #include "remoting/host/capturer.h"
 #include "remoting/host/event_executor.h"
 #include "remoting/host/host_config.h"
 #include "remoting/host/session_manager.h"
 #include "remoting/protocol/jingle_chromotocol_server.h"
+#include "remoting/protocol/chromotocol_config.h"
 
 namespace remoting {
 
 ChromotingHost::ChromotingHost(ChromotingHostContext* context,
                                MutableHostConfig* config,
                                Capturer* capturer,
-                               Encoder* encoder,
                                EventExecutor* executor)
     : context_(context),
       config_(config),
       capturer_(capturer),
-      encoder_(encoder),
       executor_(executor),
       state_(kInitial) {
 }
@@ -140,12 +142,14 @@ void ChromotingHost::OnClientConnected(ClientConnection* client) {
     // Then we create a SessionManager passing the message loops that
     // it should run on.
     DCHECK(capturer_.get());
-    DCHECK(encoder_.get());
+
+    Encoder* encoder = CreateEncoder(client->connection()->config());
+
     session_ = new SessionManager(context_->capture_message_loop(),
                                   context_->encode_message_loop(),
                                   context_->main_message_loop(),
-                                  capturer_.get(),
-                                  encoder_.get());
+                                  capturer_.release(),
+                                  encoder);
   }
 
   // Immediately add the client and start the session.
@@ -285,4 +289,24 @@ void ChromotingHost::OnServerClosed() {
   // Don't need to do anything here.
 }
 
+// TODO(sergeyu): Move this to SessionManager?
+Encoder* ChromotingHost::CreateEncoder(const ChromotocolConfig* config) {
+  const ChannelConfig& video_config = config->video_config();
+
+  if (video_config.codec == ChannelConfig::CODEC_VERBATIM) {
+    return new remoting::EncoderVerbatim();
+  } else if (video_config.codec == ChannelConfig::CODEC_ZIP) {
+    return new remoting::EncoderZlib();
+  }
+  // TODO(sergeyu): Enable VP8 on ARM builds.
+#if !defined(ARCH_CPU_ARM_FAMILY)
+  else if (video_config.codec == ChannelConfig::CODEC_VP8) {
+    return new remoting::EncoderVp8();
+  }
+#endif
+
+  return NULL;
+}
+
+
 }  // namespace remoting
diff --git a/remoting/host/chromoting_host.h b/remoting/host/chromoting_host.h
index 158daa41a9561..911a9d3495c32 100644
--- a/remoting/host/chromoting_host.h
+++ b/remoting/host/chromoting_host.h
@@ -24,6 +24,7 @@ namespace remoting {
 
 class Capturer;
 class ChromotingHostContext;
+class ChromotocolConfig;
 class Encoder;
 class EventExecutor;
 class MutableHostConfig;
@@ -59,7 +60,7 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>,
                        public JingleClient::Callback {
  public:
   ChromotingHost(ChromotingHostContext* context, MutableHostConfig* config,
-                 Capturer* capturer, Encoder* encoder, EventExecutor* executor);
+                 Capturer* capturer, EventExecutor* executor);
   virtual ~ChromotingHost();
 
   // Asynchronously start the host process.
@@ -113,6 +114,9 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>,
   // Callback for ChromotocolServer::Close().
   void OnServerClosed();
 
+  // Creates encoder for the specified configuration.
+  Encoder* CreateEncoder(const ChromotocolConfig* config);
+
   // The context that the chromoting host runs on.
   ChromotingHostContext* context_;
 
@@ -122,7 +126,6 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>,
   // constructed this is set to NULL.
   scoped_ptr<Capturer> capturer_;
 
-  // Encoder to be used by the SessionManager. Once the SessionManager is
   // constructed this is set to NULL.
   scoped_ptr<Encoder> encoder_;
 
diff --git a/remoting/host/client_connection.cc b/remoting/host/client_connection.cc
index e325bf762c9ce..8d68d6d1fcb8e 100644
--- a/remoting/host/client_connection.cc
+++ b/remoting/host/client_connection.cc
@@ -52,7 +52,7 @@ void ClientConnection::SendInitClientMessage(int width, int height) {
   msg.mutable_init_client()->set_width(width);
   msg.mutable_init_client()->set_height(height);
   DCHECK(msg.IsInitialized());
-  video_writer_.SendMessage(msg);
+  control_writer_.SendMessage(msg);
 }
 
 void ClientConnection::SendVideoPacket(const VideoPacket& packet) {
@@ -62,18 +62,12 @@ void ClientConnection::SendVideoPacket(const VideoPacket& packet) {
   if (!connection_)
     return;
 
-  ChromotingHostMessage* message = new ChromotingHostMessage();
-  // TODO(sergeyu): avoid memcopy here.
-  *message->mutable_video_packet() = packet;
-
-  video_writer_.SendMessage(*message);
-
-  delete message;
+  video_writer_->SendPacket(packet);
 }
 
 int ClientConnection::GetPendingUpdateStreamMessages() {
   DCHECK_EQ(loop_, MessageLoop::current());
-  return video_writer_.GetPendingMessages();
+  return video_writer_->GetPendingPackets();
 }
 
 void ClientConnection::Disconnect() {
@@ -91,10 +85,12 @@ ClientConnection::ClientConnection() {}
 void ClientConnection::OnConnectionStateChange(
     ChromotocolConnection::State state) {
   if (state == ChromotocolConnection::CONNECTED) {
+    control_writer_.Init(connection_->control_channel());
     event_reader_.Init<ChromotingClientMessage>(
         connection_->event_channel(),
         NewCallback(this, &ClientConnection::OnMessageReceived));
-    video_writer_.Init(connection_->video_channel());
+    video_writer_.reset(VideoWriter::Create(connection_->config()));
+    video_writer_->Init(connection_);
   }
 
   loop_->PostTask(FROM_HERE,
diff --git a/remoting/host/client_connection.h b/remoting/host/client_connection.h
index 38e383cb19397..6f9deca71ad02 100644
--- a/remoting/host/client_connection.h
+++ b/remoting/host/client_connection.h
@@ -15,6 +15,7 @@
 #include "remoting/protocol/chromotocol_connection.h"
 #include "remoting/protocol/message_reader.h"
 #include "remoting/protocol/stream_writer.h"
+#include "remoting/protocol/video_writer.h"
 
 namespace remoting {
 
@@ -100,8 +101,9 @@ class ClientConnection : public base::RefCountedThreadSafe<ClientConnection> {
   // The libjingle channel used to send and receive data from the remote client.
   scoped_refptr<ChromotocolConnection> connection_;
 
+  ControlStreamWriter control_writer_;
   MessageReader event_reader_;
-  VideoStreamWriter video_writer_;
+  scoped_ptr<VideoWriter> video_writer_;
 
   // The message loop that this object runs on.
   MessageLoop* loop_;
diff --git a/remoting/host/mock_objects.h b/remoting/host/mock_objects.h
index 428bd74919680..79b8d6f4be830 100644
--- a/remoting/host/mock_objects.h
+++ b/remoting/host/mock_objects.h
@@ -45,7 +45,7 @@ class MockClientConnection : public ClientConnection {
  public:
   MockClientConnection(){}
 
-  MOCK_METHOD1(Init, void(ChromotingConnection* connection));
+  MOCK_METHOD1(Init, void(ChromotocolConnection* connection));
   MOCK_METHOD2(SendInitClientMessage, void(int width, int height));
   MOCK_METHOD1(SendVideoPacket, void(const VideoPacket& packet));
   MOCK_METHOD0(GetPendingUpdateStreamMessages, int());
diff --git a/remoting/host/simple_host_process.cc b/remoting/host/simple_host_process.cc
index 2733850821b36..98d93036a8ca7 100644
--- a/remoting/host/simple_host_process.cc
+++ b/remoting/host/simple_host_process.cc
@@ -28,9 +28,6 @@
 #include "base/path_service.h"
 #include "base/thread.h"
 #include "media/base/media.h"
-#include "remoting/base/encoder_verbatim.h"
-#include "remoting/base/encoder_vp8.h"
-#include "remoting/base/encoder_zlib.h"
 #include "remoting/base/tracer.h"
 #include "remoting/host/capturer_fake.h"
 #include "remoting/host/chromoting_host.h"
@@ -65,8 +62,6 @@ void ShutdownTask(MessageLoop* message_loop) {
 
 const std::string kFakeSwitchName = "fake";
 const std::string kConfigSwitchName = "config";
-const std::string kVerbatimSwitchName = "verbatim";
-const std::string kVp8SwitchName = "vp8";
 
 int main(int argc, char** argv) {
   // Needed for the Mac, so we don't leak objects when threads are created.
@@ -80,7 +75,6 @@ int main(int argc, char** argv) {
   base::EnsureNSPRInit();
 
   scoped_ptr<remoting::Capturer> capturer;
-  scoped_ptr<remoting::Encoder> encoder;
   scoped_ptr<remoting::EventExecutor> event_handler;
 #if defined(OS_WIN)
   capturer.reset(new remoting::CapturerGdi());
@@ -92,12 +86,9 @@ int main(int argc, char** argv) {
   capturer.reset(new remoting::CapturerMac());
   event_handler.reset(new remoting::EventExecutorMac(capturer.get()));
 #endif
-  encoder.reset(new remoting::EncoderZlib());
 
-  // Check the argument to see if we should use a fake capturer and encoder.
+  // Check the argument to see if we should use a fake capturer.
   bool fake = cmd_line->HasSwitch(kFakeSwitchName);
-  bool verbatim = cmd_line->HasSwitch(kVerbatimSwitchName);
-  bool vp8 = cmd_line->HasSwitch(kVp8SwitchName);
 
 #if defined(OS_WIN)
   std::wstring home_path = GetEnvironmentVar(kHomeDrive);
@@ -117,19 +108,6 @@ int main(int argc, char** argv) {
     capturer.reset(new remoting::CapturerFake());
   }
 
-  if (verbatim) {
-    LOG(INFO) << "Using the verbatim encoder.";
-    encoder.reset(new remoting::EncoderVerbatim());
-  }
-
-  // TODO(sergeyu): Enable VP8 on ARM builds.
-#if !defined(ARCH_CPU_ARM_FAMILY)
-  if (vp8) {
-    LOG(INFO) << "Using the verbatim encoder.";
-    encoder.reset(new remoting::EncoderVp8());
-  }
-#endif
-
   base::Thread file_io_thread("FileIO");
   file_io_thread.Start();
 
@@ -156,7 +134,6 @@ int main(int argc, char** argv) {
       new remoting::ChromotingHost(&context,
                                    config,
                                    capturer.release(),
-                                   encoder.release(),
                                    event_handler.release()));
 
   // Let the chromoting host run until the shutdown task is executed.
diff --git a/remoting/proto/video.proto b/remoting/proto/video.proto
index c91a0a33eab50..963c118164039 100644
--- a/remoting/proto/video.proto
+++ b/remoting/proto/video.proto
@@ -85,9 +85,11 @@ message VideoPacket {
   // The sequence number of the partial data for updating a rectangle.
   optional int32 sequence_number = 2 [default = 0];
 
+  optional int32 timestamp = 3 [default = 0];
+
   // This is provided on the first packet of the rectangle data, when
   // the flags has FIRST_PACKET set.
-  optional VideoPacketFormat format = 3;
+  optional VideoPacketFormat format = 4;
 
-  optional bytes data = 4;
+  optional bytes data = 5;
 }
diff --git a/remoting/protocol/chromotocol_config.h b/remoting/protocol/chromotocol_config.h
index bd0bcbc11e37c..aef2eccf3bd1a 100644
--- a/remoting/protocol/chromotocol_config.h
+++ b/remoting/protocol/chromotocol_config.h
@@ -25,8 +25,9 @@ struct ChannelConfig {
 
   enum Codec {
     CODEC_UNDEFINED,  // Used for event and control channels.
-    CODEC_VP8,
+    CODEC_VERBATIM,
     CODEC_ZIP,
+    CODEC_VP8,
   };
 
   ChannelConfig();
diff --git a/remoting/protocol/message_decoder.h b/remoting/protocol/message_decoder.h
index d28d9e09c0799..207dee5b37820 100644
--- a/remoting/protocol/message_decoder.h
+++ b/remoting/protocol/message_decoder.h
@@ -11,6 +11,7 @@
 #include "base/ref_counted.h"
 #include "base/scoped_ptr.h"
 #include "google/protobuf/message_lite.h"
+#include "remoting/base/multiple_array_input_stream.h"
 
 namespace net {
 class DrainableIOBuffer;
@@ -25,7 +26,6 @@ class ClientControlMessage;
 class ClientEventMessage;
 class HostControlMessage;
 class HostEventMessage;
-class MultipleArrayInputStream;
 
 // MessageDecoder uses MultipleArrayInputStream to decode bytes into
 // protocol buffer messages. This can be used to decode bytes received from
diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h
index d3540bbdac204..44e6a5c899fa5 100644
--- a/remoting/protocol/message_reader.h
+++ b/remoting/protocol/message_reader.h
@@ -10,6 +10,7 @@
 #include "base/scoped_ptr.h"
 #include "base/task.h"
 #include "net/base/completion_callback.h"
+#include "net/base/io_buffer.h"
 #include "remoting/protocol/message_decoder.h"
 
 namespace net {
diff --git a/remoting/protocol/protobuf_video_reader.cc b/remoting/protocol/protobuf_video_reader.cc
new file mode 100644
index 0000000000000..70ac3cf713ebf
--- /dev/null
+++ b/remoting/protocol/protobuf_video_reader.cc
@@ -0,0 +1,30 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/protobuf_video_reader.h"
+
+#include "base/task.h"
+#include "remoting/protocol/chromotocol_connection.h"
+
+namespace remoting {
+
+ProtobufVideoReader::ProtobufVideoReader() { }
+ProtobufVideoReader::~ProtobufVideoReader() { }
+
+void ProtobufVideoReader::Init(ChromotocolConnection* connection,
+                               VideoStub* video_stub) {
+  reader_.Init<VideoPacket>(connection->video_channel(),
+                            NewCallback(this, &ProtobufVideoReader::OnNewData));
+  video_stub_ = video_stub;
+}
+
+void ProtobufVideoReader::Close() {
+  reader_.Close();
+}
+
+void ProtobufVideoReader::OnNewData(VideoPacket* packet) {
+  video_stub_->ProcessVideoPacket(packet, new DeleteTask<VideoPacket>(packet));
+}
+
+}  // namespace remoting
diff --git a/remoting/protocol/protobuf_video_reader.h b/remoting/protocol/protobuf_video_reader.h
new file mode 100644
index 0000000000000..cad98c57e5123
--- /dev/null
+++ b/remoting/protocol/protobuf_video_reader.h
@@ -0,0 +1,35 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_PROTOBUF_VIDEO_READER_H_
+#define REMOTING_PROTOCOL_PROTOBUF_VIDEO_READER_H_
+
+#include "remoting/protocol/message_reader.h"
+#include "remoting/protocol/video_reader.h"
+
+namespace remoting {
+
+class ProtobufVideoReader : public VideoReader {
+ public:
+  ProtobufVideoReader();
+  virtual ~ProtobufVideoReader();
+
+  // VideoReader interface.
+  virtual void Init(ChromotocolConnection* connection, VideoStub* video_stub);
+  virtual void Close();
+
+ private:
+  void OnNewData(VideoPacket* packet);
+
+  MessageReader reader_;
+
+  // The stub that processes all received packets.
+  VideoStub* video_stub_;
+
+  DISALLOW_COPY_AND_ASSIGN(ProtobufVideoReader);
+};
+
+}  // namespace remoting
+
+#endif  // REMOTING_PROTOCOL_PROTOBUF_VIDEO_READER_H_
diff --git a/remoting/protocol/protobuf_video_writer.cc b/remoting/protocol/protobuf_video_writer.cc
new file mode 100644
index 0000000000000..96b4e34503d89
--- /dev/null
+++ b/remoting/protocol/protobuf_video_writer.cc
@@ -0,0 +1,36 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/protobuf_video_writer.h"
+
+#include "remoting/protocol/chromotocol_connection.h"
+#include "remoting/protocol/rtp_writer.h"
+#include "remoting/protocol/util.h"
+
+namespace remoting {
+
+ProtobufVideoWriter::ProtobufVideoWriter() { }
+
+ProtobufVideoWriter::~ProtobufVideoWriter() { }
+
+void ProtobufVideoWriter::Init(ChromotocolConnection* connection) {
+  buffered_writer_ = new BufferedSocketWriter();
+  // TODO(sergeyu): Provide WriteFailedCallback for the buffered writer.
+  buffered_writer_->Init(connection->video_channel(), NULL);
+}
+
+void ProtobufVideoWriter::SendPacket(const VideoPacket& packet) {
+  buffered_writer_->Write(SerializeAndFrameMessage(packet));
+}
+
+int ProtobufVideoWriter::GetPendingPackets() {
+  return buffered_writer_->GetBufferChunks();
+}
+
+
+void ProtobufVideoWriter::Close() {
+  buffered_writer_->Close();
+}
+
+}  // namespace remoting
diff --git a/remoting/protocol/protobuf_video_writer.h b/remoting/protocol/protobuf_video_writer.h
new file mode 100644
index 0000000000000..02d3034ee20e3
--- /dev/null
+++ b/remoting/protocol/protobuf_video_writer.h
@@ -0,0 +1,34 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_PROTOBUF_VIDEO_WRITER_H_
+#define REMOTING_PROTOCOL_PROTOBUF_VIDEO_WRITER_H_
+
+#include "base/ref_counted.h"
+#include "remoting/protocol/video_writer.h"
+
+namespace remoting {
+
+class BufferedSocketWriter;
+
+class ProtobufVideoWriter : public VideoWriter {
+ public:
+  ProtobufVideoWriter();
+  virtual ~ProtobufVideoWriter();
+
+  // VideoWriter interface.
+  virtual void Init(ChromotocolConnection* connection);
+  virtual void SendPacket(const VideoPacket& packet);
+  virtual int GetPendingPackets();
+  virtual void Close();
+
+ private:
+  scoped_refptr<BufferedSocketWriter> buffered_writer_;
+
+  DISALLOW_COPY_AND_ASSIGN(ProtobufVideoWriter);
+};
+
+}  // namespace remoting
+
+#endif  // REMOTING_PROTOCOL_PROTOBUF_VIDEO_WRITER_H_
diff --git a/remoting/protocol/rtp_reader.cc b/remoting/protocol/rtp_reader.cc
index a3659fb5fe1c2..31b133e752420 100644
--- a/remoting/protocol/rtp_reader.cc
+++ b/remoting/protocol/rtp_reader.cc
@@ -34,7 +34,7 @@ void RtpReader::OnDataReceived(net::IOBuffer* buffer, int data_size) {
   packet.payload = buffer->data() + header_size;
   packet.payload_size = data_size - header_size;
 
-  on_message_callback_->Run(&packet);
+  on_message_callback_->Run(packet);
 }
 
 }  // namespace remoting
diff --git a/remoting/protocol/rtp_reader.h b/remoting/protocol/rtp_reader.h
index b9a12df50fc5f..3fc3983cd1298 100644
--- a/remoting/protocol/rtp_reader.h
+++ b/remoting/protocol/rtp_reader.h
@@ -28,7 +28,7 @@ class RtpReader : public SocketReaderBase {
 
   // The OnMessageCallback is called whenever a new message is received.
   // Ownership of the message is passed the callback.
-  typedef Callback1<RtpPacket*>::Type OnMessageCallback;
+  typedef Callback1<const RtpPacket&>::Type OnMessageCallback;
 
   // Initialize the reader and start reading. Must be called on the thread
   // |socket| belongs to. The callback will be called when a new message is
diff --git a/remoting/protocol/rtp_utils.cc b/remoting/protocol/rtp_utils.cc
index df10af235c71d..25cbefc9f3686 100644
--- a/remoting/protocol/rtp_utils.cc
+++ b/remoting/protocol/rtp_utils.cc
@@ -48,13 +48,10 @@ void PackRtpHeader(uint8* buffer, int buffer_size,
 }
 
 static inline uint8 ExtractBits(uint8 byte, int bits_count, int shift) {
-  return (byte >> shift) && ((1 << bits_count) - 1);
+  return (byte >> shift) & ((1 << bits_count) - 1);
 }
 
 int UnpackRtpHeader(const uint8* buffer, int buffer_size, RtpHeader* header) {
-  DCHECK_LT(header->sources, 1 << 4);
-  DCHECK_LT(header->payload_type, 1 << 7);
-
   if (buffer_size < kRtpBaseHeaderSize) {
     return -1;
   }
@@ -69,13 +66,13 @@ int UnpackRtpHeader(const uint8* buffer, int buffer_size, RtpHeader* header) {
   header->sources = ExtractBits(buffer[0], 4, 0);
 
   header->marker = ExtractBits(buffer[1], 1, 7) != 0;
-  header->sources = ExtractBits(buffer[1], 7, 0);
+  header->payload_type = ExtractBits(buffer[1], 7, 0);
 
   header->sequence_number = GetBE16(buffer + 2);
   header->timestamp = GetBE32(buffer + 4);
   header->sync_source_id = GetBE32(buffer + 8);
 
-  DCHECK_LE(header->sources, 16);
+  DCHECK_LT(header->sources, 16);
 
   if (buffer_size < GetRtpHeaderSize(header->sources)) {
     return -1;
diff --git a/remoting/protocol/rtp_video_reader.cc b/remoting/protocol/rtp_video_reader.cc
new file mode 100644
index 0000000000000..8de6d9164da0a
--- /dev/null
+++ b/remoting/protocol/rtp_video_reader.cc
@@ -0,0 +1,41 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/rtp_video_reader.h"
+
+#include "base/task.h"
+#include "remoting/protocol/chromotocol_connection.h"
+
+namespace remoting {
+
+RtpVideoReader::RtpVideoReader() { }
+RtpVideoReader::~RtpVideoReader() { }
+
+void RtpVideoReader::Init(ChromotocolConnection* connection,
+                          VideoStub* video_stub) {
+  rtp_reader_.Init(connection->video_rtp_channel(),
+                   NewCallback(this, &RtpVideoReader::OnRtpPacket));
+  video_stub_ = video_stub;
+}
+
+void RtpVideoReader::Close() {
+  rtp_reader_.Close();
+}
+
+void RtpVideoReader::OnRtpPacket(const RtpPacket& rtp_packet) {
+  VideoPacket* packet = new VideoPacket();
+  packet->set_data(rtp_packet.payload, rtp_packet.payload_size);
+
+  packet->mutable_format()->set_encoding(VideoPacketFormat::ENCODING_VP8);
+  packet->set_flags(rtp_packet.header.marker ? VideoPacket::LAST_PACKET : 0);
+  packet->mutable_format()->set_pixel_format(PIXEL_FORMAT_RGB32);
+  packet->mutable_format()->set_x(0);
+  packet->mutable_format()->set_y(0);
+  packet->mutable_format()->set_width(800);
+  packet->mutable_format()->set_height(600);
+
+  video_stub_->ProcessVideoPacket(packet, new DeleteTask<VideoPacket>(packet));
+}
+
+}  // namespace remoting
diff --git a/remoting/protocol/rtp_video_reader.h b/remoting/protocol/rtp_video_reader.h
new file mode 100644
index 0000000000000..d7f526eb62947
--- /dev/null
+++ b/remoting/protocol/rtp_video_reader.h
@@ -0,0 +1,35 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_RTP_VIDEO_READER_H_
+#define REMOTING_PROTOCOL_RTP_VIDEO_READER_H_
+
+#include "remoting/protocol/rtp_reader.h"
+#include "remoting/protocol/video_reader.h"
+
+namespace remoting {
+
+class RtpVideoReader : public VideoReader {
+ public:
+  RtpVideoReader();
+  virtual ~RtpVideoReader();
+
+  // VideoReader interface.
+  virtual void Init(ChromotocolConnection* connection, VideoStub* video_stub);
+  virtual void Close();
+
+ private:
+  void OnRtpPacket(const RtpPacket& rtp_packet);
+
+  RtpReader rtp_reader_;
+
+  // The stub that processes all received packets.
+  VideoStub* video_stub_;
+
+  DISALLOW_COPY_AND_ASSIGN(RtpVideoReader);
+};
+
+}  // namespace remoting
+
+#endif  // REMOTING_PROTOCOL_RTP_VIDEO_READER_H_
diff --git a/remoting/protocol/rtp_video_writer.cc b/remoting/protocol/rtp_video_writer.cc
new file mode 100644
index 0000000000000..6307c0c8a87c9
--- /dev/null
+++ b/remoting/protocol/rtp_video_writer.cc
@@ -0,0 +1,35 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/rtp_video_writer.h"
+
+#include "remoting/protocol/chromotocol_connection.h"
+#include "remoting/protocol/rtp_writer.h"
+
+namespace remoting {
+
+RtpVideoWriter::RtpVideoWriter() { }
+
+RtpVideoWriter::~RtpVideoWriter() { }
+
+void RtpVideoWriter::Init(ChromotocolConnection* connection) {
+  rtp_writer_.Init(connection->video_rtp_channel(),
+                   connection->video_rtcp_channel());
+}
+
+void RtpVideoWriter::SendPacket(const VideoPacket& packet) {
+  rtp_writer_.SendPacket(packet.data().data(), packet.data().length(),
+                         packet.timestamp());
+}
+
+int RtpVideoWriter::GetPendingPackets() {
+  return rtp_writer_.GetPendingPackets();
+}
+
+
+void RtpVideoWriter::Close() {
+  rtp_writer_.Close();
+}
+
+}  // namespace remoting
diff --git a/remoting/protocol/rtp_video_writer.h b/remoting/protocol/rtp_video_writer.h
new file mode 100644
index 0000000000000..4084576f7c636
--- /dev/null
+++ b/remoting/protocol/rtp_video_writer.h
@@ -0,0 +1,32 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_RTP_VIDEO_WRITER_H_
+#define REMOTING_PROTOCOL_RTP_VIDEO_WRITER_H_
+
+#include "remoting/protocol/rtp_writer.h"
+#include "remoting/protocol/video_writer.h"
+
+namespace remoting {
+
+class RtpVideoWriter : public VideoWriter {
+ public:
+  RtpVideoWriter();
+  virtual ~RtpVideoWriter();
+
+  // VideoWriter interface.
+  virtual void Init(ChromotocolConnection* connection);
+  virtual void SendPacket(const VideoPacket& packet);
+  virtual int GetPendingPackets();
+  virtual void Close();
+
+ private:
+  RtpWriter rtp_writer_;
+
+  DISALLOW_COPY_AND_ASSIGN(RtpVideoWriter);
+};
+
+}  // namespace remoting
+
+#endif  // REMOTING_PROTOCOL_RTP_VIDEO_WRITER_H_
diff --git a/remoting/protocol/rtp_writer.cc b/remoting/protocol/rtp_writer.cc
index a9df79822aba1..909a572bc2ea9 100644
--- a/remoting/protocol/rtp_writer.cc
+++ b/remoting/protocol/rtp_writer.cc
@@ -32,7 +32,7 @@ void RtpWriter::Init(net::Socket* rtp_socket, net::Socket* rtcp_socket) {
   rtcp_socket_ = rtcp_socket;
 }
 
-void RtpWriter::SendPacket(const char* data, int packet_size,
+void RtpWriter::SendPacket(const char* payload, int payload_size,
                            uint32 timestamp) {
   RtpHeader header;
   header.padding = false;
@@ -49,15 +49,15 @@ void RtpWriter::SendPacket(const char* data, int packet_size,
   // TODO(sergeyu): Add VP8 payload header.
 
   int position = 0;
-  while (position < packet_size) {
+  while (position < payload_size) {
     // Allocate buffer.
-    int size = std::max(kMtu, packet_size - position);
-    int header_size = GetRtpHeaderSize(header.sources) + size;
+    int size = std::min(kMtu, payload_size - position);
+    int header_size = GetRtpHeaderSize(header.sources);
     int total_size = size + header_size;
     net::IOBufferWithSize* buffer = new net::IOBufferWithSize(total_size);
 
     // Set marker if this is the last frame.
-    header.marker = (position + size) == packet_size;
+    header.marker = (position + size) == payload_size;
 
     // TODO(sergeyu): Handle sequence number wrapping.
     header.sequence_number = last_packet_number_;
@@ -68,7 +68,7 @@ void RtpWriter::SendPacket(const char* data, int packet_size,
                   header);
 
     // Copy data to the buffer.
-    memcpy(buffer->data() + header_size, data + position, size);
+    memcpy(buffer->data() + header_size, payload + position, size);
 
     // Send it.
     buffered_rtp_writer_->Write(buffer);
@@ -76,7 +76,11 @@ void RtpWriter::SendPacket(const char* data, int packet_size,
     position += size;
   }
 
-  DCHECK_EQ(position, packet_size);
+  DCHECK_EQ(position, payload_size);
+}
+
+int RtpWriter::GetPendingPackets() {
+  return buffered_rtp_writer_->GetBufferChunks();
 }
 
 // Stop writing and drop pending data. Must be called from the same thread as
diff --git a/remoting/protocol/rtp_writer.h b/remoting/protocol/rtp_writer.h
index 5ba7501a48641..6bcd7ecff387f 100644
--- a/remoting/protocol/rtp_writer.h
+++ b/remoting/protocol/rtp_writer.h
@@ -19,8 +19,11 @@ class RtpWriter {
   // to.
   void Init(net::Socket* rtp_socket, net::Socket* rtcp_socket);
 
-  void SendPacket(const char* buffer, int packet_size,
-                  uint32 timestamp);
+  // Sends next packet.
+  void SendPacket(const char* payload, int payload_size, uint32 timestamp);
+
+  // Returns number of packets queued in the buffer.
+  int GetPendingPackets();
 
   // Stop writing and drop pending data. Must be called from the same thread as
   // Init().
diff --git a/remoting/protocol/stream_writer.cc b/remoting/protocol/stream_writer.cc
index ce916505289f8..9d1f2a71c46d5 100644
--- a/remoting/protocol/stream_writer.cc
+++ b/remoting/protocol/stream_writer.cc
@@ -5,6 +5,7 @@
 #include "remoting/protocol/stream_writer.h"
 
 #include "base/message_loop.h"
+#include "remoting/protocol/buffered_socket_writer.h"
 #include "remoting/protocol/chromotocol_connection.h"
 #include "remoting/protocol/util.h"
 
@@ -34,13 +35,11 @@ void StreamWriterBase::Close() {
   buffered_writer_->Close();
 }
 
-bool EventStreamWriter::SendMessage(
-    const ChromotingClientMessage& message) {
+bool EventStreamWriter::SendMessage(const ChromotingClientMessage& message) {
   return buffered_writer_->Write(SerializeAndFrameMessage(message));
 }
 
-bool VideoStreamWriter::SendMessage(
-    const ChromotingHostMessage& message) {
+bool ControlStreamWriter::SendMessage(const ChromotingHostMessage& message) {
   return buffered_writer_->Write(SerializeAndFrameMessage(message));
 }
 
diff --git a/remoting/protocol/stream_writer.h b/remoting/protocol/stream_writer.h
index 75f61808a3046..f5b018b68a85a 100644
--- a/remoting/protocol/stream_writer.h
+++ b/remoting/protocol/stream_writer.h
@@ -5,12 +5,16 @@
 #ifndef REMOTING_PROTOCOL_STREAM_WRITER_H_
 #define REMOTING_PROTOCOL_STREAM_WRITER_H_
 
+#include "base/ref_counted.h"
 #include "remoting/proto/internal.pb.h"
-#include "remoting/protocol/buffered_socket_writer.h"
+
+namespace net {
+class Socket;
+}  // namespace net
 
 namespace remoting {
 
-class ChromotingConnection;
+class BufferedSocketWriter;
 
 class StreamWriterBase {
  public:
@@ -41,7 +45,7 @@ class EventStreamWriter : public StreamWriterBase {
   bool SendMessage(const ChromotingClientMessage& message);
 };
 
-class VideoStreamWriter : public StreamWriterBase {
+class ControlStreamWriter : public StreamWriterBase {
  public:
   // Sends the |message| or returns false if called before Init().
   // Can be called on any thread.
diff --git a/remoting/protocol/video_reader.cc b/remoting/protocol/video_reader.cc
new file mode 100644
index 0000000000000..63cdae390c819
--- /dev/null
+++ b/remoting/protocol/video_reader.cc
@@ -0,0 +1,26 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/video_reader.h"
+
+#include "remoting/protocol/chromotocol_config.h"
+#include "remoting/protocol/protobuf_video_reader.h"
+#include "remoting/protocol/rtp_video_reader.h"
+
+namespace remoting {
+
+VideoReader::~VideoReader() { }
+
+// static
+VideoReader* VideoReader::Create(const ChromotocolConfig* config) {
+  const ChannelConfig& video_config = config->video_config();
+  if (video_config.transport == ChannelConfig::TRANSPORT_SRTP) {
+    return new RtpVideoReader();
+  } else if (video_config.transport == ChannelConfig::TRANSPORT_STREAM) {
+    return new ProtobufVideoReader();
+  }
+  return NULL;
+}
+
+}  // namespace remoting
diff --git a/remoting/protocol/video_reader.h b/remoting/protocol/video_reader.h
new file mode 100644
index 0000000000000..ca39619e2f4d9
--- /dev/null
+++ b/remoting/protocol/video_reader.h
@@ -0,0 +1,43 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// VideoReader is a generic interface for a video stream reader. RtpVideoReader
+// and ProtobufVideoReader implement this interface for RTP and protobuf video
+// streams. VideoReader is used by ConnectionToHost to read video stream.
+
+#ifndef REMOTING_PROTOCOL_VIDEO_READER_H_
+#define REMOTING_PROTOCOL_VIDEO_READER_H_
+
+#include "base/callback.h"
+#include "remoting/protocol/video_stub.h"
+
+namespace remoting {
+
+class ChromotocolConfig;
+class ChromotocolConnection;
+
+class VideoReader {
+ public:
+  static VideoReader* Create(const ChromotocolConfig* config);
+
+  virtual ~VideoReader();
+
+  // Initializies the reader. Doesn't take ownership of either |connection|
+  // or |video_stub|.
+  virtual void Init(ChromotocolConnection* connection,
+                    VideoStub* video_stub) = 0;
+
+  // Closes the reader. The stub should not be called after Close().
+  virtual void Close() = 0;
+
+ protected:
+  VideoReader() { }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(VideoReader);
+};
+
+}  // namespace remoting
+
+#endif  // REMOTING_PROTOCOL_VIDEO_READER_H_
diff --git a/remoting/protocol/video_stub.h b/remoting/protocol/video_stub.h
new file mode 100644
index 0000000000000..9f9657fd95603
--- /dev/null
+++ b/remoting/protocol/video_stub.h
@@ -0,0 +1,33 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_VIDEO_STUB_H_
+#define REMOTING_PROTOCOL_VIDEO_STUB_H_
+
+#include "remoting/proto/video.pb.h"
+
+class Task;
+
+namespace remoting {
+
+class VideoStub {
+ public:
+  virtual ~VideoStub() { }
+
+  // TODO(sergeyu): VideoPacket is the protobuf message that is used to send
+  // video packets in protobuf stream. It should not be used here. Add another
+  // struct and use it to represent video packets internally.
+  virtual void ProcessVideoPacket(const VideoPacket* video_packet,
+                                  Task* done) = 0;
+
+ protected:
+  VideoStub() { }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(VideoStub);
+};
+
+}  // namespace remoting
+
+#endif  // REMOTING_PROTOCOL_VIDEO_STUB_H_
diff --git a/remoting/protocol/video_writer.cc b/remoting/protocol/video_writer.cc
new file mode 100644
index 0000000000000..c45101011da84
--- /dev/null
+++ b/remoting/protocol/video_writer.cc
@@ -0,0 +1,26 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/video_writer.h"
+
+#include "remoting/protocol/chromotocol_config.h"
+#include "remoting/protocol/protobuf_video_writer.h"
+#include "remoting/protocol/rtp_video_writer.h"
+
+namespace remoting {
+
+VideoWriter::~VideoWriter() { }
+
+// static
+VideoWriter* VideoWriter::Create(const ChromotocolConfig* config) {
+  const ChannelConfig& video_config = config->video_config();
+  if (video_config.transport == ChannelConfig::TRANSPORT_SRTP) {
+    return new RtpVideoWriter();
+  } else if (video_config.transport == ChannelConfig::TRANSPORT_STREAM) {
+    return new ProtobufVideoWriter();
+  }
+  return NULL;
+}
+
+}  // namespace remoting
diff --git a/remoting/protocol/video_writer.h b/remoting/protocol/video_writer.h
new file mode 100644
index 0000000000000..9c98a8ec908f3
--- /dev/null
+++ b/remoting/protocol/video_writer.h
@@ -0,0 +1,48 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// VideoWriter is a generic interface for a video stream writer. RtpVideoWriter
+// and ProtobufVideoWriter implement this interface for RTP and protobuf video
+// streams. VideoWriter is used by ConnectionToClient to write into the video
+// stream.
+
+#ifndef REMOTING_PROTOCOL_VIDEO_WRITER_H_
+#define REMOTING_PROTOCOL_VIDEO_WRITER_H_
+
+#include "base/basictypes.h"
+#include "remoting/proto/video.pb.h"
+
+namespace remoting {
+
+class ChromotocolConfig;
+class ChromotocolConnection;
+
+// TODO(sergeyu): VideoWriter should implement VideoStub interface.
+class VideoWriter {
+ public:
+  virtual ~VideoWriter();
+
+  static VideoWriter* Create(const ChromotocolConfig* config);
+
+  // Initializes the writer.
+  virtual void Init(ChromotocolConnection* connection) = 0;
+
+  // Sends the |packet|.
+  virtual void SendPacket(const VideoPacket& packet) = 0;
+
+  // Returns number of packets currently pending in the buffer.
+  virtual int GetPendingPackets() = 0;
+
+  virtual void Close() = 0;
+
+ protected:
+  VideoWriter() { }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(VideoWriter);
+};
+
+}  // namespace remoting
+
+#endif  // REMOTING_PROTOCOL_VIDEO_WRITER_H_
diff --git a/remoting/remoting.gyp b/remoting/remoting.gyp
index dc72d8b1fae22..80d8d5d3eba70 100644
--- a/remoting/remoting.gyp
+++ b/remoting/remoting.gyp
@@ -379,10 +379,18 @@
         'protocol/jingle_chromotocol_connection.h',
         'protocol/jingle_chromotocol_server.cc',
         'protocol/jingle_chromotocol_server.h',
+        'protocol/protobuf_video_reader.cc',
+        'protocol/protobuf_video_reader.h',
+        'protocol/protobuf_video_writer.cc',
+        'protocol/protobuf_video_writer.h',
         'protocol/rtp_reader.cc',
         'protocol/rtp_reader.h',
         'protocol/rtp_utils.cc',
         'protocol/rtp_utils.h',
+        'protocol/rtp_video_reader.cc',
+        'protocol/rtp_video_reader.h',
+        'protocol/rtp_video_writer.cc',
+        'protocol/rtp_video_writer.h',
         'protocol/rtp_writer.cc',
         'protocol/rtp_writer.h',
         'protocol/socket_reader_base.cc',
@@ -391,6 +399,11 @@
         'protocol/stream_writer.h',
         'protocol/util.cc',
         'protocol/util.h',
+        'protocol/video_reader.cc',
+        'protocol/video_reader.h',
+        'protocol/video_stub.h',
+        'protocol/video_writer.cc',
+        'protocol/video_writer.h',
       ],
     },  # end of target 'chromoting_protocol'
 
-- 
GitLab