Commit 203832d4 authored by sergeyu's avatar sergeyu Committed by Commit bot

Cleanup channel dispatchers

Now ChannelDispatcherBase handles initialization of reader and writer,
which makes all dispatcher classes simpler. ChannelDispatcherBase now
can handle writer errors. Also renamed
ProtobufMessageReader -> ProtobufMessageParser.

Review URL: https://codereview.chromium.org/841773005

Cr-Commit-Position: refs/heads/master@{#310993}
parent f4f051fe
......@@ -13,31 +13,15 @@
namespace remoting {
namespace protocol {
AudioReader::AudioReader(AudioPacket::Encoding encoding)
AudioReader::AudioReader(AudioStub* audio_stub)
: ChannelDispatcherBase(kAudioChannelName),
encoding_(encoding),
audio_stub_(nullptr) {
parser_(base::Bind(&AudioStub::ProcessAudioPacket,
base::Unretained(audio_stub)),
reader()) {
}
AudioReader::~AudioReader() {
}
// static
scoped_ptr<AudioReader> AudioReader::Create(const SessionConfig& config) {
if (!config.is_audio_enabled())
return nullptr;
return make_scoped_ptr(new AudioReader(AudioPacket::ENCODING_RAW));
}
void AudioReader::OnInitialized() {
reader_.Init(channel(), base::Bind(&AudioReader::OnNewData,
base::Unretained(this)));
}
void AudioReader::OnNewData(scoped_ptr<AudioPacket> packet,
const base::Closure& done_task) {
audio_stub_->ProcessAudioPacket(packet.Pass(), done_task);
}
} // namespace protocol
} // namespace remoting
......@@ -5,46 +5,22 @@
#ifndef REMOTING_PROTOCOL_AUDIO_READER_H_
#define REMOTING_PROTOCOL_AUDIO_READER_H_
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "remoting/proto/audio.pb.h"
#include "remoting/protocol/audio_stub.h"
#include "remoting/protocol/message_reader.h"
#include "remoting/protocol/channel_dispatcher_base.h"
namespace net {
class StreamSocket;
} // namespace net
#include "remoting/protocol/protobuf_message_parser.h"
namespace remoting {
namespace protocol {
class Session;
class SessionConfig;
class AudioReader : public ChannelDispatcherBase {
public:
static scoped_ptr<AudioReader> Create(const SessionConfig& config);
explicit AudioReader(AudioStub* audio_stub);
~AudioReader() override;
void set_audio_stub(AudioStub* audio_stub) { audio_stub_ = audio_stub; }
protected:
void OnInitialized() override;
private:
explicit AudioReader(AudioPacket::Encoding encoding);
void OnNewData(scoped_ptr<AudioPacket> packet,
const base::Closure& done_task);
AudioPacket::Encoding encoding_;
ProtobufMessageReader<AudioPacket> reader_;
// The stub that processes all received packets.
AudioStub* audio_stub_;
ProtobufMessageParser<AudioPacket> parser_;
DISALLOW_COPY_AND_ASSIGN(AudioReader);
};
......
......@@ -22,15 +22,9 @@ AudioWriter::AudioWriter()
AudioWriter::~AudioWriter() {
}
void AudioWriter::OnInitialized() {
// TODO(sergeyu): Provide a non-null WriteFailedCallback for the writer.
buffered_writer_.Init(
channel(), BufferedSocketWriter::WriteFailedCallback());
}
void AudioWriter::ProcessAudioPacket(scoped_ptr<AudioPacket> packet,
const base::Closure& done) {
buffered_writer_.Write(SerializeAndFrameMessage(*packet), done);
writer()->Write(SerializeAndFrameMessage(*packet), done);
}
// static
......
......@@ -38,14 +38,9 @@ class AudioWriter : public ChannelDispatcherBase,
void ProcessAudioPacket(scoped_ptr<AudioPacket> packet,
const base::Closure& done) override;
protected:
void OnInitialized() override;
private:
AudioWriter();
BufferedSocketWriter buffered_writer_;
DISALLOW_COPY_AND_ASSIGN(AudioWriter);
};
......
......@@ -15,17 +15,19 @@ namespace protocol {
ChannelDispatcherBase::ChannelDispatcherBase(const char* channel_name)
: channel_name_(channel_name),
channel_factory_(nullptr) {
channel_factory_(nullptr),
event_handler_(nullptr) {
}
ChannelDispatcherBase::~ChannelDispatcherBase() {
writer()->Close();
if (channel_factory_)
channel_factory_->CancelChannelCreation(channel_name_);
}
void ChannelDispatcherBase::Init(Session* session,
const ChannelConfig& config,
const InitializedCallback& callback) {
EventHandler* event_handler) {
DCHECK(session);
switch (config.transport) {
case ChannelConfig::TRANSPORT_MUX_STREAM:
......@@ -37,12 +39,10 @@ void ChannelDispatcherBase::Init(Session* session,
break;
default:
NOTREACHED();
callback.Run(false);
return;
LOG(FATAL) << "Unknown transport type: " << config.transport;
}
initialized_callback_ = callback;
event_handler_ = event_handler;
channel_factory_->CreateChannel(channel_name_, base::Bind(
&ChannelDispatcherBase::OnChannelReady, base::Unretained(this)));
......@@ -51,16 +51,21 @@ void ChannelDispatcherBase::Init(Session* session,
void ChannelDispatcherBase::OnChannelReady(
scoped_ptr<net::StreamSocket> socket) {
if (!socket.get()) {
initialized_callback_.Run(false);
event_handler_->OnChannelError(this, CHANNEL_CONNECTION_ERROR);
return;
}
channel_factory_ = nullptr;
channel_ = socket.Pass();
writer_.Init(channel_.get(), base::Bind(&ChannelDispatcherBase::OnWriteFailed,
base::Unretained(this)));
reader_.StartReading(channel_.get());
OnInitialized();
event_handler_->OnChannelInitialized(this);
}
initialized_callback_.Run(true);
void ChannelDispatcherBase::OnWriteFailed(int error) {
event_handler_->OnChannelError(this, CHANNEL_CONNECTION_ERROR);
}
} // namespace protocol
......
......@@ -10,6 +10,9 @@
#include "base/basictypes.h"
#include "base/callback.h"
#include "base/memory/scoped_ptr.h"
#include "remoting/protocol/buffered_socket_writer.h"
#include "remoting/protocol/errors.h"
#include "remoting/protocol/message_reader.h"
namespace net {
class StreamSocket;
......@@ -28,6 +31,17 @@ class Session;
// messages.
class ChannelDispatcherBase {
public:
class EventHandler {
public:
EventHandler() {}
virtual ~EventHandler() {}
virtual void OnChannelInitialized(
ChannelDispatcherBase* channel_dispatcher) = 0;
virtual void OnChannelError(ChannelDispatcherBase* channel_dispatcher,
ErrorCode error) = 0;
};
// The callback is called when initialization is finished. The
// parameter is set to true on success.
typedef base::Callback<void(bool)> InitializedCallback;
......@@ -38,28 +52,31 @@ class ChannelDispatcherBase {
// |session|. Caller retains ownership of the Session.
void Init(Session* session,
const ChannelConfig& config,
const InitializedCallback& callback);
EventHandler* event_handler);
const std::string& channel_name() { return channel_name_; }
// Returns true if the channel is currently connected.
bool is_connected() { return channel() != nullptr; }
bool is_connected() { return channel_ != nullptr; }
protected:
explicit ChannelDispatcherBase(const char* channel_name);
net::StreamSocket* channel() { return channel_.get(); }
// Called when channel is initialized. Must be overriden in the
// child classes. Should not delete the dispatcher.
virtual void OnInitialized() = 0;
BufferedSocketWriter* writer() { return &writer_; }
MessageReader* reader() { return &reader_; }
private:
void OnChannelReady(scoped_ptr<net::StreamSocket> socket);
void OnWriteFailed(int error);
std::string channel_name_;
StreamChannelFactory* channel_factory_;
InitializedCallback initialized_callback_;
EventHandler* event_handler_;
scoped_ptr<net::StreamSocket> channel_;
BufferedSocketWriter writer_;
MessageReader reader_;
DISALLOW_COPY_AND_ASSIGN(ChannelDispatcherBase);
};
......
......@@ -348,6 +348,9 @@ ChannelMultiplexer::ChannelMultiplexer(StreamChannelFactory* factory,
: base_channel_factory_(factory),
base_channel_name_(base_channel_name),
next_channel_id_(0),
parser_(base::Bind(&ChannelMultiplexer::OnIncomingPacket,
base::Unretained(this)),
&reader_),
weak_factory_(this) {
}
......@@ -400,9 +403,7 @@ void ChannelMultiplexer::OnBaseChannelReady(
if (base_channel_.get()) {
// Initialize reader and writer.
reader_.Init(base_channel_.get(),
base::Bind(&ChannelMultiplexer::OnIncomingPacket,
base::Unretained(this)));
reader_.StartReading(base_channel_.get());
writer_.Init(base_channel_.get(),
base::Bind(&ChannelMultiplexer::OnWriteFailed,
base::Unretained(this)));
......
......@@ -9,6 +9,7 @@
#include "remoting/proto/mux.pb.h"
#include "remoting/protocol/buffered_socket_writer.h"
#include "remoting/protocol/message_reader.h"
#include "remoting/protocol/protobuf_message_parser.h"
#include "remoting/protocol/stream_channel_factory.h"
namespace remoting {
......@@ -78,7 +79,8 @@ class ChannelMultiplexer : public StreamChannelFactory {
std::map<int, MuxChannel*> channels_by_receive_id_;
BufferedSocketWriter writer_;
ProtobufMessageReader<MultiplexPacket> reader_;
MessageReader reader_;
ProtobufMessageParser<MultiplexPacket> parser_;
base::WeakPtrFactory<ChannelMultiplexer> weak_factory_;
......
......@@ -61,69 +61,65 @@ bool CursorShapeIsValid(const CursorShapeInfo& cursor_shape) {
ClientControlDispatcher::ClientControlDispatcher()
: ChannelDispatcherBase(kControlChannelName),
client_stub_(nullptr),
clipboard_stub_(nullptr) {
clipboard_stub_(nullptr),
parser_(base::Bind(&ClientControlDispatcher::OnMessageReceived,
base::Unretained(this)),
reader()) {
}
ClientControlDispatcher::~ClientControlDispatcher() {
writer_.Close();
}
void ClientControlDispatcher::OnInitialized() {
// TODO(garykac): Set write failed callback.
writer_.Init(channel(), BufferedSocketWriter::WriteFailedCallback());
reader_.Init(channel(), base::Bind(
&ClientControlDispatcher::OnMessageReceived, base::Unretained(this)));
}
void ClientControlDispatcher::InjectClipboardEvent(
const ClipboardEvent& event) {
ControlMessage message;
message.mutable_clipboard_event()->CopyFrom(event);
writer_.Write(SerializeAndFrameMessage(message), base::Closure());
writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientControlDispatcher::NotifyClientResolution(
const ClientResolution& resolution) {
ControlMessage message;
message.mutable_client_resolution()->CopyFrom(resolution);
writer_.Write(SerializeAndFrameMessage(message), base::Closure());
writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientControlDispatcher::ControlVideo(const VideoControl& video_control) {
ControlMessage message;
message.mutable_video_control()->CopyFrom(video_control);
writer_.Write(SerializeAndFrameMessage(message), base::Closure());
writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientControlDispatcher::ControlAudio(const AudioControl& audio_control) {
ControlMessage message;
message.mutable_audio_control()->CopyFrom(audio_control);
writer_.Write(SerializeAndFrameMessage(message), base::Closure());
writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientControlDispatcher::SetCapabilities(
const Capabilities& capabilities) {
ControlMessage message;
message.mutable_capabilities()->CopyFrom(capabilities);
writer_.Write(SerializeAndFrameMessage(message), base::Closure());
writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientControlDispatcher::RequestPairing(
const PairingRequest& pairing_request) {
ControlMessage message;
message.mutable_pairing_request()->CopyFrom(pairing_request);
writer_.Write(SerializeAndFrameMessage(message), base::Closure());
writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientControlDispatcher::DeliverClientMessage(
const ExtensionMessage& message) {
ControlMessage control_message;
control_message.mutable_extension_message()->CopyFrom(message);
writer_.Write(SerializeAndFrameMessage(control_message), base::Closure());
writer()->Write(SerializeAndFrameMessage(control_message), base::Closure());
}
void ClientControlDispatcher::OnMessageReceived(
scoped_ptr<ControlMessage> message, const base::Closure& done_task) {
scoped_ptr<ControlMessage> message,
const base::Closure& done_task) {
DCHECK(client_stub_);
DCHECK(clipboard_stub_);
base::ScopedClosureRunner done_runner(done_task);
......
......@@ -11,7 +11,7 @@
#include "remoting/protocol/clipboard_stub.h"
#include "remoting/protocol/cursor_shape_stub.h"
#include "remoting/protocol/host_stub.h"
#include "remoting/protocol/message_reader.h"
#include "remoting/protocol/protobuf_message_parser.h"
namespace remoting {
namespace protocol {
......@@ -51,10 +51,6 @@ class ClientControlDispatcher : public ChannelDispatcherBase,
clipboard_stub_ = clipboard_stub;
}
protected:
// ChannelDispatcherBase overrides.
void OnInitialized() override;
private:
void OnMessageReceived(scoped_ptr<ControlMessage> message,
const base::Closure& done_task);
......@@ -62,8 +58,7 @@ class ClientControlDispatcher : public ChannelDispatcherBase,
ClientStub* client_stub_;
ClipboardStub* clipboard_stub_;
ProtobufMessageReader<ControlMessage> reader_;
BufferedSocketWriter writer_;
ProtobufMessageParser<ControlMessage> parser_;
DISALLOW_COPY_AND_ASSIGN(ClientControlDispatcher);
};
......
......@@ -20,13 +20,6 @@ ClientEventDispatcher::ClientEventDispatcher()
}
ClientEventDispatcher::~ClientEventDispatcher() {
writer_.Close();
}
void ClientEventDispatcher::OnInitialized() {
// TODO(garykac): Set write failed callback.
writer_.Init(channel(),
BufferedSocketWriter::WriteFailedCallback());
}
void ClientEventDispatcher::InjectKeyEvent(const KeyEvent& event) {
......@@ -35,7 +28,7 @@ void ClientEventDispatcher::InjectKeyEvent(const KeyEvent& event) {
EventMessage message;
message.set_timestamp(base::Time::Now().ToInternalValue());
message.mutable_key_event()->CopyFrom(event);
writer_.Write(SerializeAndFrameMessage(message), base::Closure());
writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientEventDispatcher::InjectTextEvent(const TextEvent& event) {
......@@ -43,14 +36,14 @@ void ClientEventDispatcher::InjectTextEvent(const TextEvent& event) {
EventMessage message;
message.set_timestamp(base::Time::Now().ToInternalValue());
message.mutable_text_event()->CopyFrom(event);
writer_.Write(SerializeAndFrameMessage(message), base::Closure());
writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientEventDispatcher::InjectMouseEvent(const MouseEvent& event) {
EventMessage message;
message.set_timestamp(base::Time::Now().ToInternalValue());
message.mutable_mouse_event()->CopyFrom(event);
writer_.Write(SerializeAndFrameMessage(message), base::Closure());
writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
} // namespace protocol
......
......@@ -25,13 +25,7 @@ class ClientEventDispatcher : public ChannelDispatcherBase, public InputStub {
void InjectTextEvent(const TextEvent& event) override;
void InjectMouseEvent(const MouseEvent& event) override;
protected:
// ChannelDispatcherBase overrides.
void OnInitialized() override;
private:
BufferedSocketWriter writer_;
DISALLOW_COPY_AND_ASSIGN(ClientEventDispatcher);
};
......
......@@ -15,16 +15,13 @@ namespace protocol {
ClientVideoDispatcher::ClientVideoDispatcher(VideoStub* video_stub)
: ChannelDispatcherBase(kVideoChannelName),
video_stub_(video_stub) {
parser_(base::Bind(&VideoStub::ProcessVideoPacket,
base::Unretained(video_stub)),
reader()) {
}
ClientVideoDispatcher::~ClientVideoDispatcher() {
}
void ClientVideoDispatcher::OnInitialized() {
reader_.Init(channel(), base::Bind(&VideoStub::ProcessVideoPacket,
base::Unretained(video_stub_)));
}
} // namespace protocol
} // namespace remoting
......@@ -8,7 +8,7 @@
#include "base/compiler_specific.h"
#include "remoting/proto/video.pb.h"
#include "remoting/protocol/channel_dispatcher_base.h"
#include "remoting/protocol/message_reader.h"
#include "remoting/protocol/protobuf_message_parser.h"
namespace remoting {
namespace protocol {
......@@ -20,15 +20,8 @@ class ClientVideoDispatcher : public ChannelDispatcherBase {
explicit ClientVideoDispatcher(VideoStub* video_stub);
~ClientVideoDispatcher() override;
protected:
// ChannelDispatcherBase overrides.
void OnInitialized() override;
private:
ProtobufMessageReader<VideoPacket> reader_;
// The stub to which VideoPackets are passed for processing.
VideoStub* video_stub_;
ProtobufMessageParser<VideoPacket> parser_;
DISALLOW_COPY_AND_ASSIGN(ClientVideoDispatcher);
};
......
......@@ -119,34 +119,26 @@ void ConnectionToClient::OnSessionStateChange(Session::State state) {
case Session::AUTHENTICATED:
// Initialize channels.
control_dispatcher_.reset(new HostControlDispatcher());
control_dispatcher_->Init(
session_.get(), session_->config().control_config(),
base::Bind(&ConnectionToClient::OnChannelInitialized,
base::Unretained(this)));
control_dispatcher_->Init(session_.get(),
session_->config().control_config(), this);
control_dispatcher_->set_clipboard_stub(clipboard_stub_);
control_dispatcher_->set_host_stub(host_stub_);
event_dispatcher_.reset(new HostEventDispatcher());
event_dispatcher_->Init(
session_.get(), session_->config().event_config(),
base::Bind(&ConnectionToClient::OnChannelInitialized,
base::Unretained(this)));
event_dispatcher_->Init(session_.get(), session_->config().event_config(),
this);
event_dispatcher_->set_input_stub(input_stub_);
event_dispatcher_->set_event_timestamp_callback(base::Bind(
&ConnectionToClient::OnEventTimestamp, base::Unretained(this)));
video_dispatcher_.reset(new HostVideoDispatcher());
video_dispatcher_->Init(
session_.get(), session_->config().video_config(),
base::Bind(&ConnectionToClient::OnChannelInitialized,
base::Unretained(this)));
video_dispatcher_->Init(session_.get(), session_->config().video_config(),
this);
audio_writer_ = AudioWriter::Create(session_->config());
if (audio_writer_.get()) {
audio_writer_->Init(
session_.get(), session_->config().audio_config(),
base::Bind(&ConnectionToClient::OnChannelInitialized,
base::Unretained(this)));
audio_writer_->Init(session_.get(), session_->config().audio_config(),
this);
}
// Notify the handler after initializing the channels, so that
......@@ -170,28 +162,33 @@ void ConnectionToClient::OnSessionRouteChange(
handler_->OnRouteChange(this, channel_name, route);
}
void ConnectionToClient::OnChannelInitialized(bool successful) {
void ConnectionToClient::OnChannelInitialized(
ChannelDispatcherBase* channel_dispatcher) {
DCHECK(CalledOnValidThread());
if (!successful) {
LOG(ERROR) << "Failed to connect a channel";
Close(CHANNEL_CONNECTION_ERROR);
return;
}
NotifyIfChannelsReady();
}
void ConnectionToClient::OnChannelError(
ChannelDispatcherBase* channel_dispatcher,
ErrorCode error) {
DCHECK(CalledOnValidThread());
LOG(ERROR) << "Failed to connect channel "
<< channel_dispatcher->channel_name();
Close(CHANNEL_CONNECTION_ERROR);
}
void ConnectionToClient::NotifyIfChannelsReady() {
DCHECK(CalledOnValidThread());
if (!control_dispatcher_.get() || !control_dispatcher_->is_connected())
if (!control_dispatcher_ || !control_dispatcher_->is_connected())
return;
if (!event_dispatcher_.get() || !event_dispatcher_->is_connected())
if (!event_dispatcher_ || !event_dispatcher_->is_connected())
return;
if (!video_dispatcher_.get() || !video_dispatcher_->is_connected())
if (!video_dispatcher_ || !video_dispatcher_->is_connected())
return;
if ((!audio_writer_.get() || !audio_writer_->is_connected()) &&
if ((!audio_writer_ || !audio_writer_->is_connected()) &&
session_->config().is_audio_enabled()) {
return;
}
......
......@@ -31,7 +31,8 @@ class VideoStub;
// host. It sets up all protocol channels and connects them to the
// stubs.
class ConnectionToClient : public base::NonThreadSafe,
public Session::EventHandler {
public Session::EventHandler,
public ChannelDispatcherBase::EventHandler {
public:
class EventHandler {
public:
......@@ -105,10 +106,12 @@ class ConnectionToClient : public base::NonThreadSafe,
void OnSessionRouteChange(const std::string& channel_name,
const TransportRoute& route) override;
private:
// Callback for channel initialization.
void OnChannelInitialized(bool successful);
// ChannelDispatcherBase::EventHandler interface.
void OnChannelInitialized(ChannelDispatcherBase* channel_dispatcher) override;
void OnChannelError(ChannelDispatcherBase* channel_dispatcher,
ErrorCode error) override;
private:
void NotifyIfChannelsReady();
void Close(ErrorCode error);
......
......@@ -184,31 +184,24 @@ void ConnectionToHost::OnSessionStateChange(
SetState(AUTHENTICATED, OK);
control_dispatcher_.reset(new ClientControlDispatcher());
control_dispatcher_->Init(
session_.get(), session_->config().control_config(),
base::Bind(&ConnectionToHost::OnChannelInitialized,
base::Unretained(this)));
control_dispatcher_->Init(session_.get(),
session_->config().control_config(), this);
control_dispatcher_->set_client_stub(client_stub_);
control_dispatcher_->set_clipboard_stub(clipboard_stub_);