diff options
| author | Jokler <jokler@protonmail.com> | 2020-10-14 00:19:27 +0200 |
|---|---|---|
| committer | Jokler <jokler@protonmail.com> | 2020-10-15 01:45:29 +0200 |
| commit | 4e1c2b9f04073294ecb8402486c20d9c01721598 (patch) | |
| tree | 93fe1d75477ae3d1c8466611a2cedd7bed316aa2 /src/bot/master.rs | |
| parent | 23671b51b4e207574a63bce820acbf43169e2b6c (diff) | |
| download | pokebot-4e1c2b9f04073294ecb8402486c20d9c01721598.tar.gz pokebot-4e1c2b9f04073294ecb8402486c20d9c01721598.zip | |
Replace channels&locks with actors & log with slog
Diffstat (limited to 'src/bot/master.rs')
| -rw-r--r-- | src/bot/master.rs | 363 |
1 files changed, 172 insertions, 191 deletions
diff --git a/src/bot/master.rs b/src/bot/master.rs index 1480e17..94332ac 100644 --- a/src/bot/master.rs +++ b/src/bot/master.rs @@ -1,41 +1,52 @@ use std::collections::HashMap; -use std::future::Future; -use std::sync::{Arc, RwLock}; -use log::info; +use async_trait::async_trait; +use futures::future; use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc::UnboundedSender; -use tsclientlib::{ClientId, Connection, Identity, MessageTarget}; +use slog::{error, info, o, trace, Logger}; +use tsclientlib::{ClientId, ConnectOptions, Connection, Identity, MessageTarget}; +use xtra::{spawn::Tokio, Actor, Address, Context, Handler, Message, WeakAddress}; use crate::audio_player::AudioPlayerError; use crate::teamspeak::TeamSpeakConnection; use crate::Args; -use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage}; +use crate::bot::{GetBotData, GetChannel, GetName, MusicBot, MusicBotArgs, MusicBotMessage}; pub struct MasterBot { - config: Arc<MasterConfig>, - music_bots: Arc<RwLock<MusicBots>>, + config: MasterConfig, + my_addr: Option<WeakAddress<Self>>, teamspeak: TeamSpeakConnection, - sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, + available_names: Vec<String>, + available_ids: Vec<Identity>, + connected_bots: HashMap<String, Address<MusicBot>>, + rng: SmallRng, + logger: Logger, } -struct MusicBots { - rng: SmallRng, - available_names: Vec<usize>, - available_ids: Vec<usize>, - connected_bots: HashMap<String, Arc<MusicBot>>, +#[derive(Debug, Serialize, Deserialize)] +pub struct MasterArgs { + #[serde(default = "default_name")] + pub master_name: String, + pub address: String, + pub channel: Option<String>, + #[serde(default = "default_verbose")] + pub verbose: u8, + pub domain: String, + pub bind_address: String, + pub names: Vec<String>, + pub id: Option<Identity>, + pub ids: Option<Vec<Identity>>, } impl MasterBot { - pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let tx = Arc::new(RwLock::new(tx)); - info!("Starting in TeamSpeak mode"); + pub async fn spawn(args: MasterArgs, logger: Logger) -> Address<Self> { + info!(logger, "Starting in TeamSpeak mode"); let mut con_config = Connection::build(args.address.clone()) + .logger(logger.clone()) .version(tsclientlib::Version::Linux_3_3_2) .name(args.master_name.clone()) .identity(args.id.expect("identity should exist")) @@ -47,168 +58,116 @@ impl MasterBot { con_config = con_config.channel(channel); } - let connection = TeamSpeakConnection::new(tx.clone(), con_config) - .await - .unwrap(); + let connection = TeamSpeakConnection::new(logger.clone()).await.unwrap(); + trace!(logger, "Created teamspeak connection"); - let config = Arc::new(MasterConfig { + let config = MasterConfig { master_name: args.master_name, address: args.address, - names: args.names, - ids: args.ids.expect("identies should exists"), - local: args.local, verbose: args.verbose, - }); - - let name_count = config.names.len(); - let id_count = config.ids.len(); + }; - let music_bots = Arc::new(RwLock::new(MusicBots { + let bot_addr = Self { + config, + my_addr: None, + teamspeak: connection, + logger: logger.clone(), rng: SmallRng::from_entropy(), - available_names: (0..name_count).collect(), - available_ids: (0..id_count).collect(), + available_names: args.names, + available_ids: args.ids.expect("identities"), connected_bots: HashMap::new(), - })); + } + .create(None) + .spawn(&mut Tokio::Global); - let bot = Arc::new(Self { - config, - music_bots, - teamspeak: connection, - sender: tx.clone(), - }); - - let cbot = bot.clone(); - let msg_loop = async move { - 'outer: loop { - while let Some(msg) = rx.recv().await { - match msg { - MusicBotMessage::Quit(reason) => { - let mut cteamspeak = cbot.teamspeak.clone(); - cteamspeak.disconnect(&reason).await; - break 'outer; - } - MusicBotMessage::ClientDisconnected { id, .. } => { - if id == cbot.my_id().await { - // TODO Reconnect since quit was not called - break 'outer; - } - } - _ => cbot.on_message(msg).await.unwrap(), - } - } - } - }; + bot_addr.send(Connect(con_config)).await.unwrap().unwrap(); + trace!(logger, "Spawned master bot actor"); - (bot, msg_loop) + bot_addr } - async fn build_bot_args_for(&self, id: ClientId) -> Result<MusicBotArgs, BotCreationError> { - let mut cteamspeak = self.teamspeak.clone(); - let channel = match cteamspeak.channel_of_user(id).await { + async fn bot_args_for_client( + &mut self, + user_id: ClientId, + ) -> Result<MusicBotArgs, BotCreationError> { + let channel = match self.teamspeak.channel_of_user(user_id).await { Some(channel) => channel, None => return Err(BotCreationError::UnfoundUser), }; - if channel == cteamspeak.my_channel().await { + if channel == self.teamspeak.current_channel().await.unwrap() { return Err(BotCreationError::MasterChannel( self.config.master_name.clone(), )); } - let MusicBots { - ref mut rng, - ref mut available_names, - ref mut available_ids, - ref connected_bots, - } = &mut *self.music_bots.write().expect("RwLock was not poisoned"); - - for bot in connected_bots.values() { - if bot.my_channel().await == channel { - return Err(BotCreationError::MultipleBots(bot.name().to_owned())); + for bot in self.connected_bots.values() { + if bot.send(GetChannel).await.unwrap() == Some(channel) { + return Err(BotCreationError::MultipleBots( + bot.send(GetName).await.unwrap(), + )); } } - let channel_path = cteamspeak - .channel_path_of_user(id) + let channel_path = self + .teamspeak + .channel_path_of_user(user_id) .await .expect("can find poke sender"); - available_names.shuffle(rng); - let name_index = match available_names.pop() { + self.available_names.shuffle(&mut self.rng); + let name = match self.available_names.pop() { Some(v) => v, None => { return Err(BotCreationError::OutOfNames); } }; - let name = self.config.names[name_index].clone(); - available_ids.shuffle(rng); - let id_index = match available_ids.pop() { + self.available_ids.shuffle(&mut self.rng); + let identity = match self.available_ids.pop() { Some(v) => v, None => { return Err(BotCreationError::OutOfIdentities); } }; - let id = self.config.ids[id_index].clone(); - - let cmusic_bots = self.music_bots.clone(); - let disconnect_cb = Box::new(move |n, name_index, id_index| { - let mut music_bots = cmusic_bots.write().expect("RwLock was not poisoned"); - music_bots.connected_bots.remove(&n); - music_bots.available_names.push(name_index); - music_bots.available_ids.push(id_index); - }); - - info!("Connecting to {} on {}", channel_path, self.config.address); - Ok(MusicBotArgs { - name, - name_index, - id_index, - local: self.config.local, + name: name.clone(), + master: self.my_addr.clone(), address: self.config.address.clone(), - id, + identity, + local: false, channel: channel_path, verbose: self.config.verbose, - disconnect_cb, + logger: self.logger.new(o!("musicbot" => name)), }) } - async fn spawn_bot_for(&self, id: ClientId) { - match self.build_bot_args_for(id).await { + async fn spawn_bot_for_client(&mut self, id: ClientId) { + match self.bot_args_for_client(id).await { Ok(bot_args) => { - let (bot, fut) = MusicBot::new(bot_args).await; - tokio::spawn(fut); - let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned"); - music_bots - .connected_bots - .insert(bot.name().to_string(), bot); - } - Err(e) => { - let mut cteamspeak = self.teamspeak.clone(); - cteamspeak.send_message_to_user(id, e.to_string()).await + let name = bot_args.name.clone(); + let bot = MusicBot::spawn(bot_args).await; + self.connected_bots.insert(name, bot); } + Err(e) => self.teamspeak.send_message_to_user(id, e.to_string()).await, } } - async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { + async fn on_message(&mut self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { match message { MusicBotMessage::TextMessage(message) => { if let MessageTarget::Poke(who) = message.target { - info!("Poked by {}, creating bot for their channel", who); - self.spawn_bot_for(who).await; + info!( + self.logger, + "Poked, creating bot"; "user" => %who + ); + self.spawn_bot_for_client(who).await; } } - MusicBotMessage::ChannelAdded(id) => { - let mut cteamspeak = self.teamspeak.clone(); - cteamspeak.subscribe(id).await; - } MusicBotMessage::ClientAdded(id) => { - let mut cteamspeak = self.teamspeak.clone(); - - if id == cteamspeak.my_id().await { - cteamspeak + if id == self.teamspeak.my_id().await { + self.teamspeak .set_description(String::from("Poke me if you want a music bot!")) .await; } @@ -219,41 +178,17 @@ impl MasterBot { Ok(()) } - async fn my_id(&self) -> ClientId { - let mut cteamspeak = self.teamspeak.clone(); + pub async fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> { + let bot = self.connected_bots.get(&name)?; - cteamspeak.my_id().await + bot.send(GetBotData).await.ok() } - pub fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> { - let music_bots = self.music_bots.read().unwrap(); - let bot = music_bots.connected_bots.get(&name)?; - - Some(crate::web_server::BotData { - name, - state: bot.state(), - volume: bot.volume(), - position: bot.position(), - currently_playing: bot.currently_playing(), - playlist: bot.playlist_to_vec(), - }) - } - - pub fn bot_datas(&self) -> Vec<crate::web_server::BotData> { - let music_bots = self.music_bots.read().unwrap(); - - let len = music_bots.connected_bots.len(); + pub async fn bot_datas(&self) -> Vec<crate::web_server::BotData> { + let len = self.connected_bots.len(); let mut result = Vec::with_capacity(len); - for (name, bot) in &music_bots.connected_bots { - let bot_data = crate::web_server::BotData { - name: name.clone(), - state: bot.state(), - volume: bot.volume(), - position: bot.position(), - currently_playing: bot.currently_playing(), - playlist: bot.playlist_to_vec(), - }; - + for bot in self.connected_bots.values() { + let bot_data = bot.send(GetBotData).await.unwrap(); result.push(bot_data); } @@ -261,24 +196,96 @@ impl MasterBot { } pub fn bot_names(&self) -> Vec<String> { - let music_bots = self.music_bots.read().unwrap(); - - let len = music_bots.connected_bots.len(); + let len = self.connected_bots.len(); let mut result = Vec::with_capacity(len); - for name in music_bots.connected_bots.keys() { + for name in self.connected_bots.keys() { result.push(name.clone()); } result } - pub fn quit(&self, reason: String) { - let music_bots = self.music_bots.read().unwrap(); - for bot in music_bots.connected_bots.values() { - bot.quit(reason.clone()) + fn on_bot_disconnect(&mut self, name: String, id: Identity) { + self.connected_bots.remove(&name); + self.available_names.push(name); + self.available_ids.push(id); + } + + pub async fn quit(&mut self, reason: String) -> Result<(), tsclientlib::Error> { + let futures = self + .connected_bots + .values() + .map(|b| b.send(Quit(reason.clone()))); + for res in future::join_all(futures).await { + if let Err(e) = res { + error!(self.logger, "Failed to shut down bot"; "error" => %e); + } } - let sender = self.sender.read().unwrap(); - sender.send(MusicBotMessage::Quit(reason)).unwrap(); + self.teamspeak.disconnect(&reason).await + } +} + +#[async_trait] +impl Actor for MasterBot { + async fn started(&mut self, ctx: &mut Context<Self>) { + self.my_addr = Some(ctx.address().unwrap().downgrade()); + } +} + +pub struct Connect(pub ConnectOptions); +impl Message for Connect { + type Result = Result<(), tsclientlib::Error>; +} + +#[async_trait] +impl Handler<Connect> for MasterBot { + async fn handle( + &mut self, + opt: Connect, + ctx: &mut Context<Self>, + ) -> Result<(), tsclientlib::Error> { + let addr = ctx.address().unwrap(); + self.teamspeak.connect_for_bot(opt.0, addr.downgrade())?; + Ok(()) + } +} + +pub struct Quit(pub String); +impl Message for Quit { + type Result = Result<(), tsclientlib::Error>; +} + +#[async_trait] +impl Handler<Quit> for MasterBot { + async fn handle(&mut self, q: Quit, _: &mut Context<Self>) -> Result<(), tsclientlib::Error> { + self.quit(q.0).await + } +} + +pub struct BotDisonnected { + pub name: String, + pub identity: Identity, +} + +impl Message for BotDisonnected { + type Result = (); +} + +#[async_trait] +impl Handler<BotDisonnected> for MasterBot { + async fn handle(&mut self, dc: BotDisonnected, _: &mut Context<Self>) { + self.on_bot_disconnect(dc.name, dc.identity); + } +} + +#[async_trait] +impl Handler<MusicBotMessage> for MasterBot { + async fn handle( + &mut self, + msg: MusicBotMessage, + _: &mut Context<Self>, + ) -> Result<(), AudioPlayerError> { + self.on_message(msg).await } } @@ -313,31 +320,10 @@ impl std::fmt::Display for BotCreationError { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct MasterArgs { - #[serde(default = "default_name")] - pub master_name: String, - #[serde(default = "default_local")] - pub local: bool, - pub address: String, - pub channel: Option<String>, - #[serde(default = "default_verbose")] - pub verbose: u8, - pub domain: String, - pub bind_address: String, - pub names: Vec<String>, - pub id: Option<Identity>, - pub ids: Option<Vec<Identity>>, -} - fn default_name() -> String { String::from("PokeBot") } -fn default_local() -> bool { - false -} - fn default_verbose() -> u8 { 0 } @@ -345,7 +331,6 @@ fn default_verbose() -> u8 { impl MasterArgs { pub fn merge(self, args: Args) -> Self { let address = args.address.unwrap_or(self.address); - let local = args.local || self.local; let channel = args.master_channel.or(self.channel); let verbose = if args.verbose > 0 { args.verbose @@ -357,7 +342,6 @@ impl MasterArgs { master_name: self.master_name, names: self.names, ids: self.ids, - local, address, domain: self.domain, bind_address: self.bind_address, @@ -371,8 +355,5 @@ impl MasterArgs { pub struct MasterConfig { pub master_name: String, pub address: String, - pub names: Vec<String>, - pub ids: Vec<Identity>, - pub local: bool, pub verbose: u8, } |
