aboutsummaryrefslogtreecommitdiffstats
path: root/src/bot
diff options
context:
space:
mode:
authorJokler <jokler@protonmail.com>2020-01-30 15:55:41 +0100
committerJokler <jokler@protonmail.com>2020-02-22 23:20:10 +0100
commit757edd214f841e8d95e4c5430d7ead7a0e8fecbb (patch)
tree3d0721d1d1f73c9bc1fd5ac23aef505e1051d5e5 /src/bot
parent2792ba9c8a7120a91b3bd2c6075e737690e73405 (diff)
downloadpokebot-757edd214f841e8d95e4c5430d7ead7a0e8fecbb.tar.gz
pokebot-757edd214f841e8d95e4c5430d7ead7a0e8fecbb.zip
Spawn actix-web server with access to the bot
Additionally replace all Mutexes with RwLocks. Hopefully this makes it possible for the web server to serve many requests at once since they would just hold read locks.
Diffstat (limited to 'src/bot')
-rw-r--r--src/bot/master.rs42
-rw-r--r--src/bot/music.rs38
2 files changed, 55 insertions, 25 deletions
diff --git a/src/bot/master.rs b/src/bot/master.rs
index 2488064..bc38cca 100644
--- a/src/bot/master.rs
+++ b/src/bot/master.rs
@@ -1,12 +1,13 @@
use std::collections::HashMap;
use std::future::Future;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, RwLock};
use futures::future::{FutureExt, TryFutureExt};
use futures01::future::Future as Future01;
use log::info;
use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
use serde::{Deserialize, Serialize};
+use tokio02::sync::mpsc::UnboundedSender;
use tsclientlib::{ClientId, ConnectOptions, Identity, MessageTarget};
use crate::audio_player::AudioPlayerError;
@@ -18,8 +19,9 @@ use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage};
pub struct MasterBot {
config: Arc<MasterConfig>,
- music_bots: Arc<Mutex<MusicBots>>,
+ music_bots: Arc<RwLock<MusicBots>>,
teamspeak: Arc<TeamSpeakConnection>,
+ sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
}
struct MusicBots {
@@ -32,7 +34,7 @@ struct MusicBots {
impl MasterBot {
pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) {
let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel();
- let tx = Arc::new(Mutex::new(tx));
+ let tx = Arc::new(RwLock::new(tx));
info!("Starting in TeamSpeak mode");
let mut con_config = ConnectOptions::new(args.address.clone())
@@ -65,7 +67,7 @@ impl MasterBot {
let name_count = config.names.len();
let id_count = config.ids.len();
- let music_bots = Arc::new(Mutex::new(MusicBots {
+ let music_bots = Arc::new(RwLock::new(MusicBots {
rng: SmallRng::from_entropy(),
available_names: (0..name_count).collect(),
available_ids: (0..id_count).collect(),
@@ -76,6 +78,7 @@ impl MasterBot {
config,
music_bots,
teamspeak: connection,
+ sender: tx.clone(),
});
bot.teamspeak
@@ -83,8 +86,12 @@ impl MasterBot {
let cbot = bot.clone();
let msg_loop = async move {
- loop {
+ 'outer: loop {
while let Some(msg) = rx.recv().await {
+ if let MusicBotMessage::Quit(reason) = msg {
+ cbot.teamspeak.disconnect(&reason);
+ break 'outer;
+ }
cbot.on_message(msg).await.unwrap();
}
}
@@ -115,7 +122,7 @@ impl MasterBot {
ref mut available_names,
ref mut available_ids,
ref connected_bots,
- } = &mut *self.music_bots.lock().expect("Mutex was not poisoned");
+ } = &mut *self.music_bots.write().expect("RwLock was not poisoned");
for (_, bot) in connected_bots {
if bot.my_channel() == channel {
@@ -163,7 +170,7 @@ impl MasterBot {
let cmusic_bots = self.music_bots.clone();
let disconnect_cb = Box::new(move |n, name_index, id_index| {
- let mut music_bots = cmusic_bots.lock().expect("Mutex was not poisoned");
+ let mut music_bots = cmusic_bots.write().expect("RwLock was not poisoned");
music_bots.connected_bots.remove(&n);
music_bots.available_names.push(name_index);
music_bots.available_ids.push(id_index);
@@ -188,7 +195,7 @@ impl MasterBot {
if let Some(bot_args) = self.build_bot_args_for(id) {
let (bot, fut) = MusicBot::new(bot_args).await;
tokio::spawn(fut.unit_error().boxed().compat().map(|_| ()));
- let mut music_bots = self.music_bots.lock().expect("Mutex was not poisoned");
+ let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned");
music_bots
.connected_bots
.insert(bot.name().to_string(), bot);
@@ -205,6 +212,25 @@ impl MasterBot {
Ok(())
}
+
+ pub fn names(&self) -> Vec<String> {
+ let music_bots = self.music_bots.read().unwrap();
+
+ music_bots
+ .connected_bots
+ .iter()
+ .map(|(_, b)| b.name().to_owned())
+ .collect()
+ }
+
+ pub fn quit(&self, reason: String) {
+ let music_bots = self.music_bots.read().unwrap();
+ for (_, bot) in &music_bots.connected_bots {
+ bot.quit(reason.clone())
+ }
+ let sender = self.sender.read().unwrap();
+ sender.send(MusicBotMessage::Quit(reason)).unwrap();
+ }
}
#[derive(Debug, Serialize, Deserialize)]
diff --git a/src/bot/music.rs b/src/bot/music.rs
index 2539695..a23ed3b 100644
--- a/src/bot/music.rs
+++ b/src/bot/music.rs
@@ -1,6 +1,6 @@
use std::future::Future;
use std::io::BufRead;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, RwLock};
use std::thread;
use humantime;
@@ -71,8 +71,8 @@ pub struct MusicBot {
name: String,
player: Arc<AudioPlayer>,
teamspeak: Option<Arc<TeamSpeakConnection>>,
- playlist: Arc<Mutex<Playlist>>,
- state: Arc<Mutex<State>>,
+ playlist: Arc<RwLock<Playlist>>,
+ state: Arc<RwLock<State>>,
}
pub struct MusicBotArgs {
@@ -90,7 +90,7 @@ pub struct MusicBotArgs {
impl MusicBot {
pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) {
let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel();
- let tx = Arc::new(Mutex::new(tx));
+ let tx = Arc::new(RwLock::new(tx));
let (player, connection) = if args.local {
info!("Starting in CLI mode");
let audio_player = AudioPlayer::new(tx.clone(), None).unwrap();
@@ -127,7 +127,7 @@ impl MusicBot {
player.set_volume(0.5).unwrap();
let player = Arc::new(player);
- let playlist = Arc::new(Mutex::new(Playlist::new()));
+ let playlist = Arc::new(RwLock::new(Playlist::new()));
spawn_gstreamer_thread(player.clone(), tx.clone());
@@ -140,7 +140,7 @@ impl MusicBot {
player,
teamspeak: connection,
playlist,
- state: Arc::new(Mutex::new(State::Stopped)),
+ state: Arc::new(RwLock::new(State::Stopped)),
});
let cbot = bot.clone();
@@ -190,7 +190,7 @@ impl MusicBot {
Ok(metadata) => {
info!("Found audio url: {}", metadata.url);
- let mut playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ let mut playlist = self.playlist.write().expect("RwLock was not poisoned");
playlist.push(metadata.clone());
if !self.player.is_started() {
@@ -269,7 +269,7 @@ impl MusicBot {
async fn on_command(&self, command: Command) -> Result<(), AudioPlayerError> {
match command {
Command::Play => {
- let playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ let playlist = self.playlist.read().expect("RwLock was not poisoned");
if !self.player.is_started() {
if !playlist.is_empty() {
@@ -303,7 +303,7 @@ impl MusicBot {
}
}
Command::Next => {
- let playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ let playlist = self.playlist.read().expect("RwLock was not poisoned");
if !playlist.is_empty() {
info!("Skipping to next track");
self.player.stop_current()?;
@@ -314,8 +314,8 @@ impl MusicBot {
}
Command::Clear => {
self.playlist
- .lock()
- .expect("Mutex was not poisoned")
+ .write()
+ .expect("RwLock was not poisoned")
.clear();
}
Command::Volume { percent: volume } => {
@@ -331,7 +331,7 @@ impl MusicBot {
}
fn on_state(&self, state: State) -> Result<(), AudioPlayerError> {
- let mut current_state = self.state.lock().unwrap();
+ let mut current_state = self.state.write().unwrap();
if *current_state != state {
match state {
State::Playing => {
@@ -345,7 +345,11 @@ impl MusicBot {
self.set_description("");
}
State::EndOfStream => {
- let next_track = self.playlist.lock().expect("Mutex was not poisoned").pop();
+ let next_track = self
+ .playlist
+ .write()
+ .expect("RwLock was not poisoned")
+ .pop();
if let Some(request) = next_track {
info!("Advancing playlist");
@@ -401,7 +405,7 @@ impl MusicBot {
}
}
-fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) {
+fn spawn_stdin_reader(tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>) {
debug!("Spawning stdin reader thread");
thread::Builder::new()
.name(String::from("stdin reader"))
@@ -421,7 +425,7 @@ fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) {
text: line,
});
- let tx = tx.lock().unwrap();
+ let tx = tx.read().unwrap();
tx.send(message).unwrap();
}
})
@@ -430,7 +434,7 @@ fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) {
fn spawn_gstreamer_thread(
player: Arc<AudioPlayer>,
- tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>,
+ tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
) {
thread::Builder::new()
.name(String::from("gstreamer polling"))
@@ -439,7 +443,7 @@ fn spawn_gstreamer_thread(
break;
}
- tx.lock()
+ tx.read()
.unwrap()
.send(MusicBotMessage::StateChange(State::EndOfStream))
.unwrap();