aboutsummaryrefslogtreecommitdiffstats
path: root/src/bot
diff options
context:
space:
mode:
Diffstat (limited to 'src/bot')
-rw-r--r--src/bot/master.rs83
-rw-r--r--src/bot/music.rs105
2 files changed, 144 insertions, 44 deletions
diff --git a/src/bot/master.rs b/src/bot/master.rs
index 2488064..755aaa1 100644
--- a/src/bot/master.rs
+++ b/src/bot/master.rs
@@ -1,12 +1,13 @@
use std::collections::HashMap;
use std::future::Future;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, RwLock};
use futures::future::{FutureExt, TryFutureExt};
use futures01::future::Future as Future01;
use log::info;
use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
use serde::{Deserialize, Serialize};
+use tokio02::sync::mpsc::UnboundedSender;
use tsclientlib::{ClientId, ConnectOptions, Identity, MessageTarget};
use crate::audio_player::AudioPlayerError;
@@ -18,8 +19,9 @@ use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage};
pub struct MasterBot {
config: Arc<MasterConfig>,
- music_bots: Arc<Mutex<MusicBots>>,
+ music_bots: Arc<RwLock<MusicBots>>,
teamspeak: Arc<TeamSpeakConnection>,
+ sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
}
struct MusicBots {
@@ -32,7 +34,7 @@ struct MusicBots {
impl MasterBot {
pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) {
let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel();
- let tx = Arc::new(Mutex::new(tx));
+ let tx = Arc::new(RwLock::new(tx));
info!("Starting in TeamSpeak mode");
let mut con_config = ConnectOptions::new(args.address.clone())
@@ -65,7 +67,7 @@ impl MasterBot {
let name_count = config.names.len();
let id_count = config.ids.len();
- let music_bots = Arc::new(Mutex::new(MusicBots {
+ let music_bots = Arc::new(RwLock::new(MusicBots {
rng: SmallRng::from_entropy(),
available_names: (0..name_count).collect(),
available_ids: (0..id_count).collect(),
@@ -76,6 +78,7 @@ impl MasterBot {
config,
music_bots,
teamspeak: connection,
+ sender: tx.clone(),
});
bot.teamspeak
@@ -83,8 +86,12 @@ impl MasterBot {
let cbot = bot.clone();
let msg_loop = async move {
- loop {
+ 'outer: loop {
while let Some(msg) = rx.recv().await {
+ if let MusicBotMessage::Quit(reason) = msg {
+ cbot.teamspeak.disconnect(&reason);
+ break 'outer;
+ }
cbot.on_message(msg).await.unwrap();
}
}
@@ -115,7 +122,7 @@ impl MasterBot {
ref mut available_names,
ref mut available_ids,
ref connected_bots,
- } = &mut *self.music_bots.lock().expect("Mutex was not poisoned");
+ } = &mut *self.music_bots.write().expect("RwLock was not poisoned");
for (_, bot) in connected_bots {
if bot.my_channel() == channel {
@@ -163,7 +170,7 @@ impl MasterBot {
let cmusic_bots = self.music_bots.clone();
let disconnect_cb = Box::new(move |n, name_index, id_index| {
- let mut music_bots = cmusic_bots.lock().expect("Mutex was not poisoned");
+ let mut music_bots = cmusic_bots.write().expect("RwLock was not poisoned");
music_bots.connected_bots.remove(&n);
music_bots.available_names.push(name_index);
music_bots.available_ids.push(id_index);
@@ -188,7 +195,7 @@ impl MasterBot {
if let Some(bot_args) = self.build_bot_args_for(id) {
let (bot, fut) = MusicBot::new(bot_args).await;
tokio::spawn(fut.unit_error().boxed().compat().map(|_| ()));
- let mut music_bots = self.music_bots.lock().expect("Mutex was not poisoned");
+ let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned");
music_bots
.connected_bots
.insert(bot.name().to_string(), bot);
@@ -205,6 +212,62 @@ impl MasterBot {
Ok(())
}
+
+ pub fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> {
+ let music_bots = self.music_bots.read().unwrap();
+ let bot = music_bots.connected_bots.get(&name)?;
+
+ Some(crate::web_server::BotData {
+ name: name,
+ state: bot.state(),
+ volume: bot.volume(),
+ position: bot.position(),
+ currently_playing: bot.currently_playing(),
+ playlist: bot.playlist_to_vec(),
+ })
+ }
+
+ pub fn bot_datas(&self) -> Vec<crate::web_server::BotData> {
+ let music_bots = self.music_bots.read().unwrap();
+
+ let len = music_bots.connected_bots.len();
+ let mut result = Vec::with_capacity(len);
+ for (name, bot) in &music_bots.connected_bots {
+ let bot_data = crate::web_server::BotData {
+ name: name.clone(),
+ state: bot.state(),
+ volume: bot.volume(),
+ position: bot.position(),
+ currently_playing: bot.currently_playing(),
+ playlist: bot.playlist_to_vec(),
+ };
+
+ result.push(bot_data);
+ }
+
+ result
+ }
+
+ pub fn bot_names(&self) -> Vec<String> {
+ let music_bots = self.music_bots.read().unwrap();
+
+ let len = music_bots.connected_bots.len();
+ let mut result = Vec::with_capacity(len);
+ for (name, _) in &music_bots.connected_bots {
+ result.push(name.clone());
+ }
+
+ result
+ }
+
+ pub fn quit(&self, reason: String) {
+ let music_bots = self.music_bots.read().unwrap();
+ for (_, bot) in &music_bots.connected_bots {
+ bot.quit(reason.clone())
+ }
+ let sender = self.sender.read().unwrap();
+ sender.send(MusicBotMessage::Quit(reason)).unwrap();
+ }
}
#[derive(Debug, Serialize, Deserialize)]
@@ -217,6 +280,8 @@ pub struct MasterArgs {
pub channel: Option<String>,
#[serde(default = "default_verbose")]
pub verbose: u8,
+ pub domain: String,
+ pub bind_address: String,
pub names: Vec<String>,
pub id: Identity,
pub ids: Vec<Identity>,
@@ -251,6 +316,8 @@ impl MasterArgs {
ids: self.ids,
local,
address,
+ domain: self.domain,
+ bind_address: self.bind_address,
id: self.id,
channel,
verbose,
diff --git a/src/bot/music.rs b/src/bot/music.rs
index 2539695..41976e5 100644
--- a/src/bot/music.rs
+++ b/src/bot/music.rs
@@ -1,10 +1,12 @@
use std::future::Future;
use std::io::BufRead;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, RwLock};
use std::thread;
+use std::time::Duration;
use humantime;
use log::{debug, info};
+use serde::Serialize;
use structopt::StructOpt;
use tokio02::sync::mpsc::UnboundedSender;
use tsclientlib::{data, ChannelId, ClientId, ConnectOptions, Identity, Invoker, MessageTarget};
@@ -44,7 +46,7 @@ fn parse_seek(mut amount: &str) -> Result<Seek, ()> {
}
}
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize)]
pub enum State {
Playing,
Paused,
@@ -52,6 +54,18 @@ pub enum State {
EndOfStream,
}
+impl std::fmt::Display for State {
+ fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
+ match self {
+ State::Playing => write!(fmt, "Playing"),
+ State::Paused => write!(fmt, "Paused"),
+ State::Stopped | State::EndOfStream => write!(fmt, "Stopped"),
+ }?;
+
+ Ok(())
+ }
+}
+
#[derive(Debug)]
pub enum MusicBotMessage {
TextMessage(Message),
@@ -71,8 +85,8 @@ pub struct MusicBot {
name: String,
player: Arc<AudioPlayer>,
teamspeak: Option<Arc<TeamSpeakConnection>>,
- playlist: Arc<Mutex<Playlist>>,
- state: Arc<Mutex<State>>,
+ playlist: Arc<RwLock<Playlist>>,
+ state: Arc<RwLock<State>>,
}
pub struct MusicBotArgs {
@@ -90,7 +104,7 @@ pub struct MusicBotArgs {
impl MusicBot {
pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) {
let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel();
- let tx = Arc::new(Mutex::new(tx));
+ let tx = Arc::new(RwLock::new(tx));
let (player, connection) = if args.local {
info!("Starting in CLI mode");
let audio_player = AudioPlayer::new(tx.clone(), None).unwrap();
@@ -127,7 +141,7 @@ impl MusicBot {
player.set_volume(0.5).unwrap();
let player = Arc::new(player);
- let playlist = Arc::new(Mutex::new(Playlist::new()));
+ let playlist = Arc::new(RwLock::new(Playlist::new()));
spawn_gstreamer_thread(player.clone(), tx.clone());
@@ -140,7 +154,7 @@ impl MusicBot {
player,
teamspeak: connection,
playlist,
- state: Arc::new(Mutex::new(State::Stopped)),
+ state: Arc::new(RwLock::new(State::Stopped)),
});
let cbot = bot.clone();
@@ -173,24 +187,20 @@ impl MusicBot {
}
fn start_playing_audio(&self, metadata: AudioMetadata) {
- if let Some(title) = metadata.title {
- self.send_message(&format!("Playing {}", ts::underline(&title)));
- self.set_description(&format!("Currently playing '{}'", title));
- } else {
- self.send_message("Playing unknown title");
- self.set_description("Currently playing");
- }
+ self.send_message(&format!("Playing {}", ts::underline(&metadata.title)));
+ self.set_description(&format!("Currently playing '{}'", metadata.title));
self.player.reset().unwrap();
- self.player.set_source_url(metadata.url).unwrap();
+ self.player.set_metadata(metadata).unwrap();
self.player.play().unwrap();
}
- pub async fn add_audio(&self, url: String) {
+ pub async fn add_audio(&self, url: String, user: String) {
match crate::youtube_dl::get_audio_download_url(url).await {
- Ok(metadata) => {
+ Ok(mut metadata) => {
+ metadata.added_by = user;
info!("Found audio url: {}", metadata.url);
- let mut playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ let mut playlist = self.playlist.write().expect("RwLock was not poisoned");
playlist.push(metadata.clone());
if !self.player.is_started() {
@@ -198,11 +208,10 @@ impl MusicBot {
self.start_playing_audio(request);
}
} else {
- if let Some(title) = metadata.title {
- self.send_message(&format!("Added {} to playlist", ts::underline(&title)));
- } else {
- self.send_message("Added to playlist");
- }
+ self.send_message(&format!(
+ "Added {} to playlist",
+ ts::underline(&metadata.title)
+ ));
}
}
Err(e) => {
@@ -217,6 +226,26 @@ impl MusicBot {
&self.name
}
+ pub fn state(&self) -> State {
+ *self.state.read().expect("RwLock was not poisoned")
+ }
+
+ pub fn volume(&self) -> f64 {
+ self.player.volume()
+ }
+
+ pub fn position(&self) -> Option<Duration> {
+ self.player.position()
+ }
+
+ pub fn currently_playing(&self) -> Option<AudioMetadata> {
+ self.player.currently_playing()
+ }
+
+ pub fn playlist_to_vec(&self) -> Vec<AudioMetadata> {
+ self.playlist.read().unwrap().to_vec()
+ }
+
pub fn my_channel(&self) -> ChannelId {
self.teamspeak
.as_ref()
@@ -255,7 +284,7 @@ impl MusicBot {
let tokens = msg[1..].split_whitespace().collect::<Vec<_>>();
match Command::from_iter_safe(&tokens) {
- Ok(args) => self.on_command(args).await?,
+ Ok(args) => self.on_command(args, message.invoker).await?,
Err(e) if e.kind == structopt::clap::ErrorKind::HelpDisplayed => {
self.send_message(&format!("\n{}", e.message));
}
@@ -266,10 +295,10 @@ impl MusicBot {
Ok(())
}
- async fn on_command(&self, command: Command) -> Result<(), AudioPlayerError> {
+ async fn on_command(&self, command: Command, invoker: Invoker) -> Result<(), AudioPlayerError> {
match command {
Command::Play => {
- let playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ let playlist = self.playlist.read().expect("RwLock was not poisoned");
if !self.player.is_started() {
if !playlist.is_empty() {
@@ -283,7 +312,7 @@ impl MusicBot {
// strip bbcode tags from url
let url = url.replace("[URL]", "").replace("[/URL]", "");
- self.add_audio(url.to_string()).await;
+ self.add_audio(url.to_string(), invoker.name).await;
}
Command::Pause => {
self.player.pause()?;
@@ -303,7 +332,7 @@ impl MusicBot {
}
}
Command::Next => {
- let playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ let playlist = self.playlist.read().expect("RwLock was not poisoned");
if !playlist.is_empty() {
info!("Skipping to next track");
self.player.stop_current()?;
@@ -314,8 +343,8 @@ impl MusicBot {
}
Command::Clear => {
self.playlist
- .lock()
- .expect("Mutex was not poisoned")
+ .write()
+ .expect("RwLock was not poisoned")
.clear();
}
Command::Volume { percent: volume } => {
@@ -331,7 +360,7 @@ impl MusicBot {
}
fn on_state(&self, state: State) -> Result<(), AudioPlayerError> {
- let mut current_state = self.state.lock().unwrap();
+ let mut current_state = self.state.write().unwrap();
if *current_state != state {
match state {
State::Playing => {
@@ -345,7 +374,11 @@ impl MusicBot {
self.set_description("");
}
State::EndOfStream => {
- let next_track = self.playlist.lock().expect("Mutex was not poisoned").pop();
+ let next_track = self
+ .playlist
+ .write()
+ .expect("RwLock was not poisoned")
+ .pop();
if let Some(request) = next_track {
info!("Advancing playlist");
@@ -401,7 +434,7 @@ impl MusicBot {
}
}
-fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) {
+fn spawn_stdin_reader(tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>) {
debug!("Spawning stdin reader thread");
thread::Builder::new()
.name(String::from("stdin reader"))
@@ -421,7 +454,7 @@ fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) {
text: line,
});
- let tx = tx.lock().unwrap();
+ let tx = tx.read().unwrap();
tx.send(message).unwrap();
}
})
@@ -430,7 +463,7 @@ fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) {
fn spawn_gstreamer_thread(
player: Arc<AudioPlayer>,
- tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>,
+ tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
) {
thread::Builder::new()
.name(String::from("gstreamer polling"))
@@ -439,7 +472,7 @@ fn spawn_gstreamer_thread(
break;
}
- tx.lock()
+ tx.read()
.unwrap()
.send(MusicBotMessage::StateChange(State::EndOfStream))
.unwrap();