diff options
Diffstat (limited to 'src/bot')
| -rw-r--r-- | src/bot/master.rs | 363 | ||||
| -rw-r--r-- | src/bot/music.rs | 500 |
2 files changed, 445 insertions, 418 deletions
diff --git a/src/bot/master.rs b/src/bot/master.rs index 1480e17..94332ac 100644 --- a/src/bot/master.rs +++ b/src/bot/master.rs @@ -1,41 +1,52 @@ use std::collections::HashMap; -use std::future::Future; -use std::sync::{Arc, RwLock}; -use log::info; +use async_trait::async_trait; +use futures::future; use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc::UnboundedSender; -use tsclientlib::{ClientId, Connection, Identity, MessageTarget}; +use slog::{error, info, o, trace, Logger}; +use tsclientlib::{ClientId, ConnectOptions, Connection, Identity, MessageTarget}; +use xtra::{spawn::Tokio, Actor, Address, Context, Handler, Message, WeakAddress}; use crate::audio_player::AudioPlayerError; use crate::teamspeak::TeamSpeakConnection; use crate::Args; -use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage}; +use crate::bot::{GetBotData, GetChannel, GetName, MusicBot, MusicBotArgs, MusicBotMessage}; pub struct MasterBot { - config: Arc<MasterConfig>, - music_bots: Arc<RwLock<MusicBots>>, + config: MasterConfig, + my_addr: Option<WeakAddress<Self>>, teamspeak: TeamSpeakConnection, - sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, + available_names: Vec<String>, + available_ids: Vec<Identity>, + connected_bots: HashMap<String, Address<MusicBot>>, + rng: SmallRng, + logger: Logger, } -struct MusicBots { - rng: SmallRng, - available_names: Vec<usize>, - available_ids: Vec<usize>, - connected_bots: HashMap<String, Arc<MusicBot>>, +#[derive(Debug, Serialize, Deserialize)] +pub struct MasterArgs { + #[serde(default = "default_name")] + pub master_name: String, + pub address: String, + pub channel: Option<String>, + #[serde(default = "default_verbose")] + pub verbose: u8, + pub domain: String, + pub bind_address: String, + pub names: Vec<String>, + pub id: Option<Identity>, + pub ids: Option<Vec<Identity>>, } impl MasterBot { - pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let tx = Arc::new(RwLock::new(tx)); - info!("Starting in TeamSpeak mode"); + pub async fn spawn(args: MasterArgs, logger: Logger) -> Address<Self> { + info!(logger, "Starting in TeamSpeak mode"); let mut con_config = Connection::build(args.address.clone()) + .logger(logger.clone()) .version(tsclientlib::Version::Linux_3_3_2) .name(args.master_name.clone()) .identity(args.id.expect("identity should exist")) @@ -47,168 +58,116 @@ impl MasterBot { con_config = con_config.channel(channel); } - let connection = TeamSpeakConnection::new(tx.clone(), con_config) - .await - .unwrap(); + let connection = TeamSpeakConnection::new(logger.clone()).await.unwrap(); + trace!(logger, "Created teamspeak connection"); - let config = Arc::new(MasterConfig { + let config = MasterConfig { master_name: args.master_name, address: args.address, - names: args.names, - ids: args.ids.expect("identies should exists"), - local: args.local, verbose: args.verbose, - }); - - let name_count = config.names.len(); - let id_count = config.ids.len(); + }; - let music_bots = Arc::new(RwLock::new(MusicBots { + let bot_addr = Self { + config, + my_addr: None, + teamspeak: connection, + logger: logger.clone(), rng: SmallRng::from_entropy(), - available_names: (0..name_count).collect(), - available_ids: (0..id_count).collect(), + available_names: args.names, + available_ids: args.ids.expect("identities"), connected_bots: HashMap::new(), - })); + } + .create(None) + .spawn(&mut Tokio::Global); - let bot = Arc::new(Self { - config, - music_bots, - teamspeak: connection, - sender: tx.clone(), - }); - - let cbot = bot.clone(); - let msg_loop = async move { - 'outer: loop { - while let Some(msg) = rx.recv().await { - match msg { - MusicBotMessage::Quit(reason) => { - let mut cteamspeak = cbot.teamspeak.clone(); - cteamspeak.disconnect(&reason).await; - break 'outer; - } - MusicBotMessage::ClientDisconnected { id, .. } => { - if id == cbot.my_id().await { - // TODO Reconnect since quit was not called - break 'outer; - } - } - _ => cbot.on_message(msg).await.unwrap(), - } - } - } - }; + bot_addr.send(Connect(con_config)).await.unwrap().unwrap(); + trace!(logger, "Spawned master bot actor"); - (bot, msg_loop) + bot_addr } - async fn build_bot_args_for(&self, id: ClientId) -> Result<MusicBotArgs, BotCreationError> { - let mut cteamspeak = self.teamspeak.clone(); - let channel = match cteamspeak.channel_of_user(id).await { + async fn bot_args_for_client( + &mut self, + user_id: ClientId, + ) -> Result<MusicBotArgs, BotCreationError> { + let channel = match self.teamspeak.channel_of_user(user_id).await { Some(channel) => channel, None => return Err(BotCreationError::UnfoundUser), }; - if channel == cteamspeak.my_channel().await { + if channel == self.teamspeak.current_channel().await.unwrap() { return Err(BotCreationError::MasterChannel( self.config.master_name.clone(), )); } - let MusicBots { - ref mut rng, - ref mut available_names, - ref mut available_ids, - ref connected_bots, - } = &mut *self.music_bots.write().expect("RwLock was not poisoned"); - - for bot in connected_bots.values() { - if bot.my_channel().await == channel { - return Err(BotCreationError::MultipleBots(bot.name().to_owned())); + for bot in self.connected_bots.values() { + if bot.send(GetChannel).await.unwrap() == Some(channel) { + return Err(BotCreationError::MultipleBots( + bot.send(GetName).await.unwrap(), + )); } } - let channel_path = cteamspeak - .channel_path_of_user(id) + let channel_path = self + .teamspeak + .channel_path_of_user(user_id) .await .expect("can find poke sender"); - available_names.shuffle(rng); - let name_index = match available_names.pop() { + self.available_names.shuffle(&mut self.rng); + let name = match self.available_names.pop() { Some(v) => v, None => { return Err(BotCreationError::OutOfNames); } }; - let name = self.config.names[name_index].clone(); - available_ids.shuffle(rng); - let id_index = match available_ids.pop() { + self.available_ids.shuffle(&mut self.rng); + let identity = match self.available_ids.pop() { Some(v) => v, None => { return Err(BotCreationError::OutOfIdentities); } }; - let id = self.config.ids[id_index].clone(); - - let cmusic_bots = self.music_bots.clone(); - let disconnect_cb = Box::new(move |n, name_index, id_index| { - let mut music_bots = cmusic_bots.write().expect("RwLock was not poisoned"); - music_bots.connected_bots.remove(&n); - music_bots.available_names.push(name_index); - music_bots.available_ids.push(id_index); - }); - - info!("Connecting to {} on {}", channel_path, self.config.address); - Ok(MusicBotArgs { - name, - name_index, - id_index, - local: self.config.local, + name: name.clone(), + master: self.my_addr.clone(), address: self.config.address.clone(), - id, + identity, + local: false, channel: channel_path, verbose: self.config.verbose, - disconnect_cb, + logger: self.logger.new(o!("musicbot" => name)), }) } - async fn spawn_bot_for(&self, id: ClientId) { - match self.build_bot_args_for(id).await { + async fn spawn_bot_for_client(&mut self, id: ClientId) { + match self.bot_args_for_client(id).await { Ok(bot_args) => { - let (bot, fut) = MusicBot::new(bot_args).await; - tokio::spawn(fut); - let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned"); - music_bots - .connected_bots - .insert(bot.name().to_string(), bot); - } - Err(e) => { - let mut cteamspeak = self.teamspeak.clone(); - cteamspeak.send_message_to_user(id, e.to_string()).await + let name = bot_args.name.clone(); + let bot = MusicBot::spawn(bot_args).await; + self.connected_bots.insert(name, bot); } + Err(e) => self.teamspeak.send_message_to_user(id, e.to_string()).await, } } - async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { + async fn on_message(&mut self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { match message { MusicBotMessage::TextMessage(message) => { if let MessageTarget::Poke(who) = message.target { - info!("Poked by {}, creating bot for their channel", who); - self.spawn_bot_for(who).await; + info!( + self.logger, + "Poked, creating bot"; "user" => %who + ); + self.spawn_bot_for_client(who).await; } } - MusicBotMessage::ChannelAdded(id) => { - let mut cteamspeak = self.teamspeak.clone(); - cteamspeak.subscribe(id).await; - } MusicBotMessage::ClientAdded(id) => { - let mut cteamspeak = self.teamspeak.clone(); - - if id == cteamspeak.my_id().await { - cteamspeak + if id == self.teamspeak.my_id().await { + self.teamspeak .set_description(String::from("Poke me if you want a music bot!")) .await; } @@ -219,41 +178,17 @@ impl MasterBot { Ok(()) } - async fn my_id(&self) -> ClientId { - let mut cteamspeak = self.teamspeak.clone(); + pub async fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> { + let bot = self.connected_bots.get(&name)?; - cteamspeak.my_id().await + bot.send(GetBotData).await.ok() } - pub fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> { - let music_bots = self.music_bots.read().unwrap(); - let bot = music_bots.connected_bots.get(&name)?; - - Some(crate::web_server::BotData { - name, - state: bot.state(), - volume: bot.volume(), - position: bot.position(), - currently_playing: bot.currently_playing(), - playlist: bot.playlist_to_vec(), - }) - } - - pub fn bot_datas(&self) -> Vec<crate::web_server::BotData> { - let music_bots = self.music_bots.read().unwrap(); - - let len = music_bots.connected_bots.len(); + pub async fn bot_datas(&self) -> Vec<crate::web_server::BotData> { + let len = self.connected_bots.len(); let mut result = Vec::with_capacity(len); - for (name, bot) in &music_bots.connected_bots { - let bot_data = crate::web_server::BotData { - name: name.clone(), - state: bot.state(), - volume: bot.volume(), - position: bot.position(), - currently_playing: bot.currently_playing(), - playlist: bot.playlist_to_vec(), - }; - + for bot in self.connected_bots.values() { + let bot_data = bot.send(GetBotData).await.unwrap(); result.push(bot_data); } @@ -261,24 +196,96 @@ impl MasterBot { } pub fn bot_names(&self) -> Vec<String> { - let music_bots = self.music_bots.read().unwrap(); - - let len = music_bots.connected_bots.len(); + let len = self.connected_bots.len(); let mut result = Vec::with_capacity(len); - for name in music_bots.connected_bots.keys() { + for name in self.connected_bots.keys() { result.push(name.clone()); } result } - pub fn quit(&self, reason: String) { - let music_bots = self.music_bots.read().unwrap(); - for bot in music_bots.connected_bots.values() { - bot.quit(reason.clone()) + fn on_bot_disconnect(&mut self, name: String, id: Identity) { + self.connected_bots.remove(&name); + self.available_names.push(name); + self.available_ids.push(id); + } + + pub async fn quit(&mut self, reason: String) -> Result<(), tsclientlib::Error> { + let futures = self + .connected_bots + .values() + .map(|b| b.send(Quit(reason.clone()))); + for res in future::join_all(futures).await { + if let Err(e) = res { + error!(self.logger, "Failed to shut down bot"; "error" => %e); + } } - let sender = self.sender.read().unwrap(); - sender.send(MusicBotMessage::Quit(reason)).unwrap(); + self.teamspeak.disconnect(&reason).await + } +} + +#[async_trait] +impl Actor for MasterBot { + async fn started(&mut self, ctx: &mut Context<Self>) { + self.my_addr = Some(ctx.address().unwrap().downgrade()); + } +} + +pub struct Connect(pub ConnectOptions); +impl Message for Connect { + type Result = Result<(), tsclientlib::Error>; +} + +#[async_trait] +impl Handler<Connect> for MasterBot { + async fn handle( + &mut self, + opt: Connect, + ctx: &mut Context<Self>, + ) -> Result<(), tsclientlib::Error> { + let addr = ctx.address().unwrap(); + self.teamspeak.connect_for_bot(opt.0, addr.downgrade())?; + Ok(()) + } +} + +pub struct Quit(pub String); +impl Message for Quit { + type Result = Result<(), tsclientlib::Error>; +} + +#[async_trait] +impl Handler<Quit> for MasterBot { + async fn handle(&mut self, q: Quit, _: &mut Context<Self>) -> Result<(), tsclientlib::Error> { + self.quit(q.0).await + } +} + +pub struct BotDisonnected { + pub name: String, + pub identity: Identity, +} + +impl Message for BotDisonnected { + type Result = (); +} + +#[async_trait] +impl Handler<BotDisonnected> for MasterBot { + async fn handle(&mut self, dc: BotDisonnected, _: &mut Context<Self>) { + self.on_bot_disconnect(dc.name, dc.identity); + } +} + +#[async_trait] +impl Handler<MusicBotMessage> for MasterBot { + async fn handle( + &mut self, + msg: MusicBotMessage, + _: &mut Context<Self>, + ) -> Result<(), AudioPlayerError> { + self.on_message(msg).await } } @@ -313,31 +320,10 @@ impl std::fmt::Display for BotCreationError { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct MasterArgs { - #[serde(default = "default_name")] - pub master_name: String, - #[serde(default = "default_local")] - pub local: bool, - pub address: String, - pub channel: Option<String>, - #[serde(default = "default_verbose")] - pub verbose: u8, - pub domain: String, - pub bind_address: String, - pub names: Vec<String>, - pub id: Option<Identity>, - pub ids: Option<Vec<Identity>>, -} - fn default_name() -> String { String::from("PokeBot") } -fn default_local() -> bool { - false -} - fn default_verbose() -> u8 { 0 } @@ -345,7 +331,6 @@ fn default_verbose() -> u8 { impl MasterArgs { pub fn merge(self, args: Args) -> Self { let address = args.address.unwrap_or(self.address); - let local = args.local || self.local; let channel = args.master_channel.or(self.channel); let verbose = if args.verbose > 0 { args.verbose @@ -357,7 +342,6 @@ impl MasterArgs { master_name: self.master_name, names: self.names, ids: self.ids, - local, address, domain: self.domain, bind_address: self.bind_address, @@ -371,8 +355,5 @@ impl MasterArgs { pub struct MasterConfig { pub master_name: String, pub address: String, - pub names: Vec<String>, - pub ids: Vec<Identity>, - pub local: bool, pub verbose: u8, } diff --git a/src/bot/music.rs b/src/bot/music.rs index 90305d0..a57b66c 100644 --- a/src/bot/music.rs +++ b/src/bot/music.rs @@ -1,25 +1,22 @@ -use std::future::Future; -use std::io::BufRead; -use std::sync::{Arc, RwLock}; -use std::thread; -use std::time::Duration; +use async_trait::async_trait; -use log::{debug, info}; use serde::Serialize; +use slog::{debug, info, Logger}; use structopt::StructOpt; -use tokio::sync::mpsc::UnboundedSender; use tsclientlib::{data, ChannelId, ClientId, Connection, Identity, Invoker, MessageTarget}; +use xtra::{spawn::Tokio, Actor, Address, Context, Handler, Message, WeakAddress}; -use crate::audio_player::{AudioPlayer, AudioPlayerError, PollResult}; +use crate::audio_player::{AudioPlayer, AudioPlayerError}; +use crate::bot::{BotDisonnected, Connect, MasterBot, Quit}; use crate::command::Command; use crate::command::VolumeChange; use crate::playlist::Playlist; use crate::teamspeak as ts; -use crate::youtube_dl::AudioMetadata; +use crate::youtube_dl::{self, AudioMetadata}; use ts::TeamSpeakConnection; #[derive(Debug)] -pub struct Message { +pub struct ChatMessage { pub target: MessageTarget, pub invoker: Invoker, pub text: String, @@ -33,6 +30,10 @@ pub enum State { EndOfStream, } +impl Message for State { + type Result = (); +} + impl std::fmt::Display for State { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { match self { @@ -47,7 +48,7 @@ impl std::fmt::Display for State { #[derive(Debug)] pub enum MusicBotMessage { - TextMessage(Message), + TextMessage(ChatMessage), ClientChannel { client: ClientId, old_channel: ChannelId, @@ -59,112 +60,95 @@ pub enum MusicBotMessage { client: Box<data::Client>, }, StateChange(State), - Quit(String), +} + +impl Message for MusicBotMessage { + type Result = Result<(), AudioPlayerError>; } pub struct MusicBot { name: String, - player: Arc<AudioPlayer>, + identity: Identity, + player: AudioPlayer, teamspeak: Option<TeamSpeakConnection>, - playlist: Arc<RwLock<Playlist>>, - state: Arc<RwLock<State>>, + master: Option<WeakAddress<MasterBot>>, + playlist: Playlist, + state: State, + logger: Logger, } pub struct MusicBotArgs { pub name: String, - pub name_index: usize, - pub id_index: usize, + pub master: Option<WeakAddress<MasterBot>>, pub local: bool, pub address: String, - pub id: Identity, + pub identity: Identity, pub channel: String, pub verbose: u8, - pub disconnect_cb: Box<dyn FnMut(String, usize, usize) + Send + Sync>, + pub logger: Logger, } impl MusicBot { - pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future<Output = ()>) { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let tx = Arc::new(RwLock::new(tx)); - let (player, connection) = if args.local { - info!("Starting in CLI mode"); - let audio_player = AudioPlayer::new(tx.clone(), None).unwrap(); - - (audio_player, None) - } else { - info!("Starting in TeamSpeak mode"); - - let con_config = Connection::build(args.address) - .version(tsclientlib::Version::Linux_3_3_2) - .name(format!("🎵 {}", args.name)) - .identity(args.id) - .log_commands(args.verbose >= 1) - .log_packets(args.verbose >= 2) - .log_udp_packets(args.verbose >= 3) - .channel(args.channel); - - let connection = TeamSpeakConnection::new(tx.clone(), con_config) - .await - .unwrap(); - let mut cconnection = connection.clone(); - let audio_player = AudioPlayer::new( - tx.clone(), - Some(Box::new(move |samples| { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(cconnection.send_audio_packet(samples)); - })), - ) - .unwrap(); - - (audio_player, Some(connection)) - }; - + pub async fn spawn(args: MusicBotArgs) -> Address<Self> { + let mut player = AudioPlayer::new(args.logger.clone()).unwrap(); player.change_volume(VolumeChange::Absolute(0.5)).unwrap(); - let player = Arc::new(player); - let playlist = Arc::new(RwLock::new(Playlist::new())); - spawn_gstreamer_thread(player.clone(), tx.clone()); + let playlist = Playlist::new(args.logger.clone()); - if args.local { - spawn_stdin_reader(tx); - } + let teamspeak = if args.local { + info!(args.logger, "Starting in CLI mode"); + player.setup_with_audio_callback(None).unwrap(); - let bot = Arc::new(Self { + None + } else { + Some(TeamSpeakConnection::new(args.logger.clone()).await.unwrap()) + }; + let bot = Self { name: args.name.clone(), + master: args.master, + identity: args.identity.clone(), player, - teamspeak: connection, + teamspeak, playlist, - state: Arc::new(RwLock::new(State::EndOfStream)), - }); - - let cbot = bot.clone(); - let mut disconnect_cb = args.disconnect_cb; - let name = args.name; - let name_index = args.name_index; - let id_index = args.id_index; - let msg_loop = async move { - 'outer: loop { - while let Some(msg) = rx.recv().await { - if let MusicBotMessage::Quit(reason) = msg { - if let Some(ts) = &cbot.teamspeak { - let mut ts = ts.clone(); - ts.disconnect(&reason).await; - } - disconnect_cb(name, name_index, id_index); - break 'outer; - } - cbot.on_message(msg).await.unwrap(); - } - } - debug!("Left message loop"); + state: State::EndOfStream, + logger: args.logger.clone(), }; - bot.update_name(State::EndOfStream).await; + let bot_addr = bot.create(None).spawn(&mut Tokio::Global); + + info!( + args.logger, + "Connecting"; + "name" => &args.name, + "channel" => &args.channel, + "address" => &args.address, + ); + + let opt = Connection::build(args.address) + .logger(args.logger.clone()) + .version(tsclientlib::Version::Linux_3_3_2) + .name(format!("🎵 {}", args.name)) + .identity(args.identity) + .log_commands(args.verbose >= 1) + .log_packets(args.verbose >= 2) + .log_udp_packets(args.verbose >= 3) + .channel(args.channel); + bot_addr.send(Connect(opt)).await.unwrap().unwrap(); + bot_addr + .send(MusicBotMessage::StateChange(State::EndOfStream)) + .await + .unwrap() + .unwrap(); + + if args.local { + debug!(args.logger, "Spawning stdin reader thread"); + spawn_stdin_reader(bot_addr.downgrade()); + } - (bot, msg_loop) + bot_addr } - async fn start_playing_audio(&self, metadata: AudioMetadata) { + async fn start_playing_audio(&mut self, metadata: AudioMetadata) { let duration = if let Some(duration) = metadata.duration { format!("({})", ts::bold(&humantime::format_duration(duration))) } else { @@ -184,25 +168,16 @@ impl MusicBot { self.player.play().unwrap(); } - pub async fn add_audio(&self, url: String, user: String) { - match crate::youtube_dl::get_audio_download_from_url(url).await { + pub async fn add_audio(&mut self, url: String, user: String) { + match youtube_dl::get_audio_download_from_url(url, &self.logger).await { Ok(mut metadata) => { metadata.added_by = user; - info!("Found audio url: {}", metadata.url); + info!(self.logger, "Found source"; "url" => &metadata.url); - // RWLockGuard can not be kept around or the compiler complains that - // it might cross the await boundary - self.playlist - .write() - .expect("RwLock was not poisoned") - .push(metadata.clone()); + self.playlist.push(metadata.clone()); if !self.player.is_started() { - let entry = self - .playlist - .write() - .expect("RwLock was not poisoned") - .pop(); + let entry = self.playlist.pop(); if let Some(request) = entry { self.start_playing_audio(request).await; } @@ -222,7 +197,7 @@ impl MusicBot { } } Err(e) => { - info!("Failed to find audio url: {}", e); + info!(self.logger, "Failed to find audio url"; "error" => &e); self.send_message(format!("Failed to find url: {}", e)) .await; @@ -235,74 +210,50 @@ impl MusicBot { } pub fn state(&self) -> State { - *self.state.read().expect("RwLock was not poisoned") + self.state } - pub fn volume(&self) -> f64 { + pub async fn volume(&self) -> f64 { self.player.volume() } - pub fn position(&self) -> Option<Duration> { - self.player.position() - } - - pub fn currently_playing(&self) -> Option<AudioMetadata> { - self.player.currently_playing() - } + pub async fn current_channel(&mut self) -> Option<ChannelId> { + let ts = self.teamspeak.as_mut().expect("current_channel needs ts"); - pub fn playlist_to_vec(&self) -> Vec<AudioMetadata> { - self.playlist.read().unwrap().to_vec() + ts.current_channel().await } - pub async fn my_channel(&self) -> ChannelId { - let ts = self.teamspeak.as_ref().expect("my_channel needs ts"); - - let mut ts = ts.clone(); - ts.my_channel().await - } + async fn user_count(&mut self, channel: ChannelId) -> u32 { + let ts = self.teamspeak.as_mut().expect("user_count needs ts"); - async fn user_count(&self, channel: ChannelId) -> u32 { - let ts = self.teamspeak.as_ref().expect("user_count needs ts"); - - let mut ts = ts.clone(); ts.user_count(channel).await } - async fn send_message(&self, text: String) { - debug!("Sending message to TeamSpeak: {}", text); + async fn send_message(&mut self, text: String) { + debug!(self.logger, "Sending message to TeamSpeak"; "message" => &text); - if let Some(ts) = &self.teamspeak { - let mut ts = ts.clone(); + if let Some(ts) = &mut self.teamspeak { ts.send_message_to_channel(text).await; } } - async fn set_nickname(&self, name: String) { - info!("Setting TeamSpeak nickname: {}", name); + async fn set_nickname(&mut self, name: String) { + info!(self.logger, "Setting TeamSpeak nickname"; "name" => &name); - if let Some(ts) = &self.teamspeak { - let mut ts = ts.clone(); + if let Some(ts) = &mut self.teamspeak { ts.set_nickname(name).await; } } - async fn set_description(&self, desc: String) { - info!("Setting TeamSpeak description: {}", desc); + async fn set_description(&mut self, desc: String) { + info!(self.logger, "Setting TeamSpeak description"; "description" => &desc); - if let Some(ts) = &self.teamspeak { - let mut ts = ts.clone(); + if let Some(ts) = &mut self.teamspeak { ts.set_description(desc).await; } } - async fn subscribe(&self, id: ChannelId) { - if let Some(ts) = &self.teamspeak { - let mut ts = ts.clone(); - ts.subscribe(id).await; - } - } - - async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> { + async fn on_text(&mut self, message: ChatMessage) -> Result<(), AudioPlayerError> { let msg = message.text; if msg.starts_with('!') { let tokens = msg[1..].split_whitespace().collect::<Vec<_>>(); @@ -319,13 +270,15 @@ impl MusicBot { Ok(()) } - async fn on_command(&self, command: Command, invoker: Invoker) -> Result<(), AudioPlayerError> { + async fn on_command( + &mut self, + command: Command, + invoker: Invoker, + ) -> Result<(), AudioPlayerError> { match command { Command::Play => { - let playlist = self.playlist.read().expect("RwLock was not poisoned"); - if !self.player.is_started() { - if !playlist.is_empty() { + if !self.playlist.is_empty() { self.player.stop_current()?; } } else { @@ -357,35 +310,32 @@ impl MusicBot { } } Command::Next => { - let playlist = self.playlist.read().expect("RwLock was not poisoned"); - if !playlist.is_empty() { - info!("Skipping to next track"); + if !self.playlist.is_empty() { + info!(self.logger, "Skipping to next track"); self.player.stop_current()?; } else { - info!("Playlist empty, cannot skip"); + info!(self.logger, "Playlist empty, cannot skip"); self.player.reset()?; } } Command::Clear => { - self.playlist - .write() - .expect("RwLock was not poisoned") - .clear(); + self.send_message(String::from("Cleared playlist")).await; + self.playlist.clear(); } Command::Volume { volume } => { self.player.change_volume(volume)?; self.update_name(self.state()).await; } Command::Leave => { - self.quit(String::from("Leaving")); + self.quit(String::from("Leaving"), true).await.unwrap(); } } Ok(()) } - async fn update_name(&self, state: State) { - let volume = (self.volume() * 100.0).round(); + async fn update_name(&mut self, state: State) { + let volume = (self.volume().await * 100.0).round(); let name = match state { State::EndOfStream => format!("🎵 {} ({}%)", self.name, volume), _ => format!("🎵 {} - {} ({}%)", self.name, state, volume), @@ -393,43 +343,39 @@ impl MusicBot { self.set_nickname(name).await; } - async fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { - let current_state = *self.state.read().unwrap(); - if current_state != state { - match state { + async fn on_state(&mut self, new_state: State) -> Result<(), AudioPlayerError> { + if self.state != new_state { + match new_state { State::EndOfStream => { - let next_track = self - .playlist - .write() - .expect("RwLock was not poisoned") - .pop(); + self.player.reset()?; + let next_track = self.playlist.pop(); if let Some(request) = next_track { - info!("Advancing playlist"); + info!(self.logger, "Advancing playlist"); self.start_playing_audio(request).await; } else { - self.update_name(state).await; + self.update_name(new_state).await; self.set_description(String::new()).await; } } State::Stopped => { - if current_state != State::EndOfStream { - self.update_name(state).await; + if self.state != State::EndOfStream { + self.update_name(new_state).await; self.set_description(String::new()).await; } } - _ => self.update_name(state).await, + _ => self.update_name(new_state).await, } } - if !(current_state == State::EndOfStream && state == State::Stopped) { - *self.state.write().unwrap() = state; + if !(self.state == State::EndOfStream && new_state == State::Stopped) { + self.state = new_state; } Ok(()) } - async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { + async fn on_message(&mut self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { match message { MusicBotMessage::TextMessage(message) => { if MessageTarget::Channel == message.target { @@ -446,9 +392,6 @@ impl MusicBot { let old_channel = client.channel; self.on_client_left_channel(old_channel).await; } - MusicBotMessage::ChannelAdded(id) => { - self.subscribe(id).await; - } MusicBotMessage::StateChange(state) => { self.on_state(state).await?; } @@ -458,60 +401,163 @@ impl MusicBot { Ok(()) } - async fn on_client_left_channel(&self, old_channel: ChannelId) { - let my_channel = self.my_channel().await; - if old_channel == my_channel && self.user_count(my_channel).await <= 1 { - self.quit(String::from("Channel is empty")); + // FIXME logs an error if this music bot is the one leaving + async fn on_client_left_channel(&mut self, old_channel: ChannelId) { + let current_channel = match self.current_channel().await { + Some(c) => c, + None => { + return; + } + }; + if old_channel == current_channel && self.user_count(current_channel).await <= 1 { + self.quit(String::from("Channel is empty"), true) + .await + .unwrap(); + } + } + + pub async fn quit( + &mut self, + reason: String, + inform_master: bool, + ) -> Result<(), tsclientlib::Error> { + // FIXME logs errors if the bot is playing something because it tries to + // change its name and description + self.player.reset().unwrap(); + + let ts = self.teamspeak.as_mut().unwrap(); + ts.disconnect(&reason).await?; + + if inform_master { + if let Some(master) = &self.master { + master + .send(BotDisonnected { + name: self.name.clone(), + identity: self.identity.clone(), + }) + .await + .unwrap(); + } + } + + Ok(()) + } +} + +#[async_trait] +impl Actor for MusicBot { + async fn started(&mut self, ctx: &mut Context<Self>) { + let addr = ctx.address().unwrap().downgrade(); + self.player.register_bot(addr); + } +} + +#[async_trait] +impl Handler<Connect> for MusicBot { + async fn handle( + &mut self, + opt: Connect, + ctx: &mut Context<Self>, + ) -> Result<(), tsclientlib::Error> { + let addr = ctx.address().unwrap().downgrade(); + self.teamspeak + .as_mut() + .unwrap() + .connect_for_bot(opt.0, addr)?; + + let mut connection = self.teamspeak.as_ref().unwrap().clone(); + let handle = tokio::runtime::Handle::current(); + self.player + .setup_with_audio_callback(Some(Box::new(move |samples| { + handle.block_on(connection.send_audio_packet(samples)); + }))) + .unwrap(); + + Ok(()) + } +} + +pub struct GetName; +impl Message for GetName { + type Result = String; +} + +#[async_trait] +impl Handler<GetName> for MusicBot { + async fn handle(&mut self, _: GetName, _: &mut Context<Self>) -> String { + self.name().to_owned() + } +} + +pub struct GetBotData; +impl Message for GetBotData { + type Result = crate::web_server::BotData; +} + +#[async_trait] +impl Handler<GetBotData> for MusicBot { + async fn handle(&mut self, _: GetBotData, _: &mut Context<Self>) -> crate::web_server::BotData { + crate::web_server::BotData { + name: self.name.clone(), + playlist: self.playlist.to_vec(), + currently_playing: self.player.currently_playing(), + position: self.player.position(), + state: self.state(), + volume: self.volume().await, } } +} - pub fn quit(&self, reason: String) { - self.player.quit(reason); +pub struct GetChannel; +impl Message for GetChannel { + type Result = Option<ChannelId>; +} + +#[async_trait] +impl Handler<GetChannel> for MusicBot { + async fn handle(&mut self, _: GetChannel, _: &mut Context<Self>) -> Option<ChannelId> { + self.current_channel().await } } -fn spawn_stdin_reader(tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>) { - debug!("Spawning stdin reader thread"); - thread::Builder::new() - .name(String::from("stdin reader")) - .spawn(move || { - let stdin = ::std::io::stdin(); - let lock = stdin.lock(); - for line in lock.lines() { - let line = line.unwrap(); - - let message = MusicBotMessage::TextMessage(Message { - target: MessageTarget::Channel, - invoker: Invoker { - name: String::from("stdin"), - id: ClientId(0), - uid: None, - }, - text: line, - }); - - let tx = tx.read().unwrap(); - tx.send(message).unwrap(); - } - }) - .expect("Failed to spawn stdin reader thread"); +#[async_trait] +impl Handler<Quit> for MusicBot { + async fn handle(&mut self, q: Quit, _: &mut Context<Self>) -> Result<(), tsclientlib::Error> { + self.quit(q.0, false).await + } } -fn spawn_gstreamer_thread( - player: Arc<AudioPlayer>, - tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, -) { - thread::Builder::new() - .name(String::from("gstreamer polling")) - .spawn(move || loop { - if player.poll() == PollResult::Quit { - break; - } +#[async_trait] +impl Handler<MusicBotMessage> for MusicBot { + async fn handle( + &mut self, + msg: MusicBotMessage, + _: &mut Context<Self>, + ) -> Result<(), AudioPlayerError> { + self.on_message(msg).await + } +} - tx.read() - .unwrap() - .send(MusicBotMessage::StateChange(State::EndOfStream)) - .unwrap(); - }) - .expect("Failed to spawn gstreamer thread"); +fn spawn_stdin_reader(addr: WeakAddress<MusicBot>) { + use tokio::io::AsyncBufReadExt; + + tokio::task::spawn(async move { + let stdin = tokio::io::stdin(); + let reader = tokio::io::BufReader::new(stdin); + let mut lines = reader.lines(); + + while let Some(line) = lines.next_line().await.unwrap() { + let message = MusicBotMessage::TextMessage(ChatMessage { + target: MessageTarget::Channel, + invoker: Invoker { + name: String::from("stdin"), + id: ClientId(0), + uid: None, + }, + text: line, + }); + + addr.send(message).await.unwrap().unwrap(); + } + }); } |
