diff options
Diffstat (limited to 'src/state.rs')
| -rw-r--r-- | src/state.rs | 436 |
1 files changed, 0 insertions, 436 deletions
diff --git a/src/state.rs b/src/state.rs deleted file mode 100644 index bf953ce..0000000 --- a/src/state.rs +++ /dev/null @@ -1,436 +0,0 @@ -use std::process::Command; -use std::sync::{Arc, Mutex}; - -use futures::compat::Future01CompatExt; -use futures01::{future::Future, sink::Sink}; -use futures_util::stream::StreamExt; -use tokio_process::CommandExt; - -use tsclientlib::{Connection, DisconnectOptions, MessageTarget}; - -use gst::prelude::*; -use gstreamer as gst; -use gstreamer_app as gst_app; - -use byte_slice_cast::*; - -use crate::playlist::{AudioRequest, Playlist}; - -#[derive(Clone)] -pub struct State { - conn: Arc<Connection>, - pipeline: Arc<gst::Pipeline>, - playlist: Arc<Mutex<Playlist>>, -} - -impl State { - pub fn new(conn: Connection) -> Self { - let conn_arc = Arc::new(conn); - - gst::init().unwrap(); - let pipeline = gst::Pipeline::new(Some("Ts Audio Player")); - let http_dl = gst::ElementFactory::make("souphttpsrc", Some("http-source")).unwrap(); - let decoder = gst::ElementFactory::make("decodebin", Some("video-decoder")).unwrap(); - pipeline.add_many(&[&http_dl, &decoder]).unwrap(); - - http_dl - .link(&decoder) - .expect("Can link https_dl to decoder"); - - let pipeline_weak = pipeline.downgrade(); - - let inner_conn = conn_arc.clone(); - decoder.connect_pad_added(move |_, decoder_src| { - let pipeline = match pipeline_weak.upgrade() { - Some(pipeline) => pipeline, - None => return, - }; - - let is_audio = { - let media_type = decoder_src.get_current_caps().and_then(|caps| { - caps.get_structure(0).map(|s| { - let name = s.get_name(); - name.starts_with("audio/") - }) - }); - - match media_type { - None => { - eprintln!( - "Failed to get media type from pad {}", - decoder_src.get_name() - ); - return; - } - Some(media_type) => media_type, - } - }; - - if is_audio { - let prev_volume = if let Some(volume) = pipeline.get_by_name("volume") { - volume - .get_property("volume") - .expect("Can get volume property") - .get_some::<f64>() - .unwrap_or(0.1) - } else { - 0.1 - }; - - let names = [ - "audio-converter", - "volume", - "audio-resampler", - "opus-encoder", - "app-sink", - ]; - for element in pipeline.iterate_elements() { - if let Ok(element) = element { - if names.contains(&&*element.get_name()) { - element.set_state(gst::State::Null).unwrap(); - pipeline - .remove_many(&[&element]) - .expect("Can remove element"); - } - } - } - - let convert = - gst::ElementFactory::make("audioconvert", Some("audio-converter")).unwrap(); - let volume = gst::ElementFactory::make("volume", Some("volume")).unwrap(); - let resample = - gst::ElementFactory::make("audioresample", Some("audio-resampler")).unwrap(); - let opus_enc = gst::ElementFactory::make("opusenc", Some("opus-encoder")).unwrap(); - let sink = gst::ElementFactory::make("appsink", Some("app-sink")).unwrap(); - - sink.set_property("async", &false) - .expect("Can make app-sink async"); - - volume - .set_property("volume", &prev_volume) - .expect("Can change volume"); - - { - let elements = &[&convert, &volume, &resample, &opus_enc, &sink]; - pipeline.add_many(elements).expect("Can add audio elements"); - gst::Element::link_many(elements).expect("Can link audio elements"); - - for e in elements { - e.sync_state_with_parent() - .expect("Can sync state with parent"); - } - } - - let appsink = sink - .dynamic_cast::<gst_app::AppSink>() - .expect("Sink is an Appsink"); - - appsink.set_caps(Some(&gst::Caps::new_simple("audio/x-opus", &[]))); - - let inner_conn = inner_conn.clone(); - appsink.set_callbacks( - gst_app::AppSinkCallbacks::new() - // Add a handler to the "new-sample" signal. - .new_sample(move |appsink| { - // Pull the sample in question out of the appsink's buffer. - let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?; - let buffer = sample - .get_buffer() - .expect("Failed to get buffer from appsink"); - - let map = buffer - .map_readable() - .expect("Failed to map buffer readable"); - - let samples = map - .as_slice_of::<u8>() - .expect("Failed to interprete buffer as S16 PCM"); - - let packet = tsproto_packets::packets::OutAudio::new( - &tsproto_packets::packets::AudioData::C2S { - id: 0, - codec: tsproto_packets::packets::CodecType::OpusMusic, - data: &samples, - }, - ); - - let send_packet = inner_conn - .get_packet_sink() - .send(packet) - .map(|_| ()) - .map_err(|e| println!("Failed to send voice packet: {}", e)); - - tokio::run(send_packet); - - Ok(gst::FlowSuccess::Ok) - }) - .build(), - ); - - let convert_sink = convert - .get_static_pad("sink") - .expect("queue has no sinkpad"); - decoder_src - .link(&convert_sink) - .expect("Can link decoder src to convert sink"); - } - }); - - Self { - conn: conn_arc, - pipeline: Arc::new(pipeline), - playlist: Arc::new(Mutex::new(Playlist::new())), - } - } - - pub async fn add_audio(&self, url: String) { - if self - .playlist - .lock() - .expect("Mutex was not poisoned") - .is_full() - { - self.send_message(MessageTarget::Channel, "Playlist is full"); - return; - } - - let ytdl_args = [ - "--no-playlist", - "-f", - "bestaudio/best", - "-g", - "--get-filename", - "-o", - "%(title)s", - &url, - ]; - - let output = Command::new("youtube-dl") - .args(&ytdl_args) - .output_async() - .compat() - .await - .expect("youtube-dl is runnable"); - - if output.status.success() == false { - self.set_name("PokeBot"); - self.send_message(MessageTarget::Channel, "Failed to load url"); - return; - } - - let output_string = String::from_utf8(output.stdout).unwrap(); - let output_lines = output_string.lines().collect::<Vec<_>>(); - let address = output_lines[0]; - let title = output_lines[1]; - - let req = AudioRequest { - title: title.to_owned(), - address: address.to_owned(), - }; - - let state = self.pipeline.get_state(gst::ClockTime(None)).1; - if gst::State::Null == state { - self.set_name("PokeBot - Playing"); - self.start_audio(req); - } else { - match state { - gst::State::Playing => self.set_name("PokeBot - Playing"), - gst::State::Paused => self.set_name("PokeBot - Paused"), - gst::State::Ready => self.set_name("PokeBot - Stopped"), - gst::State::Null | gst::State::__Unknown(_) | gst::State::VoidPending => { - unreachable!() - } - } - - let title = req.title.clone(); - if self - .playlist - .lock() - .expect("Mutex was not poisoned") - .push(req) - == false - { - self.send_message(MessageTarget::Channel, "Playlist is full"); - } else { - self.send_message( - MessageTarget::Channel, - &format!("Added '{}' to the playlist", title), - ); - } - } - } - - pub async fn poll(&self) { - let bus = self - .pipeline - .get_bus() - .expect("Pipeline without bus. Shouldn't happen!"); - - let mut messages = gst::BusStream::new(&bus); - while let Some(msg) = messages.next().await { - use gst::MessageView; - - match msg.view() { - MessageView::Eos(..) => break, - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.get_src().map(|s| s.get_path_string()), - err.get_error(), - err.get_debug() - ); - break; - } - _ => (), - }; - } - - self.next(); - } - - pub fn start_audio(&self, req: AudioRequest) { - self.pipeline - .set_state(gst::State::Null) - .expect("Can set state to null"); - - let http_src = self - .pipeline - .get_by_name("http-source") - .expect("Http source should be registered"); - - http_src - .set_property("location", &&req.address) - .expect("Can set location on http_dl"); - - self.pipeline - .set_state(gst::State::Playing) - .expect("Can set state to playing"); - - self.set_description(&format!("Currently playing: {}", req.title)); - } - - pub fn volume(&self, volume: f64) { - if let Some(vol_filter) = self.pipeline.get_by_name("volume") { - vol_filter - .set_property("volume", &(10.0f64.powf(volume / 50.0 - 2.0))) - .expect("can change volume"); - - // TODO Reflect volume in name - } - } - - pub fn play(&self) { - let http_src = self - .pipeline - .get_by_name("http-source") - .expect("Http source should be registered"); - - if http_src - .get_property("location") - .ok() - .and_then(|v| v.get::<String>().ok().and_then(|s| s.map(|s| s.is_empty()))) - .unwrap_or(true) - { - if self - .playlist - .lock() - .expect("Mutex was not poisoned") - .is_empty() - { - self.send_message(MessageTarget::Channel, "There is nothing to play"); - return; - } - } - - self.pipeline - .set_state(gst::State::Playing) - .expect("can play"); - - self.set_name("PokeBot - Playing"); - } - - pub fn next(&self) { - if let Some(req) = self.playlist.lock().expect("Mutex was not poisoned").pop() { - self.start_audio(req); - } else { - self.pipeline - .set_state(gst::State::Null) - .expect("Can set state to null"); - - let http_src = self - .pipeline - .get_by_name("http-source") - .expect("Http source should be registered"); - - http_src - .set_property("location", &"") - .expect("Can set location on http_dl"); - - self.set_name("PokeBot"); - } - } - - pub fn clear(&self) { - self.playlist - .lock() - .expect("Mutex was not poisoned") - .clear(); - - self.send_message(MessageTarget::Channel, "Playlist was cleared"); - } - - pub fn pause(&self) { - self.pipeline - .set_state(gst::State::Paused) - .expect("can pause"); - - self.set_name("PokeBot - Paused"); - } - - pub fn stop(&self) { - self.pipeline - .set_state(gst::State::Ready) - .expect("can stop"); - - self.set_name("PokeBot - Stopped"); - } - - pub async fn disconnect(&self) { - self.conn - .disconnect(DisconnectOptions::new()) - .compat() - .await; - } - - pub fn send_message(&self, target: MessageTarget, message: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .send_message(target, message) - .map_err(|e| println!("Failed to send message: {}", e)), - ); - } - - pub fn set_name(&self, name: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .set_name(name) - .map_err(|e| println!("Failed to change name: {}", e)), - ); - } - - pub fn set_description(&self, desc: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .get_client(&self.conn.lock().own_client) - .expect("Can get myself") - .set_description(desc) - .map_err(|e| println!("Failed to change description: {}", e)), - ); - } -} |
