From 4e1c2b9f04073294ecb8402486c20d9c01721598 Mon Sep 17 00:00:00 2001 From: Jokler Date: Wed, 14 Oct 2020 00:19:27 +0200 Subject: Replace channels&locks with actors & log with slog --- src/audio_player.rs | 328 +++++++++++++++++++++++++--------------------------- 1 file changed, 155 insertions(+), 173 deletions(-) (limited to 'src/audio_player.rs') diff --git a/src/audio_player.rs b/src/audio_player.rs index 1f6649f..23581f9 100644 --- a/src/audio_player.rs +++ b/src/audio_player.rs @@ -7,36 +7,31 @@ use gstreamer as gst; use gstreamer_app::{AppSink, AppSinkCallbacks}; use gstreamer_audio::{StreamVolume, StreamVolumeFormat}; -use crate::bot::{MusicBotMessage, State}; use glib::BoolError; -use log::{debug, error, info, warn}; -use std::sync::{Arc, RwLock}; -use tokio::sync::mpsc::UnboundedSender; +use slog::{debug, error, info, warn, Logger}; +use xtra::WeakAddress; +use crate::bot::{MusicBot, MusicBotMessage, State}; use crate::command::{Seek, VolumeChange}; use crate::youtube_dl::AudioMetadata; static GST_INIT: Once = Once::new(); -#[derive(PartialEq, Eq, Debug, Clone, Copy)] -pub enum PollResult { - Continue, - Quit, -} - pub struct AudioPlayer { pipeline: gst::Pipeline, bus: gst::Bus, http_src: gst::Element, - volume_f64: RwLock, + volume_f64: f64, volume: gst::Element, - sender: Arc>>, - currently_playing: RwLock>, + currently_playing: Option, + + logger: Logger, } fn make_element(factoryname: &str, display_name: &str) -> Result { - Ok(gst::ElementFactory::make(factoryname, Some(display_name))?) + Ok(gst::ElementFactory::make(factoryname, Some(display_name)) + .map_err(|_| AudioPlayerError::MissingPlugin(factoryname.to_string()))?) } fn link_elements(a: &gst::Element, b: &gst::Element) -> Result<(), AudioPlayerError> { @@ -49,11 +44,12 @@ fn add_decode_bin_new_pad_callback( decode_bin: &gst::Element, audio_bin: gst::Bin, ghost_pad: gst::GhostPad, + logger: Logger, ) { decode_bin.connect_pad_added(move |_, new_pad| { - debug!("New pad received on decode bin"); + debug!(logger, "New pad received on decode bin"); let name = if let Some(caps) = new_pad.get_current_caps() { - debug!("Pad caps: {}", caps.to_string()); + debug!(logger, "Found caps"; "caps" => caps.to_string()); if let Some(structure) = caps.get_structure(0) { Some(structure.get_name().to_string()) } else { @@ -68,7 +64,7 @@ fn add_decode_bin_new_pad_callback( peer.unlink(&ghost_pad).unwrap(); } - info!("Found raw audio, linking audio bin"); + info!(logger, "Found raw audio, linking audio bin"); new_pad.link(&ghost_pad).unwrap(); audio_bin.sync_state_with_parent().unwrap(); @@ -77,27 +73,15 @@ fn add_decode_bin_new_pad_callback( } impl AudioPlayer { - pub fn new( - sender: Arc>>, - callback: Option>, - ) -> Result { + pub fn new(logger: Logger) -> Result { GST_INIT.call_once(|| gst::init().unwrap()); - info!("Creating audio player"); + info!(logger, "Creating audio player"); let pipeline = gst::Pipeline::new(Some("TeamSpeak Audio Player")); let bus = pipeline.get_bus().unwrap(); let http_src = make_element("souphttpsrc", "http source")?; - let decode_bin = make_element("decodebin", "decode bin")?; - pipeline.add_many(&[&http_src, &decode_bin])?; - - link_elements(&http_src, &decode_bin)?; - - let (audio_bin, volume, ghost_pad) = Self::create_audio_bin(callback)?; - - add_decode_bin_new_pad_callback(&decode_bin, audio_bin.clone(), ghost_pad); - - pipeline.add(&audio_bin)?; + let volume = make_element("volume", "volume")?; // The documentation says that we have to make sure to handle // all messages if auto flushing is deactivated. @@ -111,26 +95,31 @@ impl AudioPlayer { pipeline, bus, http_src, + logger, - volume_f64: RwLock::new(0.0), + volume_f64: 0.0, volume, - sender, - currently_playing: RwLock::new(None), + currently_playing: None, }) } - fn create_audio_bin( + pub fn setup_with_audio_callback( + &self, callback: Option>, - ) -> Result<(gst::Bin, gst::Element, gst::GhostPad), AudioPlayerError> { + ) -> Result<(), AudioPlayerError> { + let decode_bin = make_element("decodebin", "decode bin")?; + self.pipeline.add_many(&[&self.http_src, &decode_bin])?; + + link_elements(&self.http_src, &decode_bin)?; + let audio_bin = gst::Bin::new(Some("audio bin")); let queue = make_element("queue", "audio queue")?; let convert = make_element("audioconvert", "audio converter")?; - let volume = make_element("volume", "volume")?; let resample = make_element("audioresample", "audio resampler")?; let pads = queue.get_sink_pads(); let queue_sink_pad = pads.first().unwrap(); - audio_bin.add_many(&[&queue, &convert, &volume, &resample])?; + audio_bin.add_many(&[&queue, &convert, &self.volume, &resample])?; if let Some(mut callback) = callback { let opus_enc = make_element("opusenc", "opus encoder")?; @@ -160,49 +149,64 @@ impl AudioPlayer { audio_bin.add_many(&[&opus_enc, &sink])?; - gst::Element::link_many(&[&queue, &convert, &volume, &resample, &opus_enc, &sink])?; + gst::Element::link_many(&[ + &queue, + &convert, + &self.volume, + &resample, + &opus_enc, + &sink, + ])?; } else { let sink = make_element("autoaudiosink", "auto audio sink")?; audio_bin.add_many(&[&sink])?; - gst::Element::link_many(&[&queue, &convert, &volume, &resample, &sink])?; + gst::Element::link_many(&[&queue, &convert, &self.volume, &resample, &sink])?; }; let ghost_pad = GhostPad::with_target(Some("audio bin sink"), queue_sink_pad).unwrap(); ghost_pad.set_active(true)?; audio_bin.add_pad(&ghost_pad)?; - Ok((audio_bin, volume, ghost_pad)) + add_decode_bin_new_pad_callback( + &decode_bin, + audio_bin.clone(), + ghost_pad, + self.logger.clone(), + ); + + self.pipeline.add(&audio_bin)?; + + Ok(()) } - pub fn set_metadata(&self, data: AudioMetadata) -> Result<(), AudioPlayerError> { + pub fn set_metadata(&mut self, data: AudioMetadata) -> Result<(), AudioPlayerError> { self.set_source_url(data.url.clone())?; - let mut currently_playing = self.currently_playing.write().unwrap(); - *currently_playing = Some(data); + self.currently_playing = Some(data); Ok(()) } fn set_source_url(&self, location: String) -> Result<(), AudioPlayerError> { - info!("Setting location URI: {}", location); + info!(self.logger, "Setting source"; "url" => &location); self.http_src.set_property("location", &location)?; Ok(()) } - pub fn change_volume(&self, volume: VolumeChange) -> Result<(), AudioPlayerError> { + pub fn change_volume(&mut self, volume: VolumeChange) -> Result<(), AudioPlayerError> { let new_volume = match volume { - VolumeChange::Positive(vol) => self.volume() + vol, - VolumeChange::Negative(vol) => self.volume() - vol, + VolumeChange::Positive(vol) => self.volume_f64 + vol, + VolumeChange::Negative(vol) => self.volume_f64 - vol, VolumeChange::Absolute(vol) => vol, }; let new_volume = new_volume.max(0.0).min(1.0); - *self.volume_f64.write().unwrap() = new_volume; + self.volume_f64 = new_volume; let db = 50.0 * new_volume.log10(); - info!("Setting volume: {} -> {} dB", new_volume, db); + info!(self.logger, "Setting volume"; "volume" => new_volume, "db" => db); let linear = StreamVolume::convert_volume(StreamVolumeFormat::Db, StreamVolumeFormat::Linear, db); @@ -212,36 +216,10 @@ impl AudioPlayer { Ok(()) } - pub fn is_started(&self) -> bool { - let (_, current, pending) = self.pipeline.get_state(gst::ClockTime(None)); - - match (current, pending) { - (gst::State::Null, gst::State::VoidPending) => false, - (_, gst::State::Null) => false, - (gst::State::Ready, gst::State::VoidPending) => false, - _ => true, - } - } - - pub fn volume(&self) -> f64 { - *self.volume_f64.read().unwrap() - } - - pub fn position(&self) -> Option { - self.pipeline - .query_position::() - .and_then(|t| t.0.map(Duration::from_nanos)) - } - - pub fn currently_playing(&self) -> Option { - self.currently_playing.read().unwrap().clone() - } + pub fn reset(&mut self) -> Result<(), AudioPlayerError> { + info!(self.logger, "Setting pipeline state"; "to" => "null"); - pub fn reset(&self) -> Result<(), AudioPlayerError> { - info!("Setting pipeline state to null"); - - let mut currently_playing = self.currently_playing.write().unwrap(); - *currently_playing = None; + self.currently_playing = None; self.pipeline.set_state(gst::State::Null)?; @@ -249,7 +227,7 @@ impl AudioPlayer { } pub fn play(&self) -> Result<(), AudioPlayerError> { - info!("Setting pipeline state to playing"); + info!(self.logger, "Setting pipeline state"; "to" => "playing"); self.pipeline.set_state(gst::State::Playing)?; @@ -257,7 +235,7 @@ impl AudioPlayer { } pub fn pause(&self) -> Result<(), AudioPlayerError> { - info!("Setting pipeline state to paused"); + info!(self.logger, "Setting pipeline state"; "to" => "paused"); self.pipeline.set_state(gst::State::Paused)?; @@ -289,7 +267,7 @@ impl AudioPlayer { }; let time = humantime::format_duration(absolute); - info!("Seeking to {}", time); + info!(self.logger, "Seeking"; "time" => %time); self.pipeline.seek_simple( gst::SeekFlags::FLUSH, @@ -300,121 +278,125 @@ impl AudioPlayer { } pub fn stop_current(&self) -> Result<(), AudioPlayerError> { - info!("Stopping pipeline, sending EOS"); + info!(self.logger, "Stopping pipeline, sending EOS"); self.bus.post(&gst::message::Eos::new())?; Ok(()) } - pub fn quit(&self, reason: String) { - info!("Quitting audio player"); - - if self - .bus - .post(&gst::message::Application::new(gst::Structure::new_empty( - "quit", - ))) - .is_err() - { - warn!("Tried to send \"quit\" app event on flushing bus."); + pub fn is_started(&self) -> bool { + let (_, current, pending) = self.pipeline.get_state(gst::ClockTime(None)); + + match (current, pending) { + (gst::State::Null, gst::State::VoidPending) => false, + (_, gst::State::Null) => false, + (gst::State::Ready, gst::State::VoidPending) => false, + _ => true, } + } - let sender = self.sender.read().unwrap(); - sender.send(MusicBotMessage::Quit(reason)).unwrap(); + pub fn volume(&self) -> f64 { + self.volume_f64 } - fn send_state(&self, state: State) { - info!("Sending state {:?} to application", state); - let sender = self.sender.read().unwrap(); - sender.send(MusicBotMessage::StateChange(state)).unwrap(); + pub fn position(&self) -> Option { + self.pipeline + .query_position::() + .and_then(|t| t.0.map(Duration::from_nanos)) } - pub fn poll(&self) -> PollResult { - debug!("Polling GStreamer"); - 'outer: loop { - while let Some(msg) = self.bus.timed_pop(gst::ClockTime(None)) { - use gst::MessageView; - - match msg.view() { - MessageView::StateChanged(state) => { - if let Some(src) = state.get_src() { - if src.get_name() != self.pipeline.get_name() { - continue; - } - } + pub fn currently_playing(&self) -> Option { + self.currently_playing.clone() + } + + pub fn register_bot(&self, bot: WeakAddress) { + let pipeline_name = self.pipeline.get_name(); + debug!(self.logger, "Setting sync handler on gstreamer bus"); - let old = state.get_old(); - let current = state.get_current(); - let pending = state.get_pending(); - - match (old, current, pending) { - (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => { - self.send_state(State::Playing) - } - (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => { - self.send_state(State::Paused) - } - (_, gst::State::Ready, gst::State::Null) => { - self.send_state(State::Stopped) - } - (_, gst::State::Null, gst::State::VoidPending) => { - self.send_state(State::Stopped) - } - _ => { - debug!( - "Pipeline transitioned from {:?} to {:?}, with {:?} pending", - old, current, pending - ); - } + let logger = self.logger.clone(); + let handle = tokio::runtime::Handle::current(); + self.bus.set_sync_handler(move |_, msg| { + use gst::MessageView; + + match msg.view() { + MessageView::StateChanged(state) => { + if let Some(src) = state.get_src() { + if src.get_name() != pipeline_name { + return gst::BusSyncReply::Drop; } } - MessageView::Eos(..) => { - info!("End of stream reached"); - self.reset().unwrap(); - break 'outer; - } - MessageView::Warning(warn) => { - warn!( - "Warning from {:?}: {} ({:?})", - warn.get_src().map(|s| s.get_path_string()), - warn.get_error(), - warn.get_debug() - ); - break 'outer; - } - MessageView::Error(err) => { - error!( - "Error from {:?}: {} ({:?})", - err.get_src().map(|s| s.get_path_string()), - err.get_error(), - err.get_debug() - ); - break 'outer; - } - MessageView::Application(content) => { - if let Some(s) = content.get_structure() { - if s.get_name() == "quit" { - self.reset().unwrap(); - return PollResult::Quit; - } + let old = state.get_old(); + let current = state.get_current(); + let pending = state.get_pending(); + + match (old, current, pending) { + (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => { + send_state(&handle, &bot, State::Playing); + } + (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => { + send_state(&handle, &bot, State::Paused); + } + (_, gst::State::Ready, gst::State::Null) => { + send_state(&handle, &bot, State::Stopped); + } + (_, gst::State::Null, gst::State::VoidPending) => { + send_state(&handle, &bot, State::Stopped); + } + _ => { + debug!( + logger, + "Pipeline transitioned"; + "from" => ?old, + "to" => ?current, + "pending" => ?pending + ); } } - _ => { - //debug!("Unhandled message on bus: {:?}", msg) - } - }; + } + MessageView::Eos(..) => { + info!(logger, "End of stream reached"); + + send_state(&handle, &bot, State::EndOfStream); + } + MessageView::Warning(warn) => { + warn!( + logger, + "Received warning from bus"; + "source" => ?warn.get_src().map(|s| s.get_path_string()), + "error" => %warn.get_error(), + "debug" => ?warn.get_debug() + ); + } + MessageView::Error(err) => { + error!( + logger, + "Received error from bus"; + "source" => ?err.get_src().map(|s| s.get_path_string()), + "error" => %err.get_error(), + "debug" => ?err.get_debug() + ); + + send_state(&handle, &bot, State::EndOfStream); + } + _ => { + //debug!("Unhandled message on bus: {:?}", msg) + } } - } - debug!("Left GStreamer message loop"); - PollResult::Continue + gst::BusSyncReply::Drop + }); } } +fn send_state(handle: &tokio::runtime::Handle, addr: &WeakAddress, state: State) { + handle.spawn(addr.send(MusicBotMessage::StateChange(state))); +} + #[derive(Debug)] pub enum AudioPlayerError { + MissingPlugin(String), GStreamerError(glib::error::BoolError), StateChangeFailed, SeekError, -- cgit v1.2.3-70-g09d2