Skip to content
Snippets Groups Projects
Commit da827048 authored by acolwell@chromium.org's avatar acolwell@chromium.org
Browse files

Revert 70267 - Refactor PipelineImpl to use CompositeFilter to manage Filter state transitions.

BUG=54110
TEST=media_unittests CompositeFilterTest.*

Review URL: http://codereview.chromium.org/5744002

TBR=acolwell@chromium.org
Review URL: http://codereview.chromium.org/6026013

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@70275 0039d316-1c4b-4281-b951-d872f2087c98
parent 7c8c869c
No related merge requests found
......@@ -65,7 +65,7 @@ class MediaTest : public UITest {
}
};
#if defined(OS_LINUX) || defined(OS_WIN)
#if defined(OS_LINUX)
// Test appears to be fine on linux, but let's first change to flaky and
// see how that goes.
// http://crbug.com/56364
......
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "media/base/composite_filter.h"
#include "base/stl_util-inl.h"
#include "media/base/callback.h"
namespace media {
class CompositeFilter::FilterHostImpl : public FilterHost {
public:
FilterHostImpl(CompositeFilter* parent, FilterHost* host);
FilterHost* host();
// media::FilterHost methods.
virtual void SetError(PipelineError error);
virtual base::TimeDelta GetTime() const;
virtual base::TimeDelta GetDuration() const;
virtual void SetTime(base::TimeDelta time);
virtual void SetDuration(base::TimeDelta duration);
virtual void SetBufferedTime(base::TimeDelta buffered_time);
virtual void SetTotalBytes(int64 total_bytes);
virtual void SetBufferedBytes(int64 buffered_bytes);
virtual void SetVideoSize(size_t width, size_t height);
virtual void SetStreaming(bool streaming);
virtual void NotifyEnded();
virtual void SetLoaded(bool loaded);
virtual void SetNetworkActivity(bool network_activity);
virtual void DisableAudioRenderer();
virtual void SetCurrentReadPosition(int64 offset);
virtual int64 GetCurrentReadPosition();
private:
CompositeFilter* parent_;
FilterHost* host_;
DISALLOW_COPY_AND_ASSIGN(FilterHostImpl);
};
CompositeFilter::CompositeFilter(MessageLoop* message_loop) {
Init(message_loop, NULL);
}
CompositeFilter::CompositeFilter(MessageLoop* message_loop,
ThreadFactoryFunction thread_factory) {
DCHECK(thread_factory);
Init(message_loop, thread_factory);
}
void CompositeFilter::Init(MessageLoop* message_loop,
ThreadFactoryFunction thread_factory) {
DCHECK(message_loop);
message_loop_ = message_loop;
thread_factory_ = thread_factory;
if (!thread_factory_) {
thread_factory_ = &CompositeFilter::DefaultThreadFactory;
}
state_ = kCreated;
sequence_index_ = 0;
error_ = PIPELINE_OK;
}
CompositeFilter::~CompositeFilter() {
DCHECK_EQ(message_loop_, MessageLoop::current());
DCHECK(state_ == kCreated || state_ == kStopped);
// Stop every running filter thread.
for (FilterThreadVector::iterator iter = filter_threads_.begin();
iter != filter_threads_.end();
++iter) {
(*iter)->Stop();
}
// Reset the pipeline, which will decrement a reference to this object.
// We will get destroyed as soon as the remaining tasks finish executing.
// To be safe, we'll set our pipeline reference to NULL.
filters_.clear();
STLDeleteElements(&filter_threads_);
}
bool CompositeFilter::AddFilter(scoped_refptr<Filter> filter) {
DCHECK_EQ(message_loop_, MessageLoop::current());
if (!filter.get() || state_ != kCreated || !host())
return false;
// Create a dedicated thread for this filter if applicable.
if (filter->requires_message_loop()) {
scoped_ptr<base::Thread> thread(
thread_factory_(filter->message_loop_name()));
if (!thread.get() || !thread->Start()) {
return false;
}
filter->set_message_loop(thread->message_loop());
filter_threads_.push_back(thread.release());
}
// Register ourselves as the filter's host.
filter->set_host(host_impl_.get());
filters_.push_back(make_scoped_refptr(filter.get()));
return true;
}
const char* CompositeFilter::major_mime_type() const {
return "";
}
void CompositeFilter::set_host(FilterHost* host) {
DCHECK_EQ(message_loop_, MessageLoop::current());
DCHECK(host);
DCHECK(!host_impl_.get());
host_impl_.reset(new FilterHostImpl(this, host));
}
FilterHost* CompositeFilter::host() {
return host_impl_.get() ? host_impl_->host() : NULL;
}
bool CompositeFilter::requires_message_loop() const {
return false;
}
const char* CompositeFilter::message_loop_name() const {
return "CompositeFilter";
}
void CompositeFilter::set_message_loop(MessageLoop* message_loop) {
NOTREACHED() << "Message loop should not be set.";
}
MessageLoop* CompositeFilter::message_loop() {
return NULL;
}
void CompositeFilter::Play(FilterCallback* play_callback) {
DCHECK_EQ(message_loop_, MessageLoop::current());
scoped_ptr<FilterCallback> callback(play_callback);
if (callback_.get()) {
SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
callback->Run();
return;
} else if (state_ == kPlaying) {
callback->Run();
return;
} else if (!host() || (state_ != kPaused && state_ != kCreated)) {
SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
callback->Run();
return;
}
ChangeState(kPlayPending);
callback_.reset(callback.release());
StartSerialCallSequence();
}
void CompositeFilter::Pause(FilterCallback* pause_callback) {
DCHECK_EQ(message_loop_, MessageLoop::current());
scoped_ptr<FilterCallback> callback(pause_callback);
if (callback_.get()) {
SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
callback->Run();
return;
} else if (state_ == kPaused) {
callback->Run();
return;
} else if (!host() || state_ != kPlaying) {
SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
callback->Run();
return;
}
ChangeState(kPausePending);
callback_.reset(callback.release());
StartSerialCallSequence();
}
void CompositeFilter::Flush(FilterCallback* flush_callback) {
DCHECK_EQ(message_loop_, MessageLoop::current());
scoped_ptr<FilterCallback> callback(flush_callback);
if (callback_.get()) {
SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
callback->Run();
return;
} else if (!host() || (state_ != kCreated && state_ != kPaused)) {
SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
callback->Run();
return;
}
ChangeState(kFlushPending);
callback_.reset(callback.release());
StartParallelCallSequence();
}
void CompositeFilter::Stop(FilterCallback* stop_callback) {
DCHECK_EQ(message_loop_, MessageLoop::current());
scoped_ptr<FilterCallback> callback(stop_callback);
if (!host()) {
SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
callback->Run();
return;
} else if (state_ == kStopped) {
callback->Run();
return;
}
switch(state_) {
case kError:
case kCreated:
case kPaused:
case kPlaying:
ChangeState(kStopPending);
break;
case kPlayPending:
ChangeState(kStopWhilePlayPending);
break;
case kPausePending:
ChangeState(kStopWhilePausePending);
break;
case kFlushPending:
ChangeState(kStopWhileFlushPending);
break;
case kSeekPending:
ChangeState(kStopWhileSeekPending);
break;
default:
SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
callback->Run();
return;
}
callback_.reset(callback.release());
if (state_ == kStopPending) {
StartSerialCallSequence();
}
}
void CompositeFilter::SetPlaybackRate(float playback_rate) {
DCHECK_EQ(message_loop_, MessageLoop::current());
for (FilterVector::iterator iter = filters_.begin();
iter != filters_.end();
++iter) {
(*iter)->SetPlaybackRate(playback_rate);
}
}
void CompositeFilter::Seek(base::TimeDelta time,
FilterCallback* seek_callback) {
DCHECK_EQ(message_loop_, MessageLoop::current());
scoped_ptr<FilterCallback> callback(seek_callback);
if (callback_.get()) {
SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
callback->Run();
return;
} else if (!host() || (state_ != kPaused && state_ != kCreated)) {
SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
callback->Run();
return;
}
ChangeState(kSeekPending);
callback_.reset(callback.release());
pending_seek_time_ = time;
StartSerialCallSequence();
}
void CompositeFilter::OnAudioRendererDisabled() {
DCHECK_EQ(message_loop_, MessageLoop::current());
for (FilterVector::iterator iter = filters_.begin();
iter != filters_.end();
++iter) {
(*iter)->OnAudioRendererDisabled();
}
}
base::Thread* CompositeFilter::DefaultThreadFactory(
const char* thread_name) {
return new base::Thread(thread_name);
}
void CompositeFilter::ChangeState(State new_state) {
DCHECK_EQ(message_loop_, MessageLoop::current());
state_ = new_state;
}
void CompositeFilter::StartSerialCallSequence() {
DCHECK_EQ(message_loop_, MessageLoop::current());
error_ = PIPELINE_OK;
if (filters_.size() > 0) {
sequence_index_ = 0;
CallFilter(filters_[sequence_index_],
NewThreadSafeCallback(&CompositeFilter::SerialCallback));
} else {
sequence_index_ = 0;
SerialCallback();
}
}
void CompositeFilter::StartParallelCallSequence() {
DCHECK_EQ(message_loop_, MessageLoop::current());
error_ = PIPELINE_OK;
if (filters_.size() > 0) {
sequence_index_ = 0;
for (size_t i = 0; i < filters_.size(); i++) {
CallFilter(filters_[i],
NewThreadSafeCallback(&CompositeFilter::ParallelCallback));
}
} else {
sequence_index_ = 0;
ParallelCallback();
}
}
void CompositeFilter::CallFilter(scoped_refptr<Filter>& filter,
FilterCallback* callback) {
switch(state_) {
case kPlayPending:
filter->Play(callback);
break;
case kPausePending:
filter->Pause(callback);
break;
case kFlushPending:
filter->Flush(callback);
break;
case kStopPending:
filter->Stop(callback);
break;
case kSeekPending:
filter->Seek(pending_seek_time_, callback);
break;
default:
delete callback;
ChangeState(kError);
HandleError(PIPELINE_ERROR_INVALID_STATE);
}
}
void CompositeFilter::DispatchPendingCallback() {
if (callback_.get()) {
scoped_ptr<FilterCallback> callback(callback_.release());
callback->Run();
}
}
CompositeFilter::State CompositeFilter::GetNextState(State state) const {
State ret = kInvalid;
switch (state) {
case kPlayPending:
ret = kPlaying;
break;
case kPausePending:
ret = kPaused;
case kFlushPending:
ret = kPaused;
break;
case kStopPending:
ret = kStopped;
break;
case kSeekPending:
ret = kPaused;
break;
case kStopWhilePlayPending:
case kStopWhilePausePending:
case kStopWhileFlushPending:
case kStopWhileSeekPending:
ret = kStopPending;
break;
case kInvalid:
case kCreated:
case kPlaying:
case kPaused:
case kStopped:
case kError:
ret = kInvalid;
break;
// default: intentionally left out to catch missing states.
}
return ret;
}
void CompositeFilter::SerialCallback() {
DCHECK_EQ(message_loop_, MessageLoop::current());
if (error_ != PIPELINE_OK) {
// We encountered an error. Terminate the sequence now.
ChangeState(kError);
HandleError(error_);
return;
}
if (filters_.size() > 0)
sequence_index_++;
if (sequence_index_ == filters_.size()) {
// All filters have been successfully called without error.
OnCallSequenceDone();
} else if (GetNextState(state_) == kStopPending) {
// Abort sequence early and start issuing Stop() calls.
ChangeState(kStopPending);
StartSerialCallSequence();
} else {
// We aren't done with the sequence. Call the next filter.
CallFilter(filters_[sequence_index_],
NewThreadSafeCallback(&CompositeFilter::SerialCallback));
}
}
void CompositeFilter::ParallelCallback() {
DCHECK_EQ(message_loop_, MessageLoop::current());
if (filters_.size() > 0)
sequence_index_++;
if (sequence_index_ == filters_.size()) {
if (error_ != PIPELINE_OK) {
// We encountered an error.
ChangeState(kError);
HandleError(error_);
return;
}
OnCallSequenceDone();
}
}
void CompositeFilter::OnCallSequenceDone() {
State next_state = GetNextState(state_);
if (next_state == kInvalid) {
// We somehow got into an unexpected state.
ChangeState(kError);
HandleError(PIPELINE_ERROR_INVALID_STATE);
}
ChangeState(next_state);
if (state_ == kStopPending) {
// Handle a deferred Stop().
StartSerialCallSequence();
} else {
// Call the callback to indicate that the operation has completed.
DispatchPendingCallback();
}
}
void CompositeFilter::SendErrorToHost(PipelineError error) {
if (host_impl_.get())
host_impl_.get()->host()->SetError(error);
}
void CompositeFilter::HandleError(PipelineError error) {
if (error != PIPELINE_OK) {
SendErrorToHost(error);
}
DispatchPendingCallback();
}
FilterCallback* CompositeFilter::NewThreadSafeCallback(
void (CompositeFilter::*method)()) {
return TaskToCallbackAdapter::NewCallback(
NewRunnableMethod(this,
&CompositeFilter::OnCallback,
message_loop_,
method));
}
void CompositeFilter::OnCallback(MessageLoop* message_loop,
void (CompositeFilter::*method)()) {
if (MessageLoop::current() != message_loop) {
// Posting callback to the proper thread.
message_loop->PostTask(FROM_HERE, NewRunnableMethod(this, method));
return;
}
(this->*method)();
}
bool CompositeFilter::CanForwardError() {
return (state_ == kCreated) || (state_ == kPlaying) || (state_ == kPaused);
}
void CompositeFilter::SetError(PipelineError error) {
// TODO(acolwell): Temporary hack to handle errors that occur
// during filter initialization. In this case we just forward
// the error to the host even if it is on the wrong thread. We
// have to do this because if we defer the call, we can't be
// sure the host will get the error before the "init done" callback
// is executed. This will be cleaned up when filter init is refactored.
if (state_ == kCreated) {
SendErrorToHost(error);
return;
}
if (message_loop_ != MessageLoop::current()) {
message_loop_->PostTask(FROM_HERE,
NewRunnableMethod(this, &CompositeFilter::SetError, error));
return;
}
DCHECK_EQ(message_loop_, MessageLoop::current());
// Drop errors recieved while stopping or stopped.
// This shields the owner of this object from having
// to deal with errors it can't do anything about.
if (state_ == kStopPending || state_ == kStopped)
return;
error_ = error;
if (CanForwardError())
SendErrorToHost(error);
}
CompositeFilter::FilterHostImpl::FilterHostImpl(CompositeFilter* parent,
FilterHost* host) :
parent_(parent),
host_(host) {
}
FilterHost* CompositeFilter::FilterHostImpl::host() {
return host_;
}
// media::FilterHost methods.
void CompositeFilter::FilterHostImpl::SetError(PipelineError error) {
parent_->SetError(error);
}
base::TimeDelta CompositeFilter::FilterHostImpl::GetTime() const {
return host_->GetTime();
}
base::TimeDelta CompositeFilter::FilterHostImpl::GetDuration() const {
return host_->GetDuration();
}
void CompositeFilter::FilterHostImpl::SetTime(base::TimeDelta time) {
host_->SetTime(time);
}
void CompositeFilter::FilterHostImpl::SetDuration(base::TimeDelta duration) {
host_->SetDuration(duration);
}
void CompositeFilter::FilterHostImpl::SetBufferedTime(
base::TimeDelta buffered_time) {
host_->SetBufferedTime(buffered_time);
}
void CompositeFilter::FilterHostImpl::SetTotalBytes(int64 total_bytes) {
host_->SetTotalBytes(total_bytes);
}
void CompositeFilter::FilterHostImpl::SetBufferedBytes(int64 buffered_bytes) {
host_->SetBufferedBytes(buffered_bytes);
}
void CompositeFilter::FilterHostImpl::SetVideoSize(size_t width,
size_t height) {
host_->SetVideoSize(width, height);
}
void CompositeFilter::FilterHostImpl::SetStreaming(bool streaming) {
host_->SetStreaming(streaming);
}
void CompositeFilter::FilterHostImpl::NotifyEnded() {
host_->NotifyEnded();
}
void CompositeFilter::FilterHostImpl::SetLoaded(bool loaded) {
host_->SetLoaded(loaded);
}
void CompositeFilter::FilterHostImpl::SetNetworkActivity(
bool network_activity) {
host_->SetNetworkActivity(network_activity);
}
void CompositeFilter::FilterHostImpl::DisableAudioRenderer() {
host_->DisableAudioRenderer();
}
void CompositeFilter::FilterHostImpl::SetCurrentReadPosition(int64 offset) {
host_->SetCurrentReadPosition(offset);
}
int64 CompositeFilter::FilterHostImpl::GetCurrentReadPosition() {
return host_->GetCurrentReadPosition();
}
} // namespace media
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MEDIA_BASE_COMPOSITE_FILTER_H_
#define MEDIA_BASE_COMPOSITE_FILTER_H_
#include "base/thread.h"
#include "media/base/filter_host.h"
#include "media/base/filters.h"
namespace media {
class CompositeFilter : public Filter {
public:
typedef base::Thread* (*ThreadFactoryFunction)(const char* thread_name);
CompositeFilter(MessageLoop* message_loop);
// Constructor that allows the default thread creation strategy to be
// overridden.
CompositeFilter(MessageLoop* message_loop,
ThreadFactoryFunction thread_factory);
// Adds a filter to the composite. This is only allowed after set_host()
// is called and before the first state changing operation such as Play(),
// Flush(), Stop(), or Seek(). True is returned if the filter was successfully
// added to the composite. False is returned if the filter couldn't be added
// because the composite is in the wrong state or the filter needed a thread
// and the composite was unable to create one.
bool AddFilter(scoped_refptr<Filter> filter);
// media::Filter methods.
virtual const char* major_mime_type() const;
virtual void set_host(FilterHost* host);
virtual FilterHost* host();
virtual bool requires_message_loop() const;
virtual const char* message_loop_name() const;
virtual void set_message_loop(MessageLoop* message_loop);
virtual MessageLoop* message_loop();
virtual void Play(FilterCallback* play_callback);
virtual void Pause(FilterCallback* pause_callback);
virtual void Flush(FilterCallback* flush_callback);
virtual void Stop(FilterCallback* stop_callback);
virtual void SetPlaybackRate(float playback_rate);
virtual void Seek(base::TimeDelta time, FilterCallback* seek_callback);
virtual void OnAudioRendererDisabled();
protected:
virtual ~CompositeFilter();
/// Default thread factory strategy.
static base::Thread* DefaultThreadFactory(const char* thread_name);
void SetError(PipelineError error);
private:
class FilterHostImpl;
enum State {
kInvalid,
kCreated,
kPaused,
kPlayPending,
kStopWhilePlayPending,
kPlaying,
kPausePending,
kStopWhilePausePending,
kFlushPending,
kStopWhileFlushPending,
kSeekPending,
kStopWhileSeekPending,
kStopPending,
kStopped,
kError
};
// Initialization method called by constructors.
void Init(MessageLoop* message_loop, ThreadFactoryFunction thread_factory);
// Transition to a new state.
void ChangeState(State new_state);
// Start calling filters in a sequence.
void StartSerialCallSequence();
// Call filters in parallel.
void StartParallelCallSequence();
// Call the filter based on the current value of state_.
void CallFilter(scoped_refptr<Filter>& filter, FilterCallback* callback);
// Calls |callback_| and then clears the reference.
void DispatchPendingCallback();
// Gets the state to transition to given |state|.
State GetNextState(State state) const;
// Filter callback for a serial sequence.
void SerialCallback();
// Filter callback for a parallel sequence.
void ParallelCallback();
// Called when a parallel or serial call sequence completes.
void OnCallSequenceDone();
// Helper function for sending an error to the FilterHost.
void SendErrorToHost(PipelineError error);
// Helper function for handling errors during call sequences.
void HandleError(PipelineError error);
// Creates a callback that can be called from any thread, but is guaranteed
// to call the specified method on the thread associated with this filter.
FilterCallback* NewThreadSafeCallback(void (CompositeFilter::*method)());
// Helper function used by NewThreadSafeCallback() to make sure the
// method gets called on the right thread.
void OnCallback(MessageLoop* message_loop,
void (CompositeFilter::*method)());
// Helper function that indicates whether SetError() calls can be forwarded
// to the host of this filter.
bool CanForwardError();
// Vector of threads owned by the composite and used by filters in |filters_|.
typedef std::vector<base::Thread*> FilterThreadVector;
FilterThreadVector filter_threads_;
// Vector of the filters added to the composite.
typedef std::vector<scoped_refptr<Filter> > FilterVector;
FilterVector filters_;
// Factory function used to create filter threads.
ThreadFactoryFunction thread_factory_;
// Callback for the pending request.
scoped_ptr<FilterCallback> callback_;
// Time parameter for the pending Seek() request.
base::TimeDelta pending_seek_time_;
// Current state of this filter.
State state_;
// The index of the filter currently processing a request.
unsigned int sequence_index_;
// Message loop passed into the constructor.
MessageLoop* message_loop_;
// FilterHost implementation passed to Filters owned by this
// object.
scoped_ptr<FilterHostImpl> host_impl_;
// Error passed in the last SetError() call.
PipelineError error_;
DISALLOW_COPY_AND_ASSIGN(CompositeFilter);
};
} // namespace media
#endif // MEDIA_BASE_COMPOSITE_FILTER_H_
This diff is collapsed.
......@@ -20,23 +20,4 @@ void RunStopFilterCallback(FilterCallback* callback) {
delete callback;
}
MockFilter::MockFilter() :
requires_message_loop_(false) {
}
MockFilter::MockFilter(bool requires_message_loop) :
requires_message_loop_(requires_message_loop) {
}
MockFilter::~MockFilter() {}
bool MockFilter::requires_message_loop() const {
return requires_message_loop_;
}
const char* MockFilter::message_loop_name() const {
return "MockFilter";
}
} // namespace media
......@@ -50,12 +50,7 @@ class Destroyable : public MockClass {
// gmock will track the number of times the methods are executed.
class MockFilterCallback {
public:
MockFilterCallback() : run_destroy_callback_(true) {
}
MockFilterCallback(bool run_destroy_callback) :
run_destroy_callback_(run_destroy_callback) {
}
MockFilterCallback() {}
virtual ~MockFilterCallback() {}
MOCK_METHOD0(OnCallbackDestroyed, void());
......@@ -66,7 +61,7 @@ class MockFilterCallback {
// destroyed. Clients should use NiceMock<> or StrictMock<> depending on the
// test.
FilterCallback* NewCallback() {
return new CallbackImpl(this, run_destroy_callback_);
return new CallbackImpl(this);
}
private:
......@@ -74,15 +69,12 @@ class MockFilterCallback {
// MockFilterCallback.
class CallbackImpl : public CallbackRunner<Tuple0> {
public:
explicit CallbackImpl(MockFilterCallback* mock_callback,
bool run_destroy_callback)
: mock_callback_(mock_callback),
run_destroy_callback_(run_destroy_callback) {
explicit CallbackImpl(MockFilterCallback* mock_callback)
: mock_callback_(mock_callback) {
}
virtual ~CallbackImpl() {
if (run_destroy_callback_)
mock_callback_->OnCallbackDestroyed();
mock_callback_->OnCallbackDestroyed();
}
virtual void RunWithParams(const Tuple0& params) {
......@@ -91,41 +83,13 @@ class MockFilterCallback {
private:
MockFilterCallback* mock_callback_;
bool run_destroy_callback_;
DISALLOW_COPY_AND_ASSIGN(CallbackImpl);
};
bool run_destroy_callback_;
DISALLOW_COPY_AND_ASSIGN(MockFilterCallback);
};
class MockFilter : public Filter {
public:
MockFilter();
MockFilter(bool requires_message_loop);
// Filter implementation.
virtual bool requires_message_loop() const;
virtual const char* message_loop_name() const;
MOCK_METHOD1(Play, void(FilterCallback* callback));
MOCK_METHOD1(Pause, void(FilterCallback* callback));
MOCK_METHOD1(Flush, void(FilterCallback* callback));
MOCK_METHOD1(Stop, void(FilterCallback* callback));
MOCK_METHOD1(SetPlaybackRate, void(float playback_rate));
MOCK_METHOD2(Seek, void(base::TimeDelta time, FilterCallback* callback));
MOCK_METHOD0(OnAudioRendererDisabled, void());
protected:
virtual ~MockFilter();
private:
bool requires_message_loop_;
DISALLOW_COPY_AND_ASSIGN(MockFilter);
};
class MockDataSource : public DataSource {
public:
MockDataSource() {}
......
......@@ -33,8 +33,6 @@ enum PipelineError {
PIPELINE_ERROR_COULD_NOT_RENDER,
PIPELINE_ERROR_READ,
PIPELINE_ERROR_AUDIO_HARDWARE,
PIPELINE_ERROR_OPERATION_PENDING,
PIPELINE_ERROR_INVALID_STATE,
// Demuxer related errors.
DEMUXER_ERROR_COULD_NOT_OPEN,
DEMUXER_ERROR_COULD_NOT_PARSE,
......
......@@ -22,7 +22,6 @@ class PipelineImpl::PipelineInitState {
scoped_refptr<Demuxer> demuxer_;
scoped_refptr<AudioDecoder> audio_decoder_;
scoped_refptr<VideoDecoder> video_decoder_;
scoped_refptr<CompositeFilter> composite_;
};
PipelineImpl::PipelineImpl(MessageLoop* message_loop)
......@@ -30,6 +29,7 @@ PipelineImpl::PipelineImpl(MessageLoop* message_loop)
clock_(new ClockImpl(&base::Time::Now)),
waiting_for_clock_update_(false),
state_(kCreated),
remaining_transitions_(0),
current_bytes_(0) {
ResetState();
}
......@@ -317,10 +317,6 @@ void PipelineImpl::ResetState() {
rendered_mime_types_.clear();
}
void PipelineImpl::set_state(State next_state) {
state_ = next_state;
}
bool PipelineImpl::IsPipelineOk() {
return PIPELINE_OK == GetError();
}
......@@ -576,25 +572,22 @@ void PipelineImpl::InitializeTask() {
// Just created, create data source.
if (state_ == kCreated) {
set_state(kInitDataSource);
state_ = kInitDataSource;
pipeline_init_state_.reset(new PipelineInitState());
pipeline_init_state_->composite_ = new CompositeFilter(message_loop_);
pipeline_init_state_->composite_->set_host(this);
InitializeDataSource();
return;
}
// Data source created, create demuxer.
if (state_ == kInitDataSource) {
set_state(kInitDemuxer);
state_ = kInitDemuxer;
InitializeDemuxer(pipeline_init_state_->data_source_);
return;
}
// Demuxer created, create audio decoder.
if (state_ == kInitDemuxer) {
set_state(kInitAudioDecoder);
state_ = kInitAudioDecoder;
// If this method returns false, then there's no audio stream.
if (InitializeAudioDecoder(pipeline_init_state_->demuxer_))
return;
......@@ -602,7 +595,7 @@ void PipelineImpl::InitializeTask() {
// Assuming audio decoder was created, create audio renderer.
if (state_ == kInitAudioDecoder) {
set_state(kInitAudioRenderer);
state_ = kInitAudioRenderer;
// Returns false if there's no audio stream.
if (InitializeAudioRenderer(pipeline_init_state_->audio_decoder_)) {
InsertRenderedMimeType(mime_type::kMajorTypeAudio);
......@@ -613,14 +606,14 @@ void PipelineImpl::InitializeTask() {
// Assuming audio renderer was created, create video decoder.
if (state_ == kInitAudioRenderer) {
// Then perform the stage of initialization, i.e. initialize video decoder.
set_state(kInitVideoDecoder);
state_ = kInitVideoDecoder;
if (InitializeVideoDecoder(pipeline_init_state_->demuxer_))
return;
}
// Assuming video decoder was created, create video renderer.
if (state_ == kInitVideoDecoder) {
set_state(kInitVideoRenderer);
state_ = kInitVideoRenderer;
if (InitializeVideoRenderer(pipeline_init_state_->video_decoder_)) {
InsertRenderedMimeType(mime_type::kMajorTypeVideo);
return;
......@@ -636,8 +629,6 @@ void PipelineImpl::InitializeTask() {
// Clear the collection of filters.
filter_collection_->Clear();
pipeline_filter_ = pipeline_init_state_->composite_;
// Clear init state since we're done initializing.
pipeline_init_state_.reset();
......@@ -646,11 +637,12 @@ void PipelineImpl::InitializeTask() {
PlaybackRateChangedTask(GetPlaybackRate());
VolumeChangedTask(GetVolume());
// Fire the seek request to get the filters to preroll.
// Fire the initial seek request to get the filters to preroll.
seek_pending_ = true;
set_state(kSeeking);
state_ = kSeeking;
remaining_transitions_ = filters_.size();
seek_timestamp_ = base::TimeDelta();
pipeline_filter_->Seek(seek_timestamp_,
filters_.front()->Seek(seek_timestamp_,
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
}
}
......@@ -713,7 +705,11 @@ void PipelineImpl::PlaybackRateChangedTask(float playback_rate) {
AutoLock auto_lock(lock_);
clock_->SetPlaybackRate(playback_rate);
}
pipeline_filter_->SetPlaybackRate(playback_rate);
for (FilterVector::iterator iter = filters_.begin();
iter != filters_.end();
++iter) {
(*iter)->SetPlaybackRate(playback_rate);
}
}
void PipelineImpl::VolumeChangedTask(float volume) {
......@@ -749,9 +745,10 @@ void PipelineImpl::SeekTask(base::TimeDelta time,
// kSeeking (for each filter)
// kStarting (for each filter)
// kStarted
set_state(kPausing);
state_ = kPausing;
seek_timestamp_ = time;
seek_callback_.reset(seek_callback);
remaining_transitions_ = filters_.size();
// Kick off seeking!
{
......@@ -760,7 +757,7 @@ void PipelineImpl::SeekTask(base::TimeDelta time,
if (!waiting_for_clock_update_)
clock_->Pause();
}
pipeline_filter_->Pause(
filters_.front()->Pause(
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
}
......@@ -793,7 +790,7 @@ void PipelineImpl::NotifyEndedTask() {
}
// Transition to ended, executing the callback if present.
set_state(kEnded);
state_ = kEnded;
if (ended_callback_.get()) {
ended_callback_->Run();
}
......@@ -817,7 +814,11 @@ void PipelineImpl::DisableAudioRendererTask() {
audio_disabled_ = true;
// Notify all filters of disabled audio renderer.
pipeline_filter_->OnAudioRendererDisabled();
for (FilterVector::iterator iter = filters_.begin();
iter != filters_.end();
++iter) {
(*iter)->OnAudioRendererDisabled();
}
}
void PipelineImpl::FilterStateTransitionTask() {
......@@ -836,31 +837,42 @@ void PipelineImpl::FilterStateTransitionTask() {
// Decrement the number of remaining transitions, making sure to transition
// to the next state if needed.
set_state(FindNextState(state_));
if (state_ == kSeeking) {
AutoLock auto_lock(lock_);
clock_->SetTime(seek_timestamp_);
DCHECK(remaining_transitions_ <= filters_.size());
DCHECK(remaining_transitions_ > 0u);
if (--remaining_transitions_ == 0) {
state_ = FindNextState(state_);
if (state_ == kSeeking) {
AutoLock auto_lock(lock_);
clock_->SetTime(seek_timestamp_);
}
if (TransientState(state_)) {
remaining_transitions_ = filters_.size();
}
}
// Carry out the action for the current state.
if (TransientState(state_)) {
Filter* filter = filters_[filters_.size() - remaining_transitions_];
if (state_ == kPausing) {
pipeline_filter_->Pause(
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
filter->Pause(NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else if (state_ == kFlushing) {
pipeline_filter_->Flush(
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
// We had to use parallel flushing all filters.
if (remaining_transitions_ == filters_.size()) {
for (size_t i = 0; i < filters_.size(); i++) {
filters_[i]->Flush(
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
}
}
} else if (state_ == kSeeking) {
pipeline_filter_->Seek(seek_timestamp_,
filter->Seek(seek_timestamp_,
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else if (state_ == kStarting) {
pipeline_filter_->Play(
NewCallback(this,&PipelineImpl::OnFilterStateTransition));
filter->Play(NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else if (state_ == kStopping) {
pipeline_filter_->Stop(
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
filter->Stop(NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else {
NOTREACHED() << "Unexpected state: " << state_;
NOTREACHED();
}
} else if (state_ == kStarted) {
FinishInitialization();
......@@ -885,7 +897,7 @@ void PipelineImpl::FilterStateTransitionTask() {
} else if (IsPipelineStopped()) {
FinishDestroyingFiltersTask();
} else {
NOTREACHED() << "Unexpected state: " << state_;
NOTREACHED();
}
}
......@@ -893,11 +905,24 @@ void PipelineImpl::FinishDestroyingFiltersTask() {
DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineStopped());
// Stop every running filter thread.
//
// TODO(scherkus): can we watchdog this section to detect wedged threads?
for (FilterThreadVector::iterator iter = filter_threads_.begin();
iter != filter_threads_.end();
++iter) {
(*iter)->Stop();
}
// Clear renderer references.
audio_renderer_ = NULL;
video_renderer_ = NULL;
pipeline_filter_ = NULL;
// Reset the pipeline, which will decrement a reference to this object.
// We will get destroyed as soon as the remaining tasks finish executing.
// To be safe, we'll set our pipeline reference to NULL.
filters_.clear();
STLDeleteElements(&filter_threads_);
stop_pending_ = false;
tearing_down_ = false;
......@@ -913,7 +938,7 @@ void PipelineImpl::FinishDestroyingFiltersTask() {
}
} else {
// Destroying filters due to SetError().
set_state(kError);
state_ = kError;
// If our owner has requested to be notified of an error.
if (error_callback_.get()) {
error_callback_->Run();
......@@ -922,12 +947,28 @@ void PipelineImpl::FinishDestroyingFiltersTask() {
}
bool PipelineImpl::PrepareFilter(scoped_refptr<Filter> filter) {
bool ret = pipeline_init_state_->composite_->AddFilter(filter.get());
DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineOk());
// Create a dedicated thread for this filter if applicable.
if (filter->requires_message_loop()) {
scoped_ptr<base::Thread> thread(
new base::Thread(filter->message_loop_name()));
if (!thread.get() || !thread->Start()) {
NOTREACHED() << "Could not start filter thread";
SetError(PIPELINE_ERROR_INITIALIZATION_FAILED);
return false;
}
if (!ret) {
SetError(PIPELINE_ERROR_INITIALIZATION_FAILED);
filter->set_message_loop(thread->message_loop());
filter_threads_.push_back(thread.release());
}
return ret;
// Register ourselves as the filter's host.
DCHECK(IsPipelineOk());
filter->set_host(this);
filters_.push_back(make_scoped_refptr(filter.get()));
return true;
}
void PipelineImpl::InitializeDataSource() {
......@@ -1104,21 +1145,23 @@ void PipelineImpl::TearDownPipeline() {
tearing_down_ = true;
if (IsPipelineInitializing()) {
// Make it look like initialization was successful.
pipeline_filter_ = pipeline_init_state_->composite_;
pipeline_init_state_.reset();
set_state(kStopping);
pipeline_filter_->Stop(NewCallback(
this, &PipelineImpl::OnFilterStateTransition));
// Notify the client that starting did not complete, if necessary.
FinishInitialization();
} else if (pipeline_filter_.get()) {
set_state(kPausing);
pipeline_filter_->Pause(NewCallback(
this, &PipelineImpl::OnFilterStateTransition));
}
remaining_transitions_ = filters_.size();
if (remaining_transitions_ > 0) {
if (IsPipelineInitializing()) {
state_ = kStopping;
filters_.front()->Stop(NewCallback(
this, &PipelineImpl::OnFilterStateTransition));
} else {
state_ = kPausing;
filters_.front()->Pause(NewCallback(
this, &PipelineImpl::OnFilterStateTransition));
}
} else {
set_state(kStopped);
state_ = kStopped;
message_loop_->PostTask(FROM_HERE,
NewRunnableMethod(this, &PipelineImpl::FinishDestroyingFiltersTask));
}
......
......@@ -18,7 +18,6 @@
#include "base/thread.h"
#include "base/time.h"
#include "media/base/clock.h"
#include "media/base/composite_filter.h"
#include "media/base/filter_host.h"
#include "media/base/pipeline.h"
......@@ -120,9 +119,6 @@ class PipelineImpl : public Pipeline, public FilterHost {
// is used by the constructor, and the Stop() method.
void ResetState();
// Updates |state_|. All state transitions should use this call.
void set_state(State next_state);
// Simple method used to make sure the pipeline is running normally.
bool IsPipelineOk();
......@@ -357,6 +353,12 @@ class PipelineImpl : public Pipeline, public FilterHost {
// Member that tracks the current state.
State state_;
// For kPausing, kSeeking and kStarting, we need to track how many filters
// have completed transitioning to the destination state. When
// |remaining_transitions_| reaches 0 the pipeline can transition out
// of the current state.
size_t remaining_transitions_;
// For kSeeking we need to remember where we're seeking between filter
// replies.
base::TimeDelta seek_timestamp_;
......@@ -386,14 +388,20 @@ class PipelineImpl : public Pipeline, public FilterHost {
scoped_ptr<PipelineCallback> error_callback_;
scoped_ptr<PipelineCallback> network_callback_;
// Reference to the filter(s) that constitute the pipeline.
scoped_refptr<Filter> pipeline_filter_;
// Vector of our filters and map maintaining the relationship between the
// FilterType and the filter itself.
typedef std::vector<scoped_refptr<Filter> > FilterVector;
FilterVector filters_;
// Renderer references used for setting the volume and determining
// when playback has finished.
scoped_refptr<AudioRenderer> audio_renderer_;
scoped_refptr<VideoRenderer> video_renderer_;
// Vector of threads owned by the pipeline and being used by filters.
typedef std::vector<base::Thread*> FilterThreadVector;
FilterThreadVector filter_threads_;
// Helper class that stores filter references during pipeline
// initialization.
class PipelineInitState;
......
......@@ -76,8 +76,6 @@
'base/clock.h',
'base/clock_impl.cc',
'base/clock_impl.h',
'base/composite_filter.cc',
'base/composite_filter.h',
'base/data_buffer.cc',
'base/data_buffer.h',
'base/djb2.cc',
......@@ -271,7 +269,6 @@
'audio/mac/audio_output_mac_unittest.cc',
'audio/simple_sources_unittest.cc',
'audio/win/audio_output_win_unittest.cc',
'base/composite_filter_unittest.cc',
'base/clock_impl_unittest.cc',
'base/data_buffer_unittest.cc',
'base/djb2_unittest.cc',
......
......@@ -737,8 +737,6 @@ void WebMediaPlayerImpl::OnPipelineError() {
case media::PIPELINE_ERROR_ABORT:
case media::PIPELINE_ERROR_OUT_OF_MEMORY:
case media::PIPELINE_ERROR_AUDIO_HARDWARE:
case media::PIPELINE_ERROR_OPERATION_PENDING:
case media::PIPELINE_ERROR_INVALID_STATE:
// Decode error.
SetNetworkState(WebMediaPlayer::DecodeError);
break;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment