diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/audio_player.rs | 57 | ||||
| -rw-r--r-- | src/bot/master.rs | 83 | ||||
| -rw-r--r-- | src/bot/music.rs | 105 | ||||
| -rw-r--r-- | src/main.rs | 21 | ||||
| -rw-r--r-- | src/playlist.rs | 10 | ||||
| -rw-r--r-- | src/teamspeak/bbcode.rs | 6 | ||||
| -rw-r--r-- | src/teamspeak/mod.rs | 6 | ||||
| -rw-r--r-- | src/web_server.rs | 122 | ||||
| -rw-r--r-- | src/web_server/api.rs | 48 | ||||
| -rw-r--r-- | src/web_server/bot_executor.rs | 63 | ||||
| -rw-r--r-- | src/web_server/default.rs | 24 | ||||
| -rw-r--r-- | src/web_server/front_end_cookie.rs | 60 | ||||
| -rw-r--r-- | src/web_server/tmtu.rs | 41 | ||||
| -rw-r--r-- | src/youtube_dl.rs | 19 |
14 files changed, 604 insertions, 61 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs index 9ed645d..d231c72 100644 --- a/src/audio_player.rs +++ b/src/audio_player.rs @@ -10,9 +10,11 @@ use gstreamer_audio::{StreamVolume, StreamVolumeFormat}; use crate::bot::{MusicBotMessage, State}; use glib::BoolError; use log::{debug, error, info, warn}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use tokio02::sync::mpsc::UnboundedSender; +use crate::youtube_dl::AudioMetadata; + static GST_INIT: Once = Once::new(); #[derive(Copy, Clone, Debug)] @@ -33,8 +35,10 @@ pub struct AudioPlayer { bus: gst::Bus, http_src: gst::Element, + volume_f64: RwLock<f64>, volume: gst::Element, - sender: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, + sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, + currently_playing: RwLock<Option<AudioMetadata>>, } fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> { @@ -83,7 +87,7 @@ fn add_decode_bin_new_pad_callback( impl AudioPlayer { pub fn new( - sender: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, + sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, callback: Option<Box<dyn FnMut(&[u8]) + Send>>, ) -> Result<Self, AudioPlayerError> { GST_INIT.call_once(|| gst::init().unwrap()); @@ -104,6 +108,12 @@ impl AudioPlayer { pipeline.add(&audio_bin)?; + // The documentation says that we have to make sure to handle + // all messages if auto flushing is deactivated. + // I hope our way of reading messages is good enough. + // + // https://gstreamer.freedesktop.org/documentation/gstreamer/gstpipeline.html#gst_pipeline_set_auto_flush_bus + pipeline.set_auto_flush_bus(false); pipeline.set_state(gst::State::Ready)?; Ok(AudioPlayer { @@ -111,8 +121,10 @@ impl AudioPlayer { bus, http_src, + volume_f64: RwLock::new(0.0), volume, sender, + currently_playing: RwLock::new(None), }) } @@ -173,7 +185,16 @@ impl AudioPlayer { Ok((audio_bin, volume, ghost_pad)) } - pub fn set_source_url(&self, location: String) -> Result<(), AudioPlayerError> { + pub fn set_metadata(&self, data: AudioMetadata) -> Result<(), AudioPlayerError> { + self.set_source_url(data.url.clone())?; + + let mut currently_playing = self.currently_playing.write().unwrap(); + *currently_playing = Some(data); + + Ok(()) + } + + fn set_source_url(&self, location: String) -> Result<(), AudioPlayerError> { info!("Setting location URI: {}", location); self.http_src.set_property("location", &location)?; @@ -181,6 +202,7 @@ impl AudioPlayer { } pub fn set_volume(&self, volume: f64) -> Result<(), AudioPlayerError> { + *self.volume_f64.write().unwrap() = volume; let db = 50.0 * volume.log10(); info!("Setting volume: {} -> {} dB", volume, db); @@ -203,9 +225,26 @@ impl AudioPlayer { } } + pub fn volume(&self) -> f64 { + *self.volume_f64.read().unwrap() + } + + pub fn position(&self) -> Option<Duration> { + self.pipeline + .query_position::<gst::ClockTime>() + .and_then(|t| t.0.map(|v| Duration::from_nanos(v))) + } + + pub fn currently_playing(&self) -> Option<AudioMetadata> { + self.currently_playing.read().unwrap().clone() + } + pub fn reset(&self) -> Result<(), AudioPlayerError> { info!("Setting pipeline state to null"); + let mut currently_playing = self.currently_playing.write().unwrap(); + *currently_playing = None; + self.pipeline.set_state(gst::State::Null)?; Ok(()) @@ -273,20 +312,20 @@ impl AudioPlayer { pub fn quit(&self, reason: String) { info!("Quitting audio player"); - if let Err(e) = self + if let Err(_) = self .bus .post(&gst::Message::new_application(gst::Structure::new_empty("quit")).build()) { - warn!("Failed to send \"quit\" app event: {}", e); + warn!("Tried to send \"quit\" app event on flushing bus."); } - let sender = self.sender.lock().unwrap(); + let sender = self.sender.read().unwrap(); sender.send(MusicBotMessage::Quit(reason)).unwrap(); } fn send_state(&self, state: State) { info!("Sending state {:?} to application", state); - let sender = self.sender.lock().unwrap(); + let sender = self.sender.read().unwrap(); sender.send(MusicBotMessage::StateChange(state)).unwrap(); } @@ -362,7 +401,7 @@ impl AudioPlayer { } } _ => { - //debug!("{:?}", msg) + //debug!("Unhandled message on bus: {:?}", msg) } }; } diff --git a/src/bot/master.rs b/src/bot/master.rs index 2488064..755aaa1 100644 --- a/src/bot/master.rs +++ b/src/bot/master.rs @@ -1,12 +1,13 @@ use std::collections::HashMap; use std::future::Future; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use futures::future::{FutureExt, TryFutureExt}; use futures01::future::Future as Future01; use log::info; use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; use serde::{Deserialize, Serialize}; +use tokio02::sync::mpsc::UnboundedSender; use tsclientlib::{ClientId, ConnectOptions, Identity, MessageTarget}; use crate::audio_player::AudioPlayerError; @@ -18,8 +19,9 @@ use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage}; pub struct MasterBot { config: Arc<MasterConfig>, - music_bots: Arc<Mutex<MusicBots>>, + music_bots: Arc<RwLock<MusicBots>>, teamspeak: Arc<TeamSpeakConnection>, + sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, } struct MusicBots { @@ -32,7 +34,7 @@ struct MusicBots { impl MasterBot { pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) { let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); - let tx = Arc::new(Mutex::new(tx)); + let tx = Arc::new(RwLock::new(tx)); info!("Starting in TeamSpeak mode"); let mut con_config = ConnectOptions::new(args.address.clone()) @@ -65,7 +67,7 @@ impl MasterBot { let name_count = config.names.len(); let id_count = config.ids.len(); - let music_bots = Arc::new(Mutex::new(MusicBots { + let music_bots = Arc::new(RwLock::new(MusicBots { rng: SmallRng::from_entropy(), available_names: (0..name_count).collect(), available_ids: (0..id_count).collect(), @@ -76,6 +78,7 @@ impl MasterBot { config, music_bots, teamspeak: connection, + sender: tx.clone(), }); bot.teamspeak @@ -83,8 +86,12 @@ impl MasterBot { let cbot = bot.clone(); let msg_loop = async move { - loop { + 'outer: loop { while let Some(msg) = rx.recv().await { + if let MusicBotMessage::Quit(reason) = msg { + cbot.teamspeak.disconnect(&reason); + break 'outer; + } cbot.on_message(msg).await.unwrap(); } } @@ -115,7 +122,7 @@ impl MasterBot { ref mut available_names, ref mut available_ids, ref connected_bots, - } = &mut *self.music_bots.lock().expect("Mutex was not poisoned"); + } = &mut *self.music_bots.write().expect("RwLock was not poisoned"); for (_, bot) in connected_bots { if bot.my_channel() == channel { @@ -163,7 +170,7 @@ impl MasterBot { let cmusic_bots = self.music_bots.clone(); let disconnect_cb = Box::new(move |n, name_index, id_index| { - let mut music_bots = cmusic_bots.lock().expect("Mutex was not poisoned"); + let mut music_bots = cmusic_bots.write().expect("RwLock was not poisoned"); music_bots.connected_bots.remove(&n); music_bots.available_names.push(name_index); music_bots.available_ids.push(id_index); @@ -188,7 +195,7 @@ impl MasterBot { if let Some(bot_args) = self.build_bot_args_for(id) { let (bot, fut) = MusicBot::new(bot_args).await; tokio::spawn(fut.unit_error().boxed().compat().map(|_| ())); - let mut music_bots = self.music_bots.lock().expect("Mutex was not poisoned"); + let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned"); music_bots .connected_bots .insert(bot.name().to_string(), bot); @@ -205,6 +212,62 @@ impl MasterBot { Ok(()) } + + pub fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> { + let music_bots = self.music_bots.read().unwrap(); + let bot = music_bots.connected_bots.get(&name)?; + + Some(crate::web_server::BotData { + name: name, + state: bot.state(), + volume: bot.volume(), + position: bot.position(), + currently_playing: bot.currently_playing(), + playlist: bot.playlist_to_vec(), + }) + } + + pub fn bot_datas(&self) -> Vec<crate::web_server::BotData> { + let music_bots = self.music_bots.read().unwrap(); + + let len = music_bots.connected_bots.len(); + let mut result = Vec::with_capacity(len); + for (name, bot) in &music_bots.connected_bots { + let bot_data = crate::web_server::BotData { + name: name.clone(), + state: bot.state(), + volume: bot.volume(), + position: bot.position(), + currently_playing: bot.currently_playing(), + playlist: bot.playlist_to_vec(), + }; + + result.push(bot_data); + } + + result + } + + pub fn bot_names(&self) -> Vec<String> { + let music_bots = self.music_bots.read().unwrap(); + + let len = music_bots.connected_bots.len(); + let mut result = Vec::with_capacity(len); + for (name, _) in &music_bots.connected_bots { + result.push(name.clone()); + } + + result + } + + pub fn quit(&self, reason: String) { + let music_bots = self.music_bots.read().unwrap(); + for (_, bot) in &music_bots.connected_bots { + bot.quit(reason.clone()) + } + let sender = self.sender.read().unwrap(); + sender.send(MusicBotMessage::Quit(reason)).unwrap(); + } } #[derive(Debug, Serialize, Deserialize)] @@ -217,6 +280,8 @@ pub struct MasterArgs { pub channel: Option<String>, #[serde(default = "default_verbose")] pub verbose: u8, + pub domain: String, + pub bind_address: String, pub names: Vec<String>, pub id: Identity, pub ids: Vec<Identity>, @@ -251,6 +316,8 @@ impl MasterArgs { ids: self.ids, local, address, + domain: self.domain, + bind_address: self.bind_address, id: self.id, channel, verbose, diff --git a/src/bot/music.rs b/src/bot/music.rs index 2539695..41976e5 100644 --- a/src/bot/music.rs +++ b/src/bot/music.rs @@ -1,10 +1,12 @@ use std::future::Future; use std::io::BufRead; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use std::thread; +use std::time::Duration; use humantime; use log::{debug, info}; +use serde::Serialize; use structopt::StructOpt; use tokio02::sync::mpsc::UnboundedSender; use tsclientlib::{data, ChannelId, ClientId, ConnectOptions, Identity, Invoker, MessageTarget}; @@ -44,7 +46,7 @@ fn parse_seek(mut amount: &str) -> Result<Seek, ()> { } } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize)] pub enum State { Playing, Paused, @@ -52,6 +54,18 @@ pub enum State { EndOfStream, } +impl std::fmt::Display for State { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + match self { + State::Playing => write!(fmt, "Playing"), + State::Paused => write!(fmt, "Paused"), + State::Stopped | State::EndOfStream => write!(fmt, "Stopped"), + }?; + + Ok(()) + } +} + #[derive(Debug)] pub enum MusicBotMessage { TextMessage(Message), @@ -71,8 +85,8 @@ pub struct MusicBot { name: String, player: Arc<AudioPlayer>, teamspeak: Option<Arc<TeamSpeakConnection>>, - playlist: Arc<Mutex<Playlist>>, - state: Arc<Mutex<State>>, + playlist: Arc<RwLock<Playlist>>, + state: Arc<RwLock<State>>, } pub struct MusicBotArgs { @@ -90,7 +104,7 @@ pub struct MusicBotArgs { impl MusicBot { pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) { let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); - let tx = Arc::new(Mutex::new(tx)); + let tx = Arc::new(RwLock::new(tx)); let (player, connection) = if args.local { info!("Starting in CLI mode"); let audio_player = AudioPlayer::new(tx.clone(), None).unwrap(); @@ -127,7 +141,7 @@ impl MusicBot { player.set_volume(0.5).unwrap(); let player = Arc::new(player); - let playlist = Arc::new(Mutex::new(Playlist::new())); + let playlist = Arc::new(RwLock::new(Playlist::new())); spawn_gstreamer_thread(player.clone(), tx.clone()); @@ -140,7 +154,7 @@ impl MusicBot { player, teamspeak: connection, playlist, - state: Arc::new(Mutex::new(State::Stopped)), + state: Arc::new(RwLock::new(State::Stopped)), }); let cbot = bot.clone(); @@ -173,24 +187,20 @@ impl MusicBot { } fn start_playing_audio(&self, metadata: AudioMetadata) { - if let Some(title) = metadata.title { - self.send_message(&format!("Playing {}", ts::underline(&title))); - self.set_description(&format!("Currently playing '{}'", title)); - } else { - self.send_message("Playing unknown title"); - self.set_description("Currently playing"); - } + self.send_message(&format!("Playing {}", ts::underline(&metadata.title))); + self.set_description(&format!("Currently playing '{}'", metadata.title)); self.player.reset().unwrap(); - self.player.set_source_url(metadata.url).unwrap(); + self.player.set_metadata(metadata).unwrap(); self.player.play().unwrap(); } - pub async fn add_audio(&self, url: String) { + pub async fn add_audio(&self, url: String, user: String) { match crate::youtube_dl::get_audio_download_url(url).await { - Ok(metadata) => { + Ok(mut metadata) => { + metadata.added_by = user; info!("Found audio url: {}", metadata.url); - let mut playlist = self.playlist.lock().expect("Mutex was not poisoned"); + let mut playlist = self.playlist.write().expect("RwLock was not poisoned"); playlist.push(metadata.clone()); if !self.player.is_started() { @@ -198,11 +208,10 @@ impl MusicBot { self.start_playing_audio(request); } } else { - if let Some(title) = metadata.title { - self.send_message(&format!("Added {} to playlist", ts::underline(&title))); - } else { - self.send_message("Added to playlist"); - } + self.send_message(&format!( + "Added {} to playlist", + ts::underline(&metadata.title) + )); } } Err(e) => { @@ -217,6 +226,26 @@ impl MusicBot { &self.name } + pub fn state(&self) -> State { + *self.state.read().expect("RwLock was not poisoned") + } + + pub fn volume(&self) -> f64 { + self.player.volume() + } + + pub fn position(&self) -> Option<Duration> { + self.player.position() + } + + pub fn currently_playing(&self) -> Option<AudioMetadata> { + self.player.currently_playing() + } + + pub fn playlist_to_vec(&self) -> Vec<AudioMetadata> { + self.playlist.read().unwrap().to_vec() + } + pub fn my_channel(&self) -> ChannelId { self.teamspeak .as_ref() @@ -255,7 +284,7 @@ impl MusicBot { let tokens = msg[1..].split_whitespace().collect::<Vec<_>>(); match Command::from_iter_safe(&tokens) { - Ok(args) => self.on_command(args).await?, + Ok(args) => self.on_command(args, message.invoker).await?, Err(e) if e.kind == structopt::clap::ErrorKind::HelpDisplayed => { self.send_message(&format!("\n{}", e.message)); } @@ -266,10 +295,10 @@ impl MusicBot { Ok(()) } - async fn on_command(&self, command: Command) -> Result<(), AudioPlayerError> { + async fn on_command(&self, command: Command, invoker: Invoker) -> Result<(), AudioPlayerError> { match command { Command::Play => { - let playlist = self.playlist.lock().expect("Mutex was not poisoned"); + let playlist = self.playlist.read().expect("RwLock was not poisoned"); if !self.player.is_started() { if !playlist.is_empty() { @@ -283,7 +312,7 @@ impl MusicBot { // strip bbcode tags from url let url = url.replace("[URL]", "").replace("[/URL]", ""); - self.add_audio(url.to_string()).await; + self.add_audio(url.to_string(), invoker.name).await; } Command::Pause => { self.player.pause()?; @@ -303,7 +332,7 @@ impl MusicBot { } } Command::Next => { - let playlist = self.playlist.lock().expect("Mutex was not poisoned"); + let playlist = self.playlist.read().expect("RwLock was not poisoned"); if !playlist.is_empty() { info!("Skipping to next track"); self.player.stop_current()?; @@ -314,8 +343,8 @@ impl MusicBot { } Command::Clear => { self.playlist - .lock() - .expect("Mutex was not poisoned") + .write() + .expect("RwLock was not poisoned") .clear(); } Command::Volume { percent: volume } => { @@ -331,7 +360,7 @@ impl MusicBot { } fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { - let mut current_state = self.state.lock().unwrap(); + let mut current_state = self.state.write().unwrap(); if *current_state != state { match state { State::Playing => { @@ -345,7 +374,11 @@ impl MusicBot { self.set_description(""); } State::EndOfStream => { - let next_track = self.playlist.lock().expect("Mutex was not poisoned").pop(); + let next_track = self + .playlist + .write() + .expect("RwLock was not poisoned") + .pop(); if let Some(request) = next_track { info!("Advancing playlist"); @@ -401,7 +434,7 @@ impl MusicBot { } } -fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) { +fn spawn_stdin_reader(tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>) { debug!("Spawning stdin reader thread"); thread::Builder::new() .name(String::from("stdin reader")) @@ -421,7 +454,7 @@ fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) { text: line, }); - let tx = tx.lock().unwrap(); + let tx = tx.read().unwrap(); tx.send(message).unwrap(); } }) @@ -430,7 +463,7 @@ fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) { fn spawn_gstreamer_thread( player: Arc<AudioPlayer>, - tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, + tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, ) { thread::Builder::new() .name(String::from("gstreamer polling")) @@ -439,7 +472,7 @@ fn spawn_gstreamer_thread( break; } - tx.lock() + tx.read() .unwrap() .send(MusicBotMessage::StateChange(State::EndOfStream)) .unwrap(); diff --git a/src/main.rs b/src/main.rs index 922162f..2559a2a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,10 @@ use std::fs::File; use std::io::{Read, Write}; use std::path::PathBuf; +use std::thread; use futures::future::{FutureExt, TryFutureExt}; -use log::{debug, info}; +use log::{debug, error, info}; use structopt::clap::AppSettings; use structopt::StructOpt; use tsclientlib::Identity; @@ -13,6 +14,7 @@ mod bot; mod command; mod playlist; mod teamspeak; +mod web_server; mod youtube_dl; use bot::{MasterArgs, MasterBot, MusicBot, MusicBotArgs}; @@ -116,7 +118,22 @@ fn run() -> Result<(), Box<dyn std::error::Error>> { }; MusicBot::new(bot_args).await.1.await; } else { - MasterBot::new(bot_args).await.1.await; + let domain = bot_args.domain.clone(); + let bind_address = bot_args.bind_address.clone(); + let (bot, fut) = MasterBot::new(bot_args).await; + + thread::spawn(|| { + let web_args = web_server::WebServerArgs { + domain, + bind_address, + bot, + }; + if let Err(e) = web_server::start(web_args) { + error!("Error in web server: {}", e); + } + }); + + fut.await; } } .unit_error() diff --git a/src/playlist.rs b/src/playlist.rs index 87c1c98..445f8a5 100644 --- a/src/playlist.rs +++ b/src/playlist.rs @@ -28,6 +28,16 @@ impl Playlist { res } + pub fn to_vec(&self) -> Vec<AudioMetadata> { + let (a, b) = self.data.as_slices(); + + let mut res = a.to_vec(); + res.extend_from_slice(b); + res.reverse(); + + res + } + pub fn is_empty(&self) -> bool { self.data.is_empty() } diff --git a/src/teamspeak/bbcode.rs b/src/teamspeak/bbcode.rs index 28be08a..91d576a 100644 --- a/src/teamspeak/bbcode.rs +++ b/src/teamspeak/bbcode.rs @@ -1,4 +1,4 @@ -use std::fmt::{Formatter, Display, Error}; +use std::fmt::{Display, Error, Formatter}; #[allow(dead_code)] pub enum BbCode<'a> { @@ -14,7 +14,9 @@ impl<'a> Display for BbCode<'a> { BbCode::Bold(text) => fmt.write_fmt(format_args!("[B]{}[/B]", text))?, BbCode::Italic(text) => fmt.write_fmt(format_args!("[I]{}[/I]", text))?, BbCode::Underline(text) => fmt.write_fmt(format_args!("[U]{}[/U]", text))?, - BbCode::Link(text, url) => fmt.write_fmt(format_args!("[URL={}]{}[/URL]", url, text))?, + BbCode::Link(text, url) => { + fmt.write_fmt(format_args!("[URL={}]{}[/URL]", url, text))? + } }; Ok(()) diff --git a/src/teamspeak/mod.rs b/src/teamspeak/mod.rs index 5ac0d44..7551e77 100644 --- a/src/teamspeak/mod.rs +++ b/src/teamspeak/mod.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use futures::compat::Future01CompatExt; @@ -76,7 +76,7 @@ fn get_message<'a>(event: &Event) -> Option<MusicBotMessage> { impl TeamSpeakConnection { pub async fn new( - tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, + tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, options: ConnectOptions, ) -> Result<TeamSpeakConnection, tsclientlib::Error> { let conn = Connection::new(options).compat().await?; @@ -89,7 +89,7 @@ impl TeamSpeakConnection { if let ConEvents(_conn, events) = e { for event in *events { if let Some(msg) = get_message(event) { - let tx = tx.lock().expect("Mutex was not poisoned"); + let tx = tx.read().expect("RwLock was not poisoned"); // Ignore the result because the receiver might get dropped first. let _ = tx.send(msg); } diff --git a/src/web_server.rs b/src/web_server.rs new file mode 100644 index 0000000..01233f2 --- /dev/null +++ b/src/web_server.rs @@ -0,0 +1,122 @@ +use std::sync::Arc; +use std::time::Duration; + +use actix::{Addr, SyncArbiter}; +use actix_web::{ + get, http::header, middleware::Logger, post, web, App, HttpResponse, HttpServer, Responder, +}; +use askama::actix_web::TemplateIntoResponse; +use askama::Template; +use serde::{Deserialize, Serialize}; + +use crate::bot::MasterBot; +use crate::youtube_dl::AudioMetadata; + +mod api; +mod bot_executor; +mod default; +mod front_end_cookie; +mod tmtu; +pub use bot_executor::*; +use front_end_cookie::FrontEnd; + +pub struct WebServerArgs { + pub domain: String, + pub bind_address: String, + pub bot: Arc<MasterBot>, +} + +#[actix_rt::main] +pub async fn start(args: WebServerArgs) -> std::io::Result<()> { + let cbot = args.bot.clone(); + let bot_addr: Addr<BotExecutor> = SyncArbiter::start(4, move || BotExecutor(cbot.clone())); + + HttpServer::new(move || { + App::new() + .data(bot_addr.clone()) + .wrap(Logger::default()) + .service(index) + .service(get_bot) + .service(post_front_end) + .service( + web::scope("/api") + .service(api::get_bot_list) + .service(api::get_bot), + ) + .service(web::scope("/docs").service(get_api_docs)) + .service(actix_files::Files::new("/static", "web_server/static/")) + }) + .bind(args.bind_address)? + .run() + .await?; + + args.bot.quit(String::from("Stopping")); + + Ok(()) +} + +#[derive(Deserialize)] +#[serde(rename_all = "kebab-case")] +struct FrontEndForm { + front_end: FrontEnd, +} + +#[post("/front-end")] +async fn post_front_end(form: web::Form<FrontEndForm>) -> impl Responder { + front_end_cookie::set_front_end(form.into_inner().front_end).await +} + +#[derive(Debug, Serialize)] +pub struct BotData { + pub name: String, + pub state: crate::bot::State, + pub volume: f64, + pub position: Option<Duration>, + pub currently_playing: Option<AudioMetadata>, + pub playlist: Vec<AudioMetadata>, +} + +#[get("/")] +async fn index(bot: web::Data<Addr<BotExecutor>>, front: FrontEnd) -> impl Responder { + match front { + FrontEnd::Default => default::index(bot).await, + FrontEnd::Tmtu => tmtu::index(bot).await, + } +} + +#[get("/bot/{name}")] +async fn get_bot( + bot: web::Data<Addr<BotExecutor>>, + name: web::Path<String>, + front: FrontEnd, +) -> impl Responder { + match front { + FrontEnd::Tmtu => tmtu::get_bot(bot, name.into_inner()).await, + FrontEnd::Default => Ok(HttpResponse::Found().header(header::LOCATION, "/").finish()), + } +} + +#[derive(Template)] +#[template(path = "docs/api.htm")] +struct ApiDocsTemplate; + +#[get("/api")] +async fn get_api_docs() -> impl Responder { + ApiDocsTemplate.into_response() +} + +mod filters { + use std::time::Duration; + + pub fn fmt_duration(duration: &Option<Duration>) -> Result<String, askama::Error> { + if let Some(duration) = duration { + let secs = duration.as_secs(); + let mins = secs / 60; + let submin_secs = secs % 60; + + Ok(format!("{:02}:{:02}", mins, submin_secs)) + } else { + Ok(String::from("--:--")) + } + } +} diff --git a/src/web_server/api.rs b/src/web_server/api.rs new file mode 100644 index 0000000..4deedad --- /dev/null +++ b/src/web_server/api.rs @@ -0,0 +1,48 @@ +use actix::Addr; +use actix_web::{get, web, HttpResponse, Responder, ResponseError}; +use derive_more::Display; +use serde::Serialize; + +use crate::web_server::{BotDataListRequest, BotDataRequest, BotExecutor}; + +#[get("/bots")] +pub async fn get_bot_list(bot: web::Data<Addr<BotExecutor>>) -> impl Responder { + let bot_datas = match bot.send(BotDataListRequest).await.unwrap() { + Ok(data) => data, + Err(_) => Vec::with_capacity(0), + }; + + web::Json(bot_datas) +} + +#[get("/bots/{name}")] +pub async fn get_bot(bot: web::Data<Addr<BotExecutor>>, name: web::Path<String>) -> impl Responder { + if let Some(bot_data) = bot.send(BotDataRequest(name.into_inner())).await.unwrap() { + Ok(web::Json(bot_data)) + } else { + Err(ApiErrorKind::NotFound) + } +} + +#[derive(Serialize)] +struct ApiError { + error: String, + description: String, +} + +#[derive(Debug, Display)] +enum ApiErrorKind { + #[display(fmt = "Not Found")] + NotFound, +} + +impl ResponseError for ApiErrorKind { + fn error_response(&self) -> HttpResponse { + match *self { + ApiErrorKind::NotFound => HttpResponse::NotFound().json(ApiError { + error: self.to_string(), + description: String::from("The requested resource was not found"), + }), + } + } +} diff --git a/src/web_server/bot_executor.rs b/src/web_server/bot_executor.rs new file mode 100644 index 0000000..fde3c08 --- /dev/null +++ b/src/web_server/bot_executor.rs @@ -0,0 +1,63 @@ +use std::sync::Arc; + +use actix::{Actor, Handler, Message, SyncContext}; + +use crate::bot::MasterBot; +use crate::web_server::BotData; + +pub struct BotExecutor(pub Arc<MasterBot>); + +impl Actor for BotExecutor { + type Context = SyncContext<Self>; +} + +pub struct BotNameListRequest; + +impl Message for BotNameListRequest { + // A plain Vec does not work for some reason + type Result = Result<Vec<String>, ()>; +} + +impl Handler<BotNameListRequest> for BotExecutor { + type Result = Result<Vec<String>, ()>; + + fn handle(&mut self, _: BotNameListRequest, _: &mut Self::Context) -> Self::Result { + let bot = &self.0; + + Ok(bot.bot_names()) + } +} + +pub struct BotDataListRequest; + +impl Message for BotDataListRequest { + // A plain Vec does not work for some reason + type Result = Result<Vec<BotData>, ()>; +} + +impl Handler<BotDataListRequest> for BotExecutor { + type Result = Result<Vec<BotData>, ()>; + + fn handle(&mut self, _: BotDataListRequest, _: &mut Self::Context) -> Self::Result { + let bot = &self.0; + + Ok(bot.bot_datas()) + } +} + +pub struct BotDataRequest(pub String); + +impl Message for BotDataRequest { + type Result = Option<BotData>; +} + +impl Handler<BotDataRequest> for BotExecutor { + type Result = Option<BotData>; + + fn handle(&mut self, r: BotDataRequest, _: &mut Self::Context) -> Self::Result { + let name = r.0; + let bot = &self.0; + + bot.bot_data(name) + } +} diff --git a/src/web_server/default.rs b/src/web_server/default.rs new file mode 100644 index 0000000..b3c8291 --- /dev/null +++ b/src/web_server/default.rs @@ -0,0 +1,24 @@ +use actix::Addr; +use actix_web::{web, Error, HttpResponse}; +use askama::actix_web::TemplateIntoResponse; +use askama::Template; + +use crate::web_server::{filters, BotData, BotDataListRequest, BotExecutor}; + +#[derive(Template)] +#[template(path = "index.htm")] +struct OverviewTemplate<'a> { + bots: &'a [BotData], +} + +pub async fn index(bot: web::Data<Addr<BotExecutor>>) -> Result<HttpResponse, Error> { + let bot_datas = match bot.send(BotDataListRequest).await.unwrap() { + Ok(data) => data, + Err(_) => Vec::with_capacity(0), + }; + + OverviewTemplate { + bots: &bot_datas[..], + } + .into_response() +} diff --git a/src/web_server/front_end_cookie.rs b/src/web_server/front_end_cookie.rs new file mode 100644 index 0000000..4812d0d --- /dev/null +++ b/src/web_server/front_end_cookie.rs @@ -0,0 +1,60 @@ +use futures::future::{ok, Ready}; + +use actix_web::{ + dev::Payload, + http::header::{COOKIE, LOCATION, SET_COOKIE}, + FromRequest, HttpRequest, HttpResponse, +}; +use serde::Deserialize; + +#[derive(PartialEq, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum FrontEnd { + Default, + Tmtu, +} + +impl FrontEnd { + const COOKIE_NAME: &'static str = "front-end"; + + fn cookie(&self) -> String { + let name = match self { + FrontEnd::Default => "default", + FrontEnd::Tmtu => "tmtu", + }; + + format!("{}={}", Self::COOKIE_NAME, name) + } +} + +impl FromRequest for FrontEnd { + type Error = (); + type Future = Ready<Result<Self, ()>>; + type Config = (); + + fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future { + for header in req.headers().get_all(COOKIE) { + if let Ok(value) = header.to_str() { + for c in value.split(';').map(|s| s.trim()) { + let mut split = c.split('='); + if Some(Self::COOKIE_NAME) == split.next() { + match split.next() { + Some("default") => return ok(FrontEnd::Default), + Some("tmtu") => return ok(FrontEnd::Tmtu), + _ => (), + } + } + } + } + } + + ok(FrontEnd::Default) + } +} + +pub fn set_front_end(front: FrontEnd) -> HttpResponse { + HttpResponse::Found() + .header(SET_COOKIE, front.cookie()) + .header(LOCATION, "/") + .finish() +} diff --git a/src/web_server/tmtu.rs b/src/web_server/tmtu.rs new file mode 100644 index 0000000..0645ee4 --- /dev/null +++ b/src/web_server/tmtu.rs @@ -0,0 +1,41 @@ +use actix::Addr; +use actix_web::{http::header, web, Error, HttpResponse}; +use askama::actix_web::TemplateIntoResponse; +use askama::Template; + +use crate::web_server::{filters, BotData, BotDataRequest, BotExecutor, BotNameListRequest}; + +#[derive(Template)] +#[template(path = "tmtu/index.htm")] +struct TmtuTemplate { + bot_names: Vec<String>, + bot: Option<BotData>, +} + +pub async fn index(bot: web::Data<Addr<BotExecutor>>) -> Result<HttpResponse, Error> { + let bot_names = bot.send(BotNameListRequest).await.unwrap().unwrap(); + + TmtuTemplate { + bot_names, + bot: None, + } + .into_response() +} + +pub async fn get_bot( + bot: web::Data<Addr<BotExecutor>>, + name: String, +) -> Result<HttpResponse, Error> { + let bot_names = bot.send(BotNameListRequest).await.unwrap().unwrap(); + + if let Some(bot) = bot.send(BotDataRequest(name)).await.unwrap() { + TmtuTemplate { + bot_names, + bot: Some(bot), + } + .into_response() + } else { + // TODO to 404 or not to 404 + Ok(HttpResponse::Found().header(header::LOCATION, "/").finish()) + } +} diff --git a/src/youtube_dl.rs b/src/youtube_dl.rs index c6012f0..89b1477 100644 --- a/src/youtube_dl.rs +++ b/src/youtube_dl.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use futures::compat::Future01CompatExt; use std::process::{Command, Stdio}; use tokio_process::CommandExt; @@ -9,7 +11,22 @@ use log::debug; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct AudioMetadata { pub url: String, - pub title: Option<String>, + pub webpage_url: String, + pub title: String, + pub thumbnail: Option<String>, + #[serde(default, deserialize_with = "duration_deserialize")] + pub duration: Option<Duration>, + #[serde(skip)] + pub added_by: String, +} + +fn duration_deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error> +where + D: serde::Deserializer<'de>, +{ + let dur: Option<f64> = Deserialize::deserialize(deserializer)?; + + Ok(dur.map(|v| Duration::from_secs_f64(v))) } pub async fn get_audio_download_url(uri: String) -> Result<AudioMetadata, String> { |
