summaryrefslogtreecommitdiffstats
path: root/src/audio_player.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/audio_player.rs')
-rw-r--r--src/audio_player.rs328
1 files changed, 328 insertions, 0 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs
new file mode 100644
index 0000000..6626417
--- /dev/null
+++ b/src/audio_player.rs
@@ -0,0 +1,328 @@
+use std::sync::Once;
+
+use gstreamer as gst;
+use gst::prelude::*;
+use gstreamer_app::{AppSink, AppSinkCallbacks};
+use gst::{GhostPad};
+
+use log::{info, debug, warn, error};
+use std::sync::mpsc::Sender;
+use std::sync::{Mutex, Arc};
+use crate::{State, ApplicationMessage};
+use glib::BoolError;
+
+static GST_INIT: Once = Once::new();
+
+#[derive(Debug)]
+pub enum AudioPlayerError {
+ GStreamerError(glib::error::BoolError),
+ StateChangeFailed,
+}
+
+impl From<glib::error::BoolError> for AudioPlayerError {
+ fn from(err: BoolError) -> Self {
+ AudioPlayerError::GStreamerError(err)
+ }
+}
+
+impl From<gst::StateChangeError> for AudioPlayerError {
+ fn from(_err: gst::StateChangeError) -> Self {
+ AudioPlayerError::StateChangeFailed
+ }
+}
+
+pub struct AudioPlayer {
+ pipeline: gst::Pipeline,
+ bus: gst::Bus,
+ http_src: gst::Element,
+
+ volume: gst::Element,
+ sender: Arc<Mutex<Sender<ApplicationMessage>>>,
+}
+
+fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> {
+ Ok(gst::ElementFactory::make(
+ factoryname,
+ Some(display_name)
+ )?)
+}
+
+fn link_elements(a: &gst::Element, b: &gst::Element) -> Result<(), AudioPlayerError> {
+ a.link(b)?;
+
+ Ok(())
+}
+
+fn add_decode_bin_new_pad_callback(
+ decode_bin: &gst::Element,
+ audio_bin: gst::Bin,
+ ghost_pad: gst::GhostPad,
+) {
+ decode_bin.connect_pad_added(move |_, new_pad| {
+ debug!("New pad received on decode bin");
+ let name = if let Some(caps) = new_pad.get_current_caps() {
+ debug!("Pad caps: {}", caps.to_string());
+ if let Some(structure) = caps.get_structure(0) {
+ Some(structure.get_name().to_string())
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+
+ match name.as_deref() {
+ Some("audio/x-raw") => {
+ if let Some(peer) = ghost_pad.get_peer() {
+ peer.unlink(&ghost_pad).unwrap();
+ }
+
+ info!("Found raw audio, linking audio bin");
+ new_pad.link(&ghost_pad).unwrap();
+
+ audio_bin.sync_state_with_parent().unwrap();
+ }
+ _ => {}
+ }
+ });
+}
+
+impl AudioPlayer {
+ pub fn new(sender: Arc<Mutex<Sender<ApplicationMessage>>>, callback: Option<Box<dyn FnMut(&[u8]) + Send>>) -> Result<Self, AudioPlayerError> {
+ GST_INIT.call_once(|| gst::init().unwrap());
+
+ info!("Creating audio player");
+
+ let pipeline = gst::Pipeline::new(Some("TeamSpeak Audio Player"));
+ let bus = pipeline.get_bus().unwrap();
+ let http_src = make_element("souphttpsrc", "http source")?;
+ let decode_bin = make_element("decodebin", "decode bin")?;
+ pipeline.add_many(&[&http_src, &decode_bin])?;
+
+ link_elements(&http_src, &decode_bin)?;
+
+ let (audio_bin, volume, ghost_pad) = Self::create_audio_bin(callback)?;
+
+ add_decode_bin_new_pad_callback(
+ &decode_bin,
+ audio_bin.clone(),
+ ghost_pad);
+
+ pipeline.add(&audio_bin)?;
+
+ pipeline.set_state(gst::State::Ready)?;
+
+ Ok(AudioPlayer {
+ pipeline,
+ bus,
+ http_src,
+
+ volume,
+ sender,
+ })
+ }
+
+ fn create_audio_bin(callback: Option<Box<dyn FnMut(&[u8]) + Send>>) -> Result<(gst::Bin, gst::Element, gst::GhostPad), AudioPlayerError> {
+ let audio_bin = gst::Bin::new(Some("audio bin"));
+ let queue = make_element("queue", "audio queue")?;
+ let convert = make_element("audioconvert", "audio converter")?;
+ let volume = make_element("volume", "volume")?;
+ let resample = make_element("audioresample", "audio resampler")?;
+ let pads = queue.get_sink_pads();
+ let queue_sink_pad = pads
+ .first()
+ .unwrap();
+
+ audio_bin.add_many(&[
+ &queue,
+ &convert,
+ &volume,
+ &resample,
+ ])?;
+
+ if let Some(mut callback) = callback {
+ let opus_enc = make_element("opusenc", "opus encoder")?;
+ let sink = make_element("appsink", "app sink")?;
+
+ let appsink = sink
+ .clone()
+ .dynamic_cast::<AppSink>()
+ .expect("Sink element is expected to be an appsink!");
+ appsink.set_caps(Some(&gst::Caps::new_simple("audio/x-opus", &[])));
+ let callbacks = AppSinkCallbacks::new()
+ .new_sample(move |sink| {
+ let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
+ let buffer = sample.get_buffer().ok_or(gst::FlowError::Error)?;
+ let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
+ let samples = map.as_slice();
+
+ callback(samples);
+
+ Ok(gst::FlowSuccess::Ok)
+ })
+ .build();
+ appsink.set_callbacks(callbacks);
+
+ audio_bin.add_many(&[
+ &opus_enc,
+ &sink
+ ])?;
+
+ gst::Element::link_many(&[
+ &queue,
+ &convert,
+ &volume,
+ &resample,
+ &opus_enc,
+ &sink
+ ])?;
+ } else {
+ let sink = make_element("autoaudiosink", "auto audio sink")?;
+
+ audio_bin.add_many(&[
+ &sink
+ ])?;
+
+ gst::Element::link_many(&[
+ &queue,
+ &convert,
+ &volume,
+ &resample,
+ &sink
+ ])?;
+ };
+
+ let ghost_pad = GhostPad::new(Some("audio bin sink"), queue_sink_pad).unwrap();
+ ghost_pad.set_active(true)?;
+ audio_bin.add_pad(&ghost_pad)?;
+
+ Ok((audio_bin, volume, ghost_pad))
+ }
+
+ pub fn set_source_url(&self, location: String) -> Result<(), AudioPlayerError> {
+ info!("Setting location URI: {}", location);
+ self.http_src.set_property("location", &location)?;
+
+ Ok(())
+ }
+
+ pub fn set_volume(&self, volume: f64) -> Result<(), AudioPlayerError> {
+ let log_volume = 1.0 - 10.0f64.powf(-volume * 2.0);
+ info!("Setting volume: {} -> {}", volume, log_volume);
+
+ self.volume.set_property("volume", &log_volume)?;
+
+ Ok(())
+ }
+
+ pub fn is_started(&self) -> bool {
+ let (_, current, pending) = self.pipeline.get_state(gst::ClockTime(None));
+
+ match (current, pending) {
+ (gst::State::Null, gst::State::VoidPending) => false,
+ (_, gst::State::Null) => false,
+ (gst::State::Ready, gst::State::VoidPending) => false,
+ _ => true
+ }
+ }
+
+ pub fn reset(&self) -> Result<(), AudioPlayerError> {
+ info!("Setting pipeline state to null");
+
+ self.pipeline.set_state(gst::State::Null)?;
+
+ Ok(())
+ }
+
+ pub fn play(&self) -> Result<(), AudioPlayerError> {
+ info!("Setting pipeline state to playing");
+
+ self.pipeline.set_state(gst::State::Playing)?;
+
+ Ok(())
+ }
+
+ pub fn pause(&self) -> Result<(), AudioPlayerError> {
+ info!("Setting pipeline state to paused");
+
+ self.pipeline.set_state(gst::State::Paused)?;
+
+ Ok(())
+ }
+
+ pub fn stop_current(&self) {
+ info!("Stopping pipeline, sending EOS");
+
+ let handled = self.http_src.send_event(gst::Event::new_eos().build());
+ if !handled {
+ warn!("EOS event was not handled");
+ }
+ }
+
+ fn send_state(&self, state: State) {
+ info!("Sending state {:?} to application", state);
+ let sender = self.sender.lock().unwrap();
+ sender.send(ApplicationMessage::StateChange(state)).unwrap();
+ }
+
+ pub fn poll(&self) {
+ debug!("Polling GStreamer");
+ 'outer: loop {
+ while let Some(msg) = self.bus.timed_pop(gst::ClockTime(None)) {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::StateChanged(state) => {
+ if let Some(src) = state.get_src() {
+ if src.get_name() != self.pipeline.get_name() {
+ continue;
+ }
+ }
+
+ let old = state.get_old();
+ let current = state.get_current();
+ let pending = state.get_pending();
+
+ match (old, current, pending) {
+ (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => self.send_state(State::Playing),
+ (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => self.send_state(State::Paused),
+ (_, gst::State::Ready, gst::State::Null) => self.send_state(State::Stopped),
+ (_, gst::State::Null, gst::State::VoidPending) => self.send_state(State::Stopped),
+ _ => {
+ debug!("Pipeline transitioned from {:?} to {:?}, with {:?} pending", old, current, pending);
+ }
+ }
+ }
+ MessageView::Eos(..) => {
+ info!("End of stream reached");
+ self.reset().unwrap();
+
+ break 'outer;
+ },
+ MessageView::Warning(warn) => {
+ warn!(
+ "Warning from {:?}: {} ({:?})",
+ warn.get_src().map(|s| s.get_path_string()),
+ warn.get_error(),
+ warn.get_debug()
+ );
+ break 'outer;
+ }
+ MessageView::Error(err) => {
+ error!(
+ "Error from {:?}: {} ({:?})",
+ err.get_src().map(|s| s.get_path_string()),
+ err.get_error(),
+ err.get_debug()
+ );
+ break 'outer;
+ }
+ _ => {
+ // debug!("{:?}", msg)
+ },
+ };
+ }
+ }
+ debug!("Left GStreamer message loop");
+ }
+} \ No newline at end of file