diff options
Diffstat (limited to 'src/bot')
| -rw-r--r-- | src/bot/master.rs | 68 | ||||
| -rw-r--r-- | src/bot/music.rs | 176 |
2 files changed, 138 insertions, 106 deletions
diff --git a/src/bot/master.rs b/src/bot/master.rs index dad2bed..fe3c3fe 100644 --- a/src/bot/master.rs +++ b/src/bot/master.rs @@ -2,12 +2,10 @@ use std::collections::HashMap; use std::future::Future; use std::sync::{Arc, RwLock}; -use futures::future::{FutureExt, TryFutureExt}; -use futures01::future::Future as Future01; use log::info; use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; use serde::{Deserialize, Serialize}; -use tokio02::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::UnboundedSender; use tsclientlib::{ClientId, ConnectOptions, Identity, MessageTarget}; use crate::audio_player::AudioPlayerError; @@ -20,7 +18,7 @@ use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage}; pub struct MasterBot { config: Arc<MasterConfig>, music_bots: Arc<RwLock<MusicBots>>, - teamspeak: Arc<TeamSpeakConnection>, + teamspeak: TeamSpeakConnection, sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, } @@ -33,7 +31,7 @@ struct MusicBots { impl MasterBot { pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) { - let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let tx = Arc::new(RwLock::new(tx)); info!("Starting in TeamSpeak mode"); @@ -49,11 +47,9 @@ impl MasterBot { con_config = con_config.channel(channel); } - let connection = Arc::new( - TeamSpeakConnection::new(tx.clone(), con_config) - .await - .unwrap(), - ); + let connection = TeamSpeakConnection::new(tx.clone(), con_config) + .await + .unwrap(); let config = Arc::new(MasterConfig { master_name: args.master_name, @@ -81,24 +77,22 @@ impl MasterBot { sender: tx.clone(), }); - bot.teamspeak - .set_description("Poke me if you want a music bot!"); - let cbot = bot.clone(); let msg_loop = async move { 'outer: loop { while let Some(msg) = rx.recv().await { match msg { MusicBotMessage::Quit(reason) => { - cbot.teamspeak.disconnect(&reason); + let mut cteamspeak = cbot.teamspeak.clone(); + cteamspeak.disconnect(&reason).await; break 'outer; } MusicBotMessage::ClientDisconnected { id, .. } => { - if id == cbot.my_id() { + if id == cbot.my_id().await { // TODO Reconnect since quit was not called break 'outer; } - }, + } _ => cbot.on_message(msg).await.unwrap(), } } @@ -108,13 +102,14 @@ impl MasterBot { (bot, msg_loop) } - fn build_bot_args_for(&self, id: ClientId) -> Result<MusicBotArgs, BotCreationError> { - let channel = match self.teamspeak.channel_of_user(id) { + async fn build_bot_args_for(&self, id: ClientId) -> Result<MusicBotArgs, BotCreationError> { + let mut cteamspeak = self.teamspeak.clone(); + let channel = match cteamspeak.channel_of_user(id).await { Some(channel) => channel, None => return Err(BotCreationError::UnfoundUser), }; - if channel == self.teamspeak.my_channel() { + if channel == cteamspeak.my_channel().await { return Err(BotCreationError::MasterChannel( self.config.master_name.clone(), )); @@ -128,14 +123,14 @@ impl MasterBot { } = &mut *self.music_bots.write().expect("RwLock was not poisoned"); for bot in connected_bots.values() { - if bot.my_channel() == channel { + if bot.my_channel().await == channel { return Err(BotCreationError::MultipleBots(bot.name().to_owned())); } } - let channel_path = self - .teamspeak + let channel_path = cteamspeak .channel_path_of_user(id) + .await .expect("can find poke sender"); available_names.shuffle(rng); @@ -181,16 +176,19 @@ impl MasterBot { } async fn spawn_bot_for(&self, id: ClientId) { - match self.build_bot_args_for(id) { + match self.build_bot_args_for(id).await { Ok(bot_args) => { let (bot, fut) = MusicBot::new(bot_args).await; - tokio::spawn(fut.unit_error().boxed().compat().map(|_| ())); + tokio::spawn(fut); let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned"); music_bots .connected_bots .insert(bot.name().to_string(), bot); } - Err(e) => self.teamspeak.send_message_to_user(id, &e.to_string()), + Err(e) => { + let mut cteamspeak = self.teamspeak.clone(); + cteamspeak.send_message_to_user(id, e.to_string()).await + } } } @@ -202,9 +200,19 @@ impl MasterBot { self.spawn_bot_for(who).await; } } - MusicBotMessage::ChannelCreated(_) => { + MusicBotMessage::ChannelAdded(_) => { // TODO Only subscribe to one channel - self.teamspeak.subscribe_all(); + let mut cteamspeak = self.teamspeak.clone(); + cteamspeak.subscribe_all().await; + } + MusicBotMessage::ClientAdded(id) => { + let mut cteamspeak = self.teamspeak.clone(); + + if id == cteamspeak.my_id().await { + cteamspeak + .set_description(String::from("Poke me if you want a music bot!")) + .await; + } } _ => (), } @@ -212,8 +220,10 @@ impl MasterBot { Ok(()) } - fn my_id(&self) -> ClientId { - self.teamspeak.my_id() + async fn my_id(&self) -> ClientId { + let mut cteamspeak = self.teamspeak.clone(); + + cteamspeak.my_id().await } pub fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> { diff --git a/src/bot/music.rs b/src/bot/music.rs index 71e7b58..656a169 100644 --- a/src/bot/music.rs +++ b/src/bot/music.rs @@ -7,7 +7,7 @@ use std::time::Duration; use log::{debug, info}; use serde::Serialize; use structopt::StructOpt; -use tokio02::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::UnboundedSender; use tsclientlib::{data, ChannelId, ClientId, ConnectOptions, Identity, Invoker, MessageTarget}; use crate::audio_player::{AudioPlayer, AudioPlayerError, PollResult}; @@ -52,7 +52,8 @@ pub enum MusicBotMessage { client: ClientId, old_channel: ChannelId, }, - ChannelCreated(ChannelId), + ChannelAdded(ChannelId), + ClientAdded(ClientId), ClientDisconnected { id: ClientId, client: Box<data::Client>, @@ -64,7 +65,7 @@ pub enum MusicBotMessage { pub struct MusicBot { name: String, player: Arc<AudioPlayer>, - teamspeak: Option<Arc<TeamSpeakConnection>>, + teamspeak: Option<TeamSpeakConnection>, playlist: Arc<RwLock<Playlist>>, state: Arc<RwLock<State>>, } @@ -82,8 +83,8 @@ pub struct MusicBotArgs { } impl MusicBot { - pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) { - let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); + pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future<Output = ()>) { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let tx = Arc::new(RwLock::new(tx)); let (player, connection) = if args.local { info!("Starting in CLI mode"); @@ -102,16 +103,15 @@ impl MusicBot { .log_udp_packets(args.verbose >= 3) .channel(args.channel); - let connection = Arc::new( - TeamSpeakConnection::new(tx.clone(), con_config) - .await - .unwrap(), - ); - let cconnection = connection.clone(); + let connection = TeamSpeakConnection::new(tx.clone(), con_config) + .await + .unwrap(); + let mut cconnection = connection.clone(); let audio_player = AudioPlayer::new( tx.clone(), Some(Box::new(move |samples| { - cconnection.send_audio_packet(samples); + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(cconnection.send_audio_packet(samples)); })), ) .unwrap(); @@ -146,7 +146,10 @@ impl MusicBot { 'outer: loop { while let Some(msg) = rx.recv().await { if let MusicBotMessage::Quit(reason) = msg { - cbot.with_teamspeak(|ts| ts.disconnect(&reason)); + if let Some(ts) = &cbot.teamspeak { + let mut ts = ts.clone(); + ts.disconnect(&reason).await; + } disconnect_cb(name, name_index, id_index); break 'outer; } @@ -156,31 +159,26 @@ impl MusicBot { debug!("Left message loop"); }; - bot.update_name(State::EndOfStream); + bot.update_name(State::EndOfStream).await; (bot, msg_loop) } - #[inline(always)] - fn with_teamspeak<F: Fn(&TeamSpeakConnection)>(&self, func: F) { - if let Some(ts) = &self.teamspeak { - func(&ts); - } - } - - fn start_playing_audio(&self, metadata: AudioMetadata) { + async fn start_playing_audio(&self, metadata: AudioMetadata) { let duration = if let Some(duration) = metadata.duration { format!("({})", ts::bold(&humantime::format_duration(duration))) } else { format!("") }; - self.send_message(&format!( + self.send_message(format!( "Playing {} {}", ts::underline(&metadata.title), duration - )); - self.set_description(&format!("Currently playing '{}'", metadata.title)); + )) + .await; + self.set_description(format!("Currently playing '{}'", metadata.title)) + .await; self.player.reset().unwrap(); self.player.set_metadata(metadata).unwrap(); self.player.play().unwrap(); @@ -192,12 +190,21 @@ impl MusicBot { metadata.added_by = user; info!("Found audio url: {}", metadata.url); - let mut playlist = self.playlist.write().expect("RwLock was not poisoned"); - playlist.push(metadata.clone()); + // RWLockGuard can not be kept around or the compiler complains that + // it might cross the await boundary + self.playlist + .write() + .expect("RwLock was not poisoned") + .push(metadata.clone()); if !self.player.is_started() { - if let Some(request) = playlist.pop() { - self.start_playing_audio(request); + let entry = self + .playlist + .write() + .expect("RwLock was not poisoned") + .pop(); + if let Some(request) = entry { + self.start_playing_audio(request).await; } } else { let duration = if let Some(duration) = metadata.duration { @@ -206,17 +213,19 @@ impl MusicBot { format!("") }; - self.send_message(&format!( + self.send_message(format!( "Added {}{} to playlist", ts::underline(&metadata.title), duration - )); + )) + .await; } } Err(e) => { info!("Failed to find audio url: {}", e); - self.send_message(&format!("Failed to find url: {}", e)); + self.send_message(format!("Failed to find url: {}", e)) + .await; } } } @@ -245,40 +254,52 @@ impl MusicBot { self.playlist.read().unwrap().to_vec() } - pub fn my_channel(&self) -> ChannelId { - self.teamspeak - .as_ref() - .map(|ts| ts.my_channel()) - .expect("my_channel needs ts") + pub async fn my_channel(&self) -> ChannelId { + let ts = self.teamspeak.as_ref().expect("my_channel needs ts"); + + let mut ts = ts.clone(); + ts.my_channel().await } - fn user_count(&self, channel: ChannelId) -> u32 { - self.teamspeak - .as_ref() - .map(|ts| ts.user_count(channel)) - .expect("user_count needs ts") + async fn user_count(&self, channel: ChannelId) -> u32 { + let ts = self.teamspeak.as_ref().expect("user_count needs ts"); + + let mut ts = ts.clone(); + ts.user_count(channel).await } - fn send_message(&self, text: &str) { + async fn send_message(&self, text: String) { debug!("Sending message to TeamSpeak: {}", text); - self.with_teamspeak(|ts| ts.send_message_to_channel(text)); + if let Some(ts) = &self.teamspeak { + let mut ts = ts.clone(); + ts.send_message_to_channel(text).await; + } } - fn set_nickname(&self, name: &str) { + async fn set_nickname(&self, name: String) { info!("Setting TeamSpeak nickname: {}", name); - self.with_teamspeak(|ts| ts.set_nickname(name)); + if let Some(ts) = &self.teamspeak { + let mut ts = ts.clone(); + ts.set_nickname(name).await; + } } - fn set_description(&self, desc: &str) { + async fn set_description(&self, desc: String) { info!("Setting TeamSpeak description: {}", desc); - self.with_teamspeak(|ts| ts.set_description(desc)); + if let Some(ts) = &self.teamspeak { + let mut ts = ts.clone(); + ts.set_description(desc).await; + } } - fn subscribe_all(&self) { - self.with_teamspeak(|ts| ts.subscribe_all()); + async fn subscribe_all(&self) { + if let Some(ts) = &self.teamspeak { + let mut ts = ts.clone(); + ts.subscribe_all().await; + } } async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> { @@ -289,7 +310,7 @@ impl MusicBot { match Command::from_iter_safe(&tokens) { Ok(args) => self.on_command(args, message.invoker).await?, Err(e) if e.kind == structopt::clap::ErrorKind::HelpDisplayed => { - self.send_message(&format!("\n{}", e.message)); + self.send_message(format!("\n{}", e.message)).await; } _ => (), } @@ -329,9 +350,10 @@ impl MusicBot { } Command::Seek { amount } => { if let Ok(time) = self.player.seek(amount) { - self.send_message(&format!("New position: {}", ts::bold(&time))); + self.send_message(format!("New position: {}", ts::bold(&time))) + .await; } else { - self.send_message("Failed to seek"); + self.send_message(String::from("Failed to seek")).await; } } Command::Next => { @@ -352,7 +374,7 @@ impl MusicBot { } Command::Volume { volume } => { self.player.change_volume(volume)?; - self.update_name(self.state()); + self.update_name(self.state()).await; } Command::Leave => { self.quit(String::from("Leaving")); @@ -362,18 +384,18 @@ impl MusicBot { Ok(()) } - fn update_name(&self, state: State) { + async fn update_name(&self, state: State) { let volume = (self.volume() * 100.0).round(); let name = match state { State::EndOfStream => format!("🎵 {} ({}%)", self.name, volume), _ => format!("🎵 {} - {} ({}%)", self.name, state, volume), }; - self.set_nickname(&name); + self.set_nickname(name).await; } - fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { - let mut current_state = self.state.write().unwrap(); - if *current_state != state { + async fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { + let current_state = *self.state.read().unwrap(); + if current_state != state { match state { State::EndOfStream => { let next_track = self @@ -384,24 +406,24 @@ impl MusicBot { if let Some(request) = next_track { info!("Advancing playlist"); - self.start_playing_audio(request); + self.start_playing_audio(request).await; } else { - self.update_name(state); - self.set_description(""); + self.update_name(state).await; + self.set_description(String::new()).await; } } State::Stopped => { - if *current_state != State::EndOfStream { - self.update_name(state); - self.set_description(""); + if current_state != State::EndOfStream { + self.update_name(state).await; + self.set_description(String::new()).await; } } - _ => self.update_name(state), + _ => self.update_name(state).await, } } - if !(*current_state == State::EndOfStream && state == State::Stopped) { - *current_state = state; + if !(current_state == State::EndOfStream && state == State::Stopped) { + *self.state.write().unwrap() = state; } Ok(()) @@ -418,28 +440,28 @@ impl MusicBot { client: _, old_channel, } => { - self.on_client_left_channel(old_channel); + self.on_client_left_channel(old_channel).await; } MusicBotMessage::ClientDisconnected { id: _, client } => { let old_channel = client.channel; - self.on_client_left_channel(old_channel); + self.on_client_left_channel(old_channel).await; } - MusicBotMessage::ChannelCreated(_) => { + MusicBotMessage::ChannelAdded(_) => { // TODO Only subscribe to one channel - self.subscribe_all(); + self.subscribe_all().await; } MusicBotMessage::StateChange(state) => { - self.on_state(state)?; + self.on_state(state).await?; } - MusicBotMessage::Quit(_) => (), + _ => (), } Ok(()) } - fn on_client_left_channel(&self, old_channel: ChannelId) { - let my_channel = self.my_channel(); - if old_channel == my_channel && self.user_count(my_channel) <= 1 { + async fn on_client_left_channel(&self, old_channel: ChannelId) { + let my_channel = self.my_channel().await; + if old_channel == my_channel && self.user_count(my_channel).await <= 1 { self.quit(String::from("Channel is empty")); } } |
