summaryrefslogtreecommitdiffstats
path: root/src/state.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/state.rs')
-rw-r--r--src/state.rs292
1 files changed, 195 insertions, 97 deletions
diff --git a/src/state.rs b/src/state.rs
index dfccd9a..8c6a290 100644
--- a/src/state.rs
+++ b/src/state.rs
@@ -1,27 +1,26 @@
-use std::error::Error;
-use std::process::{Command, Stdio};
-use std::sync::Arc;
-
-use futures::{
- compat::Future01CompatExt,
- future::{FutureExt, TryFutureExt},
-};
-use futures01::{future::Future, sink::Sink};
+use std::process::Command;
+use std::sync::{Arc, Mutex};
+use futures::compat::Future01CompatExt;
+use futures01::{future::Future, sink::Sink};
use futures_util::stream::StreamExt;
-use tsclientlib::{Connection, MessageTarget, DisconnectOptions};
+use tokio_process::CommandExt;
+
+use tsclientlib::{Connection, DisconnectOptions, MessageTarget};
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
-use gstreamer_audio as gst_audio;
use byte_slice_cast::*;
+use crate::playlist::{AudioRequest, Playlist};
+
#[derive(Clone)]
pub struct State {
conn: Arc<Connection>,
pipeline: Arc<gst::Pipeline>,
+ playlist: Arc<Mutex<Playlist>>,
}
impl State {
@@ -68,6 +67,16 @@ impl State {
};
if is_audio {
+ let prev_volume = if let Some(volume) = pipeline.get_by_name("volume") {
+ volume
+ .get_property("volume")
+ .expect("Can get volume property")
+ .get_some::<f64>()
+ .unwrap_or(0.1)
+ } else {
+ 0.1
+ };
+
let names = [
"audio-converter",
"volume",
@@ -78,8 +87,10 @@ impl State {
for element in pipeline.iterate_elements() {
if let Ok(element) = element {
if names.contains(&&*element.get_name()) {
- element.set_state(gst::State::Null);
- pipeline.remove_many(&[&element]);
+ element.set_state(gst::State::Null).unwrap();
+ pipeline
+ .remove_many(&[&element])
+ .expect("Can remove element");
}
}
}
@@ -92,9 +103,12 @@ impl State {
let opus_enc = gst::ElementFactory::make("opusenc", Some("opus-encoder")).unwrap();
let sink = gst::ElementFactory::make("appsink", Some("app-sink")).unwrap();
- sink.set_property("async", &false);
+ sink.set_property("async", &false)
+ .expect("Can make app-sink async");
- volume.set_property("volume", &0.2);
+ volume
+ .set_property("volume", &prev_volume)
+ .expect("Can change volume");
{
let elements = &[&convert, &volume, &resample, &opus_enc, &sink];
@@ -109,7 +123,7 @@ impl State {
let appsink = sink
.dynamic_cast::<gst_app::AppSink>()
- .expect("Sink element is expected to be an appsink!");
+ .expect("Sink is an Appsink");
appsink.set_caps(Some(&gst::Caps::new_simple("audio/x-opus", &[])));
@@ -144,7 +158,7 @@ impl State {
.get_packet_sink()
.send(packet)
.map(|_| ())
- .map_err(|e| println!("Failed to send voice packet"));
+ .map_err(|e| println!("Failed to send voice packet: {}", e));
tokio::run(send_packet);
@@ -165,68 +179,77 @@ impl State {
Self {
conn: conn_arc,
pipeline: Arc::new(pipeline),
+ playlist: Arc::new(Mutex::new(Playlist::new())),
}
}
- pub fn add_audio<'a>(&self, uri: String) {
+ pub async fn add_audio(&self, url: String) {
+ if self
+ .playlist
+ .lock()
+ .expect("Mutex was not poisoned")
+ .is_full()
+ {
+ self.send_message(MessageTarget::Channel, "Playlist is full");
+ return;
+ }
+
let ytdl_args = [
"--no-playlist",
"-f",
"bestaudio/best",
"-g",
- &uri,
+ "--get-filename",
"-o",
- "-",
+ "%(title)s",
+ &url,
];
- let ytdl_output = Command::new("youtube-dl")
+ let output = Command::new("youtube-dl")
.args(&ytdl_args)
- .stdin(Stdio::null())
- .output()
- .unwrap();
+ .output_async()
+ .compat()
+ .await
+ .expect("youtube-dl is runnable");
- if ytdl_output.status.success() == false {
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .set_name("PokeBot")
- .map_err(|_| println!("Failed to change name")),
- );
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .send_message(MessageTarget::Channel, "Failed to load url")
- .map_err(|_| println!("Failed to change name")),
- );
+ if output.status.success() == false {
+ self.set_name("PokeBot");
+ self.send_message(MessageTarget::Channel, "Failed to load url");
return;
}
- let dl_url: &str = &String::from_utf8(ytdl_output.stderr).unwrap();
-
- self.pipeline
- .set_state(gst::State::Ready)
- .expect("Can set state to ready");
-
- let http_src = self
- .pipeline
- .get_by_name("http-source")
- .expect("Http source should be registered");
- http_src
- .set_property("location", &dl_url)
- .expect("Can set location on http_dl");
- self.pipeline
- .set_state(gst::State::Playing)
- .expect("Can set state to playing");
-
- tokio::spawn(
- self.conn
+ let output_string = String::from_utf8(output.stdout).unwrap();
+ let output_lines = output_string.lines().collect::<Vec<_>>();
+ let address = output_lines[0];
+ let title = output_lines[1];
+
+ let req = AudioRequest {
+ title: title.to_owned(),
+ address: address.to_owned(),
+ };
+
+ if gst::State::Null == self.pipeline.get_state(gst::ClockTime(None)).1 {
+ self.set_name("PokeBot - Playing");
+ self.start_audio(req);
+ } else {
+ self.set_name("PokeBot - Playing");
+
+ let title = req.title.clone();
+ if self
+ .playlist
.lock()
- .to_mut()
- .set_name("PokeBot - Playing")
- .map_err(|_| println!("Failed to change name")),
- );
+ .expect("Mutex was not poisoned")
+ .push(req)
+ == false
+ {
+ self.send_message(MessageTarget::Channel, "Playlist is full");
+ } else {
+ self.send_message(
+ MessageTarget::Channel,
+ &format!("Added '{}' to the playlist", title),
+ );
+ }
+ }
}
pub async fn poll(&self) {
@@ -239,8 +262,6 @@ impl State {
while let Some(msg) = messages.next().await {
use gst::MessageView;
- // Determine whether we want to quit: on EOS or error message
- // we quit, otherwise simply continue.
match msg.view() {
MessageView::Eos(..) => break,
MessageView::Error(err) => {
@@ -256,39 +277,96 @@ impl State {
};
}
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .set_name("PokeBot")
- .map_err(|_| println!("Failed to change name")),
- );
+ self.next();
+ }
+ pub fn start_audio(&self, req: AudioRequest) {
self.pipeline
.set_state(gst::State::Null)
.expect("Can set state to null");
+
+ let http_src = self
+ .pipeline
+ .get_by_name("http-source")
+ .expect("Http source should be registered");
+
+ http_src
+ .set_property("location", &&req.address)
+ .expect("Can set location on http_dl");
+
+ self.pipeline
+ .set_state(gst::State::Playing)
+ .expect("Can set state to playing");
+
+ self.set_description(&format!("Currently playing: {}", req.title));
}
pub fn volume(&self, volume: f64) {
- self.pipeline
- .get_by_name("volume")
- .expect("Volume filter should be registered")
- .set_property("volume", &volume)
- .expect("can change volume");
+ if let Some(vol_filter) = self.pipeline.get_by_name("volume") {
+ vol_filter
+ .set_property("volume", &(10.0f64.powf(volume / 50.0 - 2.0)))
+ .expect("can change volume");
+
+ // TODO Reflect volume in name
+ }
}
pub fn play(&self) {
+ let http_src = self
+ .pipeline
+ .get_by_name("http-source")
+ .expect("Http source should be registered");
+
+ if http_src
+ .get_property("location")
+ .ok()
+ .and_then(|v| v.get::<String>().ok().and_then(|s| s.map(|s| s.is_empty())))
+ .unwrap_or(true)
+ {
+ if self
+ .playlist
+ .lock()
+ .expect("Mutex was not poisoned")
+ .is_empty()
+ {
+ self.send_message(MessageTarget::Channel, "There is nothing to play");
+ return;
+ }
+ }
+
self.pipeline
.set_state(gst::State::Playing)
.expect("can play");
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .set_name("PokeBot - Playing")
- .map_err(|_| println!("Failed to change name")),
- );
+ self.set_name("PokeBot - Playing");
+ }
+
+ pub fn next(&self) {
+ if let Some(req) = self.playlist.lock().expect("Mutex was not poisoned").pop() {
+ self.start_audio(req);
+ } else {
+ self.pipeline
+ .set_state(gst::State::Null)
+ .expect("Can set state to null");
+
+ let http_src = self
+ .pipeline
+ .get_by_name("http-source")
+ .expect("Http source should be registered");
+
+ http_src
+ .set_property("location", &"")
+ .expect("Can set location on http_dl");
+
+ self.set_name("PokeBot");
+ }
+ }
+
+ pub fn clear(&self) {
+ self.playlist
+ .lock()
+ .expect("Mutex was not poisoned")
+ .clear();
}
pub fn pause(&self) {
@@ -296,13 +374,7 @@ impl State {
.set_state(gst::State::Paused)
.expect("can pause");
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .set_name("PokeBot - Paused")
- .map_err(|_| println!("Failed to change name")),
- );
+ self.set_name("PokeBot - Paused");
}
pub fn stop(&self) {
@@ -310,13 +382,7 @@ impl State {
.set_state(gst::State::Ready)
.expect("can stop");
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .set_name("PokeBot - Stopped")
- .map_err(|_| println!("Failed to change name")),
- );
+ self.set_name("PokeBot - Stopped");
}
pub async fn disconnect(&self) {
@@ -325,4 +391,36 @@ impl State {
.compat()
.await;
}
+
+ pub fn send_message(&self, target: MessageTarget, message: &str) {
+ tokio::spawn(
+ self.conn
+ .lock()
+ .to_mut()
+ .send_message(target, message)
+ .map_err(|e| println!("Failed to send message: {}", e)),
+ );
+ }
+
+ pub fn set_name(&self, name: &str) {
+ tokio::spawn(
+ self.conn
+ .lock()
+ .to_mut()
+ .set_name(name)
+ .map_err(|e| println!("Failed to change name: {}", e)),
+ );
+ }
+
+ pub fn set_description(&self, desc: &str) {
+ tokio::spawn(
+ self.conn
+ .lock()
+ .to_mut()
+ .get_client(&self.conn.lock().own_client)
+ .expect("Can get myself")
+ .set_description(desc)
+ .map_err(|e| println!("Failed to change description: {}", e)),
+ );
+ }
}