diff options
| author | Jokler <jokler@protonmail.com> | 2020-01-30 15:55:41 +0100 |
|---|---|---|
| committer | Jokler <jokler@protonmail.com> | 2020-02-22 23:20:10 +0100 |
| commit | 757edd214f841e8d95e4c5430d7ead7a0e8fecbb (patch) | |
| tree | 3d0721d1d1f73c9bc1fd5ac23aef505e1051d5e5 /src | |
| parent | 2792ba9c8a7120a91b3bd2c6075e737690e73405 (diff) | |
| download | pokebot-757edd214f841e8d95e4c5430d7ead7a0e8fecbb.tar.gz pokebot-757edd214f841e8d95e4c5430d7ead7a0e8fecbb.zip | |
Spawn actix-web server with access to the bot
Additionally replace all Mutexes with RwLocks.
Hopefully this makes it possible for the web server to
serve many requests at once since they would just hold read locks.
Diffstat (limited to 'src')
| -rw-r--r-- | src/audio_player.rs | 10 | ||||
| -rw-r--r-- | src/bot/master.rs | 42 | ||||
| -rw-r--r-- | src/bot/music.rs | 38 | ||||
| -rw-r--r-- | src/main.rs | 21 | ||||
| -rw-r--r-- | src/teamspeak/bbcode.rs | 6 | ||||
| -rw-r--r-- | src/teamspeak/mod.rs | 6 | ||||
| -rw-r--r-- | src/web_server.rs | 60 |
7 files changed, 146 insertions, 37 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs index 9ed645d..cdb04d7 100644 --- a/src/audio_player.rs +++ b/src/audio_player.rs @@ -10,7 +10,7 @@ use gstreamer_audio::{StreamVolume, StreamVolumeFormat}; use crate::bot::{MusicBotMessage, State}; use glib::BoolError; use log::{debug, error, info, warn}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use tokio02::sync::mpsc::UnboundedSender; static GST_INIT: Once = Once::new(); @@ -34,7 +34,7 @@ pub struct AudioPlayer { http_src: gst::Element, volume: gst::Element, - sender: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, + sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, } fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> { @@ -83,7 +83,7 @@ fn add_decode_bin_new_pad_callback( impl AudioPlayer { pub fn new( - sender: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, + sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, callback: Option<Box<dyn FnMut(&[u8]) + Send>>, ) -> Result<Self, AudioPlayerError> { GST_INIT.call_once(|| gst::init().unwrap()); @@ -280,13 +280,13 @@ impl AudioPlayer { warn!("Failed to send \"quit\" app event: {}", e); } - let sender = self.sender.lock().unwrap(); + let sender = self.sender.read().unwrap(); sender.send(MusicBotMessage::Quit(reason)).unwrap(); } fn send_state(&self, state: State) { info!("Sending state {:?} to application", state); - let sender = self.sender.lock().unwrap(); + let sender = self.sender.read().unwrap(); sender.send(MusicBotMessage::StateChange(state)).unwrap(); } diff --git a/src/bot/master.rs b/src/bot/master.rs index 2488064..bc38cca 100644 --- a/src/bot/master.rs +++ b/src/bot/master.rs @@ -1,12 +1,13 @@ use std::collections::HashMap; use std::future::Future; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use futures::future::{FutureExt, TryFutureExt}; use futures01::future::Future as Future01; use log::info; use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; use serde::{Deserialize, Serialize}; +use tokio02::sync::mpsc::UnboundedSender; use tsclientlib::{ClientId, ConnectOptions, Identity, MessageTarget}; use crate::audio_player::AudioPlayerError; @@ -18,8 +19,9 @@ use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage}; pub struct MasterBot { config: Arc<MasterConfig>, - music_bots: Arc<Mutex<MusicBots>>, + music_bots: Arc<RwLock<MusicBots>>, teamspeak: Arc<TeamSpeakConnection>, + sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, } struct MusicBots { @@ -32,7 +34,7 @@ struct MusicBots { impl MasterBot { pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) { let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); - let tx = Arc::new(Mutex::new(tx)); + let tx = Arc::new(RwLock::new(tx)); info!("Starting in TeamSpeak mode"); let mut con_config = ConnectOptions::new(args.address.clone()) @@ -65,7 +67,7 @@ impl MasterBot { let name_count = config.names.len(); let id_count = config.ids.len(); - let music_bots = Arc::new(Mutex::new(MusicBots { + let music_bots = Arc::new(RwLock::new(MusicBots { rng: SmallRng::from_entropy(), available_names: (0..name_count).collect(), available_ids: (0..id_count).collect(), @@ -76,6 +78,7 @@ impl MasterBot { config, music_bots, teamspeak: connection, + sender: tx.clone(), }); bot.teamspeak @@ -83,8 +86,12 @@ impl MasterBot { let cbot = bot.clone(); let msg_loop = async move { - loop { + 'outer: loop { while let Some(msg) = rx.recv().await { + if let MusicBotMessage::Quit(reason) = msg { + cbot.teamspeak.disconnect(&reason); + break 'outer; + } cbot.on_message(msg).await.unwrap(); } } @@ -115,7 +122,7 @@ impl MasterBot { ref mut available_names, ref mut available_ids, ref connected_bots, - } = &mut *self.music_bots.lock().expect("Mutex was not poisoned"); + } = &mut *self.music_bots.write().expect("RwLock was not poisoned"); for (_, bot) in connected_bots { if bot.my_channel() == channel { @@ -163,7 +170,7 @@ impl MasterBot { let cmusic_bots = self.music_bots.clone(); let disconnect_cb = Box::new(move |n, name_index, id_index| { - let mut music_bots = cmusic_bots.lock().expect("Mutex was not poisoned"); + let mut music_bots = cmusic_bots.write().expect("RwLock was not poisoned"); music_bots.connected_bots.remove(&n); music_bots.available_names.push(name_index); music_bots.available_ids.push(id_index); @@ -188,7 +195,7 @@ impl MasterBot { if let Some(bot_args) = self.build_bot_args_for(id) { let (bot, fut) = MusicBot::new(bot_args).await; tokio::spawn(fut.unit_error().boxed().compat().map(|_| ())); - let mut music_bots = self.music_bots.lock().expect("Mutex was not poisoned"); + let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned"); music_bots .connected_bots .insert(bot.name().to_string(), bot); @@ -205,6 +212,25 @@ impl MasterBot { Ok(()) } + + pub fn names(&self) -> Vec<String> { + let music_bots = self.music_bots.read().unwrap(); + + music_bots + .connected_bots + .iter() + .map(|(_, b)| b.name().to_owned()) + .collect() + } + + pub fn quit(&self, reason: String) { + let music_bots = self.music_bots.read().unwrap(); + for (_, bot) in &music_bots.connected_bots { + bot.quit(reason.clone()) + } + let sender = self.sender.read().unwrap(); + sender.send(MusicBotMessage::Quit(reason)).unwrap(); + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/bot/music.rs b/src/bot/music.rs index 2539695..a23ed3b 100644 --- a/src/bot/music.rs +++ b/src/bot/music.rs @@ -1,6 +1,6 @@ use std::future::Future; use std::io::BufRead; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use std::thread; use humantime; @@ -71,8 +71,8 @@ pub struct MusicBot { name: String, player: Arc<AudioPlayer>, teamspeak: Option<Arc<TeamSpeakConnection>>, - playlist: Arc<Mutex<Playlist>>, - state: Arc<Mutex<State>>, + playlist: Arc<RwLock<Playlist>>, + state: Arc<RwLock<State>>, } pub struct MusicBotArgs { @@ -90,7 +90,7 @@ pub struct MusicBotArgs { impl MusicBot { pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) { let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); - let tx = Arc::new(Mutex::new(tx)); + let tx = Arc::new(RwLock::new(tx)); let (player, connection) = if args.local { info!("Starting in CLI mode"); let audio_player = AudioPlayer::new(tx.clone(), None).unwrap(); @@ -127,7 +127,7 @@ impl MusicBot { player.set_volume(0.5).unwrap(); let player = Arc::new(player); - let playlist = Arc::new(Mutex::new(Playlist::new())); + let playlist = Arc::new(RwLock::new(Playlist::new())); spawn_gstreamer_thread(player.clone(), tx.clone()); @@ -140,7 +140,7 @@ impl MusicBot { player, teamspeak: connection, playlist, - state: Arc::new(Mutex::new(State::Stopped)), + state: Arc::new(RwLock::new(State::Stopped)), }); let cbot = bot.clone(); @@ -190,7 +190,7 @@ impl MusicBot { Ok(metadata) => { info!("Found audio url: {}", metadata.url); - let mut playlist = self.playlist.lock().expect("Mutex was not poisoned"); + let mut playlist = self.playlist.write().expect("RwLock was not poisoned"); playlist.push(metadata.clone()); if !self.player.is_started() { @@ -269,7 +269,7 @@ impl MusicBot { async fn on_command(&self, command: Command) -> Result<(), AudioPlayerError> { match command { Command::Play => { - let playlist = self.playlist.lock().expect("Mutex was not poisoned"); + let playlist = self.playlist.read().expect("RwLock was not poisoned"); if !self.player.is_started() { if !playlist.is_empty() { @@ -303,7 +303,7 @@ impl MusicBot { } } Command::Next => { - let playlist = self.playlist.lock().expect("Mutex was not poisoned"); + let playlist = self.playlist.read().expect("RwLock was not poisoned"); if !playlist.is_empty() { info!("Skipping to next track"); self.player.stop_current()?; @@ -314,8 +314,8 @@ impl MusicBot { } Command::Clear => { self.playlist - .lock() - .expect("Mutex was not poisoned") + .write() + .expect("RwLock was not poisoned") .clear(); } Command::Volume { percent: volume } => { @@ -331,7 +331,7 @@ impl MusicBot { } fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { - let mut current_state = self.state.lock().unwrap(); + let mut current_state = self.state.write().unwrap(); if *current_state != state { match state { State::Playing => { @@ -345,7 +345,11 @@ impl MusicBot { self.set_description(""); } State::EndOfStream => { - let next_track = self.playlist.lock().expect("Mutex was not poisoned").pop(); + let next_track = self + .playlist + .write() + .expect("RwLock was not poisoned") + .pop(); if let Some(request) = next_track { info!("Advancing playlist"); @@ -401,7 +405,7 @@ impl MusicBot { } } -fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) { +fn spawn_stdin_reader(tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>) { debug!("Spawning stdin reader thread"); thread::Builder::new() .name(String::from("stdin reader")) @@ -421,7 +425,7 @@ fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) { text: line, }); - let tx = tx.lock().unwrap(); + let tx = tx.read().unwrap(); tx.send(message).unwrap(); } }) @@ -430,7 +434,7 @@ fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) { fn spawn_gstreamer_thread( player: Arc<AudioPlayer>, - tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, + tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, ) { thread::Builder::new() .name(String::from("gstreamer polling")) @@ -439,7 +443,7 @@ fn spawn_gstreamer_thread( break; } - tx.lock() + tx.read() .unwrap() .send(MusicBotMessage::StateChange(State::EndOfStream)) .unwrap(); diff --git a/src/main.rs b/src/main.rs index 922162f..2559a2a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,10 @@ use std::fs::File; use std::io::{Read, Write}; use std::path::PathBuf; +use std::thread; use futures::future::{FutureExt, TryFutureExt}; -use log::{debug, info}; +use log::{debug, error, info}; use structopt::clap::AppSettings; use structopt::StructOpt; use tsclientlib::Identity; @@ -13,6 +14,7 @@ mod bot; mod command; mod playlist; mod teamspeak; +mod web_server; mod youtube_dl; use bot::{MasterArgs, MasterBot, MusicBot, MusicBotArgs}; @@ -116,7 +118,22 @@ fn run() -> Result<(), Box<dyn std::error::Error>> { }; MusicBot::new(bot_args).await.1.await; } else { - MasterBot::new(bot_args).await.1.await; + let domain = bot_args.domain.clone(); + let bind_address = bot_args.bind_address.clone(); + let (bot, fut) = MasterBot::new(bot_args).await; + + thread::spawn(|| { + let web_args = web_server::WebServerArgs { + domain, + bind_address, + bot, + }; + if let Err(e) = web_server::start(web_args) { + error!("Error in web server: {}", e); + } + }); + + fut.await; } } .unit_error() diff --git a/src/teamspeak/bbcode.rs b/src/teamspeak/bbcode.rs index 28be08a..91d576a 100644 --- a/src/teamspeak/bbcode.rs +++ b/src/teamspeak/bbcode.rs @@ -1,4 +1,4 @@ -use std::fmt::{Formatter, Display, Error}; +use std::fmt::{Display, Error, Formatter}; #[allow(dead_code)] pub enum BbCode<'a> { @@ -14,7 +14,9 @@ impl<'a> Display for BbCode<'a> { BbCode::Bold(text) => fmt.write_fmt(format_args!("[B]{}[/B]", text))?, BbCode::Italic(text) => fmt.write_fmt(format_args!("[I]{}[/I]", text))?, BbCode::Underline(text) => fmt.write_fmt(format_args!("[U]{}[/U]", text))?, - BbCode::Link(text, url) => fmt.write_fmt(format_args!("[URL={}]{}[/URL]", url, text))?, + BbCode::Link(text, url) => { + fmt.write_fmt(format_args!("[URL={}]{}[/URL]", url, text))? + } }; Ok(()) diff --git a/src/teamspeak/mod.rs b/src/teamspeak/mod.rs index 5ac0d44..7551e77 100644 --- a/src/teamspeak/mod.rs +++ b/src/teamspeak/mod.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use futures::compat::Future01CompatExt; @@ -76,7 +76,7 @@ fn get_message<'a>(event: &Event) -> Option<MusicBotMessage> { impl TeamSpeakConnection { pub async fn new( - tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, + tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, options: ConnectOptions, ) -> Result<TeamSpeakConnection, tsclientlib::Error> { let conn = Connection::new(options).compat().await?; @@ -89,7 +89,7 @@ impl TeamSpeakConnection { if let ConEvents(_conn, events) = e { for event in *events { if let Some(msg) = get_message(event) { - let tx = tx.lock().expect("Mutex was not poisoned"); + let tx = tx.read().expect("RwLock was not poisoned"); // Ignore the result because the receiver might get dropped first. let _ = tx.send(msg); } diff --git a/src/web_server.rs b/src/web_server.rs new file mode 100644 index 0000000..1edbc50 --- /dev/null +++ b/src/web_server.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use actix::{Actor, Addr, Handler, Message, SyncArbiter, SyncContext}; +use actix_web::{get, middleware::Logger, web, App, HttpResponse, HttpServer, Responder}; + +use crate::bot::MasterBot; + +struct GetNames; + +impl Message for GetNames { + type Result = Result<Vec<String>, ()>; +} + +#[get("/")] +async fn index(bot: web::Data<Addr<BotExecutor>>) -> impl Responder { + let names = bot.send(GetNames).await.unwrap().unwrap(); + HttpResponse::Ok().body(&format!("Music bots connected: {}", names.join(", "))) +} + +pub struct WebServerArgs { + pub domain: String, + pub bind_address: String, + pub bot: Arc<MasterBot>, +} + +#[actix_rt::main] +pub async fn start(args: WebServerArgs) -> std::io::Result<()> { + let cbot = args.bot.clone(); + let bot_addr: Addr<BotExecutor> = SyncArbiter::start(4, move || BotExecutor(cbot.clone())); + + HttpServer::new(move || { + App::new() + .data(bot_addr.clone()) + .wrap(Logger::default()) + .service(index) + }) + .bind(args.bind_address)? + .run() + .await?; + + args.bot.quit(String::from("Stopping")); + + Ok(()) +} + +pub struct BotExecutor(pub Arc<MasterBot>); + +impl Actor for BotExecutor { + type Context = SyncContext<Self>; +} + +impl Handler<GetNames> for BotExecutor { + type Result = Result<Vec<String>, ()>; + + fn handle(&mut self, _: GetNames, _: &mut Self::Context) -> Self::Result { + let bot = &self.0; + + Ok(bot.names()) + } +} |
