From c2821c1efc8afbf0ebf4fe64310e2de4899632a3 Mon Sep 17 00:00:00 2001
From: "zork@chromium.org"
 <zork@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>
Date: Fri, 11 Jun 2010 02:08:19 +0000
Subject: [PATCH] Replace changes_channel with an observer list.

BUG=none
TEST=Run unit tests

Review URL: http://codereview.chromium.org/2075012

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@49490 0039d316-1c4b-4281-b951-d872f2087c98
---
 chrome/browser/sync/engine/all_status.cc      |   7 +-
 chrome/browser/sync/engine/all_status.h       |   7 +-
 chrome/browser/sync/engine/syncapi.cc         |  29 ++--
 chrome/browser/sync/engine/syncer.cc          |  13 +-
 chrome/browser/sync/engine/syncer.h           |   2 +-
 chrome/browser/sync/engine/syncer_command.cc  |   4 +-
 .../browser/sync/engine/syncer_end_command.cc |   2 +-
 chrome/browser/sync/engine/syncer_thread.cc   |  18 +--
 chrome/browser/sync/engine/syncer_thread.h    |   7 +-
 .../sync/engine/syncer_thread_unittest.cc     |  39 +++--
 chrome/browser/sync/engine/syncer_types.h     |   9 +-
 chrome/browser/sync/engine/syncer_unittest.cc |  10 +-
 .../sync/sessions/sync_session_unittest.cc    |   4 +-
 chrome/browser/sync/syncable/syncable.cc      |  12 +-
 chrome/browser/sync/syncable/syncable.h       |  13 +-
 chrome/browser/sync/util/channel.h            | 140 ++++++++++++++++++
 chrome/browser/sync/util/channel_unittest.cc  |  32 ++++
 chrome/chrome.gyp                             |   1 +
 chrome/chrome_tests.gypi                      |   1 +
 19 files changed, 268 insertions(+), 82 deletions(-)
 create mode 100644 chrome/browser/sync/util/channel.h
 create mode 100644 chrome/browser/sync/util/channel_unittest.cc

diff --git a/chrome/browser/sync/engine/all_status.cc b/chrome/browser/sync/engine/all_status.cc
index 64ea9a71769d7..39509f4ea821b 100644
--- a/chrome/browser/sync/engine/all_status.cc
+++ b/chrome/browser/sync/engine/all_status.cc
@@ -50,6 +50,7 @@ AllStatus::AllStatus() : status_(init_status),
 }
 
 AllStatus::~AllStatus() {
+  syncer_thread_hookup_.reset();
   delete channel_;
 }
 
@@ -60,8 +61,7 @@ void AllStatus::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
 
 void AllStatus::WatchSyncerThread(SyncerThread* syncer_thread) {
   syncer_thread_hookup_.reset(
-      NewEventListenerHookup(syncer_thread->relay_channel(), this,
-                             &AllStatus::HandleSyncerEvent));
+      syncer_thread->relay_channel()->AddObserver(this));
 }
 
 AllStatus::Status AllStatus::CreateBlankStatus() const {
@@ -187,7 +187,7 @@ void AllStatus::HandleAuthWatcherEvent(const AuthWatcherEvent& auth_event) {
   }
 }
 
-void AllStatus::HandleSyncerEvent(const SyncerEvent& event) {
+void AllStatus::HandleChannelEvent(const SyncerEvent& event) {
   ScopedStatusLockWithNotify lock(this);
   switch (event.what_happened) {
     case SyncerEvent::COMMITS_SUCCEEDED:
@@ -200,6 +200,7 @@ void AllStatus::HandleSyncerEvent(const SyncerEvent& event) {
       // We're safe to use this value here because we don't call into the syncer
       // or block on any processes.
       lock.set_notify_plan(DONT_NOTIFY);
+      syncer_thread_hookup_.reset();
       break;
     case SyncerEvent::OVER_QUOTA:
       LOG(WARNING) << "User has gone over quota.";
diff --git a/chrome/browser/sync/engine/all_status.h b/chrome/browser/sync/engine/all_status.h
index d34a15a8f4d0d..63edd17805588 100644
--- a/chrome/browser/sync/engine/all_status.h
+++ b/chrome/browser/sync/engine/all_status.h
@@ -13,6 +13,7 @@
 #include "base/atomicops.h"
 #include "base/lock.h"
 #include "base/scoped_ptr.h"
+#include "chrome/browser/sync/util/channel.h"
 #include "chrome/common/deprecated/event_sys.h"
 
 namespace browser_sync {
@@ -26,7 +27,7 @@ struct AuthWatcherEvent;
 struct ServerConnectionEvent;
 struct SyncerEvent;
 
-class AllStatus {
+class AllStatus : public ChannelEventHandler<SyncerEvent> {
   friend class ScopedStatusLockWithNotify;
  public:
   typedef EventChannel<AllStatusEvent, Lock> Channel;
@@ -97,7 +98,7 @@ class AllStatus {
   void HandleAuthWatcherEvent(const AuthWatcherEvent& event);
 
   void WatchSyncerThread(SyncerThread* syncer_thread);
-  void HandleSyncerEvent(const SyncerEvent& event);
+  void HandleChannelEvent(const SyncerEvent& event);
 
   // Returns a string description of the SyncStatus (currently just the ascii
   // version of the enum). Will LOG(FATAL) if the status us out of range.
@@ -134,7 +135,7 @@ class AllStatus {
   Status status_;
   Channel* const channel_;
   scoped_ptr<EventListenerHookup> conn_mgr_hookup_;
-  scoped_ptr<EventListenerHookup> syncer_thread_hookup_;
+  scoped_ptr<ChannelHookup<SyncerEvent> > syncer_thread_hookup_;
   scoped_ptr<EventListenerHookup> diskfull_hookup_;
   scoped_ptr<EventListenerHookup> talk_mediator_hookup_;
 
diff --git a/chrome/browser/sync/engine/syncapi.cc b/chrome/browser/sync/engine/syncapi.cc
index d4867e6103325..ad6d43801876e 100644
--- a/chrome/browser/sync/engine/syncapi.cc
+++ b/chrome/browser/sync/engine/syncapi.cc
@@ -796,7 +796,9 @@ class BridgedGaiaAuthenticator : public gaia::GaiaAuthenticator {
 // SyncManager's implementation: SyncManager::SyncInternal
 class SyncManager::SyncInternal
     : public net::NetworkChangeNotifier::Observer,
-      public TalkMediator::Delegate {
+      public TalkMediator::Delegate,
+      public browser_sync::ChannelEventHandler<syncable::DirectoryChangeEvent>,
+      public browser_sync::ChannelEventHandler<SyncerEvent>{
   static const int kDefaultNudgeDelayMilliseconds;
   static const int kPreferencesNudgeDelayMilliseconds;
  public:
@@ -864,7 +866,7 @@ class SyncManager::SyncInternal
   // This listener is called upon completion of a syncable transaction, and
   // builds the list of sync-engine initiated changes that will be forwarded to
   // the SyncManager's Observers.
-  void HandleChangeEvent(const syncable::DirectoryChangeEvent& event);
+  virtual void HandleChannelEvent(const syncable::DirectoryChangeEvent& event);
   void HandleTransactionCompleteChangeEvent(
       const syncable::DirectoryChangeEvent& event);
   void HandleCalculateChangesChangeEventFromSyncApi(
@@ -873,7 +875,7 @@ class SyncManager::SyncInternal
       const syncable::DirectoryChangeEvent& event);
 
   // This listener is called by the syncer channel for all syncer events.
-  void HandleSyncerEvent(const SyncerEvent& event);
+  virtual void HandleChannelEvent(const SyncerEvent& event);
 
   // We have a direct hookup to the authwatcher to be notified for auth failures
   // on startup, to serve our UI needs.
@@ -1087,10 +1089,11 @@ class SyncManager::SyncInternal
   ChangeReorderBuffer change_buffers_[syncable::MODEL_TYPE_COUNT];
 
   // The event listener hookup that is registered for HandleChangeEvent.
-  scoped_ptr<EventListenerHookup> dir_change_hookup_;
+  scoped_ptr<browser_sync::ChannelHookup<syncable::DirectoryChangeEvent> >
+      dir_change_hookup_;
 
   // The event listener hookup registered for HandleSyncerEvent.
-  scoped_ptr<EventListenerHookup> syncer_event_;
+  scoped_ptr<browser_sync::ChannelHookup<SyncerEvent> > syncer_event_;
 
   // The event listener hookup registered for HandleAuthWatcherEvent.
   scoped_ptr<EventListenerHookup> authwatcher_hookup_;
@@ -1294,9 +1297,7 @@ bool SyncManager::SyncInternal::Init(
   allstatus_.WatchSyncerThread(syncer_thread());
 
   // Subscribe to the syncer thread's channel.
-  syncer_event_.reset(
-      NewEventListenerHookup(syncer_thread()->relay_channel(), this,
-          &SyncInternal::HandleSyncerEvent));
+  syncer_event_.reset(syncer_thread()->relay_channel()->AddObserver(this));
 
   bool attempting_auth = false;
   std::string username, auth_token;
@@ -1521,7 +1522,7 @@ void SyncManager::SyncInternal::OnIPAddressChanged() {
 // Listen to model changes, filter out ones initiated by the sync API, and
 // saves the rest (hopefully just backend Syncer changes resulting from
 // ApplyUpdates) to data_->changelist.
-void SyncManager::SyncInternal::HandleChangeEvent(
+void SyncManager::SyncInternal::HandleChannelEvent(
     const syncable::DirectoryChangeEvent& event) {
   if (event.todo == syncable::DirectoryChangeEvent::TRANSACTION_COMPLETE) {
     HandleTransactionCompleteChangeEvent(event);
@@ -1694,7 +1695,7 @@ SyncManager::Status SyncManager::SyncInternal::ComputeAggregatedStatus() {
   return return_status;
 }
 
-void SyncManager::SyncInternal::HandleSyncerEvent(const SyncerEvent& event) {
+void SyncManager::SyncInternal::HandleChannelEvent(const SyncerEvent& event) {
   if (!initialized()) {
     // This could be the first time that the syncer has completed a full
     // download; if so, we should signal that initialization is complete.
@@ -1778,9 +1779,7 @@ void SyncManager::SyncInternal::HandleAuthWatcherEvent(
                         << "up directory change event listener!";
           return;
         }
-        dir_change_hookup_.reset(NewEventListenerHookup(
-            lookup->changes_channel(), this,
-            &SyncInternal::HandleChangeEvent));
+        dir_change_hookup_.reset(lookup->AddChangeObserver(this));
       }
       if (InitialSyncEndedForAllEnabledTypes())
         MarkAndNotifyInitializationComplete();
@@ -1942,9 +1941,7 @@ void SyncManager::SyncInternal::SetupForTestMode(
                     << "up directory change event listener!";
       return;
     }
-    dir_change_hookup_.reset(NewEventListenerHookup(
-        lookup->changes_channel(), this,
-        &SyncInternal::HandleChangeEvent));
+    dir_change_hookup_.reset(lookup->AddChangeObserver(this));
   }
   MarkAndNotifyInitializationComplete();
 }
diff --git a/chrome/browser/sync/engine/syncer.cc b/chrome/browser/sync/engine/syncer.cc
index 24c471bf13687..2956a046c457a 100644
--- a/chrome/browser/sync/engine/syncer.cc
+++ b/chrome/browser/sync/engine/syncer.cc
@@ -55,24 +55,29 @@ using sessions::ConflictProgress;
 Syncer::Syncer(sessions::SyncSessionContext* context)
     : early_exit_requested_(false),
       max_commit_batch_size_(kDefaultMaxCommitBatchSize),
-      syncer_event_channel_(new SyncerEventChannel(SyncerEvent(
-          SyncerEvent::SHUTDOWN_USE_WITH_CARE))),
+      syncer_event_channel_(new SyncerEventChannel()),
       resolver_scoper_(context, &resolver_),
       event_channel_scoper_(context, syncer_event_channel_.get()),
       context_(context),
       updates_source_(sync_pb::GetUpdatesCallerInfo::UNKNOWN),
       pre_conflict_resolution_closure_(NULL) {
-  shutdown_channel_.reset(new ShutdownChannel(this));
+  shutdown_channel_.reset(new ShutdownChannel());
 
   ScopedDirLookup dir(context->directory_manager(), context->account_name());
   // The directory must be good here.
   CHECK(dir.good());
 }
 
+Syncer::~Syncer() {
+  syncer_event_channel_->Notify(
+      SyncerEvent(SyncerEvent::SHUTDOWN_USE_WITH_CARE));
+  shutdown_channel_->Notify(SyncerShutdownEvent(this));
+}
+
 void Syncer::RequestNudge(int milliseconds) {
   SyncerEvent event(SyncerEvent::REQUEST_SYNC_NUDGE);
   event.nudge_delay_milliseconds = milliseconds;
-  syncer_event_channel_->NotifyListeners(event);
+  syncer_event_channel_->Notify(event);
 }
 
 bool Syncer::SyncShare(sessions::SyncSession::Delegate* delegate) {
diff --git a/chrome/browser/sync/engine/syncer.h b/chrome/browser/sync/engine/syncer.h
index df28ee48031b1..27a8d708e175d 100644
--- a/chrome/browser/sync/engine/syncer.h
+++ b/chrome/browser/sync/engine/syncer.h
@@ -76,7 +76,7 @@ class Syncer {
   // The constructor may be called from a thread that is not the Syncer's
   // dedicated thread, to allow some flexibility in the setup.
   explicit Syncer(sessions::SyncSessionContext* context);
-  ~Syncer() {}
+  ~Syncer();
 
   // Called by other threads to tell the syncer to stop what it's doing
   // and return early from SyncShare, if possible.
diff --git a/chrome/browser/sync/engine/syncer_command.cc b/chrome/browser/sync/engine/syncer_command.cc
index 2c58795096d84..f95cf40113a49 100644
--- a/chrome/browser/sync/engine/syncer_command.cc
+++ b/chrome/browser/sync/engine/syncer_command.cc
@@ -34,11 +34,11 @@ void SyncerCommand::SendNotifications(SyncSession* session) {
     const sessions::SyncSessionSnapshot& snapshot(session->TakeSnapshot());
     event.snapshot = &snapshot;
     DCHECK(session->context()->syncer_event_channel());
-    session->context()->syncer_event_channel()->NotifyListeners(event);
+    session->context()->syncer_event_channel()->Notify(event);
     if (session->status_controller()->syncer_status().over_quota) {
       SyncerEvent quota_event(SyncerEvent::OVER_QUOTA);
       quota_event.snapshot = &snapshot;
-      session->context()->syncer_event_channel()->NotifyListeners(quota_event);
+      session->context()->syncer_event_channel()->Notify(quota_event);
     }
   }
 }
diff --git a/chrome/browser/sync/engine/syncer_end_command.cc b/chrome/browser/sync/engine/syncer_end_command.cc
index e1dae58354249..6d7f0914bedc8 100644
--- a/chrome/browser/sync/engine/syncer_end_command.cc
+++ b/chrome/browser/sync/engine/syncer_end_command.cc
@@ -41,7 +41,7 @@ void SyncerEndCommand::ExecuteImpl(sessions::SyncSession* session) {
   SyncerEvent event(SyncerEvent::SYNC_CYCLE_ENDED);
   sessions::SyncSessionSnapshot snapshot(session->TakeSnapshot());
   event.snapshot = &snapshot;
-  session->context()->syncer_event_channel()->NotifyListeners(event);
+  session->context()->syncer_event_channel()->Notify(event);
 }
 
 }  // namespace browser_sync
diff --git a/chrome/browser/sync/engine/syncer_thread.cc b/chrome/browser/sync/engine/syncer_thread.cc
index ed8b023c141ff..9fc69d4f2ffba 100644
--- a/chrome/browser/sync/engine/syncer_thread.cc
+++ b/chrome/browser/sync/engine/syncer_thread.cc
@@ -74,8 +74,7 @@ SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
       session_context_(context),
       disable_idle_detection_(false) {
   DCHECK(context);
-  syncer_event_relay_channel_.reset(new SyncerEventChannel(SyncerEvent(
-      SyncerEvent::SHUTDOWN_USE_WITH_CARE)));
+  syncer_event_relay_channel_.reset(new SyncerEventChannel());
 
   if (context->directory_manager()) {
     directory_manager_hookup_.reset(NewEventListenerHookup(
@@ -90,6 +89,8 @@ SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
 
 SyncerThread::~SyncerThread() {
   conn_mgr_hookup_.reset();
+  syncer_event_relay_channel_->Notify(SyncerEvent(
+      SyncerEvent::SHUTDOWN_USE_WITH_CARE));
   syncer_event_relay_channel_.reset();
   directory_manager_hookup_.reset();
   syncer_events_.reset();
@@ -305,7 +306,7 @@ void SyncerThread::ThreadMainLoop() {
 void SyncerThread::PauseUntilResumedOrQuit() {
   LOG(INFO) << "Syncer thread entering pause.";
   SyncerEvent event(SyncerEvent::PAUSED);
-  relay_channel()->NotifyListeners(event);
+  relay_channel()->Notify(event);
 
   // Thread will get stuck here until either a resume is requested
   // or shutdown is started.
@@ -315,7 +316,7 @@ void SyncerThread::PauseUntilResumedOrQuit() {
   // Notify that we have resumed if we are not shutting down.
   if (!vault_.stop_syncer_thread_) {
     SyncerEvent event(SyncerEvent::RESUMED);
-    relay_channel()->NotifyListeners(event);
+    relay_channel()->Notify(event);
   }
   LOG(INFO) << "Syncer thread exiting pause.";
 }
@@ -479,9 +480,9 @@ void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source,
   vault_.syncer_->set_updates_source(updates_source);
 }
 
-void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) {
+void SyncerThread::HandleChannelEvent(const SyncerEvent& event) {
   AutoLock lock(lock_);
-  relay_channel()->NotifyListeners(event);
+  relay_channel()->Notify(event);
   if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
     return;
   }
@@ -500,9 +501,8 @@ void SyncerThread::HandleDirectoryManagerEvent(
     session_context_->set_account_name(event.dirname);
     vault_.syncer_ = new Syncer(session_context_.get());
 
-    syncer_events_.reset(NewEventListenerHookup(
-        session_context_->syncer_event_channel(), this,
-        &SyncerThread::HandleSyncerEvent));
+    syncer_events_.reset(
+        session_context_->syncer_event_channel()->AddObserver(this));
     vault_field_changed_.Broadcast();
   }
 }
diff --git a/chrome/browser/sync/engine/syncer_thread.h b/chrome/browser/sync/engine/syncer_thread.h
index 8801b6387c47a..af6417e7f9ca6 100644
--- a/chrome/browser/sync/engine/syncer_thread.h
+++ b/chrome/browser/sync/engine/syncer_thread.h
@@ -46,7 +46,8 @@ struct SyncerEvent;
 struct SyncerShutdownEvent;
 
 class SyncerThread : public base::RefCountedThreadSafe<SyncerThread>,
-                     public sessions::SyncSession::Delegate {
+                     public sessions::SyncSession::Delegate,
+                     public ChannelEventHandler<SyncerEvent> {
   FRIEND_TEST_ALL_PREFIXES(SyncerThreadTest, CalculateSyncWaitTime);
   FRIEND_TEST_ALL_PREFIXES(SyncerThreadTest, CalculatePollingWaitTime);
   FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, Polling);
@@ -220,7 +221,7 @@ class SyncerThread : public base::RefCountedThreadSafe<SyncerThread>,
   void* Run();
   void HandleDirectoryManagerEvent(
       const syncable::DirectoryManagerEvent& event);
-  void HandleSyncerEvent(const SyncerEvent& event);
+  void HandleChannelEvent(const SyncerEvent& event);
 
   // SyncSession::Delegate implementation.
   virtual void OnSilencedUntil(const base::TimeTicks& silenced_until);
@@ -302,7 +303,7 @@ class SyncerThread : public base::RefCountedThreadSafe<SyncerThread>,
   void NudgeSyncImpl(int milliseconds_from_now, NudgeSource source);
 
   scoped_ptr<EventListenerHookup> directory_manager_hookup_;
-  scoped_ptr<EventListenerHookup> syncer_events_;
+  scoped_ptr<ChannelHookup<SyncerEvent> > syncer_events_;
 
 #if defined(OS_LINUX)
   // On Linux, we need this information in order to query idle time.
diff --git a/chrome/browser/sync/engine/syncer_thread_unittest.cc b/chrome/browser/sync/engine/syncer_thread_unittest.cc
index 946978b60b9ee..14f4e02af657c 100644
--- a/chrome/browser/sync/engine/syncer_thread_unittest.cc
+++ b/chrome/browser/sync/engine/syncer_thread_unittest.cc
@@ -13,6 +13,7 @@
 #include "chrome/browser/sync/engine/syncer_thread.h"
 #include "chrome/browser/sync/engine/syncer_types.h"
 #include "chrome/browser/sync/sessions/sync_session_context.h"
+#include "chrome/browser/sync/util/channel.h"
 #include "chrome/test/sync/engine/mock_server_connection.h"
 #include "chrome/test/sync/engine/test_directory_setter_upper.h"
 #include "testing/gmock/include/gmock/gmock.h"
@@ -32,7 +33,8 @@ typedef testing::Test SyncerThreadTest;
 typedef SyncerThread::WaitInterval WaitInterval;
 
 class SyncerThreadWithSyncerTest : public testing::Test,
-                                   public ModelSafeWorkerRegistrar {
+                                   public ModelSafeWorkerRegistrar,
+                                   public ChannelEventHandler<SyncerEvent> {
  public:
   SyncerThreadWithSyncerTest() : sync_cycle_ended_event_(false, false) {}
   virtual void SetUp() {
@@ -45,8 +47,7 @@ class SyncerThreadWithSyncerTest : public testing::Test,
         NULL, metadb_.manager(), this);
     syncer_thread_ = new SyncerThread(context, allstatus_.get());
     syncer_event_hookup_.reset(
-        NewEventListenerHookup(syncer_thread_->relay_channel(), this,
-            &SyncerThreadWithSyncerTest::HandleSyncerEvent));
+        syncer_thread_->relay_channel()->AddObserver(this));
     allstatus_->WatchSyncerThread(syncer_thread_);
     syncer_thread_->SetConnected(true);
     syncable::ModelTypeBitSet expected_types;
@@ -54,8 +55,9 @@ class SyncerThreadWithSyncerTest : public testing::Test,
     connection_->ExpectGetUpdatesRequestTypes(expected_types);
   }
   virtual void TearDown() {
-    syncer_thread_ = NULL;
+    syncer_event_hookup_.reset();
     allstatus_.reset();
+    syncer_thread_ = NULL;
     connection_.reset();
     metadb_.TearDown();
   }
@@ -98,7 +100,7 @@ class SyncerThreadWithSyncerTest : public testing::Test,
 
  private:
 
-  void HandleSyncerEvent(const SyncerEvent& event) {
+  void HandleChannelEvent(const SyncerEvent& event) {
     if (event.what_happened == SyncerEvent::SYNC_CYCLE_ENDED)
       sync_cycle_ended_event_.Signal();
   }
@@ -108,7 +110,7 @@ class SyncerThreadWithSyncerTest : public testing::Test,
   scoped_ptr<AllStatus> allstatus_;
   scoped_refptr<SyncerThread> syncer_thread_;
   scoped_refptr<ModelSafeWorker> worker_;
-  scoped_ptr<EventListenerHookup> syncer_event_hookup_;
+  scoped_ptr<ChannelHookup<SyncerEvent> > syncer_event_hookup_;
   base::WaitableEvent sync_cycle_ended_event_;
   DISALLOW_COPY_AND_ASSIGN(SyncerThreadWithSyncerTest);
 };
@@ -715,9 +717,9 @@ ACTION_P(SignalEvent, event) {
   event->Signal();
 }
 
-class ListenerMock {
+class ListenerMock : public ChannelEventHandler<SyncerEvent> {
  public:
-  MOCK_METHOD1(HandleEvent, void(const SyncerEvent&));
+  MOCK_METHOD1(HandleChannelEvent, void(const SyncerEvent&));
 };
 
 // TODO(skrul): Bug 39070.
@@ -730,13 +732,10 @@ TEST_F(SyncerThreadWithSyncerTest, DISABLED_Pause) {
   syncer_thread()->SetSyncerShortPollInterval(poll_interval);
 
   ListenerMock listener;
-  scoped_ptr<EventListenerHookup> hookup;
-  hookup.reset(
-      NewEventListenerHookup(syncer_thread()->relay_channel(),
-                             &listener,
-                             &ListenerMock::HandleEvent));
+  scoped_ptr<ChannelHookup<SyncerEvent> > hookup;
+  hookup.reset(syncer_thread()->relay_channel()->AddObserver(&listener));
 
-  EXPECT_CALL(listener, HandleEvent(
+  EXPECT_CALL(listener, HandleChannelEvent(
       Field(&SyncerEvent::what_happened, SyncerEvent::STATUS_CHANGED))).
       Times(AnyNumber());
 
@@ -745,7 +744,7 @@ TEST_F(SyncerThreadWithSyncerTest, DISABLED_Pause) {
   EXPECT_FALSE(syncer_thread()->RequestResume());
 
   // Wait for the initial sync to complete.
-  EXPECT_CALL(listener, HandleEvent(
+  EXPECT_CALL(listener, HandleChannelEvent(
       Field(&SyncerEvent::what_happened, SyncerEvent::SYNC_CYCLE_ENDED))).
       WillOnce(SignalEvent(&sync_cycle_ended_event));
   ASSERT_TRUE(syncer_thread()->Start());
@@ -753,14 +752,14 @@ TEST_F(SyncerThreadWithSyncerTest, DISABLED_Pause) {
   sync_cycle_ended_event.Wait();
 
   // Request a pause.
-  EXPECT_CALL(listener, HandleEvent(
+  EXPECT_CALL(listener, HandleChannelEvent(
       Field(&SyncerEvent::what_happened, SyncerEvent::PAUSED))).
       WillOnce(SignalEvent(&paused_event));
   ASSERT_TRUE(syncer_thread()->RequestPause());
   paused_event.Wait();
 
   // Resuming the pause.
-  EXPECT_CALL(listener, HandleEvent(
+  EXPECT_CALL(listener, HandleChannelEvent(
       Field(&SyncerEvent::what_happened, SyncerEvent::RESUMED))).
       WillOnce(SignalEvent(&resumed_event));
   ASSERT_TRUE(syncer_thread()->RequestResume());
@@ -770,7 +769,7 @@ TEST_F(SyncerThreadWithSyncerTest, DISABLED_Pause) {
   EXPECT_FALSE(syncer_thread()->RequestResume());
 
   // Request a pause.
-  EXPECT_CALL(listener, HandleEvent(
+  EXPECT_CALL(listener, HandleChannelEvent(
       Field(&SyncerEvent::what_happened, SyncerEvent::PAUSED))).
       WillOnce(SignalEvent(&paused_event));
   ASSERT_TRUE(syncer_thread()->RequestPause());
@@ -780,11 +779,11 @@ TEST_F(SyncerThreadWithSyncerTest, DISABLED_Pause) {
   syncer_thread()->NudgeSyncer(0, SyncerThread::kUnknown);
 
   // Resuming will cause the nudge to be processed and a sync cycle to run.
-  EXPECT_CALL(listener, HandleEvent(
+  EXPECT_CALL(listener, HandleChannelEvent(
       Field(&SyncerEvent::what_happened, SyncerEvent::RESUMED))).
       WillOnce(SignalEvent(&resumed_event));
   // Wait for the sync cycle to run.
-  EXPECT_CALL(listener, HandleEvent(
+  EXPECT_CALL(listener, HandleChannelEvent(
       Field(&SyncerEvent::what_happened, SyncerEvent::SYNC_CYCLE_ENDED))).
       WillOnce(SignalEvent(&sync_cycle_ended_event));
   ASSERT_TRUE(syncer_thread()->RequestResume());
diff --git a/chrome/browser/sync/engine/syncer_types.h b/chrome/browser/sync/engine/syncer_types.h
index 762d526cabc66..a842e87485e2a 100644
--- a/chrome/browser/sync/engine/syncer_types.h
+++ b/chrome/browser/sync/engine/syncer_types.h
@@ -8,7 +8,7 @@
 #include <map>
 #include <vector>
 
-#include "chrome/common/deprecated/event_sys.h"
+#include "chrome/browser/sync/util/channel.h"
 
 namespace syncable {
 class BaseTransaction;
@@ -128,15 +128,16 @@ struct SyncerEvent {
 };
 
 struct SyncerShutdownEvent {
-  typedef Syncer* EventType;
+  SyncerShutdownEvent(Syncer *syncer_ptr) : syncer(syncer_ptr) {}
+  Syncer* syncer;
   static bool IsChannelShutdownEvent(Syncer* syncer) {
     return true;
   }
 };
 
-typedef EventChannel<SyncerEvent, Lock> SyncerEventChannel;
+typedef Channel<SyncerEvent> SyncerEventChannel;
 
-typedef EventChannel<SyncerShutdownEvent, Lock> ShutdownChannel;
+typedef Channel<SyncerShutdownEvent> ShutdownChannel;
 
 // This struct is passed between parts of the syncer during the processing of
 // one sync loop. It lives on the stack. We don't expose the number of
diff --git a/chrome/browser/sync/engine/syncer_unittest.cc b/chrome/browser/sync/engine/syncer_unittest.cc
index 7573cdebd0eed..f7946f5671a95 100644
--- a/chrome/browser/sync/engine/syncer_unittest.cc
+++ b/chrome/browser/sync/engine/syncer_unittest.cc
@@ -103,7 +103,8 @@ const int64 kTestLogRequestTimestamp = 123456;
 
 class SyncerTest : public testing::Test,
                    public SyncSession::Delegate,
-                   public ModelSafeWorkerRegistrar {
+                   public ModelSafeWorkerRegistrar,
+                   public ChannelEventHandler<SyncerEvent> {
  protected:
   SyncerTest() : syncer_(NULL) {}
 
@@ -138,7 +139,7 @@ class SyncerTest : public testing::Test,
     }
   }
 
-  void HandleSyncerEvent(SyncerEvent event) {
+  void HandleChannelEvent(const SyncerEvent& event) {
     LOG(INFO) << "HandleSyncerEvent in unittest " << event.what_happened;
     // we only test for entry-specific events, not status changed ones.
     switch (event.what_happened) {
@@ -185,8 +186,7 @@ class SyncerTest : public testing::Test,
     ASSERT_TRUE(context_->syncer_event_channel());
     ASSERT_TRUE(context_->resolver());
 
-    hookup_.reset(NewEventListenerHookup(context_->syncer_event_channel(), this,
-                                         &SyncerTest::HandleSyncerEvent));
+    hookup_.reset(context_->syncer_event_channel()->AddObserver(this));
     session_.reset(new SyncSession(context_.get(), this));
 
     ScopedDirLookup dir(syncdb_.manager(), syncdb_.name());
@@ -421,7 +421,7 @@ class SyncerTest : public testing::Test,
 
   TestDirectorySetterUpper syncdb_;
   scoped_ptr<MockConnectionManager> mock_server_;
-  scoped_ptr<EventListenerHookup> hookup_;
+  scoped_ptr<ChannelHookup<SyncerEvent> > hookup_;
 
   Syncer* syncer_;
 
diff --git a/chrome/browser/sync/sessions/sync_session_unittest.cc b/chrome/browser/sync/sessions/sync_session_unittest.cc
index 4251f0dafaf0b..ae85137816dac 100644
--- a/chrome/browser/sync/sessions/sync_session_unittest.cc
+++ b/chrome/browser/sync/sessions/sync_session_unittest.cc
@@ -88,8 +88,7 @@ class SyncSessionTest : public testing::Test,
 
 TEST_F(SyncSessionTest, ScopedContextHelpers) {
   ConflictResolver resolver;
-  SyncerEventChannel* channel = new SyncerEventChannel(
-      SyncerEvent(SyncerEvent::SHUTDOWN_USE_WITH_CARE));
+  SyncerEventChannel* channel = new SyncerEventChannel();
   EXPECT_FALSE(context_->resolver());
   EXPECT_FALSE(context_->syncer_event_channel());
   {
@@ -100,6 +99,7 @@ TEST_F(SyncSessionTest, ScopedContextHelpers) {
   }
   EXPECT_FALSE(context_->resolver());
   EXPECT_FALSE(context_->syncer_event_channel());
+  channel->Notify(SyncerEvent(SyncerEvent::SHUTDOWN_USE_WITH_CARE));
   delete channel;
 }
 
diff --git a/chrome/browser/sync/syncable/syncable.cc b/chrome/browser/sync/syncable/syncable.cc
index ae0307cbb94cd..42e48674c538c 100644
--- a/chrome/browser/sync/syncable/syncable.cc
+++ b/chrome/browser/sync/syncable/syncable.cc
@@ -172,7 +172,6 @@ Directory::Kernel::Kernel(const FilePath& db_path,
       unsynced_metahandles(new MetahandleSet),
       dirty_metahandles(new MetahandleSet),
       channel(new Directory::Channel(syncable::DIRECTORY_DESTROYED)),
-      changes_channel(new Directory::ChangesChannel(kShutdownChangesEvent)),
       info_status(Directory::KERNEL_SHARE_INFO_VALID),
       persisted_info(info.kernel_info),
       cache_guid(info.cache_guid),
@@ -195,7 +194,7 @@ void Directory::Kernel::Release() {
 Directory::Kernel::~Kernel() {
   CHECK(0 == refcount);
   delete channel;
-  delete changes_channel;
+  changes_channel.Notify(kShutdownChangesEvent);
   delete unsynced_metahandles;
   delete unapplied_update_metahandles;
   delete dirty_metahandles;
@@ -898,6 +897,11 @@ void Directory::CheckTreeInvariants(syncable::BaseTransaction* trans,
   // pulling entries into RAM
 }
 
+browser_sync::ChannelHookup<DirectoryChangeEvent>* Directory::AddChangeObserver(
+    browser_sync::ChannelEventHandler<DirectoryChangeEvent>* observer) {
+  return kernel_->changes_channel.AddObserver(observer);
+}
+
 ///////////////////////////////////////////////////////////////////////////////
 // ScopedKernelLocks
 
@@ -954,14 +958,14 @@ void BaseTransaction::UnlockAndLog(OriginalEntries* originals_arg) {
   // Tell listeners to calculate changes while we still have the mutex.
   DirectoryChangeEvent event = { DirectoryChangeEvent::CALCULATE_CHANGES,
                                  originals.get(), this, writer_ };
-  dirkernel_->changes_channel->NotifyListeners(event);
+  dirkernel_->changes_channel.Notify(event);
 
   dirkernel_->transaction_mutex.Release();
 
   DirectoryChangeEvent complete_event =
       { DirectoryChangeEvent::TRANSACTION_COMPLETE,
         NULL, NULL, INVALID };
-  dirkernel_->changes_channel->NotifyListeners(complete_event);
+  dirkernel_->changes_channel.Notify(complete_event);
 }
 
 ReadTransaction::ReadTransaction(Directory* directory, const char* file,
diff --git a/chrome/browser/sync/syncable/syncable.h b/chrome/browser/sync/syncable/syncable.h
index b6e8dca37e18a..270d2bf56970f 100644
--- a/chrome/browser/sync/syncable/syncable.h
+++ b/chrome/browser/sync/syncable/syncable.h
@@ -27,6 +27,7 @@
 #include "chrome/browser/sync/syncable/path_name_cmp.h"
 #include "chrome/browser/sync/syncable/syncable_id.h"
 #include "chrome/browser/sync/syncable/model_type.h"
+#include "chrome/browser/sync/util/channel.h"
 #include "chrome/browser/sync/util/dbgq.h"
 #include "chrome/browser/sync/util/row_iterator.h"
 #include "chrome/browser/sync/util/sync_types.h"
@@ -674,6 +675,8 @@ class Directory {
                            TakeSnapshotGetsOnlyDirtyHandlesTest);
 
  public:
+  class EventListenerHookup;
+
   // Various data that the Directory::Kernel we are backing (persisting data
   // for) needs saved across runs of the application.
   struct PersistedKernelInfo {
@@ -782,6 +785,9 @@ class Directory {
   // Unique to each account / client pair.
   std::string cache_guid() const;
 
+  browser_sync::ChannelHookup<DirectoryChangeEvent>* AddChangeObserver(
+      browser_sync::ChannelEventHandler<DirectoryChangeEvent>* observer);
+
  protected:  // for friends, mainly used by Entry constructors
   EntryKernel* GetEntryByHandle(const int64 handle);
   EntryKernel* GetEntryByHandle(const int64 metahandle, ScopedKernelLock* lock);
@@ -818,7 +824,6 @@ class Directory {
   };
  public:
   typedef EventChannel<DirectoryEventTraits, Lock> Channel;
-  typedef EventChannel<DirectoryChangeEvent, Lock> ChangesChannel;
   typedef std::vector<int64> ChildHandles;
 
   // Returns the child meta handles for given parent id.
@@ -871,9 +876,6 @@ class Directory {
   inline Channel* channel() const {
     return kernel_->channel;
   }
-  inline ChangesChannel* changes_channel() const {
-    return kernel_->changes_channel;
-  }
 
   // Checks tree metadata consistency.
   // If full_scan is false, the function will avoid pulling any entries from the
@@ -1000,7 +1002,8 @@ class Directory {
     // The changes channel mutex is explicit because it must be locked
     // while holding the transaction mutex and released after
     // releasing the transaction mutex.
-    ChangesChannel* const changes_channel;
+    browser_sync::Channel<DirectoryChangeEvent> changes_channel;
+
     Lock changes_channel_mutex;
     KernelShareInfoStatus info_status;
 
diff --git a/chrome/browser/sync/util/channel.h b/chrome/browser/sync/util/channel.h
new file mode 100644
index 0000000000000..ba516b6e789e1
--- /dev/null
+++ b/chrome/browser/sync/util/channel.h
@@ -0,0 +1,140 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CHROME_BROWSER_SYNC_UTIL_CHANNEL_H_
+#define CHROME_BROWSER_SYNC_UTIL_CHANNEL_H_
+
+///////////////////////////////////////////////////////////////////////////////
+//
+// OVERVIEW:
+//
+//   A threadsafe container for a list of observers.  Observers are able to
+//   remove themselves during iteration, and can be added on any thread.  This
+//   allows observers to safely remove themselves during notifications.  It
+//   also provides a handler when an observer is added that will remove the
+//   observer on destruction.
+//
+//   It is expected that all observers are removed before destruction.
+//   The channel owner should notify all observers to disconnect on shutdown if
+//   needed to ensure this.
+//
+// TYPICAL USAGE:
+//
+//   class MyWidget {
+//    public:
+//     ...
+//
+//     class Observer : public ChannelEventHandler<FooEvent> {
+//      public:
+//       virtual void HandleChannelEvent(const FooEvent& w) = 0;
+//     };
+//
+//     ChannelHookup<MyEvent>* AddObserver(Observer* obs) {
+//       return channel_.AddObserver(obs);
+//     }
+//
+//     void RemoveObserver(Observer* obs) {
+//       channel_.RemoveObserver(obs);
+//     }
+//
+//     void NotifyFoo(FooEvent& event) {
+//       channel_.Notify(event);
+//     }
+//
+//    private:
+//     Channel<FooEvent> channel_;
+//   };
+//
+//
+///////////////////////////////////////////////////////////////////////////////
+
+#include "base/lock.h"
+#include "base/observer_list.h"
+
+namespace browser_sync {
+
+template <typename EventType>
+class Channel;
+
+class EventHandler {
+};
+
+template <typename EventType>
+class ChannelEventHandler : public EventHandler {
+ public:
+  virtual void HandleChannelEvent(const EventType& event) = 0;
+};
+
+// This class manages a connection to a channel.  When it is destroyed, it
+// will remove the listener from the channel observer list.
+template <typename EventType>
+class ChannelHookup {
+ public:
+  ChannelHookup(Channel<EventType>* channel,
+                browser_sync::ChannelEventHandler<EventType>* handler)
+      : channel_(channel),
+        handler_(handler) {}
+  ~ChannelHookup() {
+    channel_->RemoveObserver(handler_);
+  }
+
+ private:
+  Channel<EventType>* channel_;
+  browser_sync::ChannelEventHandler<EventType>* handler_;
+};
+
+template <typename EventType>
+class Channel {
+ public:
+  typedef ObserverListBase<EventHandler> ChannelObserverList;
+
+  Channel() : locking_thread_(0) {}
+
+  ChannelHookup<EventType>* AddObserver(
+      ChannelEventHandler<EventType>* observer) {
+    AutoLock scoped_lock(event_handlers_mutex_);
+    event_handlers_.AddObserver(observer);
+    return new ChannelHookup<EventType>(this, observer);
+  }
+
+  void RemoveObserver(ChannelEventHandler<EventType>* observer) {
+    // This can be called in response to a notification, so we may already have
+    // locked this channel on this thread.
+    bool need_lock = (locking_thread_ != PlatformThread::CurrentId());
+    if (need_lock)
+      event_handlers_mutex_.Acquire();
+
+    event_handlers_mutex_.AssertAcquired();
+    event_handlers_.RemoveObserver(observer);
+    if (need_lock)
+      event_handlers_mutex_.Release();
+  }
+
+  void Notify(const EventType& event) {
+    AutoLock scoped_lock(event_handlers_mutex_);
+
+    // This may result in an observer trying to remove itself, so keep track
+    // of the thread we're locked on.
+    locking_thread_ = PlatformThread::CurrentId();
+
+    ChannelObserverList::Iterator it(event_handlers_);
+    EventHandler* obs;
+    while ((obs = it.GetNext()) != NULL) {
+      static_cast<ChannelEventHandler<EventType>* >(obs)->
+          HandleChannelEvent(event);
+    }
+
+    // Set back to an invalid thread id.
+    locking_thread_ = 0;
+  }
+
+ private:
+  Lock event_handlers_mutex_;
+  PlatformThreadId locking_thread_;
+  ObserverList<EventHandler> event_handlers_;
+};
+
+}  // namespace browser_sync
+
+#endif  // CHROME_BROWSER_SYNC_UTIL_CHANNEL_H_
diff --git a/chrome/browser/sync/util/channel_unittest.cc b/chrome/browser/sync/util/channel_unittest.cc
new file mode 100644
index 0000000000000..f2317dcc979b0
--- /dev/null
+++ b/chrome/browser/sync/util/channel_unittest.cc
@@ -0,0 +1,32 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chrome/browser/sync/util/channel.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+struct TestEvent {
+  explicit TestEvent(int foo) : data(foo) {}
+  int data;
+};
+
+class TestObserver : public browser_sync::ChannelEventHandler<TestEvent> {
+ public:
+  virtual void HandleChannelEvent(const TestEvent& event) {
+    delete hookup;
+    hookup = 0;
+  }
+
+  browser_sync::ChannelHookup<TestEvent>* hookup;
+};
+
+TEST(ChannelTest, RemoveOnNotify) {
+  browser_sync::Channel<TestEvent> channel;
+  TestObserver observer;
+
+  observer.hookup = channel.AddObserver(&observer);
+
+  ASSERT_TRUE(0 != observer.hookup);
+  channel.Notify(TestEvent(1));
+  ASSERT_EQ(0, observer.hookup);
+}
diff --git a/chrome/chrome.gyp b/chrome/chrome.gyp
index 267966f72d443..f17ec101ee3d9 100644
--- a/chrome/chrome.gyp
+++ b/chrome/chrome.gyp
@@ -900,6 +900,7 @@
         'browser/sync/syncable/syncable_columns.h',
         'browser/sync/syncable/syncable_id.cc',
         'browser/sync/syncable/syncable_id.h',
+        'browser/sync/util/channel.h',
         'browser/sync/util/character_set_converters.h',
         'browser/sync/util/character_set_converters_posix.cc',
         'browser/sync/util/character_set_converters_win.cc',
diff --git a/chrome/chrome_tests.gypi b/chrome/chrome_tests.gypi
index 49690b4a302a4..50c785c25c651 100755
--- a/chrome/chrome_tests.gypi
+++ b/chrome/chrome_tests.gypi
@@ -1851,6 +1851,7 @@
         'browser/sync/syncable/directory_backing_store_unittest.cc',
         'browser/sync/syncable/syncable_id_unittest.cc',
         'browser/sync/syncable/syncable_unittest.cc',
+        'browser/sync/util/channel_unittest.cc',
         'browser/sync/util/crypto_helpers_unittest.cc',
         'browser/sync/util/extensions_activity_monitor_unittest.cc',
         'browser/sync/util/user_settings_unittest.cc',
-- 
GitLab