Commit 5b4667bc authored by sergeyu's avatar sergeyu Committed by Commit bot

Update ICE protocol to handle closed channel

An ICE/PseudoTcp channel may be closed by the peer. Read() may returns 0
in that case. MessageReader wasn't expecting 0 and was DCHECK'ing in that
case. Also update StreamMessagePipeAdapter to handle this case as closed
MessagePipe.

BUG=700167

Review-Url: https://codereview.chromium.org/2757723002
Cr-Commit-Position: refs/heads/master@{#457842}
parent af737fca
......@@ -260,44 +260,74 @@ void DatagramConnectionTester::HandleReadResult(int result) {
}
}
class MessagePipeConnectionTester::MessageSender
: public MessagePipe::EventHandler {
public:
MessageSender(MessagePipe* pipe, int message_size, int message_count)
: pipe_(pipe),
message_size_(message_size),
message_count_(message_count) {}
void Start() { pipe_->Start(this); }
const std::vector<std::unique_ptr<VideoPacket>>& sent_messages() {
return sent_messages_;
}
// MessagePipe::EventHandler interface.
void OnMessagePipeOpen() override {
for (int i = 0; i < message_count_; ++i) {
std::unique_ptr<VideoPacket> message(new VideoPacket());
message->mutable_data()->resize(message_size_);
for (int p = 0; p < message_size_; ++p) {
message->mutable_data()[0] = static_cast<char>(i + p);
}
pipe_->Send(message.get(), base::Closure());
sent_messages_.push_back(std::move(message));
}
}
void OnMessageReceived(std::unique_ptr<CompoundBuffer> message) override {
NOTREACHED();
}
void OnMessagePipeClosed() override { NOTREACHED(); }
private:
MessagePipe* pipe_;
int message_size_;
int message_count_;
std::vector<std::unique_ptr<VideoPacket>> sent_messages_;
};
MessagePipeConnectionTester::MessagePipeConnectionTester(
MessagePipe* client_pipe,
MessagePipe* host_pipe,
MessagePipe* client_pipe,
int message_size,
int message_count)
: host_pipe_(host_pipe),
client_pipe_(client_pipe),
message_size_(message_size),
message_count_(message_count) {}
: client_pipe_(client_pipe),
sender_(new MessageSender(host_pipe, message_size, message_count)) {}
MessagePipeConnectionTester::~MessagePipeConnectionTester() {}
void MessagePipeConnectionTester::RunAndCheckResults() {
host_pipe_->Start(this);
}
void MessagePipeConnectionTester::OnMessagePipeOpen() {
for (int i = 0; i < message_count_; ++i) {
std::unique_ptr<VideoPacket> message(new VideoPacket());
message->mutable_data()->resize(message_size_);
for (int p = 0; p < message_size_; ++p) {
message->mutable_data()[0] = static_cast<char>(i + p);
}
client_pipe_->Send(message.get(), base::Closure());
sent_messages_.push_back(std::move(message));
}
sender_->Start();
client_pipe_->Start(this);
run_loop_.Run();
ASSERT_EQ(sent_messages_.size(), received_messages_.size());
for (size_t i = 0; i < sent_messages_.size(); ++i) {
EXPECT_TRUE(sent_messages_[i]->data() == received_messages_[i]->data());
ASSERT_EQ(sender_->sent_messages().size(), received_messages_.size());
for (size_t i = 0; i < sender_->sent_messages().size(); ++i) {
EXPECT_TRUE(sender_->sent_messages()[i]->data() ==
received_messages_[i]->data());
}
}
void MessagePipeConnectionTester::OnMessagePipeOpen() {}
void MessagePipeConnectionTester::OnMessageReceived(
std::unique_ptr<CompoundBuffer> message) {
received_messages_.push_back(ParseMessage<VideoPacket>(message.get()));
if (received_messages_.size() >= sent_messages_.size()) {
if (received_messages_.size() >= sender_->sent_messages().size()) {
run_loop_.Quit();
}
}
......
......@@ -111,8 +111,8 @@ class DatagramConnectionTester {
class MessagePipeConnectionTester : public MessagePipe::EventHandler {
public:
MessagePipeConnectionTester(MessagePipe* client_pipe,
MessagePipe* host_pipe,
MessagePipeConnectionTester(MessagePipe* host_pipe,
MessagePipe* client_pipe,
int message_size,
int message_count);
~MessagePipeConnectionTester() override;
......@@ -126,13 +126,13 @@ class MessagePipeConnectionTester : public MessagePipe::EventHandler {
void OnMessagePipeClosed() override;
private:
class MessageSender;
base::RunLoop run_loop_;
MessagePipe* host_pipe_;
MessagePipe* client_pipe_;
int message_size_;
int message_count_;
std::vector<std::unique_ptr<VideoPacket>> sent_messages_;
std::unique_ptr<MessageSender> sender_;
std::vector<std::unique_ptr<VideoPacket>> received_messages_;
};
......
......@@ -77,11 +77,18 @@ void FakeSession::Close(ErrorCode error) {
error_ = error;
event_handler_->OnSessionStateChange(CLOSED);
FakeSession* peer = peer_.get();
base::WeakPtr<FakeSession> peer = peer_;
if (peer) {
peer->peer_.reset();
peer_.reset();
peer->Close(error);
if (signaling_delay_.is_zero()) {
peer->Close(error);
} else {
base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
FROM_HERE, base::Bind(&FakeSession::Close, peer, error),
signaling_delay_);
}
}
}
......
......@@ -26,7 +26,7 @@ FakeStreamSocket::~FakeStreamSocket() {
EXPECT_TRUE(task_runner_->BelongsToCurrentThread());
if (peer_socket_) {
task_runner_->PostTask(
FROM_HERE, base::Bind(&FakeStreamSocket::AppendReadError, peer_socket_,
FROM_HERE, base::Bind(&FakeStreamSocket::SetReadError, peer_socket_,
net::ERR_CONNECTION_CLOSED));
}
}
......@@ -48,7 +48,7 @@ void FakeStreamSocket::AppendInputData(const std::string& data) {
}
}
void FakeStreamSocket::AppendReadError(int error) {
void FakeStreamSocket::SetReadError(int error) {
EXPECT_TRUE(task_runner_->BelongsToCurrentThread());
// Complete pending read if any.
if (!read_callback_.is_null()) {
......@@ -79,9 +79,9 @@ int FakeStreamSocket::Read(const scoped_refptr<net::IOBuffer>& buf,
memcpy(buf->data(), &(*input_data_.begin()) + input_pos_, result);
input_pos_ += result;
return result;
} else if (next_read_error_ != net::OK) {
int r = next_read_error_;
next_read_error_ = net::OK;
} else if (next_read_error_.has_value()) {
int r = next_read_error_.value();
next_read_error_.reset();
return r;
} else {
read_buffer_ = buf;
......
......@@ -11,6 +11,7 @@
#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "base/optional.h"
#include "net/base/completion_callback.h"
#include "remoting/protocol/p2p_stream_socket.h"
#include "remoting/protocol/stream_channel_factory.h"
......@@ -55,7 +56,7 @@ class FakeStreamSocket : public P2PStreamSocket {
// Causes Read() to fail with |error| once the read buffer is exhausted. If
// there is a currently pending Read, it is interrupted.
void AppendReadError(int error);
void SetReadError(int error);
// Pairs the socket with |peer_socket|. Deleting either of the paired sockets
// unpairs them.
......@@ -85,7 +86,7 @@ class FakeStreamSocket : public P2PStreamSocket {
int write_limit_ = 0;
int next_write_error_ = 0;
int next_read_error_ = 0;
base::Optional<int> next_read_error_;
scoped_refptr<net::IOBuffer> read_buffer_;
int read_buffer_size_ = 0;
net::CompletionCallback read_callback_;
......
......@@ -189,7 +189,6 @@ void IceConnectionToClient::OnIceTransportRouteChange(
void IceConnectionToClient::OnIceTransportError(ErrorCode error) {
DCHECK(thread_checker_.CalledOnValidThread());
Disconnect(error);
}
......@@ -202,8 +201,8 @@ void IceConnectionToClient::OnChannelInitialized(
void IceConnectionToClient::OnChannelClosed(
ChannelDispatcherBase* channel_dispatcher) {
// ICE transport doesn't close channels dynamically.
NOTREACHED();
DCHECK(thread_checker_.CalledOnValidThread());
Disconnect(OK);
}
void IceConnectionToClient::NotifyIfChannelsReady() {
......
......@@ -175,8 +175,7 @@ void IceConnectionToHost::OnChannelInitialized(
void IceConnectionToHost::OnChannelClosed(
ChannelDispatcherBase* channel_dispatcher) {
// ICE transport doesn't close channels dynamically.
NOTREACHED();
session_->Close(OK);
}
void IceConnectionToHost::OnVideoChannelStatus(bool active) {
......
......@@ -85,8 +85,6 @@ void MessageReader::HandleReadResult(int result, bool* read_succeeded) {
} else if (result == net::ERR_IO_PENDING) {
read_pending_ = true;
} else {
DCHECK_LT(result, 0);
// Stop reading after any error.
closed_ = true;
*read_succeeded = false;
......
......@@ -149,7 +149,7 @@ TEST_F(MessageReaderTest, TwoMessages_Separately) {
// Read() returns error.
TEST_F(MessageReaderTest, ReadError) {
socket_.AppendReadError(net::ERR_FAILED);
socket_.SetReadError(net::ERR_FAILED);
EXPECT_CALL(callback_, OnMessage()).Times(0);
......@@ -159,6 +159,18 @@ TEST_F(MessageReaderTest, ReadError) {
EXPECT_FALSE(reader_);
}
// Read() returns 0 (end of stream).
TEST_F(MessageReaderTest, EndOfStream) {
socket_.SetReadError(0);
EXPECT_CALL(callback_, OnMessage()).Times(0);
InitReader();
EXPECT_EQ(0, read_error_);
EXPECT_FALSE(reader_);
}
// Verify that we the OnMessage callback is not reentered.
TEST_F(MessageReaderTest, ReadFromCallback) {
AddMessage(kTestMessage1);
......
......@@ -22,27 +22,31 @@ namespace protocol {
StreamMessagePipeAdapter::StreamMessagePipeAdapter(
std::unique_ptr<P2PStreamSocket> socket,
const ErrorCallback& error_callback)
: socket_(std::move(socket)),
error_callback_(error_callback),
writer_(new BufferedSocketWriter()) {
: socket_(std::move(socket)), error_callback_(error_callback) {
DCHECK(socket_);
DCHECK(!error_callback_.is_null());
DCHECK(error_callback_);
}
StreamMessagePipeAdapter::~StreamMessagePipeAdapter() {}
void StreamMessagePipeAdapter::Start(EventHandler* event_handler) {
DCHECK(event_handler);
event_handler_ = event_handler;
writer_ = base::MakeUnique<BufferedSocketWriter>();
writer_->Start(
base::Bind(&P2PStreamSocket::Write, base::Unretained(socket_.get())),
base::Bind(&StreamMessagePipeAdapter::CloseOnError,
base::Unretained(this)));
}
StreamMessagePipeAdapter::~StreamMessagePipeAdapter() {}
reader_ = base::MakeUnique<MessageReader>();
reader_->StartReading(socket_.get(),
base::Bind(&EventHandler::OnMessageReceived,
base::Unretained(event_handler_)),
base::Bind(&StreamMessagePipeAdapter::CloseOnError,
base::Unretained(this)));
void StreamMessagePipeAdapter::Start(EventHandler* event_handler) {
reader_.StartReading(socket_.get(),
base::Bind(&EventHandler::OnMessageReceived,
base::Unretained(event_handler)),
base::Bind(&StreamMessagePipeAdapter::CloseOnError,
base::Unretained(this)));
event_handler->OnMessagePipeOpen();
event_handler_->OnMessagePipeOpen();
}
void StreamMessagePipeAdapter::Send(google::protobuf::MessageLite* message,
......@@ -52,11 +56,15 @@ void StreamMessagePipeAdapter::Send(google::protobuf::MessageLite* message,
}
void StreamMessagePipeAdapter::CloseOnError(int error) {
// Stop writing on error.
// Stop reading and writing on error.
writer_.reset();
reader_.reset();
if (!error_callback_.is_null())
if (error == 0) {
event_handler_->OnMessagePipeClosed();
} else if (error_callback_) {
base::ResetAndReturn(&error_callback_).Run(error);
}
}
StreamMessageChannelFactoryAdapter::StreamMessageChannelFactoryAdapter(
......
......@@ -19,7 +19,7 @@ class P2PStreamSocket;
class StreamChannelFactory;
// MessagePipe implementation that sends and receives messages over a
// P2PStreamChannel.
// P2PStreamSocket.
class StreamMessagePipeAdapter : public MessagePipe {
public:
typedef base::Callback<void(int)> ErrorCallback;
......@@ -36,10 +36,12 @@ class StreamMessagePipeAdapter : public MessagePipe {
private:
void CloseOnError(int error);
EventHandler* event_handler_ = nullptr;
std::unique_ptr<P2PStreamSocket> socket_;
ErrorCallback error_callback_;
MessageReader reader_;
std::unique_ptr<MessageReader> reader_;
std::unique_ptr<BufferedSocketWriter> writer_;
DISALLOW_COPY_AND_ASSIGN(StreamMessagePipeAdapter);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment