audio_pump.cc 8.54 KB
Newer Older
1
// Copyright 2016 The Chromium Authors. All rights reserved.
2 3 4
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

5
#include "remoting/protocol/audio_pump.h"
6

7
#include <memory>
8 9
#include <utility>

10 11 12
#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
13
#include "base/macros.h"
14
#include "base/memory/ptr_util.h"
15
#include "base/single_thread_task_runner.h"
16
#include "base/threading/thread_task_runner_handle.h"
17 18 19 20
#include "media/base/audio_bus.h"
#include "media/base/audio_sample_types.h"
#include "media/base/channel_layout.h"
#include "media/base/channel_mixer.h"
21 22
#include "remoting/codec/audio_encoder.h"
#include "remoting/proto/audio.pb.h"
23
#include "remoting/protocol/audio_source.h"
24 25
#include "remoting/protocol/audio_stub.h"

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
namespace {

int CalculateFrameCount(const remoting::AudioPacket& packet) {
  return packet.data(0).size() / packet.channels() / packet.bytes_per_sample();
}

std::unique_ptr<media::AudioBus> AudioPacketToAudioBus(
    const remoting::AudioPacket& packet) {
  const int frame_count = CalculateFrameCount(packet);
  DCHECK_GT(frame_count, 0);
  std::unique_ptr<media::AudioBus> result =
      media::AudioBus::Create(packet.channels(), frame_count);
  result->FromInterleaved<media::SignedInt16SampleTypeTraits>(
      reinterpret_cast<const int16_t*>(packet.data(0).data()), frame_count);
  return result;
}

std::unique_ptr<remoting::AudioPacket> AudioBusToAudioPacket(
    const media::AudioBus& packet) {
  std::unique_ptr<remoting::AudioPacket> result =
      base::MakeUnique<remoting::AudioPacket>();
  result->add_data()->resize(
      packet.channels() * packet.frames() * sizeof(int16_t));
  packet.ToInterleaved<media::SignedInt16SampleTypeTraits>(
      packet.frames(),
      reinterpret_cast<int16_t*>(&(result->mutable_data(0)->at(0))));
  result->set_encoding(remoting::AudioPacket::ENCODING_RAW);
  result->set_channels(
      static_cast<remoting::AudioPacket::Channels>(packet.channels()));
  result->set_bytes_per_sample(remoting::AudioPacket::BYTES_PER_SAMPLE_2);
  return result;
}

media::ChannelLayout RetrieveLayout(const remoting::AudioPacket& packet) {
  // This switch should match AudioPacket::Channels enum in audio.proto.
  switch (packet.channels()) {
    case remoting::AudioPacket::CHANNELS_INVALID:
      return media::CHANNEL_LAYOUT_UNSUPPORTED;
    case remoting::AudioPacket::CHANNELS_MONO:
      return media::CHANNEL_LAYOUT_MONO;
    case remoting::AudioPacket::CHANNELS_STEREO:
      return media::CHANNEL_LAYOUT_STEREO;
    case remoting::AudioPacket::CHANNELS_SURROUND:
      return media::CHANNEL_LAYOUT_SURROUND;
    case remoting::AudioPacket::CHANNELS_4_0:
      return media::CHANNEL_LAYOUT_4_0;
    case remoting::AudioPacket::CHANNELS_4_1:
      return media::CHANNEL_LAYOUT_4_1;
    case remoting::AudioPacket::CHANNELS_5_1:
      return media::CHANNEL_LAYOUT_5_1;
    case remoting::AudioPacket::CHANNELS_6_1:
      return media::CHANNEL_LAYOUT_6_1;
    case remoting::AudioPacket::CHANNELS_7_1:
      return media::CHANNEL_LAYOUT_7_1;
  }
  NOTREACHED() << "Invalid AudioPacket::Channels";
  return media::CHANNEL_LAYOUT_UNSUPPORTED;
}

}  // namespace

87
namespace remoting {
88
namespace protocol {
89

90 91 92
// Limit the data stored in the pending send buffers to 250ms.
const int kMaxBufferedIntervalMs = 250;

93
class AudioPump::Core {
94 95 96 97 98
 public:
  Core(base::WeakPtr<AudioPump> pump,
       std::unique_ptr<AudioSource> audio_source,
       std::unique_ptr<AudioEncoder> audio_encoder);
  ~Core();
99

100 101
  void Start();
  void Pause(bool pause);
102

103
  void OnPacketSent(int size);
104

105
 private:
106 107
  std::unique_ptr<AudioPacket> Downmix(std::unique_ptr<AudioPacket> packet);

108
  void EncodeAudioPacket(std::unique_ptr<AudioPacket> packet);
109

110
  base::ThreadChecker thread_checker_;
111

112
  base::WeakPtr<AudioPump> pump_;
113

114
  scoped_refptr<base::SingleThreadTaskRunner> pump_task_runner_;
115

116 117
  std::unique_ptr<AudioSource> audio_source_;
  std::unique_ptr<AudioEncoder> audio_encoder_;
118

119
  bool enabled_;
120

121 122 123
  // Number of bytes in the queue that have been encoded but haven't been sent
  // yet.
  int bytes_pending_;
124

125 126 127
  std::unique_ptr<media::ChannelMixer> mixer_;
  media::ChannelLayout mixer_input_layout_ = media::CHANNEL_LAYOUT_NONE;

128
  DISALLOW_COPY_AND_ASSIGN(Core);
129 130
};

131
AudioPump::Core::Core(base::WeakPtr<AudioPump> pump,
132
                      std::unique_ptr<AudioSource> audio_source,
133
                      std::unique_ptr<AudioEncoder> audio_encoder)
134 135
    : pump_(pump),
      pump_task_runner_(base::ThreadTaskRunnerHandle::Get()),
136
      audio_source_(std::move(audio_source)),
137
      audio_encoder_(std::move(audio_encoder)),
138 139
      enabled_(true),
      bytes_pending_(0) {
140 141 142 143 144 145 146 147 148 149
  thread_checker_.DetachFromThread();
}

AudioPump::Core::~Core() {
  DCHECK(thread_checker_.CalledOnValidThread());
}

void AudioPump::Core::Start() {
  DCHECK(thread_checker_.CalledOnValidThread());

150
  audio_source_->Start(
151 152 153 154 155 156 157 158 159
      base::Bind(&Core::EncodeAudioPacket, base::Unretained(this)));
}

void AudioPump::Core::Pause(bool pause) {
  DCHECK(thread_checker_.CalledOnValidThread());

  enabled_ = !pause;
}

160 161 162
void AudioPump::Core::OnPacketSent(int size) {
  DCHECK(thread_checker_.CalledOnValidThread());

163
  bytes_pending_ -= size;
164 165 166
  DCHECK_GE(bytes_pending_, 0);
}

167
void AudioPump::Core::EncodeAudioPacket(std::unique_ptr<AudioPacket> packet) {
168 169 170
  DCHECK(thread_checker_.CalledOnValidThread());
  DCHECK(packet);

171 172
  int max_buffered_bytes =
      audio_encoder_->GetBitrate() * kMaxBufferedIntervalMs / 1000 / 8;
173
  if (!enabled_ || bytes_pending_ > max_buffered_bytes) {
174
    return;
175 176 177 178 179
  }

  if (packet->channels() > AudioPacket::CHANNELS_STEREO) {
    packet = Downmix(std::move(packet));
  }
180

181
  std::unique_ptr<AudioPacket> encoded_packet =
182
      audio_encoder_->Encode(std::move(packet));
183 184

  // The audio encoder returns a null audio packet if there's no audio to send.
185
  if (!encoded_packet) {
186
    return;
187
  }
188

189 190 191 192 193 194
  int packet_size = encoded_packet->ByteSize();
  bytes_pending_ += packet_size;

  pump_task_runner_->PostTask(
      FROM_HERE, base::Bind(&AudioPump::SendAudioPacket, pump_,
                            base::Passed(&encoded_packet), packet_size));
195 196
}

197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
std::unique_ptr<AudioPacket> AudioPump::Core::Downmix(
    std::unique_ptr<AudioPacket> packet) {
  DCHECK(thread_checker_.CalledOnValidThread());
  DCHECK(packet);
  DCHECK_EQ(packet->data_size(), 1);
  DCHECK_EQ(packet->bytes_per_sample(), AudioPacket::BYTES_PER_SAMPLE_2);

  const media::ChannelLayout input_layout = RetrieveLayout(*packet);
  DCHECK_NE(input_layout, media::CHANNEL_LAYOUT_UNSUPPORTED);
  DCHECK_NE(input_layout, media::CHANNEL_LAYOUT_MONO);
  DCHECK_NE(input_layout, media::CHANNEL_LAYOUT_STEREO);

  if (!mixer_ || mixer_input_layout_ != input_layout) {
    mixer_input_layout_ = input_layout;
    mixer_ = base::MakeUnique<media::ChannelMixer>(
        input_layout, media::CHANNEL_LAYOUT_STEREO);
  }

  std::unique_ptr<media::AudioBus> input = AudioPacketToAudioBus(*packet);
  DCHECK(input);
  std::unique_ptr<media::AudioBus> output =
      media::AudioBus::Create(AudioPacket::CHANNELS_STEREO, input->frames());
  mixer_->Transform(input.get(), output.get());

  std::unique_ptr<AudioPacket> result = AudioBusToAudioPacket(*output);
  result->set_sampling_rate(packet->sampling_rate());
  return result;
}

226 227
AudioPump::AudioPump(
    scoped_refptr<base::SingleThreadTaskRunner> audio_task_runner,
228
    std::unique_ptr<AudioSource> audio_source,
229
    std::unique_ptr<AudioEncoder> audio_encoder,
230
    AudioStub* audio_stub)
231 232 233 234 235
    : audio_task_runner_(audio_task_runner),
      audio_stub_(audio_stub),
      weak_factory_(this) {
  DCHECK(audio_stub_);

236
  core_.reset(new Core(weak_factory_.GetWeakPtr(), std::move(audio_source),
237
                       std::move(audio_encoder)));
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256

  audio_task_runner_->PostTask(
      FROM_HERE, base::Bind(&Core::Start, base::Unretained(core_.get())));
}

AudioPump::~AudioPump() {
  DCHECK(thread_checker_.CalledOnValidThread());

  audio_task_runner_->DeleteSoon(FROM_HERE, core_.release());
}

void AudioPump::Pause(bool pause) {
  DCHECK(thread_checker_.CalledOnValidThread());

  audio_task_runner_->PostTask(
      FROM_HERE,
      base::Bind(&Core::Pause, base::Unretained(core_.get()), pause));
}

257
void AudioPump::SendAudioPacket(std::unique_ptr<AudioPacket> packet, int size) {
258 259 260
  DCHECK(thread_checker_.CalledOnValidThread());
  DCHECK(packet);

261
  audio_stub_->ProcessAudioPacket(
262
      std::move(packet),
263 264 265 266 267 268 269
      base::Bind(&AudioPump::OnPacketSent, weak_factory_.GetWeakPtr(), size));
}

void AudioPump::OnPacketSent(int size) {
  audio_task_runner_->PostTask(
      FROM_HERE,
      base::Bind(&Core::OnPacketSent, base::Unretained(core_.get()), size));
270 271
}

272
}  // namespace protocol
273
}  // namespace remoting