aboutsummaryrefslogtreecommitdiffstats
path: root/src/audio_player.rs
diff options
context:
space:
mode:
authorJokler <jokler@protonmail.com>2020-10-15 13:11:54 +0000
committerGitHub <noreply@github.com>2020-10-15 13:11:54 +0000
commit43974717fee9a98701c6efa2e7221cdbfe7e537e (patch)
tree93fe1d75477ae3d1c8466611a2cedd7bed316aa2 /src/audio_player.rs
parent23671b51b4e207574a63bce820acbf43169e2b6c (diff)
parent4e1c2b9f04073294ecb8402486c20d9c01721598 (diff)
downloadpokebot-43974717fee9a98701c6efa2e7221cdbfe7e537e.tar.gz
pokebot-43974717fee9a98701c6efa2e7221cdbfe7e537e.zip
Merge pull request #70 from Mavulp/actor-bots
Replace channels&locks with actors & log with slog
Diffstat (limited to 'src/audio_player.rs')
-rw-r--r--src/audio_player.rs328
1 files changed, 155 insertions, 173 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs
index 1f6649f..23581f9 100644
--- a/src/audio_player.rs
+++ b/src/audio_player.rs
@@ -7,36 +7,31 @@ use gstreamer as gst;
use gstreamer_app::{AppSink, AppSinkCallbacks};
use gstreamer_audio::{StreamVolume, StreamVolumeFormat};
-use crate::bot::{MusicBotMessage, State};
use glib::BoolError;
-use log::{debug, error, info, warn};
-use std::sync::{Arc, RwLock};
-use tokio::sync::mpsc::UnboundedSender;
+use slog::{debug, error, info, warn, Logger};
+use xtra::WeakAddress;
+use crate::bot::{MusicBot, MusicBotMessage, State};
use crate::command::{Seek, VolumeChange};
use crate::youtube_dl::AudioMetadata;
static GST_INIT: Once = Once::new();
-#[derive(PartialEq, Eq, Debug, Clone, Copy)]
-pub enum PollResult {
- Continue,
- Quit,
-}
-
pub struct AudioPlayer {
pipeline: gst::Pipeline,
bus: gst::Bus,
http_src: gst::Element,
- volume_f64: RwLock<f64>,
+ volume_f64: f64,
volume: gst::Element,
- sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
- currently_playing: RwLock<Option<AudioMetadata>>,
+ currently_playing: Option<AudioMetadata>,
+
+ logger: Logger,
}
fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> {
- Ok(gst::ElementFactory::make(factoryname, Some(display_name))?)
+ Ok(gst::ElementFactory::make(factoryname, Some(display_name))
+ .map_err(|_| AudioPlayerError::MissingPlugin(factoryname.to_string()))?)
}
fn link_elements(a: &gst::Element, b: &gst::Element) -> Result<(), AudioPlayerError> {
@@ -49,11 +44,12 @@ fn add_decode_bin_new_pad_callback(
decode_bin: &gst::Element,
audio_bin: gst::Bin,
ghost_pad: gst::GhostPad,
+ logger: Logger,
) {
decode_bin.connect_pad_added(move |_, new_pad| {
- debug!("New pad received on decode bin");
+ debug!(logger, "New pad received on decode bin");
let name = if let Some(caps) = new_pad.get_current_caps() {
- debug!("Pad caps: {}", caps.to_string());
+ debug!(logger, "Found caps"; "caps" => caps.to_string());
if let Some(structure) = caps.get_structure(0) {
Some(structure.get_name().to_string())
} else {
@@ -68,7 +64,7 @@ fn add_decode_bin_new_pad_callback(
peer.unlink(&ghost_pad).unwrap();
}
- info!("Found raw audio, linking audio bin");
+ info!(logger, "Found raw audio, linking audio bin");
new_pad.link(&ghost_pad).unwrap();
audio_bin.sync_state_with_parent().unwrap();
@@ -77,27 +73,15 @@ fn add_decode_bin_new_pad_callback(
}
impl AudioPlayer {
- pub fn new(
- sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
- callback: Option<Box<dyn FnMut(&[u8]) + Send>>,
- ) -> Result<Self, AudioPlayerError> {
+ pub fn new(logger: Logger) -> Result<Self, AudioPlayerError> {
GST_INIT.call_once(|| gst::init().unwrap());
- info!("Creating audio player");
+ info!(logger, "Creating audio player");
let pipeline = gst::Pipeline::new(Some("TeamSpeak Audio Player"));
let bus = pipeline.get_bus().unwrap();
let http_src = make_element("souphttpsrc", "http source")?;
- let decode_bin = make_element("decodebin", "decode bin")?;
- pipeline.add_many(&[&http_src, &decode_bin])?;
-
- link_elements(&http_src, &decode_bin)?;
-
- let (audio_bin, volume, ghost_pad) = Self::create_audio_bin(callback)?;
-
- add_decode_bin_new_pad_callback(&decode_bin, audio_bin.clone(), ghost_pad);
-
- pipeline.add(&audio_bin)?;
+ let volume = make_element("volume", "volume")?;
// The documentation says that we have to make sure to handle
// all messages if auto flushing is deactivated.
@@ -111,26 +95,31 @@ impl AudioPlayer {
pipeline,
bus,
http_src,
+ logger,
- volume_f64: RwLock::new(0.0),
+ volume_f64: 0.0,
volume,
- sender,
- currently_playing: RwLock::new(None),
+ currently_playing: None,
})
}
- fn create_audio_bin(
+ pub fn setup_with_audio_callback(
+ &self,
callback: Option<Box<dyn FnMut(&[u8]) + Send>>,
- ) -> Result<(gst::Bin, gst::Element, gst::GhostPad), AudioPlayerError> {
+ ) -> Result<(), AudioPlayerError> {
+ let decode_bin = make_element("decodebin", "decode bin")?;
+ self.pipeline.add_many(&[&self.http_src, &decode_bin])?;
+
+ link_elements(&self.http_src, &decode_bin)?;
+
let audio_bin = gst::Bin::new(Some("audio bin"));
let queue = make_element("queue", "audio queue")?;
let convert = make_element("audioconvert", "audio converter")?;
- let volume = make_element("volume", "volume")?;
let resample = make_element("audioresample", "audio resampler")?;
let pads = queue.get_sink_pads();
let queue_sink_pad = pads.first().unwrap();
- audio_bin.add_many(&[&queue, &convert, &volume, &resample])?;
+ audio_bin.add_many(&[&queue, &convert, &self.volume, &resample])?;
if let Some(mut callback) = callback {
let opus_enc = make_element("opusenc", "opus encoder")?;
@@ -160,49 +149,64 @@ impl AudioPlayer {
audio_bin.add_many(&[&opus_enc, &sink])?;
- gst::Element::link_many(&[&queue, &convert, &volume, &resample, &opus_enc, &sink])?;
+ gst::Element::link_many(&[
+ &queue,
+ &convert,
+ &self.volume,
+ &resample,
+ &opus_enc,
+ &sink,
+ ])?;
} else {
let sink = make_element("autoaudiosink", "auto audio sink")?;
audio_bin.add_many(&[&sink])?;
- gst::Element::link_many(&[&queue, &convert, &volume, &resample, &sink])?;
+ gst::Element::link_many(&[&queue, &convert, &self.volume, &resample, &sink])?;
};
let ghost_pad = GhostPad::with_target(Some("audio bin sink"), queue_sink_pad).unwrap();
ghost_pad.set_active(true)?;
audio_bin.add_pad(&ghost_pad)?;
- Ok((audio_bin, volume, ghost_pad))
+ add_decode_bin_new_pad_callback(
+ &decode_bin,
+ audio_bin.clone(),
+ ghost_pad,
+ self.logger.clone(),
+ );
+
+ self.pipeline.add(&audio_bin)?;
+
+ Ok(())
}
- pub fn set_metadata(&self, data: AudioMetadata) -> Result<(), AudioPlayerError> {
+ pub fn set_metadata(&mut self, data: AudioMetadata) -> Result<(), AudioPlayerError> {
self.set_source_url(data.url.clone())?;
- let mut currently_playing = self.currently_playing.write().unwrap();
- *currently_playing = Some(data);
+ self.currently_playing = Some(data);
Ok(())
}
fn set_source_url(&self, location: String) -> Result<(), AudioPlayerError> {
- info!("Setting location URI: {}", location);
+ info!(self.logger, "Setting source"; "url" => &location);
self.http_src.set_property("location", &location)?;
Ok(())
}
- pub fn change_volume(&self, volume: VolumeChange) -> Result<(), AudioPlayerError> {
+ pub fn change_volume(&mut self, volume: VolumeChange) -> Result<(), AudioPlayerError> {
let new_volume = match volume {
- VolumeChange::Positive(vol) => self.volume() + vol,
- VolumeChange::Negative(vol) => self.volume() - vol,
+ VolumeChange::Positive(vol) => self.volume_f64 + vol,
+ VolumeChange::Negative(vol) => self.volume_f64 - vol,
VolumeChange::Absolute(vol) => vol,
};
let new_volume = new_volume.max(0.0).min(1.0);
- *self.volume_f64.write().unwrap() = new_volume;
+ self.volume_f64 = new_volume;
let db = 50.0 * new_volume.log10();
- info!("Setting volume: {} -> {} dB", new_volume, db);
+ info!(self.logger, "Setting volume"; "volume" => new_volume, "db" => db);
let linear =
StreamVolume::convert_volume(StreamVolumeFormat::Db, StreamVolumeFormat::Linear, db);
@@ -212,36 +216,10 @@ impl AudioPlayer {
Ok(())
}
- pub fn is_started(&self) -> bool {
- let (_, current, pending) = self.pipeline.get_state(gst::ClockTime(None));
-
- match (current, pending) {
- (gst::State::Null, gst::State::VoidPending) => false,
- (_, gst::State::Null) => false,
- (gst::State::Ready, gst::State::VoidPending) => false,
- _ => true,
- }
- }
-
- pub fn volume(&self) -> f64 {
- *self.volume_f64.read().unwrap()
- }
-
- pub fn position(&self) -> Option<Duration> {
- self.pipeline
- .query_position::<gst::ClockTime>()
- .and_then(|t| t.0.map(Duration::from_nanos))
- }
-
- pub fn currently_playing(&self) -> Option<AudioMetadata> {
- self.currently_playing.read().unwrap().clone()
- }
+ pub fn reset(&mut self) -> Result<(), AudioPlayerError> {
+ info!(self.logger, "Setting pipeline state"; "to" => "null");
- pub fn reset(&self) -> Result<(), AudioPlayerError> {
- info!("Setting pipeline state to null");
-
- let mut currently_playing = self.currently_playing.write().unwrap();
- *currently_playing = None;
+ self.currently_playing = None;
self.pipeline.set_state(gst::State::Null)?;
@@ -249,7 +227,7 @@ impl AudioPlayer {
}
pub fn play(&self) -> Result<(), AudioPlayerError> {
- info!("Setting pipeline state to playing");
+ info!(self.logger, "Setting pipeline state"; "to" => "playing");
self.pipeline.set_state(gst::State::Playing)?;
@@ -257,7 +235,7 @@ impl AudioPlayer {
}
pub fn pause(&self) -> Result<(), AudioPlayerError> {
- info!("Setting pipeline state to paused");
+ info!(self.logger, "Setting pipeline state"; "to" => "paused");
self.pipeline.set_state(gst::State::Paused)?;
@@ -289,7 +267,7 @@ impl AudioPlayer {
};
let time = humantime::format_duration(absolute);
- info!("Seeking to {}", time);
+ info!(self.logger, "Seeking"; "time" => %time);
self.pipeline.seek_simple(
gst::SeekFlags::FLUSH,
@@ -300,121 +278,125 @@ impl AudioPlayer {
}
pub fn stop_current(&self) -> Result<(), AudioPlayerError> {
- info!("Stopping pipeline, sending EOS");
+ info!(self.logger, "Stopping pipeline, sending EOS");
self.bus.post(&gst::message::Eos::new())?;
Ok(())
}
- pub fn quit(&self, reason: String) {
- info!("Quitting audio player");
-
- if self
- .bus
- .post(&gst::message::Application::new(gst::Structure::new_empty(
- "quit",
- )))
- .is_err()
- {
- warn!("Tried to send \"quit\" app event on flushing bus.");
+ pub fn is_started(&self) -> bool {
+ let (_, current, pending) = self.pipeline.get_state(gst::ClockTime(None));
+
+ match (current, pending) {
+ (gst::State::Null, gst::State::VoidPending) => false,
+ (_, gst::State::Null) => false,
+ (gst::State::Ready, gst::State::VoidPending) => false,
+ _ => true,
}
+ }
- let sender = self.sender.read().unwrap();
- sender.send(MusicBotMessage::Quit(reason)).unwrap();
+ pub fn volume(&self) -> f64 {
+ self.volume_f64
}
- fn send_state(&self, state: State) {
- info!("Sending state {:?} to application", state);
- let sender = self.sender.read().unwrap();
- sender.send(MusicBotMessage::StateChange(state)).unwrap();
+ pub fn position(&self) -> Option<Duration> {
+ self.pipeline
+ .query_position::<gst::ClockTime>()
+ .and_then(|t| t.0.map(Duration::from_nanos))
}
- pub fn poll(&self) -> PollResult {
- debug!("Polling GStreamer");
- 'outer: loop {
- while let Some(msg) = self.bus.timed_pop(gst::ClockTime(None)) {
- use gst::MessageView;
-
- match msg.view() {
- MessageView::StateChanged(state) => {
- if let Some(src) = state.get_src() {
- if src.get_name() != self.pipeline.get_name() {
- continue;
- }
- }
+ pub fn currently_playing(&self) -> Option<AudioMetadata> {
+ self.currently_playing.clone()
+ }
+
+ pub fn register_bot(&self, bot: WeakAddress<MusicBot>) {
+ let pipeline_name = self.pipeline.get_name();
+ debug!(self.logger, "Setting sync handler on gstreamer bus");
- let old = state.get_old();
- let current = state.get_current();
- let pending = state.get_pending();
-
- match (old, current, pending) {
- (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => {
- self.send_state(State::Playing)
- }
- (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => {
- self.send_state(State::Paused)
- }
- (_, gst::State::Ready, gst::State::Null) => {
- self.send_state(State::Stopped)
- }
- (_, gst::State::Null, gst::State::VoidPending) => {
- self.send_state(State::Stopped)
- }
- _ => {
- debug!(
- "Pipeline transitioned from {:?} to {:?}, with {:?} pending",
- old, current, pending
- );
- }
+ let logger = self.logger.clone();
+ let handle = tokio::runtime::Handle::current();
+ self.bus.set_sync_handler(move |_, msg| {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::StateChanged(state) => {
+ if let Some(src) = state.get_src() {
+ if src.get_name() != pipeline_name {
+ return gst::BusSyncReply::Drop;
}
}
- MessageView::Eos(..) => {
- info!("End of stream reached");
- self.reset().unwrap();
- break 'outer;
- }
- MessageView::Warning(warn) => {
- warn!(
- "Warning from {:?}: {} ({:?})",
- warn.get_src().map(|s| s.get_path_string()),
- warn.get_error(),
- warn.get_debug()
- );
- break 'outer;
- }
- MessageView::Error(err) => {
- error!(
- "Error from {:?}: {} ({:?})",
- err.get_src().map(|s| s.get_path_string()),
- err.get_error(),
- err.get_debug()
- );
- break 'outer;
- }
- MessageView::Application(content) => {
- if let Some(s) = content.get_structure() {
- if s.get_name() == "quit" {
- self.reset().unwrap();
- return PollResult::Quit;
- }
+ let old = state.get_old();
+ let current = state.get_current();
+ let pending = state.get_pending();
+
+ match (old, current, pending) {
+ (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => {
+ send_state(&handle, &bot, State::Playing);
+ }
+ (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => {
+ send_state(&handle, &bot, State::Paused);
+ }
+ (_, gst::State::Ready, gst::State::Null) => {
+ send_state(&handle, &bot, State::Stopped);
+ }
+ (_, gst::State::Null, gst::State::VoidPending) => {
+ send_state(&handle, &bot, State::Stopped);
+ }
+ _ => {
+ debug!(
+ logger,
+ "Pipeline transitioned";
+ "from" => ?old,
+ "to" => ?current,
+ "pending" => ?pending
+ );
}
}
- _ => {
- //debug!("Unhandled message on bus: {:?}", msg)
- }
- };
+ }
+ MessageView::Eos(..) => {
+ info!(logger, "End of stream reached");
+
+ send_state(&handle, &bot, State::EndOfStream);
+ }
+ MessageView::Warning(warn) => {
+ warn!(
+ logger,
+ "Received warning from bus";
+ "source" => ?warn.get_src().map(|s| s.get_path_string()),
+ "error" => %warn.get_error(),
+ "debug" => ?warn.get_debug()
+ );
+ }
+ MessageView::Error(err) => {
+ error!(
+ logger,
+ "Received error from bus";
+ "source" => ?err.get_src().map(|s| s.get_path_string()),
+ "error" => %err.get_error(),
+ "debug" => ?err.get_debug()
+ );
+
+ send_state(&handle, &bot, State::EndOfStream);
+ }
+ _ => {
+ //debug!("Unhandled message on bus: {:?}", msg)
+ }
}
- }
- debug!("Left GStreamer message loop");
- PollResult::Continue
+ gst::BusSyncReply::Drop
+ });
}
}
+fn send_state(handle: &tokio::runtime::Handle, addr: &WeakAddress<MusicBot>, state: State) {
+ handle.spawn(addr.send(MusicBotMessage::StateChange(state)));
+}
+
#[derive(Debug)]
pub enum AudioPlayerError {
+ MissingPlugin(String),
GStreamerError(glib::error::BoolError),
StateChangeFailed,
SeekError,