diff options
Diffstat (limited to 'src/state.rs')
| -rw-r--r-- | src/state.rs | 292 |
1 files changed, 195 insertions, 97 deletions
diff --git a/src/state.rs b/src/state.rs index dfccd9a..8c6a290 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,27 +1,26 @@ -use std::error::Error; -use std::process::{Command, Stdio}; -use std::sync::Arc; - -use futures::{ - compat::Future01CompatExt, - future::{FutureExt, TryFutureExt}, -}; -use futures01::{future::Future, sink::Sink}; +use std::process::Command; +use std::sync::{Arc, Mutex}; +use futures::compat::Future01CompatExt; +use futures01::{future::Future, sink::Sink}; use futures_util::stream::StreamExt; -use tsclientlib::{Connection, MessageTarget, DisconnectOptions}; +use tokio_process::CommandExt; + +use tsclientlib::{Connection, DisconnectOptions, MessageTarget}; use gst::prelude::*; use gstreamer as gst; use gstreamer_app as gst_app; -use gstreamer_audio as gst_audio; use byte_slice_cast::*; +use crate::playlist::{AudioRequest, Playlist}; + #[derive(Clone)] pub struct State { conn: Arc<Connection>, pipeline: Arc<gst::Pipeline>, + playlist: Arc<Mutex<Playlist>>, } impl State { @@ -68,6 +67,16 @@ impl State { }; if is_audio { + let prev_volume = if let Some(volume) = pipeline.get_by_name("volume") { + volume + .get_property("volume") + .expect("Can get volume property") + .get_some::<f64>() + .unwrap_or(0.1) + } else { + 0.1 + }; + let names = [ "audio-converter", "volume", @@ -78,8 +87,10 @@ impl State { for element in pipeline.iterate_elements() { if let Ok(element) = element { if names.contains(&&*element.get_name()) { - element.set_state(gst::State::Null); - pipeline.remove_many(&[&element]); + element.set_state(gst::State::Null).unwrap(); + pipeline + .remove_many(&[&element]) + .expect("Can remove element"); } } } @@ -92,9 +103,12 @@ impl State { let opus_enc = gst::ElementFactory::make("opusenc", Some("opus-encoder")).unwrap(); let sink = gst::ElementFactory::make("appsink", Some("app-sink")).unwrap(); - sink.set_property("async", &false); + sink.set_property("async", &false) + .expect("Can make app-sink async"); - volume.set_property("volume", &0.2); + volume + .set_property("volume", &prev_volume) + .expect("Can change volume"); { let elements = &[&convert, &volume, &resample, &opus_enc, &sink]; @@ -109,7 +123,7 @@ impl State { let appsink = sink .dynamic_cast::<gst_app::AppSink>() - .expect("Sink element is expected to be an appsink!"); + .expect("Sink is an Appsink"); appsink.set_caps(Some(&gst::Caps::new_simple("audio/x-opus", &[]))); @@ -144,7 +158,7 @@ impl State { .get_packet_sink() .send(packet) .map(|_| ()) - .map_err(|e| println!("Failed to send voice packet")); + .map_err(|e| println!("Failed to send voice packet: {}", e)); tokio::run(send_packet); @@ -165,68 +179,77 @@ impl State { Self { conn: conn_arc, pipeline: Arc::new(pipeline), + playlist: Arc::new(Mutex::new(Playlist::new())), } } - pub fn add_audio<'a>(&self, uri: String) { + pub async fn add_audio(&self, url: String) { + if self + .playlist + .lock() + .expect("Mutex was not poisoned") + .is_full() + { + self.send_message(MessageTarget::Channel, "Playlist is full"); + return; + } + let ytdl_args = [ "--no-playlist", "-f", "bestaudio/best", "-g", - &uri, + "--get-filename", "-o", - "-", + "%(title)s", + &url, ]; - let ytdl_output = Command::new("youtube-dl") + let output = Command::new("youtube-dl") .args(&ytdl_args) - .stdin(Stdio::null()) - .output() - .unwrap(); + .output_async() + .compat() + .await + .expect("youtube-dl is runnable"); - if ytdl_output.status.success() == false { - tokio::spawn( - self.conn - .lock() - .to_mut() - .set_name("PokeBot") - .map_err(|_| println!("Failed to change name")), - ); - tokio::spawn( - self.conn - .lock() - .to_mut() - .send_message(MessageTarget::Channel, "Failed to load url") - .map_err(|_| println!("Failed to change name")), - ); + if output.status.success() == false { + self.set_name("PokeBot"); + self.send_message(MessageTarget::Channel, "Failed to load url"); return; } - let dl_url: &str = &String::from_utf8(ytdl_output.stderr).unwrap(); - - self.pipeline - .set_state(gst::State::Ready) - .expect("Can set state to ready"); - - let http_src = self - .pipeline - .get_by_name("http-source") - .expect("Http source should be registered"); - http_src - .set_property("location", &dl_url) - .expect("Can set location on http_dl"); - self.pipeline - .set_state(gst::State::Playing) - .expect("Can set state to playing"); - - tokio::spawn( - self.conn + let output_string = String::from_utf8(output.stdout).unwrap(); + let output_lines = output_string.lines().collect::<Vec<_>>(); + let address = output_lines[0]; + let title = output_lines[1]; + + let req = AudioRequest { + title: title.to_owned(), + address: address.to_owned(), + }; + + if gst::State::Null == self.pipeline.get_state(gst::ClockTime(None)).1 { + self.set_name("PokeBot - Playing"); + self.start_audio(req); + } else { + self.set_name("PokeBot - Playing"); + + let title = req.title.clone(); + if self + .playlist .lock() - .to_mut() - .set_name("PokeBot - Playing") - .map_err(|_| println!("Failed to change name")), - ); + .expect("Mutex was not poisoned") + .push(req) + == false + { + self.send_message(MessageTarget::Channel, "Playlist is full"); + } else { + self.send_message( + MessageTarget::Channel, + &format!("Added '{}' to the playlist", title), + ); + } + } } pub async fn poll(&self) { @@ -239,8 +262,6 @@ impl State { while let Some(msg) = messages.next().await { use gst::MessageView; - // Determine whether we want to quit: on EOS or error message - // we quit, otherwise simply continue. match msg.view() { MessageView::Eos(..) => break, MessageView::Error(err) => { @@ -256,39 +277,96 @@ impl State { }; } - tokio::spawn( - self.conn - .lock() - .to_mut() - .set_name("PokeBot") - .map_err(|_| println!("Failed to change name")), - ); + self.next(); + } + pub fn start_audio(&self, req: AudioRequest) { self.pipeline .set_state(gst::State::Null) .expect("Can set state to null"); + + let http_src = self + .pipeline + .get_by_name("http-source") + .expect("Http source should be registered"); + + http_src + .set_property("location", &&req.address) + .expect("Can set location on http_dl"); + + self.pipeline + .set_state(gst::State::Playing) + .expect("Can set state to playing"); + + self.set_description(&format!("Currently playing: {}", req.title)); } pub fn volume(&self, volume: f64) { - self.pipeline - .get_by_name("volume") - .expect("Volume filter should be registered") - .set_property("volume", &volume) - .expect("can change volume"); + if let Some(vol_filter) = self.pipeline.get_by_name("volume") { + vol_filter + .set_property("volume", &(10.0f64.powf(volume / 50.0 - 2.0))) + .expect("can change volume"); + + // TODO Reflect volume in name + } } pub fn play(&self) { + let http_src = self + .pipeline + .get_by_name("http-source") + .expect("Http source should be registered"); + + if http_src + .get_property("location") + .ok() + .and_then(|v| v.get::<String>().ok().and_then(|s| s.map(|s| s.is_empty()))) + .unwrap_or(true) + { + if self + .playlist + .lock() + .expect("Mutex was not poisoned") + .is_empty() + { + self.send_message(MessageTarget::Channel, "There is nothing to play"); + return; + } + } + self.pipeline .set_state(gst::State::Playing) .expect("can play"); - tokio::spawn( - self.conn - .lock() - .to_mut() - .set_name("PokeBot - Playing") - .map_err(|_| println!("Failed to change name")), - ); + self.set_name("PokeBot - Playing"); + } + + pub fn next(&self) { + if let Some(req) = self.playlist.lock().expect("Mutex was not poisoned").pop() { + self.start_audio(req); + } else { + self.pipeline + .set_state(gst::State::Null) + .expect("Can set state to null"); + + let http_src = self + .pipeline + .get_by_name("http-source") + .expect("Http source should be registered"); + + http_src + .set_property("location", &"") + .expect("Can set location on http_dl"); + + self.set_name("PokeBot"); + } + } + + pub fn clear(&self) { + self.playlist + .lock() + .expect("Mutex was not poisoned") + .clear(); } pub fn pause(&self) { @@ -296,13 +374,7 @@ impl State { .set_state(gst::State::Paused) .expect("can pause"); - tokio::spawn( - self.conn - .lock() - .to_mut() - .set_name("PokeBot - Paused") - .map_err(|_| println!("Failed to change name")), - ); + self.set_name("PokeBot - Paused"); } pub fn stop(&self) { @@ -310,13 +382,7 @@ impl State { .set_state(gst::State::Ready) .expect("can stop"); - tokio::spawn( - self.conn - .lock() - .to_mut() - .set_name("PokeBot - Stopped") - .map_err(|_| println!("Failed to change name")), - ); + self.set_name("PokeBot - Stopped"); } pub async fn disconnect(&self) { @@ -325,4 +391,36 @@ impl State { .compat() .await; } + + pub fn send_message(&self, target: MessageTarget, message: &str) { + tokio::spawn( + self.conn + .lock() + .to_mut() + .send_message(target, message) + .map_err(|e| println!("Failed to send message: {}", e)), + ); + } + + pub fn set_name(&self, name: &str) { + tokio::spawn( + self.conn + .lock() + .to_mut() + .set_name(name) + .map_err(|e| println!("Failed to change name: {}", e)), + ); + } + + pub fn set_description(&self, desc: &str) { + tokio::spawn( + self.conn + .lock() + .to_mut() + .get_client(&self.conn.lock().own_client) + .expect("Can get myself") + .set_description(desc) + .map_err(|e| println!("Failed to change description: {}", e)), + ); + } } |
