aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/audio_player.rs57
-rw-r--r--src/bot/master.rs83
-rw-r--r--src/bot/music.rs105
-rw-r--r--src/main.rs21
-rw-r--r--src/playlist.rs10
-rw-r--r--src/teamspeak/bbcode.rs6
-rw-r--r--src/teamspeak/mod.rs6
-rw-r--r--src/web_server.rs122
-rw-r--r--src/web_server/api.rs48
-rw-r--r--src/web_server/bot_executor.rs63
-rw-r--r--src/web_server/default.rs24
-rw-r--r--src/web_server/front_end_cookie.rs60
-rw-r--r--src/web_server/tmtu.rs41
-rw-r--r--src/youtube_dl.rs19
14 files changed, 604 insertions, 61 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs
index 9ed645d..d231c72 100644
--- a/src/audio_player.rs
+++ b/src/audio_player.rs
@@ -10,9 +10,11 @@ use gstreamer_audio::{StreamVolume, StreamVolumeFormat};
use crate::bot::{MusicBotMessage, State};
use glib::BoolError;
use log::{debug, error, info, warn};
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, RwLock};
use tokio02::sync::mpsc::UnboundedSender;
+use crate::youtube_dl::AudioMetadata;
+
static GST_INIT: Once = Once::new();
#[derive(Copy, Clone, Debug)]
@@ -33,8 +35,10 @@ pub struct AudioPlayer {
bus: gst::Bus,
http_src: gst::Element,
+ volume_f64: RwLock<f64>,
volume: gst::Element,
- sender: Arc<Mutex<UnboundedSender<MusicBotMessage>>>,
+ sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
+ currently_playing: RwLock<Option<AudioMetadata>>,
}
fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> {
@@ -83,7 +87,7 @@ fn add_decode_bin_new_pad_callback(
impl AudioPlayer {
pub fn new(
- sender: Arc<Mutex<UnboundedSender<MusicBotMessage>>>,
+ sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
callback: Option<Box<dyn FnMut(&[u8]) + Send>>,
) -> Result<Self, AudioPlayerError> {
GST_INIT.call_once(|| gst::init().unwrap());
@@ -104,6 +108,12 @@ impl AudioPlayer {
pipeline.add(&audio_bin)?;
+ // The documentation says that we have to make sure to handle
+ // all messages if auto flushing is deactivated.
+ // I hope our way of reading messages is good enough.
+ //
+ // https://gstreamer.freedesktop.org/documentation/gstreamer/gstpipeline.html#gst_pipeline_set_auto_flush_bus
+ pipeline.set_auto_flush_bus(false);
pipeline.set_state(gst::State::Ready)?;
Ok(AudioPlayer {
@@ -111,8 +121,10 @@ impl AudioPlayer {
bus,
http_src,
+ volume_f64: RwLock::new(0.0),
volume,
sender,
+ currently_playing: RwLock::new(None),
})
}
@@ -173,7 +185,16 @@ impl AudioPlayer {
Ok((audio_bin, volume, ghost_pad))
}
- pub fn set_source_url(&self, location: String) -> Result<(), AudioPlayerError> {
+ pub fn set_metadata(&self, data: AudioMetadata) -> Result<(), AudioPlayerError> {
+ self.set_source_url(data.url.clone())?;
+
+ let mut currently_playing = self.currently_playing.write().unwrap();
+ *currently_playing = Some(data);
+
+ Ok(())
+ }
+
+ fn set_source_url(&self, location: String) -> Result<(), AudioPlayerError> {
info!("Setting location URI: {}", location);
self.http_src.set_property("location", &location)?;
@@ -181,6 +202,7 @@ impl AudioPlayer {
}
pub fn set_volume(&self, volume: f64) -> Result<(), AudioPlayerError> {
+ *self.volume_f64.write().unwrap() = volume;
let db = 50.0 * volume.log10();
info!("Setting volume: {} -> {} dB", volume, db);
@@ -203,9 +225,26 @@ impl AudioPlayer {
}
}
+ pub fn volume(&self) -> f64 {
+ *self.volume_f64.read().unwrap()
+ }
+
+ pub fn position(&self) -> Option<Duration> {
+ self.pipeline
+ .query_position::<gst::ClockTime>()
+ .and_then(|t| t.0.map(|v| Duration::from_nanos(v)))
+ }
+
+ pub fn currently_playing(&self) -> Option<AudioMetadata> {
+ self.currently_playing.read().unwrap().clone()
+ }
+
pub fn reset(&self) -> Result<(), AudioPlayerError> {
info!("Setting pipeline state to null");
+ let mut currently_playing = self.currently_playing.write().unwrap();
+ *currently_playing = None;
+
self.pipeline.set_state(gst::State::Null)?;
Ok(())
@@ -273,20 +312,20 @@ impl AudioPlayer {
pub fn quit(&self, reason: String) {
info!("Quitting audio player");
- if let Err(e) = self
+ if let Err(_) = self
.bus
.post(&gst::Message::new_application(gst::Structure::new_empty("quit")).build())
{
- warn!("Failed to send \"quit\" app event: {}", e);
+ warn!("Tried to send \"quit\" app event on flushing bus.");
}
- let sender = self.sender.lock().unwrap();
+ let sender = self.sender.read().unwrap();
sender.send(MusicBotMessage::Quit(reason)).unwrap();
}
fn send_state(&self, state: State) {
info!("Sending state {:?} to application", state);
- let sender = self.sender.lock().unwrap();
+ let sender = self.sender.read().unwrap();
sender.send(MusicBotMessage::StateChange(state)).unwrap();
}
@@ -362,7 +401,7 @@ impl AudioPlayer {
}
}
_ => {
- //debug!("{:?}", msg)
+ //debug!("Unhandled message on bus: {:?}", msg)
}
};
}
diff --git a/src/bot/master.rs b/src/bot/master.rs
index 2488064..755aaa1 100644
--- a/src/bot/master.rs
+++ b/src/bot/master.rs
@@ -1,12 +1,13 @@
use std::collections::HashMap;
use std::future::Future;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, RwLock};
use futures::future::{FutureExt, TryFutureExt};
use futures01::future::Future as Future01;
use log::info;
use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
use serde::{Deserialize, Serialize};
+use tokio02::sync::mpsc::UnboundedSender;
use tsclientlib::{ClientId, ConnectOptions, Identity, MessageTarget};
use crate::audio_player::AudioPlayerError;
@@ -18,8 +19,9 @@ use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage};
pub struct MasterBot {
config: Arc<MasterConfig>,
- music_bots: Arc<Mutex<MusicBots>>,
+ music_bots: Arc<RwLock<MusicBots>>,
teamspeak: Arc<TeamSpeakConnection>,
+ sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
}
struct MusicBots {
@@ -32,7 +34,7 @@ struct MusicBots {
impl MasterBot {
pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) {
let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel();
- let tx = Arc::new(Mutex::new(tx));
+ let tx = Arc::new(RwLock::new(tx));
info!("Starting in TeamSpeak mode");
let mut con_config = ConnectOptions::new(args.address.clone())
@@ -65,7 +67,7 @@ impl MasterBot {
let name_count = config.names.len();
let id_count = config.ids.len();
- let music_bots = Arc::new(Mutex::new(MusicBots {
+ let music_bots = Arc::new(RwLock::new(MusicBots {
rng: SmallRng::from_entropy(),
available_names: (0..name_count).collect(),
available_ids: (0..id_count).collect(),
@@ -76,6 +78,7 @@ impl MasterBot {
config,
music_bots,
teamspeak: connection,
+ sender: tx.clone(),
});
bot.teamspeak
@@ -83,8 +86,12 @@ impl MasterBot {
let cbot = bot.clone();
let msg_loop = async move {
- loop {
+ 'outer: loop {
while let Some(msg) = rx.recv().await {
+ if let MusicBotMessage::Quit(reason) = msg {
+ cbot.teamspeak.disconnect(&reason);
+ break 'outer;
+ }
cbot.on_message(msg).await.unwrap();
}
}
@@ -115,7 +122,7 @@ impl MasterBot {
ref mut available_names,
ref mut available_ids,
ref connected_bots,
- } = &mut *self.music_bots.lock().expect("Mutex was not poisoned");
+ } = &mut *self.music_bots.write().expect("RwLock was not poisoned");
for (_, bot) in connected_bots {
if bot.my_channel() == channel {
@@ -163,7 +170,7 @@ impl MasterBot {
let cmusic_bots = self.music_bots.clone();
let disconnect_cb = Box::new(move |n, name_index, id_index| {
- let mut music_bots = cmusic_bots.lock().expect("Mutex was not poisoned");
+ let mut music_bots = cmusic_bots.write().expect("RwLock was not poisoned");
music_bots.connected_bots.remove(&n);
music_bots.available_names.push(name_index);
music_bots.available_ids.push(id_index);
@@ -188,7 +195,7 @@ impl MasterBot {
if let Some(bot_args) = self.build_bot_args_for(id) {
let (bot, fut) = MusicBot::new(bot_args).await;
tokio::spawn(fut.unit_error().boxed().compat().map(|_| ()));
- let mut music_bots = self.music_bots.lock().expect("Mutex was not poisoned");
+ let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned");
music_bots
.connected_bots
.insert(bot.name().to_string(), bot);
@@ -205,6 +212,62 @@ impl MasterBot {
Ok(())
}
+
+ pub fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> {
+ let music_bots = self.music_bots.read().unwrap();
+ let bot = music_bots.connected_bots.get(&name)?;
+
+ Some(crate::web_server::BotData {
+ name: name,
+ state: bot.state(),
+ volume: bot.volume(),
+ position: bot.position(),
+ currently_playing: bot.currently_playing(),
+ playlist: bot.playlist_to_vec(),
+ })
+ }
+
+ pub fn bot_datas(&self) -> Vec<crate::web_server::BotData> {
+ let music_bots = self.music_bots.read().unwrap();
+
+ let len = music_bots.connected_bots.len();
+ let mut result = Vec::with_capacity(len);
+ for (name, bot) in &music_bots.connected_bots {
+ let bot_data = crate::web_server::BotData {
+ name: name.clone(),
+ state: bot.state(),
+ volume: bot.volume(),
+ position: bot.position(),
+ currently_playing: bot.currently_playing(),
+ playlist: bot.playlist_to_vec(),
+ };
+
+ result.push(bot_data);
+ }
+
+ result
+ }
+
+ pub fn bot_names(&self) -> Vec<String> {
+ let music_bots = self.music_bots.read().unwrap();
+
+ let len = music_bots.connected_bots.len();
+ let mut result = Vec::with_capacity(len);
+ for (name, _) in &music_bots.connected_bots {
+ result.push(name.clone());
+ }
+
+ result
+ }
+
+ pub fn quit(&self, reason: String) {
+ let music_bots = self.music_bots.read().unwrap();
+ for (_, bot) in &music_bots.connected_bots {
+ bot.quit(reason.clone())
+ }
+ let sender = self.sender.read().unwrap();
+ sender.send(MusicBotMessage::Quit(reason)).unwrap();
+ }
}
#[derive(Debug, Serialize, Deserialize)]
@@ -217,6 +280,8 @@ pub struct MasterArgs {
pub channel: Option<String>,
#[serde(default = "default_verbose")]
pub verbose: u8,
+ pub domain: String,
+ pub bind_address: String,
pub names: Vec<String>,
pub id: Identity,
pub ids: Vec<Identity>,
@@ -251,6 +316,8 @@ impl MasterArgs {
ids: self.ids,
local,
address,
+ domain: self.domain,
+ bind_address: self.bind_address,
id: self.id,
channel,
verbose,
diff --git a/src/bot/music.rs b/src/bot/music.rs
index 2539695..41976e5 100644
--- a/src/bot/music.rs
+++ b/src/bot/music.rs
@@ -1,10 +1,12 @@
use std::future::Future;
use std::io::BufRead;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, RwLock};
use std::thread;
+use std::time::Duration;
use humantime;
use log::{debug, info};
+use serde::Serialize;
use structopt::StructOpt;
use tokio02::sync::mpsc::UnboundedSender;
use tsclientlib::{data, ChannelId, ClientId, ConnectOptions, Identity, Invoker, MessageTarget};
@@ -44,7 +46,7 @@ fn parse_seek(mut amount: &str) -> Result<Seek, ()> {
}
}
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize)]
pub enum State {
Playing,
Paused,
@@ -52,6 +54,18 @@ pub enum State {
EndOfStream,
}
+impl std::fmt::Display for State {
+ fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
+ match self {
+ State::Playing => write!(fmt, "Playing"),
+ State::Paused => write!(fmt, "Paused"),
+ State::Stopped | State::EndOfStream => write!(fmt, "Stopped"),
+ }?;
+
+ Ok(())
+ }
+}
+
#[derive(Debug)]
pub enum MusicBotMessage {
TextMessage(Message),
@@ -71,8 +85,8 @@ pub struct MusicBot {
name: String,
player: Arc<AudioPlayer>,
teamspeak: Option<Arc<TeamSpeakConnection>>,
- playlist: Arc<Mutex<Playlist>>,
- state: Arc<Mutex<State>>,
+ playlist: Arc<RwLock<Playlist>>,
+ state: Arc<RwLock<State>>,
}
pub struct MusicBotArgs {
@@ -90,7 +104,7 @@ pub struct MusicBotArgs {
impl MusicBot {
pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) {
let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel();
- let tx = Arc::new(Mutex::new(tx));
+ let tx = Arc::new(RwLock::new(tx));
let (player, connection) = if args.local {
info!("Starting in CLI mode");
let audio_player = AudioPlayer::new(tx.clone(), None).unwrap();
@@ -127,7 +141,7 @@ impl MusicBot {
player.set_volume(0.5).unwrap();
let player = Arc::new(player);
- let playlist = Arc::new(Mutex::new(Playlist::new()));
+ let playlist = Arc::new(RwLock::new(Playlist::new()));
spawn_gstreamer_thread(player.clone(), tx.clone());
@@ -140,7 +154,7 @@ impl MusicBot {
player,
teamspeak: connection,
playlist,
- state: Arc::new(Mutex::new(State::Stopped)),
+ state: Arc::new(RwLock::new(State::Stopped)),
});
let cbot = bot.clone();
@@ -173,24 +187,20 @@ impl MusicBot {
}
fn start_playing_audio(&self, metadata: AudioMetadata) {
- if let Some(title) = metadata.title {
- self.send_message(&format!("Playing {}", ts::underline(&title)));
- self.set_description(&format!("Currently playing '{}'", title));
- } else {
- self.send_message("Playing unknown title");
- self.set_description("Currently playing");
- }
+ self.send_message(&format!("Playing {}", ts::underline(&metadata.title)));
+ self.set_description(&format!("Currently playing '{}'", metadata.title));
self.player.reset().unwrap();
- self.player.set_source_url(metadata.url).unwrap();
+ self.player.set_metadata(metadata).unwrap();
self.player.play().unwrap();
}
- pub async fn add_audio(&self, url: String) {
+ pub async fn add_audio(&self, url: String, user: String) {
match crate::youtube_dl::get_audio_download_url(url).await {
- Ok(metadata) => {
+ Ok(mut metadata) => {
+ metadata.added_by = user;
info!("Found audio url: {}", metadata.url);
- let mut playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ let mut playlist = self.playlist.write().expect("RwLock was not poisoned");
playlist.push(metadata.clone());
if !self.player.is_started() {
@@ -198,11 +208,10 @@ impl MusicBot {
self.start_playing_audio(request);
}
} else {
- if let Some(title) = metadata.title {
- self.send_message(&format!("Added {} to playlist", ts::underline(&title)));
- } else {
- self.send_message("Added to playlist");
- }
+ self.send_message(&format!(
+ "Added {} to playlist",
+ ts::underline(&metadata.title)
+ ));
}
}
Err(e) => {
@@ -217,6 +226,26 @@ impl MusicBot {
&self.name
}
+ pub fn state(&self) -> State {
+ *self.state.read().expect("RwLock was not poisoned")
+ }
+
+ pub fn volume(&self) -> f64 {
+ self.player.volume()
+ }
+
+ pub fn position(&self) -> Option<Duration> {
+ self.player.position()
+ }
+
+ pub fn currently_playing(&self) -> Option<AudioMetadata> {
+ self.player.currently_playing()
+ }
+
+ pub fn playlist_to_vec(&self) -> Vec<AudioMetadata> {
+ self.playlist.read().unwrap().to_vec()
+ }
+
pub fn my_channel(&self) -> ChannelId {
self.teamspeak
.as_ref()
@@ -255,7 +284,7 @@ impl MusicBot {
let tokens = msg[1..].split_whitespace().collect::<Vec<_>>();
match Command::from_iter_safe(&tokens) {
- Ok(args) => self.on_command(args).await?,
+ Ok(args) => self.on_command(args, message.invoker).await?,
Err(e) if e.kind == structopt::clap::ErrorKind::HelpDisplayed => {
self.send_message(&format!("\n{}", e.message));
}
@@ -266,10 +295,10 @@ impl MusicBot {
Ok(())
}
- async fn on_command(&self, command: Command) -> Result<(), AudioPlayerError> {
+ async fn on_command(&self, command: Command, invoker: Invoker) -> Result<(), AudioPlayerError> {
match command {
Command::Play => {
- let playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ let playlist = self.playlist.read().expect("RwLock was not poisoned");
if !self.player.is_started() {
if !playlist.is_empty() {
@@ -283,7 +312,7 @@ impl MusicBot {
// strip bbcode tags from url
let url = url.replace("[URL]", "").replace("[/URL]", "");
- self.add_audio(url.to_string()).await;
+ self.add_audio(url.to_string(), invoker.name).await;
}
Command::Pause => {
self.player.pause()?;
@@ -303,7 +332,7 @@ impl MusicBot {
}
}
Command::Next => {
- let playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ let playlist = self.playlist.read().expect("RwLock was not poisoned");
if !playlist.is_empty() {
info!("Skipping to next track");
self.player.stop_current()?;
@@ -314,8 +343,8 @@ impl MusicBot {
}
Command::Clear => {
self.playlist
- .lock()
- .expect("Mutex was not poisoned")
+ .write()
+ .expect("RwLock was not poisoned")
.clear();
}
Command::Volume { percent: volume } => {
@@ -331,7 +360,7 @@ impl MusicBot {
}
fn on_state(&self, state: State) -> Result<(), AudioPlayerError> {
- let mut current_state = self.state.lock().unwrap();
+ let mut current_state = self.state.write().unwrap();
if *current_state != state {
match state {
State::Playing => {
@@ -345,7 +374,11 @@ impl MusicBot {
self.set_description("");
}
State::EndOfStream => {
- let next_track = self.playlist.lock().expect("Mutex was not poisoned").pop();
+ let next_track = self
+ .playlist
+ .write()
+ .expect("RwLock was not poisoned")
+ .pop();
if let Some(request) = next_track {
info!("Advancing playlist");
@@ -401,7 +434,7 @@ impl MusicBot {
}
}
-fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) {
+fn spawn_stdin_reader(tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>) {
debug!("Spawning stdin reader thread");
thread::Builder::new()
.name(String::from("stdin reader"))
@@ -421,7 +454,7 @@ fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) {
text: line,
});
- let tx = tx.lock().unwrap();
+ let tx = tx.read().unwrap();
tx.send(message).unwrap();
}
})
@@ -430,7 +463,7 @@ fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) {
fn spawn_gstreamer_thread(
player: Arc<AudioPlayer>,
- tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>,
+ tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
) {
thread::Builder::new()
.name(String::from("gstreamer polling"))
@@ -439,7 +472,7 @@ fn spawn_gstreamer_thread(
break;
}
- tx.lock()
+ tx.read()
.unwrap()
.send(MusicBotMessage::StateChange(State::EndOfStream))
.unwrap();
diff --git a/src/main.rs b/src/main.rs
index 922162f..2559a2a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,9 +1,10 @@
use std::fs::File;
use std::io::{Read, Write};
use std::path::PathBuf;
+use std::thread;
use futures::future::{FutureExt, TryFutureExt};
-use log::{debug, info};
+use log::{debug, error, info};
use structopt::clap::AppSettings;
use structopt::StructOpt;
use tsclientlib::Identity;
@@ -13,6 +14,7 @@ mod bot;
mod command;
mod playlist;
mod teamspeak;
+mod web_server;
mod youtube_dl;
use bot::{MasterArgs, MasterBot, MusicBot, MusicBotArgs};
@@ -116,7 +118,22 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
};
MusicBot::new(bot_args).await.1.await;
} else {
- MasterBot::new(bot_args).await.1.await;
+ let domain = bot_args.domain.clone();
+ let bind_address = bot_args.bind_address.clone();
+ let (bot, fut) = MasterBot::new(bot_args).await;
+
+ thread::spawn(|| {
+ let web_args = web_server::WebServerArgs {
+ domain,
+ bind_address,
+ bot,
+ };
+ if let Err(e) = web_server::start(web_args) {
+ error!("Error in web server: {}", e);
+ }
+ });
+
+ fut.await;
}
}
.unit_error()
diff --git a/src/playlist.rs b/src/playlist.rs
index 87c1c98..445f8a5 100644
--- a/src/playlist.rs
+++ b/src/playlist.rs
@@ -28,6 +28,16 @@ impl Playlist {
res
}
+ pub fn to_vec(&self) -> Vec<AudioMetadata> {
+ let (a, b) = self.data.as_slices();
+
+ let mut res = a.to_vec();
+ res.extend_from_slice(b);
+ res.reverse();
+
+ res
+ }
+
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
diff --git a/src/teamspeak/bbcode.rs b/src/teamspeak/bbcode.rs
index 28be08a..91d576a 100644
--- a/src/teamspeak/bbcode.rs
+++ b/src/teamspeak/bbcode.rs
@@ -1,4 +1,4 @@
-use std::fmt::{Formatter, Display, Error};
+use std::fmt::{Display, Error, Formatter};
#[allow(dead_code)]
pub enum BbCode<'a> {
@@ -14,7 +14,9 @@ impl<'a> Display for BbCode<'a> {
BbCode::Bold(text) => fmt.write_fmt(format_args!("[B]{}[/B]", text))?,
BbCode::Italic(text) => fmt.write_fmt(format_args!("[I]{}[/I]", text))?,
BbCode::Underline(text) => fmt.write_fmt(format_args!("[U]{}[/U]", text))?,
- BbCode::Link(text, url) => fmt.write_fmt(format_args!("[URL={}]{}[/URL]", url, text))?,
+ BbCode::Link(text, url) => {
+ fmt.write_fmt(format_args!("[URL={}]{}[/URL]", url, text))?
+ }
};
Ok(())
diff --git a/src/teamspeak/mod.rs b/src/teamspeak/mod.rs
index 5ac0d44..7551e77 100644
--- a/src/teamspeak/mod.rs
+++ b/src/teamspeak/mod.rs
@@ -1,4 +1,4 @@
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use futures::compat::Future01CompatExt;
@@ -76,7 +76,7 @@ fn get_message<'a>(event: &Event) -> Option<MusicBotMessage> {
impl TeamSpeakConnection {
pub async fn new(
- tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>,
+ tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
options: ConnectOptions,
) -> Result<TeamSpeakConnection, tsclientlib::Error> {
let conn = Connection::new(options).compat().await?;
@@ -89,7 +89,7 @@ impl TeamSpeakConnection {
if let ConEvents(_conn, events) = e {
for event in *events {
if let Some(msg) = get_message(event) {
- let tx = tx.lock().expect("Mutex was not poisoned");
+ let tx = tx.read().expect("RwLock was not poisoned");
// Ignore the result because the receiver might get dropped first.
let _ = tx.send(msg);
}
diff --git a/src/web_server.rs b/src/web_server.rs
new file mode 100644
index 0000000..01233f2
--- /dev/null
+++ b/src/web_server.rs
@@ -0,0 +1,122 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use actix::{Addr, SyncArbiter};
+use actix_web::{
+ get, http::header, middleware::Logger, post, web, App, HttpResponse, HttpServer, Responder,
+};
+use askama::actix_web::TemplateIntoResponse;
+use askama::Template;
+use serde::{Deserialize, Serialize};
+
+use crate::bot::MasterBot;
+use crate::youtube_dl::AudioMetadata;
+
+mod api;
+mod bot_executor;
+mod default;
+mod front_end_cookie;
+mod tmtu;
+pub use bot_executor::*;
+use front_end_cookie::FrontEnd;
+
+pub struct WebServerArgs {
+ pub domain: String,
+ pub bind_address: String,
+ pub bot: Arc<MasterBot>,
+}
+
+#[actix_rt::main]
+pub async fn start(args: WebServerArgs) -> std::io::Result<()> {
+ let cbot = args.bot.clone();
+ let bot_addr: Addr<BotExecutor> = SyncArbiter::start(4, move || BotExecutor(cbot.clone()));
+
+ HttpServer::new(move || {
+ App::new()
+ .data(bot_addr.clone())
+ .wrap(Logger::default())
+ .service(index)
+ .service(get_bot)
+ .service(post_front_end)
+ .service(
+ web::scope("/api")
+ .service(api::get_bot_list)
+ .service(api::get_bot),
+ )
+ .service(web::scope("/docs").service(get_api_docs))
+ .service(actix_files::Files::new("/static", "web_server/static/"))
+ })
+ .bind(args.bind_address)?
+ .run()
+ .await?;
+
+ args.bot.quit(String::from("Stopping"));
+
+ Ok(())
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "kebab-case")]
+struct FrontEndForm {
+ front_end: FrontEnd,
+}
+
+#[post("/front-end")]
+async fn post_front_end(form: web::Form<FrontEndForm>) -> impl Responder {
+ front_end_cookie::set_front_end(form.into_inner().front_end).await
+}
+
+#[derive(Debug, Serialize)]
+pub struct BotData {
+ pub name: String,
+ pub state: crate::bot::State,
+ pub volume: f64,
+ pub position: Option<Duration>,
+ pub currently_playing: Option<AudioMetadata>,
+ pub playlist: Vec<AudioMetadata>,
+}
+
+#[get("/")]
+async fn index(bot: web::Data<Addr<BotExecutor>>, front: FrontEnd) -> impl Responder {
+ match front {
+ FrontEnd::Default => default::index(bot).await,
+ FrontEnd::Tmtu => tmtu::index(bot).await,
+ }
+}
+
+#[get("/bot/{name}")]
+async fn get_bot(
+ bot: web::Data<Addr<BotExecutor>>,
+ name: web::Path<String>,
+ front: FrontEnd,
+) -> impl Responder {
+ match front {
+ FrontEnd::Tmtu => tmtu::get_bot(bot, name.into_inner()).await,
+ FrontEnd::Default => Ok(HttpResponse::Found().header(header::LOCATION, "/").finish()),
+ }
+}
+
+#[derive(Template)]
+#[template(path = "docs/api.htm")]
+struct ApiDocsTemplate;
+
+#[get("/api")]
+async fn get_api_docs() -> impl Responder {
+ ApiDocsTemplate.into_response()
+}
+
+mod filters {
+ use std::time::Duration;
+
+ pub fn fmt_duration(duration: &Option<Duration>) -> Result<String, askama::Error> {
+ if let Some(duration) = duration {
+ let secs = duration.as_secs();
+ let mins = secs / 60;
+ let submin_secs = secs % 60;
+
+ Ok(format!("{:02}:{:02}", mins, submin_secs))
+ } else {
+ Ok(String::from("--:--"))
+ }
+ }
+}
diff --git a/src/web_server/api.rs b/src/web_server/api.rs
new file mode 100644
index 0000000..4deedad
--- /dev/null
+++ b/src/web_server/api.rs
@@ -0,0 +1,48 @@
+use actix::Addr;
+use actix_web::{get, web, HttpResponse, Responder, ResponseError};
+use derive_more::Display;
+use serde::Serialize;
+
+use crate::web_server::{BotDataListRequest, BotDataRequest, BotExecutor};
+
+#[get("/bots")]
+pub async fn get_bot_list(bot: web::Data<Addr<BotExecutor>>) -> impl Responder {
+ let bot_datas = match bot.send(BotDataListRequest).await.unwrap() {
+ Ok(data) => data,
+ Err(_) => Vec::with_capacity(0),
+ };
+
+ web::Json(bot_datas)
+}
+
+#[get("/bots/{name}")]
+pub async fn get_bot(bot: web::Data<Addr<BotExecutor>>, name: web::Path<String>) -> impl Responder {
+ if let Some(bot_data) = bot.send(BotDataRequest(name.into_inner())).await.unwrap() {
+ Ok(web::Json(bot_data))
+ } else {
+ Err(ApiErrorKind::NotFound)
+ }
+}
+
+#[derive(Serialize)]
+struct ApiError {
+ error: String,
+ description: String,
+}
+
+#[derive(Debug, Display)]
+enum ApiErrorKind {
+ #[display(fmt = "Not Found")]
+ NotFound,
+}
+
+impl ResponseError for ApiErrorKind {
+ fn error_response(&self) -> HttpResponse {
+ match *self {
+ ApiErrorKind::NotFound => HttpResponse::NotFound().json(ApiError {
+ error: self.to_string(),
+ description: String::from("The requested resource was not found"),
+ }),
+ }
+ }
+}
diff --git a/src/web_server/bot_executor.rs b/src/web_server/bot_executor.rs
new file mode 100644
index 0000000..fde3c08
--- /dev/null
+++ b/src/web_server/bot_executor.rs
@@ -0,0 +1,63 @@
+use std::sync::Arc;
+
+use actix::{Actor, Handler, Message, SyncContext};
+
+use crate::bot::MasterBot;
+use crate::web_server::BotData;
+
+pub struct BotExecutor(pub Arc<MasterBot>);
+
+impl Actor for BotExecutor {
+ type Context = SyncContext<Self>;
+}
+
+pub struct BotNameListRequest;
+
+impl Message for BotNameListRequest {
+ // A plain Vec does not work for some reason
+ type Result = Result<Vec<String>, ()>;
+}
+
+impl Handler<BotNameListRequest> for BotExecutor {
+ type Result = Result<Vec<String>, ()>;
+
+ fn handle(&mut self, _: BotNameListRequest, _: &mut Self::Context) -> Self::Result {
+ let bot = &self.0;
+
+ Ok(bot.bot_names())
+ }
+}
+
+pub struct BotDataListRequest;
+
+impl Message for BotDataListRequest {
+ // A plain Vec does not work for some reason
+ type Result = Result<Vec<BotData>, ()>;
+}
+
+impl Handler<BotDataListRequest> for BotExecutor {
+ type Result = Result<Vec<BotData>, ()>;
+
+ fn handle(&mut self, _: BotDataListRequest, _: &mut Self::Context) -> Self::Result {
+ let bot = &self.0;
+
+ Ok(bot.bot_datas())
+ }
+}
+
+pub struct BotDataRequest(pub String);
+
+impl Message for BotDataRequest {
+ type Result = Option<BotData>;
+}
+
+impl Handler<BotDataRequest> for BotExecutor {
+ type Result = Option<BotData>;
+
+ fn handle(&mut self, r: BotDataRequest, _: &mut Self::Context) -> Self::Result {
+ let name = r.0;
+ let bot = &self.0;
+
+ bot.bot_data(name)
+ }
+}
diff --git a/src/web_server/default.rs b/src/web_server/default.rs
new file mode 100644
index 0000000..b3c8291
--- /dev/null
+++ b/src/web_server/default.rs
@@ -0,0 +1,24 @@
+use actix::Addr;
+use actix_web::{web, Error, HttpResponse};
+use askama::actix_web::TemplateIntoResponse;
+use askama::Template;
+
+use crate::web_server::{filters, BotData, BotDataListRequest, BotExecutor};
+
+#[derive(Template)]
+#[template(path = "index.htm")]
+struct OverviewTemplate<'a> {
+ bots: &'a [BotData],
+}
+
+pub async fn index(bot: web::Data<Addr<BotExecutor>>) -> Result<HttpResponse, Error> {
+ let bot_datas = match bot.send(BotDataListRequest).await.unwrap() {
+ Ok(data) => data,
+ Err(_) => Vec::with_capacity(0),
+ };
+
+ OverviewTemplate {
+ bots: &bot_datas[..],
+ }
+ .into_response()
+}
diff --git a/src/web_server/front_end_cookie.rs b/src/web_server/front_end_cookie.rs
new file mode 100644
index 0000000..4812d0d
--- /dev/null
+++ b/src/web_server/front_end_cookie.rs
@@ -0,0 +1,60 @@
+use futures::future::{ok, Ready};
+
+use actix_web::{
+ dev::Payload,
+ http::header::{COOKIE, LOCATION, SET_COOKIE},
+ FromRequest, HttpRequest, HttpResponse,
+};
+use serde::Deserialize;
+
+#[derive(PartialEq, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub enum FrontEnd {
+ Default,
+ Tmtu,
+}
+
+impl FrontEnd {
+ const COOKIE_NAME: &'static str = "front-end";
+
+ fn cookie(&self) -> String {
+ let name = match self {
+ FrontEnd::Default => "default",
+ FrontEnd::Tmtu => "tmtu",
+ };
+
+ format!("{}={}", Self::COOKIE_NAME, name)
+ }
+}
+
+impl FromRequest for FrontEnd {
+ type Error = ();
+ type Future = Ready<Result<Self, ()>>;
+ type Config = ();
+
+ fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future {
+ for header in req.headers().get_all(COOKIE) {
+ if let Ok(value) = header.to_str() {
+ for c in value.split(';').map(|s| s.trim()) {
+ let mut split = c.split('=');
+ if Some(Self::COOKIE_NAME) == split.next() {
+ match split.next() {
+ Some("default") => return ok(FrontEnd::Default),
+ Some("tmtu") => return ok(FrontEnd::Tmtu),
+ _ => (),
+ }
+ }
+ }
+ }
+ }
+
+ ok(FrontEnd::Default)
+ }
+}
+
+pub fn set_front_end(front: FrontEnd) -> HttpResponse {
+ HttpResponse::Found()
+ .header(SET_COOKIE, front.cookie())
+ .header(LOCATION, "/")
+ .finish()
+}
diff --git a/src/web_server/tmtu.rs b/src/web_server/tmtu.rs
new file mode 100644
index 0000000..0645ee4
--- /dev/null
+++ b/src/web_server/tmtu.rs
@@ -0,0 +1,41 @@
+use actix::Addr;
+use actix_web::{http::header, web, Error, HttpResponse};
+use askama::actix_web::TemplateIntoResponse;
+use askama::Template;
+
+use crate::web_server::{filters, BotData, BotDataRequest, BotExecutor, BotNameListRequest};
+
+#[derive(Template)]
+#[template(path = "tmtu/index.htm")]
+struct TmtuTemplate {
+ bot_names: Vec<String>,
+ bot: Option<BotData>,
+}
+
+pub async fn index(bot: web::Data<Addr<BotExecutor>>) -> Result<HttpResponse, Error> {
+ let bot_names = bot.send(BotNameListRequest).await.unwrap().unwrap();
+
+ TmtuTemplate {
+ bot_names,
+ bot: None,
+ }
+ .into_response()
+}
+
+pub async fn get_bot(
+ bot: web::Data<Addr<BotExecutor>>,
+ name: String,
+) -> Result<HttpResponse, Error> {
+ let bot_names = bot.send(BotNameListRequest).await.unwrap().unwrap();
+
+ if let Some(bot) = bot.send(BotDataRequest(name)).await.unwrap() {
+ TmtuTemplate {
+ bot_names,
+ bot: Some(bot),
+ }
+ .into_response()
+ } else {
+ // TODO to 404 or not to 404
+ Ok(HttpResponse::Found().header(header::LOCATION, "/").finish())
+ }
+}
diff --git a/src/youtube_dl.rs b/src/youtube_dl.rs
index c6012f0..89b1477 100644
--- a/src/youtube_dl.rs
+++ b/src/youtube_dl.rs
@@ -1,3 +1,5 @@
+use std::time::Duration;
+
use futures::compat::Future01CompatExt;
use std::process::{Command, Stdio};
use tokio_process::CommandExt;
@@ -9,7 +11,22 @@ use log::debug;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct AudioMetadata {
pub url: String,
- pub title: Option<String>,
+ pub webpage_url: String,
+ pub title: String,
+ pub thumbnail: Option<String>,
+ #[serde(default, deserialize_with = "duration_deserialize")]
+ pub duration: Option<Duration>,
+ #[serde(skip)]
+ pub added_by: String,
+}
+
+fn duration_deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
+where
+ D: serde::Deserializer<'de>,
+{
+ let dur: Option<f64> = Deserialize::deserialize(deserializer)?;
+
+ Ok(dur.map(|v| Duration::from_secs_f64(v)))
}
pub async fn get_audio_download_url(uri: String) -> Result<AudioMetadata, String> {