Commit c3497fcd authored by sergeyu's avatar sergeyu Committed by Commit bot

Fix BufferedSocketWriter to buffer everything before it starts writing.

Previously BufferedSocketWriter was ignoring Write() calls before
Init(). Fixed it to buffer all data in that scenario. Also renamed
Init() to Start().

Review URL: https://codereview.chromium.org/1582583003

Cr-Commit-Position: refs/heads/master@{#369355}
parent 960e27ee
......@@ -5,6 +5,7 @@
#include "remoting/base/buffered_socket_writer.h"
#include "base/bind.h"
#include "base/callback_helpers.h"
#include "base/stl_util.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
......@@ -39,7 +40,7 @@ scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket(
net::Socket* socket,
const WriteFailedCallback& write_failed_callback) {
scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter());
result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback);
result->Start(base::Bind(&WriteNetSocket, socket), write_failed_callback);
return result;
}
......@@ -49,11 +50,12 @@ BufferedSocketWriter::~BufferedSocketWriter() {
STLDeleteElements(&queue_);
}
void BufferedSocketWriter::Init(
void BufferedSocketWriter::Start(
const WriteCallback& write_callback,
const WriteFailedCallback& write_failed_callback) {
write_callback_ = write_callback;
write_failed_callback_ = write_failed_callback;
DoWrite();
}
void BufferedSocketWriter::Write(
......@@ -63,7 +65,7 @@ void BufferedSocketWriter::Write(
DCHECK(data.get());
// Don't write after error.
if (is_closed())
if (closed_)
return;
queue_.push_back(new PendingPacket(
......@@ -72,15 +74,12 @@ void BufferedSocketWriter::Write(
DoWrite();
}
bool BufferedSocketWriter::is_closed() {
return write_callback_.is_null();
}
void BufferedSocketWriter::DoWrite() {
DCHECK(thread_checker_.CalledOnValidThread());
base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
while (self && !write_pending_ && !is_closed() && !queue_.empty()) {
while (self && !write_pending_ && !write_callback_.is_null() &&
!queue_.empty()) {
int result = write_callback_.Run(
queue_.front()->data.get(), queue_.front()->data->BytesRemaining(),
base::Bind(&BufferedSocketWriter::OnWritten,
......@@ -94,11 +93,10 @@ void BufferedSocketWriter::HandleWriteResult(int result) {
if (result == net::ERR_IO_PENDING) {
write_pending_ = true;
} else {
closed_ = true;
write_callback_.Reset();
if (!write_failed_callback_.is_null()) {
WriteFailedCallback callback = write_failed_callback_;
callback.Run(result);
}
if (!write_failed_callback_.is_null())
base::ResetAndReturn(&write_failed_callback_).Run(result);
}
return;
}
......
......@@ -36,13 +36,14 @@ class BufferedSocketWriter {
BufferedSocketWriter();
virtual ~BufferedSocketWriter();
// Initializes the writer. |write_callback| is called to write data to the
// Starts the writer. |write_callback| is called to write data to the
// socket. |write_failed_callback| is called when write operation fails.
// Writing stops after the first failed write.
void Init(const WriteCallback& write_callback,
const WriteFailedCallback& write_failed_callback);
void Start(const WriteCallback& write_callback,
const WriteFailedCallback& write_failed_callback);
// Puts a new data chunk in the buffer.
// Puts a new data chunk in the buffer. If called before Start() then all data
// is buffered until Start().
void Write(const scoped_refptr<net::IOBufferWithSize>& buffer,
const base::Closure& done_task);
......@@ -53,9 +54,6 @@ class BufferedSocketWriter {
struct PendingPacket;
typedef std::list<PendingPacket*> DataQueue;
// Returns true if the writer is closed due to an error.
bool is_closed();
void DoWrite();
void HandleWriteResult(int result);
void OnWritten(int result);
......@@ -65,6 +63,8 @@ class BufferedSocketWriter {
WriteCallback write_callback_;
WriteFailedCallback write_failed_callback_;
bool closed_ = false;
DataQueue queue_;
bool write_pending_ = false;
......
......@@ -23,6 +23,13 @@ namespace {
const int kTestBufferSize = 10000;
const size_t kWriteChunkSize = 1024U;
int WriteNetSocket(net::Socket* socket,
const scoped_refptr<net::IOBuffer>& buf,
int buf_len,
const net::CompletionCallback& callback) {
return socket->Write(buf.get(), buf_len, callback);
}
class SocketDataProvider: public net::SocketDataProvider {
public:
SocketDataProvider()
......@@ -95,25 +102,26 @@ class BufferedSocketWriterTest : public testing::Test {
net::MockConnect(net::SYNCHRONOUS, net::OK));
EXPECT_EQ(net::OK, socket_->Connect(net::CompletionCallback()));
writer_ = BufferedSocketWriter::CreateForSocket(
socket_.get(), base::Bind(&BufferedSocketWriterTest::OnWriteFailed,
base::Unretained(this)));
writer_.reset(new BufferedSocketWriter());
test_buffer_ = new net::IOBufferWithSize(kTestBufferSize);
test_buffer_2_ = new net::IOBufferWithSize(kTestBufferSize);
for (int i = 0; i< kTestBufferSize; ++i) {
for (int i = 0; i < kTestBufferSize; ++i) {
test_buffer_->data()[i] = rand() % 256;
test_buffer_2_->data()[i] = rand() % 256;
}
}
void StartWriter() {
writer_->Start(base::Bind(&WriteNetSocket, socket_.get()),
base::Bind(&BufferedSocketWriterTest::OnWriteFailed,
base::Unretained(this)));
};
void OnWriteFailed(int error) {
write_error_ = error;
}
void TestWrite() {
writer_->Write(test_buffer_, base::Closure());
writer_->Write(test_buffer_2_, base::Closure());
base::RunLoop().RunUntilIdle();
void VerifyWrittenData() {
ASSERT_EQ(static_cast<size_t>(test_buffer_->size() +
test_buffer_2_->size()),
socket_data_provider_.written_data().size());
......@@ -126,22 +134,20 @@ class BufferedSocketWriterTest : public testing::Test {
test_buffer_2_->size()));
}
void TestWrite() {
writer_->Write(test_buffer_, base::Closure());
writer_->Write(test_buffer_2_, base::Closure());
base::RunLoop().RunUntilIdle();
VerifyWrittenData();
}
void TestAppendInCallback() {
writer_->Write(test_buffer_, base::Bind(
base::IgnoreResult(&BufferedSocketWriter::Write),
base::Unretained(writer_.get()), test_buffer_2_,
base::Closure()));
base::RunLoop().RunUntilIdle();
ASSERT_EQ(static_cast<size_t>(test_buffer_->size() +
test_buffer_2_->size()),
socket_data_provider_.written_data().size());
EXPECT_EQ(0, memcmp(test_buffer_->data(),
socket_data_provider_.written_data().data(),
test_buffer_->size()));
EXPECT_EQ(0, memcmp(test_buffer_2_->data(),
socket_data_provider_.written_data().data() +
test_buffer_->size(),
test_buffer_2_->size()));
VerifyWrittenData();
}
base::MessageLoop message_loop_;
......@@ -156,17 +162,20 @@ class BufferedSocketWriterTest : public testing::Test {
// Test synchronous write.
TEST_F(BufferedSocketWriterTest, WriteFull) {
StartWriter();
TestWrite();
}
// Test synchronous write in 1k chunks.
TEST_F(BufferedSocketWriterTest, WriteChunks) {
StartWriter();
socket_data_provider_.set_write_limit(kWriteChunkSize);
TestWrite();
}
// Test asynchronous write.
TEST_F(BufferedSocketWriterTest, WriteAsync) {
StartWriter();
socket_data_provider_.set_async_write(true);
socket_data_provider_.set_write_limit(kWriteChunkSize);
TestWrite();
......@@ -174,11 +183,13 @@ TEST_F(BufferedSocketWriterTest, WriteAsync) {
// Make sure we can call Write() from the done callback.
TEST_F(BufferedSocketWriterTest, AppendInCallbackSync) {
StartWriter();
TestAppendInCallback();
}
// Make sure we can call Write() from the done callback.
TEST_F(BufferedSocketWriterTest, AppendInCallbackAsync) {
StartWriter();
socket_data_provider_.set_async_write(true);
socket_data_provider_.set_write_limit(kWriteChunkSize);
TestAppendInCallback();
......@@ -186,6 +197,7 @@ TEST_F(BufferedSocketWriterTest, AppendInCallbackAsync) {
// Test that the writer can be destroyed from callback.
TEST_F(BufferedSocketWriterTest, DestroyFromCallback) {
StartWriter();
socket_data_provider_.set_async_write(true);
writer_->Write(test_buffer_, base::Bind(
&BufferedSocketWriterTest::DestroyWriter,
......@@ -204,6 +216,7 @@ TEST_F(BufferedSocketWriterTest, DestroyFromCallback) {
// Verify that it stops writing after the first error.
TEST_F(BufferedSocketWriterTest, TestWriteErrorSync) {
StartWriter();
socket_data_provider_.set_write_limit(kWriteChunkSize);
writer_->Write(test_buffer_, base::Closure());
socket_data_provider_.set_async_write(true);
......@@ -220,6 +233,7 @@ TEST_F(BufferedSocketWriterTest, TestWriteErrorSync) {
// Verify that it stops writing after the first error.
TEST_F(BufferedSocketWriterTest, TestWriteErrorAsync) {
StartWriter();
socket_data_provider_.set_write_limit(kWriteChunkSize);
writer_->Write(test_buffer_, base::Closure());
socket_data_provider_.set_async_write(true);
......@@ -233,4 +247,14 @@ TEST_F(BufferedSocketWriterTest, TestWriteErrorAsync) {
socket_data_provider_.written_data().size());
}
TEST_F(BufferedSocketWriterTest, WriteBeforeStart) {
writer_->Write(test_buffer_, base::Closure());
writer_->Write(test_buffer_2_, base::Closure());
StartWriter();
base::RunLoop().RunUntilIdle();
VerifyWrittenData();
}
} // namespace remoting
......@@ -42,7 +42,7 @@ void ChannelDispatcherBase::OnChannelReady(
channel_factory_ = nullptr;
channel_ = std::move(socket);
writer_.Init(
writer_.Start(
base::Bind(&P2PStreamSocket::Write, base::Unretained(channel_.get())),
base::Bind(&ChannelDispatcherBase::OnReadWriteFailed,
base::Unretained(this)));
......
......@@ -372,10 +372,10 @@ void ChannelMultiplexer::OnBaseChannelReady(
reader_.StartReading(base_channel_.get(),
base::Bind(&ChannelMultiplexer::OnBaseChannelError,
base::Unretained(this)));
writer_.Init(base::Bind(&P2PStreamSocket::Write,
base::Unretained(base_channel_.get())),
base::Bind(&ChannelMultiplexer::OnBaseChannelError,
base::Unretained(this)));
writer_.Start(base::Bind(&P2PStreamSocket::Write,
base::Unretained(base_channel_.get())),
base::Bind(&ChannelMultiplexer::OnBaseChannelError,
base::Unretained(this)));
}
DoCreatePendingChannels();
......
......@@ -73,7 +73,7 @@ ClientVideoDispatcherTest::ClientVideoDispatcherTest()
reader_.StartReading(&host_socket_,
base::Bind(&ClientVideoDispatcherTest::OnReadError,
base::Unretained(this)));
writer_.Init(
writer_.Start(
base::Bind(&P2PStreamSocket::Write, base::Unretained(&host_socket_)),
BufferedSocketWriter::WriteFailedCallback());
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment