diff options
| author | Jokler <jokler@protonmail.com> | 2020-09-29 15:18:47 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-09-29 15:18:47 +0200 |
| commit | e44a251fe0e1b82c859515768e483f19b1b5aaf3 (patch) | |
| tree | 6092b3db497ee0a795f70db695ff2adb3c16e5ee /src/bot/music.rs | |
| parent | 130cde033795382b70a312846a8f2704a15d11e3 (diff) | |
| parent | bbe3e1fffc94e7e87237a331de7b09253b0aa3fb (diff) | |
| download | pokebot-e44a251fe0e1b82c859515768e483f19b1b5aaf3.tar.gz pokebot-e44a251fe0e1b82c859515768e483f19b1b5aaf3.zip | |
Merge pull request #59 from Mavulp/update-dependencies
Upgrade dependencies & use tokio 0.2 exclusively
Diffstat (limited to 'src/bot/music.rs')
| -rw-r--r-- | src/bot/music.rs | 176 |
1 files changed, 99 insertions, 77 deletions
diff --git a/src/bot/music.rs b/src/bot/music.rs index 71e7b58..656a169 100644 --- a/src/bot/music.rs +++ b/src/bot/music.rs @@ -7,7 +7,7 @@ use std::time::Duration; use log::{debug, info}; use serde::Serialize; use structopt::StructOpt; -use tokio02::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::UnboundedSender; use tsclientlib::{data, ChannelId, ClientId, ConnectOptions, Identity, Invoker, MessageTarget}; use crate::audio_player::{AudioPlayer, AudioPlayerError, PollResult}; @@ -52,7 +52,8 @@ pub enum MusicBotMessage { client: ClientId, old_channel: ChannelId, }, - ChannelCreated(ChannelId), + ChannelAdded(ChannelId), + ClientAdded(ClientId), ClientDisconnected { id: ClientId, client: Box<data::Client>, @@ -64,7 +65,7 @@ pub enum MusicBotMessage { pub struct MusicBot { name: String, player: Arc<AudioPlayer>, - teamspeak: Option<Arc<TeamSpeakConnection>>, + teamspeak: Option<TeamSpeakConnection>, playlist: Arc<RwLock<Playlist>>, state: Arc<RwLock<State>>, } @@ -82,8 +83,8 @@ pub struct MusicBotArgs { } impl MusicBot { - pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) { - let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); + pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future<Output = ()>) { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let tx = Arc::new(RwLock::new(tx)); let (player, connection) = if args.local { info!("Starting in CLI mode"); @@ -102,16 +103,15 @@ impl MusicBot { .log_udp_packets(args.verbose >= 3) .channel(args.channel); - let connection = Arc::new( - TeamSpeakConnection::new(tx.clone(), con_config) - .await - .unwrap(), - ); - let cconnection = connection.clone(); + let connection = TeamSpeakConnection::new(tx.clone(), con_config) + .await + .unwrap(); + let mut cconnection = connection.clone(); let audio_player = AudioPlayer::new( tx.clone(), Some(Box::new(move |samples| { - cconnection.send_audio_packet(samples); + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(cconnection.send_audio_packet(samples)); })), ) .unwrap(); @@ -146,7 +146,10 @@ impl MusicBot { 'outer: loop { while let Some(msg) = rx.recv().await { if let MusicBotMessage::Quit(reason) = msg { - cbot.with_teamspeak(|ts| ts.disconnect(&reason)); + if let Some(ts) = &cbot.teamspeak { + let mut ts = ts.clone(); + ts.disconnect(&reason).await; + } disconnect_cb(name, name_index, id_index); break 'outer; } @@ -156,31 +159,26 @@ impl MusicBot { debug!("Left message loop"); }; - bot.update_name(State::EndOfStream); + bot.update_name(State::EndOfStream).await; (bot, msg_loop) } - #[inline(always)] - fn with_teamspeak<F: Fn(&TeamSpeakConnection)>(&self, func: F) { - if let Some(ts) = &self.teamspeak { - func(&ts); - } - } - - fn start_playing_audio(&self, metadata: AudioMetadata) { + async fn start_playing_audio(&self, metadata: AudioMetadata) { let duration = if let Some(duration) = metadata.duration { format!("({})", ts::bold(&humantime::format_duration(duration))) } else { format!("") }; - self.send_message(&format!( + self.send_message(format!( "Playing {} {}", ts::underline(&metadata.title), duration - )); - self.set_description(&format!("Currently playing '{}'", metadata.title)); + )) + .await; + self.set_description(format!("Currently playing '{}'", metadata.title)) + .await; self.player.reset().unwrap(); self.player.set_metadata(metadata).unwrap(); self.player.play().unwrap(); @@ -192,12 +190,21 @@ impl MusicBot { metadata.added_by = user; info!("Found audio url: {}", metadata.url); - let mut playlist = self.playlist.write().expect("RwLock was not poisoned"); - playlist.push(metadata.clone()); + // RWLockGuard can not be kept around or the compiler complains that + // it might cross the await boundary + self.playlist + .write() + .expect("RwLock was not poisoned") + .push(metadata.clone()); if !self.player.is_started() { - if let Some(request) = playlist.pop() { - self.start_playing_audio(request); + let entry = self + .playlist + .write() + .expect("RwLock was not poisoned") + .pop(); + if let Some(request) = entry { + self.start_playing_audio(request).await; } } else { let duration = if let Some(duration) = metadata.duration { @@ -206,17 +213,19 @@ impl MusicBot { format!("") }; - self.send_message(&format!( + self.send_message(format!( "Added {}{} to playlist", ts::underline(&metadata.title), duration - )); + )) + .await; } } Err(e) => { info!("Failed to find audio url: {}", e); - self.send_message(&format!("Failed to find url: {}", e)); + self.send_message(format!("Failed to find url: {}", e)) + .await; } } } @@ -245,40 +254,52 @@ impl MusicBot { self.playlist.read().unwrap().to_vec() } - pub fn my_channel(&self) -> ChannelId { - self.teamspeak - .as_ref() - .map(|ts| ts.my_channel()) - .expect("my_channel needs ts") + pub async fn my_channel(&self) -> ChannelId { + let ts = self.teamspeak.as_ref().expect("my_channel needs ts"); + + let mut ts = ts.clone(); + ts.my_channel().await } - fn user_count(&self, channel: ChannelId) -> u32 { - self.teamspeak - .as_ref() - .map(|ts| ts.user_count(channel)) - .expect("user_count needs ts") + async fn user_count(&self, channel: ChannelId) -> u32 { + let ts = self.teamspeak.as_ref().expect("user_count needs ts"); + + let mut ts = ts.clone(); + ts.user_count(channel).await } - fn send_message(&self, text: &str) { + async fn send_message(&self, text: String) { debug!("Sending message to TeamSpeak: {}", text); - self.with_teamspeak(|ts| ts.send_message_to_channel(text)); + if let Some(ts) = &self.teamspeak { + let mut ts = ts.clone(); + ts.send_message_to_channel(text).await; + } } - fn set_nickname(&self, name: &str) { + async fn set_nickname(&self, name: String) { info!("Setting TeamSpeak nickname: {}", name); - self.with_teamspeak(|ts| ts.set_nickname(name)); + if let Some(ts) = &self.teamspeak { + let mut ts = ts.clone(); + ts.set_nickname(name).await; + } } - fn set_description(&self, desc: &str) { + async fn set_description(&self, desc: String) { info!("Setting TeamSpeak description: {}", desc); - self.with_teamspeak(|ts| ts.set_description(desc)); + if let Some(ts) = &self.teamspeak { + let mut ts = ts.clone(); + ts.set_description(desc).await; + } } - fn subscribe_all(&self) { - self.with_teamspeak(|ts| ts.subscribe_all()); + async fn subscribe_all(&self) { + if let Some(ts) = &self.teamspeak { + let mut ts = ts.clone(); + ts.subscribe_all().await; + } } async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> { @@ -289,7 +310,7 @@ impl MusicBot { match Command::from_iter_safe(&tokens) { Ok(args) => self.on_command(args, message.invoker).await?, Err(e) if e.kind == structopt::clap::ErrorKind::HelpDisplayed => { - self.send_message(&format!("\n{}", e.message)); + self.send_message(format!("\n{}", e.message)).await; } _ => (), } @@ -329,9 +350,10 @@ impl MusicBot { } Command::Seek { amount } => { if let Ok(time) = self.player.seek(amount) { - self.send_message(&format!("New position: {}", ts::bold(&time))); + self.send_message(format!("New position: {}", ts::bold(&time))) + .await; } else { - self.send_message("Failed to seek"); + self.send_message(String::from("Failed to seek")).await; } } Command::Next => { @@ -352,7 +374,7 @@ impl MusicBot { } Command::Volume { volume } => { self.player.change_volume(volume)?; - self.update_name(self.state()); + self.update_name(self.state()).await; } Command::Leave => { self.quit(String::from("Leaving")); @@ -362,18 +384,18 @@ impl MusicBot { Ok(()) } - fn update_name(&self, state: State) { + async fn update_name(&self, state: State) { let volume = (self.volume() * 100.0).round(); let name = match state { State::EndOfStream => format!("🎵 {} ({}%)", self.name, volume), _ => format!("🎵 {} - {} ({}%)", self.name, state, volume), }; - self.set_nickname(&name); + self.set_nickname(name).await; } - fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { - let mut current_state = self.state.write().unwrap(); - if *current_state != state { + async fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { + let current_state = *self.state.read().unwrap(); + if current_state != state { match state { State::EndOfStream => { let next_track = self @@ -384,24 +406,24 @@ impl MusicBot { if let Some(request) = next_track { info!("Advancing playlist"); - self.start_playing_audio(request); + self.start_playing_audio(request).await; } else { - self.update_name(state); - self.set_description(""); + self.update_name(state).await; + self.set_description(String::new()).await; } } State::Stopped => { - if *current_state != State::EndOfStream { - self.update_name(state); - self.set_description(""); + if current_state != State::EndOfStream { + self.update_name(state).await; + self.set_description(String::new()).await; } } - _ => self.update_name(state), + _ => self.update_name(state).await, } } - if !(*current_state == State::EndOfStream && state == State::Stopped) { - *current_state = state; + if !(current_state == State::EndOfStream && state == State::Stopped) { + *self.state.write().unwrap() = state; } Ok(()) @@ -418,28 +440,28 @@ impl MusicBot { client: _, old_channel, } => { - self.on_client_left_channel(old_channel); + self.on_client_left_channel(old_channel).await; } MusicBotMessage::ClientDisconnected { id: _, client } => { let old_channel = client.channel; - self.on_client_left_channel(old_channel); + self.on_client_left_channel(old_channel).await; } - MusicBotMessage::ChannelCreated(_) => { + MusicBotMessage::ChannelAdded(_) => { // TODO Only subscribe to one channel - self.subscribe_all(); + self.subscribe_all().await; } MusicBotMessage::StateChange(state) => { - self.on_state(state)?; + self.on_state(state).await?; } - MusicBotMessage::Quit(_) => (), + _ => (), } Ok(()) } - fn on_client_left_channel(&self, old_channel: ChannelId) { - let my_channel = self.my_channel(); - if old_channel == my_channel && self.user_count(my_channel) <= 1 { + async fn on_client_left_channel(&self, old_channel: ChannelId) { + let my_channel = self.my_channel().await; + if old_channel == my_channel && self.user_count(my_channel).await <= 1 { self.quit(String::from("Channel is empty")); } } |
