Commit d8af2cac authored by sergeyu's avatar sergeyu Committed by Commit bot

Simplify message parsing.

Previously ProtobufMessageParser was used to parse incoming messages.
Removed it and replaced with ParseMessage() function. This allows to
simplify MessageReader and makes it possible to remove MessageReader
dependency in ChannelDispatcherBase, which will be done later.

Review URL: https://codereview.chromium.org/1654513003

Cr-Commit-Position: refs/heads/master@{#372526}
parent 82184c38
......@@ -7,6 +7,9 @@
#include "base/bind.h"
#include "net/socket/stream_socket.h"
#include "remoting/base/constants.h"
#include "remoting/proto/audio.pb.h"
#include "remoting/protocol/audio_stub.h"
#include "remoting/protocol/message_serialization.h"
#include "remoting/protocol/session.h"
#include "remoting/protocol/session_config.h"
......@@ -14,16 +17,17 @@ namespace remoting {
namespace protocol {
AudioReader::AudioReader(AudioStub* audio_stub)
: ChannelDispatcherBase(kAudioChannelName),
audio_stub_(audio_stub),
parser_(base::Bind(&AudioReader::OnAudioPacket, base::Unretained(this)),
reader()) {}
: ChannelDispatcherBase(kAudioChannelName), audio_stub_(audio_stub) {}
AudioReader::~AudioReader() {}
void AudioReader::OnAudioPacket(scoped_ptr<AudioPacket> audio_packet) {
audio_stub_->ProcessAudioPacket(std::move(audio_packet),
base::Bind(&base::DoNothing));
void AudioReader::OnIncomingMessage(scoped_ptr<CompoundBuffer> message) {
scoped_ptr<AudioPacket> audio_packet =
ParseMessage<AudioPacket>(message.get());
if (audio_packet) {
audio_stub_->ProcessAudioPacket(std::move(audio_packet),
base::Bind(&base::DoNothing));
}
}
} // namespace protocol
......
......@@ -7,24 +7,22 @@
#include "base/compiler_specific.h"
#include "base/macros.h"
#include "remoting/proto/audio.pb.h"
#include "remoting/protocol/audio_stub.h"
#include "remoting/protocol/channel_dispatcher_base.h"
#include "remoting/protocol/protobuf_message_parser.h"
namespace remoting {
namespace protocol {
class AudioStub;
class AudioReader : public ChannelDispatcherBase {
public:
explicit AudioReader(AudioStub* audio_stub);
~AudioReader() override;
private:
void OnAudioPacket(scoped_ptr<AudioPacket> audio_packet);
void OnIncomingMessage(scoped_ptr<CompoundBuffer> message) override;
AudioStub* audio_stub_;
ProtobufMessageParser<AudioPacket> parser_;
DISALLOW_COPY_AND_ASSIGN(AudioReader);
};
......
......@@ -34,5 +34,9 @@ scoped_ptr<AudioWriter> AudioWriter::Create(const SessionConfig& config) {
return make_scoped_ptr(new AudioWriter());
}
void AudioWriter::OnIncomingMessage(scoped_ptr<CompoundBuffer> message) {
LOG(ERROR) << "Received unexpected message on the audio channel.";
}
} // namespace protocol
} // namespace remoting
......@@ -40,6 +40,8 @@ class AudioWriter : public ChannelDispatcherBase,
private:
AudioWriter();
void OnIncomingMessage(scoped_ptr<CompoundBuffer> message) override;
DISALLOW_COPY_AND_ASSIGN(AudioWriter);
};
......
......@@ -47,6 +47,8 @@ void ChannelDispatcherBase::OnChannelReady(
base::Bind(&ChannelDispatcherBase::OnReadWriteFailed,
base::Unretained(this)));
reader_.StartReading(channel_.get(),
base::Bind(&ChannelDispatcherBase::OnIncomingMessage,
base::Unretained(this)),
base::Bind(&ChannelDispatcherBase::OnReadWriteFailed,
base::Unretained(this)));
......
......@@ -55,7 +55,9 @@ class ChannelDispatcherBase {
explicit ChannelDispatcherBase(const char* channel_name);
BufferedSocketWriter* writer() { return &writer_; }
MessageReader* reader() { return &reader_; }
// Child classes must override this method to handle incoming messages.
virtual void OnIncomingMessage(scoped_ptr<CompoundBuffer> message) = 0;
private:
void OnChannelReady(scoped_ptr<P2PStreamSocket> socket);
......
......@@ -305,11 +305,7 @@ ChannelMultiplexer::ChannelMultiplexer(StreamChannelFactory* factory,
: base_channel_factory_(factory),
base_channel_name_(base_channel_name),
next_channel_id_(0),
parser_(base::Bind(&ChannelMultiplexer::OnIncomingPacket,
base::Unretained(this)),
&reader_),
weak_factory_(this) {
}
weak_factory_(this) {}
ChannelMultiplexer::~ChannelMultiplexer() {
DCHECK(pending_channels_.empty());
......@@ -361,6 +357,8 @@ void ChannelMultiplexer::OnBaseChannelReady(
if (base_channel_.get()) {
// Initialize reader and writer.
reader_.StartReading(base_channel_.get(),
base::Bind(&ChannelMultiplexer::OnIncomingPacket,
base::Unretained(this)),
base::Bind(&ChannelMultiplexer::OnBaseChannelError,
base::Unretained(this)));
writer_.Start(base::Bind(&P2PStreamSocket::Write,
......@@ -424,7 +422,12 @@ void ChannelMultiplexer::NotifyBaseChannelError(const std::string& name,
it->second->OnBaseChannelError(error);
}
void ChannelMultiplexer::OnIncomingPacket(scoped_ptr<MultiplexPacket> packet) {
void ChannelMultiplexer::OnIncomingPacket(scoped_ptr<CompoundBuffer> buffer) {
scoped_ptr<MultiplexPacket> packet =
ParseMessage<MultiplexPacket>(buffer.get());
if (!packet)
return;
DCHECK(packet->has_channel_id());
if (!packet->has_channel_id()) {
LOG(ERROR) << "Received packet without channel_id.";
......
......@@ -10,7 +10,6 @@
#include "remoting/base/buffered_socket_writer.h"
#include "remoting/proto/mux.pb.h"
#include "remoting/protocol/message_reader.h"
#include "remoting/protocol/protobuf_message_parser.h"
#include "remoting/protocol/stream_channel_factory.h"
namespace remoting {
......@@ -53,7 +52,7 @@ class ChannelMultiplexer : public StreamChannelFactory {
void NotifyBaseChannelError(const std::string& name, int error);
// Callback for |reader_;
void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet);
void OnIncomingPacket(scoped_ptr<CompoundBuffer> buffer);
// Called by MuxChannel.
void DoWrite(scoped_ptr<MultiplexPacket> packet,
......@@ -81,7 +80,6 @@ class ChannelMultiplexer : public StreamChannelFactory {
BufferedSocketWriter writer_;
MessageReader reader_;
ProtobufMessageParser<MultiplexPacket> parser_;
base::WeakPtrFactory<ChannelMultiplexer> weak_factory_;
......
......@@ -59,13 +59,7 @@ bool CursorShapeIsValid(const CursorShapeInfo& cursor_shape) {
} // namespace
ClientControlDispatcher::ClientControlDispatcher()
: ChannelDispatcherBase(kControlChannelName),
client_stub_(nullptr),
clipboard_stub_(nullptr),
parser_(base::Bind(&ClientControlDispatcher::OnMessageReceived,
base::Unretained(this)),
reader()) {}
: ChannelDispatcherBase(kControlChannelName) {}
ClientControlDispatcher::~ClientControlDispatcher() {}
void ClientControlDispatcher::InjectClipboardEvent(
......@@ -115,11 +109,16 @@ void ClientControlDispatcher::DeliverClientMessage(
writer()->Write(SerializeAndFrameMessage(control_message), base::Closure());
}
void ClientControlDispatcher::OnMessageReceived(
scoped_ptr<ControlMessage> message) {
void ClientControlDispatcher::OnIncomingMessage(
scoped_ptr<CompoundBuffer> buffer) {
DCHECK(client_stub_);
DCHECK(clipboard_stub_);
scoped_ptr<ControlMessage> message =
ParseMessage<ControlMessage>(buffer.get());
if (!message)
return;
if (message->has_clipboard_event()) {
clipboard_stub_->InjectClipboardEvent(message->clipboard_event());
} else if (message->has_capabilities()) {
......
......@@ -11,7 +11,6 @@
#include "remoting/protocol/clipboard_stub.h"
#include "remoting/protocol/cursor_shape_stub.h"
#include "remoting/protocol/host_stub.h"
#include "remoting/protocol/protobuf_message_parser.h"
namespace remoting {
namespace protocol {
......@@ -52,12 +51,10 @@ class ClientControlDispatcher : public ChannelDispatcherBase,
}
private:
void OnMessageReceived(scoped_ptr<ControlMessage> message);
void OnIncomingMessage(scoped_ptr<CompoundBuffer> message) override;
ClientStub* client_stub_;
ClipboardStub* clipboard_stub_;
ProtobufMessageParser<ControlMessage> parser_;
ClientStub* client_stub_ = nullptr;
ClipboardStub* clipboard_stub_ = nullptr;
DISALLOW_COPY_AND_ASSIGN(ClientControlDispatcher);
};
......
......@@ -52,5 +52,10 @@ void ClientEventDispatcher::InjectTouchEvent(const TouchEvent& event) {
writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientEventDispatcher::OnIncomingMessage(
scoped_ptr<CompoundBuffer> message) {
LOG(ERROR) << "Received unexpected message on the event channel.";
}
} // namespace protocol
} // namespace remoting
......@@ -27,6 +27,8 @@ class ClientEventDispatcher : public ChannelDispatcherBase, public InputStub {
void InjectTouchEvent(const TouchEvent& event) override;
private:
void OnIncomingMessage(scoped_ptr<CompoundBuffer> message) override;
DISALLOW_COPY_AND_ASSIGN(ClientEventDispatcher);
};
......
......@@ -28,15 +28,16 @@ struct ClientVideoDispatcher::PendingFrame {
ClientVideoDispatcher::ClientVideoDispatcher(VideoStub* video_stub)
: ChannelDispatcherBase(kVideoChannelName),
video_stub_(video_stub),
parser_(base::Bind(&ClientVideoDispatcher::ProcessVideoPacket,
base::Unretained(this)),
reader()),
weak_factory_(this) {}
ClientVideoDispatcher::~ClientVideoDispatcher() {}
void ClientVideoDispatcher::ProcessVideoPacket(
scoped_ptr<VideoPacket> video_packet) {
void ClientVideoDispatcher::OnIncomingMessage(
scoped_ptr<CompoundBuffer> message) {
scoped_ptr<VideoPacket> video_packet =
ParseMessage<VideoPacket>(message.get());
if (!video_packet)
return;
int frame_id = video_packet->frame_id();
if (!video_packet->has_frame_id()) {
......
......@@ -10,7 +10,6 @@
#include "base/memory/weak_ptr.h"
#include "remoting/proto/video.pb.h"
#include "remoting/protocol/channel_dispatcher_base.h"
#include "remoting/protocol/protobuf_message_parser.h"
namespace remoting {
namespace protocol {
......@@ -26,7 +25,7 @@ class ClientVideoDispatcher : public ChannelDispatcherBase {
struct PendingFrame;
typedef std::list<PendingFrame> PendingFramesList;
void ProcessVideoPacket(scoped_ptr<VideoPacket> video_packet);
void OnIncomingMessage(scoped_ptr<CompoundBuffer> message) override;
// Callback for VideoStub::ProcessVideoPacket().
void OnPacketDone(PendingFramesList::iterator pending_frame);
......@@ -34,7 +33,6 @@ class ClientVideoDispatcher : public ChannelDispatcherBase {
PendingFramesList pending_frames_;
VideoStub* video_stub_;
ProtobufMessageParser<VideoPacket> parser_;
base::WeakPtrFactory<ClientVideoDispatcher> weak_factory_;
......
......@@ -35,7 +35,7 @@ class ClientVideoDispatcherTest : public testing::Test,
ErrorCode error) override;
protected:
void OnVideoAck(scoped_ptr<VideoAck> ack);
void OnMessageReceived(scoped_ptr<CompoundBuffer> buffer);
void OnReadError(int error);
base::MessageLoop message_loop_;
......@@ -50,7 +50,6 @@ class ClientVideoDispatcherTest : public testing::Test,
// Host side.
FakeStreamSocket host_socket_;
MessageReader reader_;
ProtobufMessageParser<VideoAck> parser_;
BufferedSocketWriter writer_;
ScopedVector<VideoPacket> video_packets_;
......@@ -61,16 +60,15 @@ class ClientVideoDispatcherTest : public testing::Test,
ClientVideoDispatcherTest::ClientVideoDispatcherTest()
: initialized_(false),
dispatcher_(this),
parser_(base::Bind(&ClientVideoDispatcherTest::OnVideoAck,
base::Unretained(this)),
&reader_) {
dispatcher_(this) {
dispatcher_.Init(&client_channel_factory_, this);
base::RunLoop().RunUntilIdle();
DCHECK(initialized_);
host_socket_.PairWith(
client_channel_factory_.GetFakeChannel(kVideoChannelName));
reader_.StartReading(&host_socket_,
base::Bind(&ClientVideoDispatcherTest::OnMessageReceived,
base::Unretained(this)),
base::Bind(&ClientVideoDispatcherTest::OnReadError,
base::Unretained(this)));
writer_.Start(
......@@ -97,7 +95,10 @@ void ClientVideoDispatcherTest::OnChannelError(
FAIL();
}
void ClientVideoDispatcherTest::OnVideoAck(scoped_ptr<VideoAck> ack) {
void ClientVideoDispatcherTest::OnMessageReceived(
scoped_ptr<CompoundBuffer> buffer) {
scoped_ptr<VideoAck> ack = ParseMessage<VideoAck>(buffer.get());
EXPECT_TRUE(ack);
ack_messages_.push_back(ack.release());
}
......
......@@ -17,16 +17,8 @@ namespace remoting {
namespace protocol {
HostControlDispatcher::HostControlDispatcher()
: ChannelDispatcherBase(kControlChannelName),
clipboard_stub_(nullptr),
host_stub_(nullptr),
parser_(base::Bind(&HostControlDispatcher::OnMessageReceived,
base::Unretained(this)),
reader()) {
}
HostControlDispatcher::~HostControlDispatcher() {
}
: ChannelDispatcherBase(kControlChannelName) {}
HostControlDispatcher::~HostControlDispatcher() {}
void HostControlDispatcher::SetCapabilities(
const Capabilities& capabilities) {
......@@ -62,11 +54,16 @@ void HostControlDispatcher::SetCursorShape(
writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void HostControlDispatcher::OnMessageReceived(
scoped_ptr<ControlMessage> message) {
void HostControlDispatcher::OnIncomingMessage(
scoped_ptr<CompoundBuffer> buffer) {
DCHECK(clipboard_stub_);
DCHECK(host_stub_);
scoped_ptr<ControlMessage> message =
ParseMessage<ControlMessage>(buffer.get());
if (!message)
return;
if (message->has_clipboard_event()) {
clipboard_stub_->InjectClipboardEvent(message->clipboard_event());
} else if (message->has_client_resolution()) {
......
......@@ -10,7 +10,6 @@
#include "remoting/protocol/client_stub.h"
#include "remoting/protocol/clipboard_stub.h"
#include "remoting/protocol/cursor_shape_stub.h"
#include "remoting/protocol/protobuf_message_parser.h"
namespace net {
class StreamSocket;
......@@ -19,7 +18,6 @@ class StreamSocket;
namespace remoting {
namespace protocol {
class ControlMessage;
class HostStub;
class PairingResponse;
class Session;
......@@ -55,12 +53,10 @@ class HostControlDispatcher : public ChannelDispatcherBase,
void set_host_stub(HostStub* host_stub) { host_stub_ = host_stub; }
private:
void OnMessageReceived(scoped_ptr<ControlMessage> message);
void OnIncomingMessage(scoped_ptr<CompoundBuffer> buffer) override;
ClipboardStub* clipboard_stub_;
HostStub* host_stub_;
ProtobufMessageParser<ControlMessage> parser_;
ClipboardStub* clipboard_stub_ = nullptr;
HostStub* host_stub_ = nullptr;
DISALLOW_COPY_AND_ASSIGN(HostControlDispatcher);
};
......
......@@ -9,22 +9,22 @@
#include "remoting/proto/event.pb.h"
#include "remoting/proto/internal.pb.h"
#include "remoting/protocol/input_stub.h"
#include "remoting/protocol/message_serialization.h"
namespace remoting {
namespace protocol {
HostEventDispatcher::HostEventDispatcher()
: ChannelDispatcherBase(kEventChannelName),
input_stub_(nullptr),
parser_(base::Bind(&HostEventDispatcher::OnMessageReceived,
base::Unretained(this)),
reader()) {}
: ChannelDispatcherBase(kEventChannelName) {}
HostEventDispatcher::~HostEventDispatcher() {}
void HostEventDispatcher::OnMessageReceived(scoped_ptr<EventMessage> message) {
void HostEventDispatcher::OnIncomingMessage(scoped_ptr<CompoundBuffer> buffer) {
DCHECK(input_stub_);
scoped_ptr<EventMessage> message = ParseMessage<EventMessage>(buffer.get());
if (!message)
return;
if (!on_input_event_callback_.is_null())
on_input_event_callback_.Run(message->timestamp());
......
......@@ -9,7 +9,6 @@
#include "base/macros.h"
#include "remoting/protocol/channel_dispatcher_base.h"
#include "remoting/protocol/protobuf_message_parser.h"
namespace remoting {
namespace protocol {
......@@ -38,13 +37,11 @@ class HostEventDispatcher : public ChannelDispatcherBase {
}
private:
void OnMessageReceived(scoped_ptr<EventMessage> message);
void OnIncomingMessage(scoped_ptr<CompoundBuffer> buffer) override;
InputStub* input_stub_;
InputStub* input_stub_ = nullptr;
OnInputEventCallback on_input_event_callback_;
ProtobufMessageParser<EventMessage> parser_;
DISALLOW_COPY_AND_ASSIGN(HostEventDispatcher);
};
......
......@@ -9,6 +9,7 @@
#include "base/bind.h"
#include "net/socket/stream_socket.h"
#include "remoting/base/constants.h"
#include "remoting/proto/video.pb.h"
#include "remoting/protocol/message_serialization.h"
#include "remoting/protocol/video_feedback_stub.h"
......@@ -16,12 +17,7 @@ namespace remoting {
namespace protocol {
HostVideoDispatcher::HostVideoDispatcher()
: ChannelDispatcherBase(kVideoChannelName),
parser_(
base::Bind(&HostVideoDispatcher::OnVideoAck, base::Unretained(this)),
reader()),
video_feedback_stub_(nullptr) {}
: ChannelDispatcherBase(kVideoChannelName) {}
HostVideoDispatcher::~HostVideoDispatcher() {}
void HostVideoDispatcher::ProcessVideoPacket(scoped_ptr<VideoPacket> packet,
......@@ -29,7 +25,11 @@ void HostVideoDispatcher::ProcessVideoPacket(scoped_ptr<VideoPacket> packet,
writer()->Write(SerializeAndFrameMessage(*packet), done);
}
void HostVideoDispatcher::OnVideoAck(scoped_ptr<VideoAck> ack) {
void HostVideoDispatcher::OnIncomingMessage(
scoped_ptr<CompoundBuffer> message) {
scoped_ptr<VideoAck> ack = ParseMessage<VideoAck>(message.get());
if (!ack)
return;
if (video_feedback_stub_)
video_feedback_stub_->ProcessVideoAck(std::move(ack));
}
......
......@@ -7,9 +7,7 @@
#include "base/compiler_specific.h"
#include "base/macros.h"
#include "remoting/proto/video.pb.h"
#include "remoting/protocol/channel_dispatcher_base.h"
#include "remoting/protocol/protobuf_message_parser.h"
#include "remoting/protocol/video_stub.h"
namespace remoting {
......@@ -31,11 +29,9 @@ class HostVideoDispatcher : public ChannelDispatcherBase, public VideoStub {
const base::Closure& done) override;
private:
void OnVideoAck(scoped_ptr<VideoAck> ack);
void OnIncomingMessage(scoped_ptr<CompoundBuffer> message) override;
ProtobufMessageParser<VideoAck> parser_;
VideoFeedbackStub* video_feedback_stub_;
VideoFeedbackStub* video_feedback_stub_ = nullptr;
DISALLOW_COPY_AND_ASSIGN(HostVideoDispatcher);
};
......
......@@ -20,7 +20,6 @@
#include "remoting/protocol/errors.h"
#include "remoting/protocol/ice_transport.h"
#include "remoting/protocol/input_filter.h"
#include "remoting/protocol/message_reader.h"
#include "remoting/protocol/monitored_video_stub.h"
#include "remoting/protocol/session.h"
#include "remoting/protocol/session_config.h"
......
......@@ -26,20 +26,18 @@ static const int kReadBufferSize = 4096;
MessageReader::MessageReader() : weak_factory_(this) {}
MessageReader::~MessageReader() {}
void MessageReader::SetMessageReceivedCallback(
const MessageReceivedCallback& callback) {
DCHECK(CalledOnValidThread());
message_received_callback_ = callback;
}
void MessageReader::StartReading(
P2PStreamSocket* socket,
const MessageReceivedCallback& message_received_callback,
const ReadFailedCallback& read_failed_callback) {
DCHECK(CalledOnValidThread());
DCHECK(!socket_);
DCHECK(socket);
DCHECK(!message_received_callback.is_null());
DCHECK(!read_failed_callback.is_null());
socket_ = socket;
message_received_callback_ = message_received_callback;
read_failed_callback_ = read_failed_callback;
DoRead();
}
......
......@@ -42,11 +42,9 @@ class MessageReader : public base::NonThreadSafe {
MessageReader();
virtual ~MessageReader();
// Sets the callback to be called for each incoming message.
void SetMessageReceivedCallback(const MessageReceivedCallback& callback);
// Starts reading from |socket|.
void StartReading(P2PStreamSocket* socket,
const MessageReceivedCallback& message_received_callback,
const ReadFailedCallback& read_failed_callback);
private:
......
......@@ -53,10 +53,10 @@ class MessageReaderTest : public testing::Test {
void TearDown() override { STLDeleteElements(&messages_); }
void InitReader() {
reader_->SetMessageReceivedCallback(
base::Bind(&MessageReaderTest::OnMessage, base::Unretained(this)));
reader_->StartReading(&socket_, base::Bind(&MessageReaderTest::OnReadError,
base::Unretained(this)));
reader_->StartReading(
&socket_,
base::Bind(&MessageReaderTest::OnMessage, base::Unretained(this)),
base::Bind(&MessageReaderTest::OnReadError, base::Unretained(this)));
}
void AddMessage(const std::string& message) {
......
......@@ -9,6 +9,7 @@
#define REMOTING_PROTOCOL_MESSAGE_SERIALIZATION_H_
#include "net/base/io_buffer.h"
#include "remoting/base/compound_buffer.h"
#if defined(USE_SYSTEM_PROTOBUF)
#include <google/protobuf/message_lite.h>
......@@ -19,6 +20,18 @@
namespace remoting {
namespace protocol {
template <class T>
scoped_ptr<T> ParseMessage(CompoundBuffer* buffer) {
scoped_ptr<T> message(new T());
CompoundBufferInputStream stream(buffer);
if (!message->ParseFromZeroCopyStream(&stream)) {
LOG(WARNING) << "Received message that is not a valid protocol buffer.";
return nullptr;
}
DCHECK_EQ(stream.position(), buffer->total_bytes());
return message;
}
// Serialize the Protocol Buffer message and provide sufficient framing for
// sending it over the wire.
// This will provide sufficient prefix and suffix for the receiver side to
......
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef REMOTING_PROTOCOL_PROTOBUF_MESSAGE_PARSER_H_
#define REMOTING_PROTOCOL_PROTOBUF_MESSAGE_PARSER_H_
#include <utility>
#include "base/bind.h"
#include "base/callback.h"
#include "base/memory/scoped_ptr.h"
#include "remoting/base/compound_buffer.h"
#include "remoting/protocol/message_reader.h"
namespace remoting {
namespace protocol {
// Version of MessageReader for protocol buffer messages, that parses
// each incoming message.
template <class T>
class ProtobufMessageParser {
public:
// The callback that is called when a new message is received. |done_task|
// must be called by the callback when it's done processing the |message|.
typedef typename base::Callback<void(scoped_ptr<T> message)>
MessageReceivedCallback;
// |message_reader| must outlive ProtobufMessageParser.
ProtobufMessageParser(const MessageReceivedCallback& callback,
MessageReader* message_reader)
: message_reader_(message_reader),
message_received_callback_(callback) {
message_reader->SetMessageReceivedCallback(base::Bind(
&ProtobufMessageParser<T>::OnNewData, base::Unretained(this)));
}
~ProtobufMessageParser() {
message_reader_->SetMessageReceivedCallback(
MessageReader::MessageReceivedCallback());
}
private:
void OnNewData(scoped_ptr<CompoundBuffer> buffer) {
scoped_ptr<T> message(new T());
CompoundBufferInputStream stream(buffer.get());
bool ret = message->ParseFromZeroCopyStream(&stream);
if (!ret) {
LOG(WARNING) << "Received message that is not a valid protocol buffer.";
} else {
DCHECK_EQ(stream.position(), buffer->total_bytes());
message_received_callback_.Run(std::move(message));
}
}
MessageReader* message_reader_;
MessageReceivedCallback message_received_callback_;
};
} // namespace protocol
} // namespace remoting
#endif // REMOTING_PROTOCOL_PROTOBUF_MESSAGE_PARSER_H_
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment