Commit e61b68d9 authored by sergeyu's avatar sergeyu Committed by Commit bot

Remove done notifications from incoming message handlers.

Previously MessageReader and ProtobufMessageParse were passing done
callbacks to the messages handlers. These callbacks were necessary to
pace the reader, particularly when video renderer is slow and cannot
keep up with the rate of the incoming messages. It's no longer
necessary because we have explicit ACK messages for video packets.

Review URL: https://codereview.chromium.org/1655433002

Cr-Commit-Position: refs/heads/master@{#372509}
parent 507b0ad3
......@@ -15,12 +15,15 @@ namespace protocol {
AudioReader::AudioReader(AudioStub* audio_stub)
: ChannelDispatcherBase(kAudioChannelName),
parser_(base::Bind(&AudioStub::ProcessAudioPacket,
base::Unretained(audio_stub)),
reader()) {
}
audio_stub_(audio_stub),
parser_(base::Bind(&AudioReader::OnAudioPacket, base::Unretained(this)),
reader()) {}
AudioReader::~AudioReader() {}
AudioReader::~AudioReader() {
void AudioReader::OnAudioPacket(scoped_ptr<AudioPacket> audio_packet) {
audio_stub_->ProcessAudioPacket(std::move(audio_packet),
base::Bind(&base::DoNothing));
}
} // namespace protocol
......
......@@ -21,6 +21,9 @@ class AudioReader : public ChannelDispatcherBase {
~AudioReader() override;
private:
void OnAudioPacket(scoped_ptr<AudioPacket> audio_packet);
AudioStub* audio_stub_;
ProtobufMessageParser<AudioPacket> parser_;
DISALLOW_COPY_AND_ASSIGN(AudioReader);
......
......@@ -30,15 +30,9 @@ const int kMaxPacketSize = 1024;
class PendingPacket {
public:
PendingPacket(scoped_ptr<MultiplexPacket> packet,
const base::Closure& done_task)
: packet(std::move(packet)),
done_task(done_task),
pos(0U) {
}
~PendingPacket() {
done_task.Run();
}
PendingPacket(scoped_ptr<MultiplexPacket> packet)
: packet(std::move(packet)) {}
~PendingPacket() {}
bool is_empty() { return pos >= packet->data().size(); }
......@@ -51,8 +45,7 @@ class PendingPacket {
private:
scoped_ptr<MultiplexPacket> packet;
base::Closure done_task;
size_t pos;
size_t pos = 0U;
DISALLOW_COPY_AND_ASSIGN(PendingPacket);
};
......@@ -82,8 +75,7 @@ class ChannelMultiplexer::MuxChannel {
// Called by ChannelMultiplexer.
scoped_ptr<P2PStreamSocket> CreateSocket();
void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet,
const base::Closure& done_task);
void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet);
void OnBaseChannelError(int error);
// Called by MuxSocket.
......@@ -164,11 +156,10 @@ scoped_ptr<P2PStreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() {
}
void ChannelMultiplexer::MuxChannel::OnIncomingPacket(
scoped_ptr<MultiplexPacket> packet,
const base::Closure& done_task) {
scoped_ptr<MultiplexPacket> packet) {
DCHECK_EQ(packet->channel_id(), receive_id_);
if (packet->data().size() > 0) {
pending_packets_.push_back(new PendingPacket(std::move(packet), done_task));
pending_packets_.push_back(new PendingPacket(std::move(packet)));
if (socket_) {
// Notify the socket that we have more data.
socket_->OnPacketReceived();
......@@ -433,12 +424,10 @@ void ChannelMultiplexer::NotifyBaseChannelError(const std::string& name,
it->second->OnBaseChannelError(error);
}
void ChannelMultiplexer::OnIncomingPacket(scoped_ptr<MultiplexPacket> packet,
const base::Closure& done_task) {
void ChannelMultiplexer::OnIncomingPacket(scoped_ptr<MultiplexPacket> packet) {
DCHECK(packet->has_channel_id());
if (!packet->has_channel_id()) {
LOG(ERROR) << "Received packet without channel_id.";
done_task.Run();
return;
}
......@@ -453,7 +442,6 @@ void ChannelMultiplexer::OnIncomingPacket(scoped_ptr<MultiplexPacket> packet,
if (!packet->has_channel_name()) {
LOG(ERROR) << "Received packet with unknown channel_id and "
"without channel_name.";
done_task.Run();
return;
}
channel = GetOrCreateChannel(packet->channel_name());
......@@ -461,7 +449,7 @@ void ChannelMultiplexer::OnIncomingPacket(scoped_ptr<MultiplexPacket> packet,
channels_by_receive_id_[receive_id] = channel;
}
channel->OnIncomingPacket(std::move(packet), done_task);
channel->OnIncomingPacket(std::move(packet));
}
void ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet,
......
......@@ -53,8 +53,7 @@ class ChannelMultiplexer : public StreamChannelFactory {
void NotifyBaseChannelError(const std::string& name, int error);
// Callback for |reader_;
void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet,
const base::Closure& done_task);
void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet);
// Called by MuxChannel.
void DoWrite(scoped_ptr<MultiplexPacket> packet,
......
......@@ -8,7 +8,6 @@
#include "base/bind_helpers.h"
#include "base/callback.h"
#include "base/callback_helpers.h"
#include "net/socket/stream_socket.h"
#include "remoting/base/constants.h"
#include "remoting/proto/control.pb.h"
......@@ -65,11 +64,9 @@ ClientControlDispatcher::ClientControlDispatcher()
clipboard_stub_(nullptr),
parser_(base::Bind(&ClientControlDispatcher::OnMessageReceived,
base::Unretained(this)),
reader()) {
}
reader()) {}
ClientControlDispatcher::~ClientControlDispatcher() {
}
ClientControlDispatcher::~ClientControlDispatcher() {}
void ClientControlDispatcher::InjectClipboardEvent(
const ClipboardEvent& event) {
......@@ -119,11 +116,9 @@ void ClientControlDispatcher::DeliverClientMessage(
}
void ClientControlDispatcher::OnMessageReceived(
scoped_ptr<ControlMessage> message,
const base::Closure& done_task) {
scoped_ptr<ControlMessage> message) {
DCHECK(client_stub_);
DCHECK(clipboard_stub_);
base::ScopedClosureRunner done_runner(done_task);
if (message->has_clipboard_event()) {
clipboard_stub_->InjectClipboardEvent(message->clipboard_event());
......
......@@ -52,8 +52,7 @@ class ClientControlDispatcher : public ChannelDispatcherBase,
}
private:
void OnMessageReceived(scoped_ptr<ControlMessage> message,
const base::Closure& done_task);
void OnMessageReceived(scoped_ptr<ControlMessage> message);
ClientStub* client_stub_;
ClipboardStub* clipboard_stub_;
......
......@@ -31,22 +31,17 @@ ClientVideoDispatcher::ClientVideoDispatcher(VideoStub* video_stub)
parser_(base::Bind(&ClientVideoDispatcher::ProcessVideoPacket,
base::Unretained(this)),
reader()),
weak_factory_(this) {
}
weak_factory_(this) {}
ClientVideoDispatcher::~ClientVideoDispatcher() {
}
ClientVideoDispatcher::~ClientVideoDispatcher() {}
void ClientVideoDispatcher::ProcessVideoPacket(
scoped_ptr<VideoPacket> video_packet,
const base::Closure& done) {
base::ScopedClosureRunner done_runner(done);
scoped_ptr<VideoPacket> video_packet) {
int frame_id = video_packet->frame_id();
if (!video_packet->has_frame_id()) {
video_stub_->ProcessVideoPacket(std::move(video_packet),
done_runner.Release());
base::Bind(&base::DoNothing));
return;
}
......
......@@ -26,8 +26,7 @@ class ClientVideoDispatcher : public ChannelDispatcherBase {
struct PendingFrame;
typedef std::list<PendingFrame> PendingFramesList;
void ProcessVideoPacket(scoped_ptr<VideoPacket> video_packet,
const base::Closure& done);
void ProcessVideoPacket(scoped_ptr<VideoPacket> video_packet);
// Callback for VideoStub::ProcessVideoPacket().
void OnPacketDone(PendingFramesList::iterator pending_frame);
......
......@@ -35,7 +35,7 @@ class ClientVideoDispatcherTest : public testing::Test,
ErrorCode error) override;
protected:
void OnVideoAck(scoped_ptr<VideoAck> ack, const base::Closure& done);
void OnVideoAck(scoped_ptr<VideoAck> ack);
void OnReadError(int error);
base::MessageLoop message_loop_;
......@@ -97,10 +97,8 @@ void ClientVideoDispatcherTest::OnChannelError(
FAIL();
}
void ClientVideoDispatcherTest::OnVideoAck(scoped_ptr<VideoAck> ack,
const base::Closure& done) {
void ClientVideoDispatcherTest::OnVideoAck(scoped_ptr<VideoAck> ack) {
ack_messages_.push_back(ack.release());
done.Run();
}
void ClientVideoDispatcherTest::OnReadError(int error) {
......
......@@ -63,12 +63,10 @@ void HostControlDispatcher::SetCursorShape(
}
void HostControlDispatcher::OnMessageReceived(
scoped_ptr<ControlMessage> message, const base::Closure& done_task) {
scoped_ptr<ControlMessage> message) {
DCHECK(clipboard_stub_);
DCHECK(host_stub_);
base::ScopedClosureRunner done_runner(done_task);
if (message->has_clipboard_event()) {
clipboard_stub_->InjectClipboardEvent(message->clipboard_event());
} else if (message->has_client_resolution()) {
......
......@@ -55,8 +55,7 @@ class HostControlDispatcher : public ChannelDispatcherBase,
void set_host_stub(HostStub* host_stub) { host_stub_ = host_stub; }
private:
void OnMessageReceived(scoped_ptr<ControlMessage> message,
const base::Closure& done_task);
void OnMessageReceived(scoped_ptr<ControlMessage> message);
ClipboardStub* clipboard_stub_;
HostStub* host_stub_;
......
......@@ -4,7 +4,6 @@
#include "remoting/protocol/host_event_dispatcher.h"
#include "base/callback_helpers.h"
#include "net/socket/stream_socket.h"
#include "remoting/base/constants.h"
#include "remoting/proto/event.pb.h"
......@@ -23,12 +22,9 @@ HostEventDispatcher::HostEventDispatcher()
HostEventDispatcher::~HostEventDispatcher() {}
void HostEventDispatcher::OnMessageReceived(scoped_ptr<EventMessage> message,
const base::Closure& done_task) {
void HostEventDispatcher::OnMessageReceived(scoped_ptr<EventMessage> message) {
DCHECK(input_stub_);
base::ScopedClosureRunner done_runner(done_task);
if (!on_input_event_callback_.is_null())
on_input_event_callback_.Run(message->timestamp());
......
......@@ -38,8 +38,7 @@ class HostEventDispatcher : public ChannelDispatcherBase {
}
private:
void OnMessageReceived(scoped_ptr<EventMessage> message,
const base::Closure& done_task);
void OnMessageReceived(scoped_ptr<EventMessage> message);
InputStub* input_stub_;
OnInputEventCallback on_input_event_callback_;
......
......@@ -20,23 +20,18 @@ HostVideoDispatcher::HostVideoDispatcher()
parser_(
base::Bind(&HostVideoDispatcher::OnVideoAck, base::Unretained(this)),
reader()),
video_feedback_stub_(nullptr) {
}
video_feedback_stub_(nullptr) {}
HostVideoDispatcher::~HostVideoDispatcher() {
}
HostVideoDispatcher::~HostVideoDispatcher() {}
void HostVideoDispatcher::ProcessVideoPacket(scoped_ptr<VideoPacket> packet,
const base::Closure& done) {
writer()->Write(SerializeAndFrameMessage(*packet), done);
}
void HostVideoDispatcher::OnVideoAck(scoped_ptr<VideoAck> ack,
const base::Closure& done) {
void HostVideoDispatcher::OnVideoAck(scoped_ptr<VideoAck> ack) {
if (video_feedback_stub_)
video_feedback_stub_->ProcessVideoAck(std::move(ack));
done.Run();
}
} // namespace protocol
......
......@@ -31,7 +31,7 @@ class HostVideoDispatcher : public ChannelDispatcherBase, public VideoStub {
const base::Closure& done) override;
private:
void OnVideoAck(scoped_ptr<VideoAck> ack, const base::Closure& done);
void OnVideoAck(scoped_ptr<VideoAck> ack);
ProtobufMessageParser<VideoAck> parser_;
......
......@@ -23,16 +23,8 @@ namespace protocol {
static const int kReadBufferSize = 4096;
MessageReader::MessageReader()
: socket_(nullptr),
read_pending_(false),
pending_messages_(0),
closed_(false),
weak_factory_(this) {
}
MessageReader::~MessageReader() {
}
MessageReader::MessageReader() : weak_factory_(this) {}
MessageReader::~MessageReader() {}
void MessageReader::SetMessageReceivedCallback(
const MessageReceivedCallback& callback) {
......@@ -57,8 +49,7 @@ void MessageReader::DoRead() {
// Don't try to read again if there is another read pending or we
// have messages that we haven't finished processing yet.
bool read_succeeded = true;
while (read_succeeded && !closed_ && !read_pending_ &&
pending_messages_ == 0) {
while (read_succeeded && !closed_ && !read_pending_) {
read_buffer_ = new net::IOBuffer(kReadBufferSize);
int result = socket_->Read(
read_buffer_.get(),
......@@ -116,30 +107,16 @@ void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) {
CompoundBuffer* buffer = message_decoder_.GetNextMessage();
if (!buffer)
break;
pending_messages_++;
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE,
base::Bind(&MessageReader::RunCallback,
weak_factory_.GetWeakPtr(),
base::Bind(&MessageReader::RunCallback, weak_factory_.GetWeakPtr(),
base::Passed(make_scoped_ptr(buffer))));
}
}
void MessageReader::RunCallback(scoped_ptr<CompoundBuffer> message) {
if (!message_received_callback_.is_null()){
message_received_callback_.Run(
std::move(message),
base::Bind(&MessageReader::OnMessageDone, weak_factory_.GetWeakPtr()));
}
}
void MessageReader::OnMessageDone() {
DCHECK(CalledOnValidThread());
pending_messages_--;
DCHECK_GE(pending_messages_, 0);
// Start next read if necessary.
DoRead();
if (!message_received_callback_.is_null())
message_received_callback_.Run(std::move(message));
}
} // namespace protocol
......
......@@ -35,7 +35,7 @@ class P2PStreamSocket;
// e.g. when we the sender sends multiple messages in one TCP packet.
class MessageReader : public base::NonThreadSafe {
public:
typedef base::Callback<void(scoped_ptr<CompoundBuffer>, const base::Closure&)>
typedef base::Callback<void(scoped_ptr<CompoundBuffer> message)>
MessageReceivedCallback;
typedef base::Callback<void(int)> ReadFailedCallback;
......@@ -55,22 +55,16 @@ class MessageReader : public base::NonThreadSafe {
void HandleReadResult(int result, bool* read_succeeded);
void OnDataReceived(net::IOBuffer* data, int data_size);
void RunCallback(scoped_ptr<CompoundBuffer> message);
void OnMessageDone();
ReadFailedCallback read_failed_callback_;
P2PStreamSocket* socket_;
P2PStreamSocket* socket_ = nullptr;
// Set to true, when we have a socket read pending, and expecting
// OnRead() to be called when new data is received.
bool read_pending_;
bool read_pending_ = false;
// Number of messages that we received, but haven't finished
// processing yet, i.e. |done_task| hasn't been called for these
// messages.
int pending_messages_;
bool closed_;
bool closed_ = false;
scoped_refptr<net::IOBuffer> read_buffer_;
MessageDecoder message_decoder_;
......
......@@ -30,41 +30,20 @@ namespace protocol {
namespace {
const char kTestMessage1[] = "Message1";
const char kTestMessage2[] = "Message2";
ACTION(CallDoneTask) {
arg0.Run();
}
} // namespace
class MockMessageReceivedCallback {
public:
MOCK_METHOD1(OnMessage, void(const base::Closure&));
MOCK_METHOD0(OnMessage, void());
};
class MessageReaderTest : public testing::Test {
public:
MessageReaderTest()
: in_callback_(false) {
}
// Following two methods are used by the ReadFromCallback test.
void AddSecondMessage(const base::Closure& task) {
AddMessage(kTestMessage2);
in_callback_ = true;
task.Run();
in_callback_ = false;
}
void OnSecondMessage(const base::Closure& task) {
EXPECT_FALSE(in_callback_);
task.Run();
}
void AddSecondMessage() { AddMessage(kTestMessage2); }
// Used by the DeleteFromCallback() test.
void DeleteReader(const base::Closure& task) {
reader_.reset();
task.Run();
}
void DeleteReader() { reader_.reset(); }
protected:
void SetUp() override {
......@@ -98,10 +77,9 @@ class MessageReaderTest : public testing::Test {
reader_.reset();
}
void OnMessage(scoped_ptr<CompoundBuffer> buffer,
const base::Closure& done_callback) {
void OnMessage(scoped_ptr<CompoundBuffer> buffer) {
messages_.push_back(buffer.release());
callback_.OnMessage(done_callback);
callback_.OnMessage();
}
base::MessageLoop message_loop_;
......@@ -110,43 +88,13 @@ class MessageReaderTest : public testing::Test {
MockMessageReceivedCallback callback_;
int read_error_ = 0;
std::vector<CompoundBuffer*> messages_;
bool in_callback_;
};
// Receive one message and process it with delay
TEST_F(MessageReaderTest, OneMessage_Delay) {
base::Closure done_task;
AddMessage(kTestMessage1);
EXPECT_CALL(callback_, OnMessage(_))
.Times(1)
.WillOnce(SaveArg<0>(&done_task));
InitReader();
base::RunLoop().RunUntilIdle();
Mock::VerifyAndClearExpectations(&callback_);
Mock::VerifyAndClearExpectations(&socket_);
EXPECT_TRUE(CompareResult(messages_[0], kTestMessage1));
// Verify that the reader starts reading again only after we've
// finished processing the previous message.
EXPECT_FALSE(socket_.read_pending());
done_task.Run();
EXPECT_TRUE(socket_.read_pending());
}
// Receive one message and process it instantly.
TEST_F(MessageReaderTest, OneMessage_Instant) {
// Receive one message.
TEST_F(MessageReaderTest, OneMessage) {
AddMessage(kTestMessage1);
EXPECT_CALL(callback_, OnMessage(_))
.Times(1)
.WillOnce(CallDoneTask());
EXPECT_CALL(callback_, OnMessage()).Times(1);
InitReader();
base::RunLoop().RunUntilIdle();
......@@ -157,16 +105,10 @@ TEST_F(MessageReaderTest, OneMessage_Instant) {
// Receive two messages in one packet.
TEST_F(MessageReaderTest, TwoMessages_Together) {
base::Closure done_task1;
base::Closure done_task2;
AddMessage(kTestMessage1);
AddMessage(kTestMessage2);
EXPECT_CALL(callback_, OnMessage(_))
.Times(2)
.WillOnce(SaveArg<0>(&done_task1))
.WillOnce(SaveArg<0>(&done_task2));
EXPECT_CALL(callback_, OnMessage()).Times(2);
InitReader();
base::RunLoop().RunUntilIdle();
......@@ -177,77 +119,15 @@ TEST_F(MessageReaderTest, TwoMessages_Together) {
EXPECT_TRUE(CompareResult(messages_[0], kTestMessage1));
EXPECT_TRUE(CompareResult(messages_[1], kTestMessage2));
// Verify that the reader starts reading again only after we've
// finished processing the previous message.
EXPECT_FALSE(socket_.read_pending());
done_task1.Run();
base::RunLoop().RunUntilIdle();
EXPECT_FALSE(socket_.read_pending());
done_task2.Run();
base::RunLoop().RunUntilIdle();
EXPECT_TRUE(socket_.read_pending());
}
// Receive two messages in one packet, and process the first one
// instantly.
TEST_F(MessageReaderTest, TwoMessages_Instant) {
base::Closure done_task2;
AddMessage(kTestMessage1);
AddMessage(kTestMessage2);
EXPECT_CALL(callback_, OnMessage(_))
.Times(2)
.WillOnce(CallDoneTask())
.WillOnce(SaveArg<0>(&done_task2));
InitReader();
base::RunLoop().RunUntilIdle();
Mock::VerifyAndClearExpectations(&callback_);
Mock::VerifyAndClearExpectations(&socket_);
EXPECT_TRUE(CompareResult(messages_[1], kTestMessage2));
// Verify that the reader starts reading again only after we've
// finished processing the second message.
EXPECT_FALSE(socket_.read_pending());
done_task2.Run();
EXPECT_TRUE(socket_.read_pending());
}
// Receive two messages in one packet, and process both of them
// instantly.
TEST_F(MessageReaderTest, TwoMessages_Instant2) {
AddMessage(kTestMessage1);
AddMessage(kTestMessage2);
EXPECT_CALL(callback_, OnMessage(_))
.Times(2)
.WillOnce(CallDoneTask())
.WillOnce(CallDoneTask());
InitReader();
base::RunLoop().RunUntilIdle();
EXPECT_TRUE(socket_.read_pending());
}
// Receive two messages in separate packets.
TEST_F(MessageReaderTest, TwoMessages_Separately) {
base::Closure done_task;
AddMessage(kTestMessage1);
EXPECT_CALL(callback_, OnMessage(_))
.Times(1)
.WillOnce(SaveArg<0>(&done_task));
EXPECT_CALL(callback_, OnMessage())
.Times(1);
InitReader();
base::RunLoop().RunUntilIdle();
......@@ -257,30 +137,16 @@ TEST_F(MessageReaderTest, TwoMessages_Separately) {
EXPECT_TRUE(CompareResult(messages_[0], kTestMessage1));
// Verify that the reader starts reading again only after we've
// finished processing the previous message.
EXPECT_FALSE(socket_.read_pending());
done_task.Run();
base::RunLoop().RunUntilIdle();
EXPECT_TRUE(socket_.read_pending());
// Write another message and verify that we receive it.
EXPECT_CALL(callback_, OnMessage(_))
.Times(1)
.WillOnce(SaveArg<0>(&done_task));
EXPECT_CALL(callback_, OnMessage())
.Times(1);
AddMessage(kTestMessage2);
base::RunLoop().RunUntilIdle();
EXPECT_TRUE(CompareResult(messages_[1], kTestMessage2));
// Verify that the reader starts reading again only after we've
// finished processing the previous message.
EXPECT_FALSE(socket_.read_pending());
done_task.Run();
EXPECT_TRUE(socket_.read_pending());
}
......@@ -288,7 +154,7 @@ TEST_F(MessageReaderTest, TwoMessages_Separately) {
TEST_F(MessageReaderTest, ReadError) {
socket_.AppendReadError(net::ERR_FAILED);
EXPECT_CALL(callback_, OnMessage(_)).Times(0);
EXPECT_CALL(callback_, OnMessage()).Times(0);
InitReader();
......@@ -300,10 +166,9 @@ TEST_F(MessageReaderTest, ReadError) {
TEST_F(MessageReaderTest, ReadFromCallback) {
AddMessage(kTestMessage1);
EXPECT_CALL(callback_, OnMessage(_))
EXPECT_CALL(callback_, OnMessage())
.Times(2)
.WillOnce(Invoke(this, &MessageReaderTest::AddSecondMessage))
.WillOnce(Invoke(this, &MessageReaderTest::OnSecondMessage));
.WillOnce(Invoke(this, &MessageReaderTest::AddSecondMessage));