aboutsummaryrefslogtreecommitdiffstats
path: root/src/bot/music.rs
diff options
context:
space:
mode:
authorJokler <jokler@protonmail.com>2020-09-29 15:18:47 +0200
committerGitHub <noreply@github.com>2020-09-29 15:18:47 +0200
commite44a251fe0e1b82c859515768e483f19b1b5aaf3 (patch)
tree6092b3db497ee0a795f70db695ff2adb3c16e5ee /src/bot/music.rs
parent130cde033795382b70a312846a8f2704a15d11e3 (diff)
parentbbe3e1fffc94e7e87237a331de7b09253b0aa3fb (diff)
downloadpokebot-e44a251fe0e1b82c859515768e483f19b1b5aaf3.tar.gz
pokebot-e44a251fe0e1b82c859515768e483f19b1b5aaf3.zip
Merge pull request #59 from Mavulp/update-dependencies
Upgrade dependencies & use tokio 0.2 exclusively
Diffstat (limited to 'src/bot/music.rs')
-rw-r--r--src/bot/music.rs176
1 files changed, 99 insertions, 77 deletions
diff --git a/src/bot/music.rs b/src/bot/music.rs
index 71e7b58..656a169 100644
--- a/src/bot/music.rs
+++ b/src/bot/music.rs
@@ -7,7 +7,7 @@ use std::time::Duration;
use log::{debug, info};
use serde::Serialize;
use structopt::StructOpt;
-use tokio02::sync::mpsc::UnboundedSender;
+use tokio::sync::mpsc::UnboundedSender;
use tsclientlib::{data, ChannelId, ClientId, ConnectOptions, Identity, Invoker, MessageTarget};
use crate::audio_player::{AudioPlayer, AudioPlayerError, PollResult};
@@ -52,7 +52,8 @@ pub enum MusicBotMessage {
client: ClientId,
old_channel: ChannelId,
},
- ChannelCreated(ChannelId),
+ ChannelAdded(ChannelId),
+ ClientAdded(ClientId),
ClientDisconnected {
id: ClientId,
client: Box<data::Client>,
@@ -64,7 +65,7 @@ pub enum MusicBotMessage {
pub struct MusicBot {
name: String,
player: Arc<AudioPlayer>,
- teamspeak: Option<Arc<TeamSpeakConnection>>,
+ teamspeak: Option<TeamSpeakConnection>,
playlist: Arc<RwLock<Playlist>>,
state: Arc<RwLock<State>>,
}
@@ -82,8 +83,8 @@ pub struct MusicBotArgs {
}
impl MusicBot {
- pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) {
- let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel();
+ pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future<Output = ()>) {
+ let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let tx = Arc::new(RwLock::new(tx));
let (player, connection) = if args.local {
info!("Starting in CLI mode");
@@ -102,16 +103,15 @@ impl MusicBot {
.log_udp_packets(args.verbose >= 3)
.channel(args.channel);
- let connection = Arc::new(
- TeamSpeakConnection::new(tx.clone(), con_config)
- .await
- .unwrap(),
- );
- let cconnection = connection.clone();
+ let connection = TeamSpeakConnection::new(tx.clone(), con_config)
+ .await
+ .unwrap();
+ let mut cconnection = connection.clone();
let audio_player = AudioPlayer::new(
tx.clone(),
Some(Box::new(move |samples| {
- cconnection.send_audio_packet(samples);
+ let mut rt = tokio::runtime::Runtime::new().unwrap();
+ rt.block_on(cconnection.send_audio_packet(samples));
})),
)
.unwrap();
@@ -146,7 +146,10 @@ impl MusicBot {
'outer: loop {
while let Some(msg) = rx.recv().await {
if let MusicBotMessage::Quit(reason) = msg {
- cbot.with_teamspeak(|ts| ts.disconnect(&reason));
+ if let Some(ts) = &cbot.teamspeak {
+ let mut ts = ts.clone();
+ ts.disconnect(&reason).await;
+ }
disconnect_cb(name, name_index, id_index);
break 'outer;
}
@@ -156,31 +159,26 @@ impl MusicBot {
debug!("Left message loop");
};
- bot.update_name(State::EndOfStream);
+ bot.update_name(State::EndOfStream).await;
(bot, msg_loop)
}
- #[inline(always)]
- fn with_teamspeak<F: Fn(&TeamSpeakConnection)>(&self, func: F) {
- if let Some(ts) = &self.teamspeak {
- func(&ts);
- }
- }
-
- fn start_playing_audio(&self, metadata: AudioMetadata) {
+ async fn start_playing_audio(&self, metadata: AudioMetadata) {
let duration = if let Some(duration) = metadata.duration {
format!("({})", ts::bold(&humantime::format_duration(duration)))
} else {
format!("")
};
- self.send_message(&format!(
+ self.send_message(format!(
"Playing {} {}",
ts::underline(&metadata.title),
duration
- ));
- self.set_description(&format!("Currently playing '{}'", metadata.title));
+ ))
+ .await;
+ self.set_description(format!("Currently playing '{}'", metadata.title))
+ .await;
self.player.reset().unwrap();
self.player.set_metadata(metadata).unwrap();
self.player.play().unwrap();
@@ -192,12 +190,21 @@ impl MusicBot {
metadata.added_by = user;
info!("Found audio url: {}", metadata.url);
- let mut playlist = self.playlist.write().expect("RwLock was not poisoned");
- playlist.push(metadata.clone());
+ // RWLockGuard can not be kept around or the compiler complains that
+ // it might cross the await boundary
+ self.playlist
+ .write()
+ .expect("RwLock was not poisoned")
+ .push(metadata.clone());
if !self.player.is_started() {
- if let Some(request) = playlist.pop() {
- self.start_playing_audio(request);
+ let entry = self
+ .playlist
+ .write()
+ .expect("RwLock was not poisoned")
+ .pop();
+ if let Some(request) = entry {
+ self.start_playing_audio(request).await;
}
} else {
let duration = if let Some(duration) = metadata.duration {
@@ -206,17 +213,19 @@ impl MusicBot {
format!("")
};
- self.send_message(&format!(
+ self.send_message(format!(
"Added {}{} to playlist",
ts::underline(&metadata.title),
duration
- ));
+ ))
+ .await;
}
}
Err(e) => {
info!("Failed to find audio url: {}", e);
- self.send_message(&format!("Failed to find url: {}", e));
+ self.send_message(format!("Failed to find url: {}", e))
+ .await;
}
}
}
@@ -245,40 +254,52 @@ impl MusicBot {
self.playlist.read().unwrap().to_vec()
}
- pub fn my_channel(&self) -> ChannelId {
- self.teamspeak
- .as_ref()
- .map(|ts| ts.my_channel())
- .expect("my_channel needs ts")
+ pub async fn my_channel(&self) -> ChannelId {
+ let ts = self.teamspeak.as_ref().expect("my_channel needs ts");
+
+ let mut ts = ts.clone();
+ ts.my_channel().await
}
- fn user_count(&self, channel: ChannelId) -> u32 {
- self.teamspeak
- .as_ref()
- .map(|ts| ts.user_count(channel))
- .expect("user_count needs ts")
+ async fn user_count(&self, channel: ChannelId) -> u32 {
+ let ts = self.teamspeak.as_ref().expect("user_count needs ts");
+
+ let mut ts = ts.clone();
+ ts.user_count(channel).await
}
- fn send_message(&self, text: &str) {
+ async fn send_message(&self, text: String) {
debug!("Sending message to TeamSpeak: {}", text);
- self.with_teamspeak(|ts| ts.send_message_to_channel(text));
+ if let Some(ts) = &self.teamspeak {
+ let mut ts = ts.clone();
+ ts.send_message_to_channel(text).await;
+ }
}
- fn set_nickname(&self, name: &str) {
+ async fn set_nickname(&self, name: String) {
info!("Setting TeamSpeak nickname: {}", name);
- self.with_teamspeak(|ts| ts.set_nickname(name));
+ if let Some(ts) = &self.teamspeak {
+ let mut ts = ts.clone();
+ ts.set_nickname(name).await;
+ }
}
- fn set_description(&self, desc: &str) {
+ async fn set_description(&self, desc: String) {
info!("Setting TeamSpeak description: {}", desc);
- self.with_teamspeak(|ts| ts.set_description(desc));
+ if let Some(ts) = &self.teamspeak {
+ let mut ts = ts.clone();
+ ts.set_description(desc).await;
+ }
}
- fn subscribe_all(&self) {
- self.with_teamspeak(|ts| ts.subscribe_all());
+ async fn subscribe_all(&self) {
+ if let Some(ts) = &self.teamspeak {
+ let mut ts = ts.clone();
+ ts.subscribe_all().await;
+ }
}
async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> {
@@ -289,7 +310,7 @@ impl MusicBot {
match Command::from_iter_safe(&tokens) {
Ok(args) => self.on_command(args, message.invoker).await?,
Err(e) if e.kind == structopt::clap::ErrorKind::HelpDisplayed => {
- self.send_message(&format!("\n{}", e.message));
+ self.send_message(format!("\n{}", e.message)).await;
}
_ => (),
}
@@ -329,9 +350,10 @@ impl MusicBot {
}
Command::Seek { amount } => {
if let Ok(time) = self.player.seek(amount) {
- self.send_message(&format!("New position: {}", ts::bold(&time)));
+ self.send_message(format!("New position: {}", ts::bold(&time)))
+ .await;
} else {
- self.send_message("Failed to seek");
+ self.send_message(String::from("Failed to seek")).await;
}
}
Command::Next => {
@@ -352,7 +374,7 @@ impl MusicBot {
}
Command::Volume { volume } => {
self.player.change_volume(volume)?;
- self.update_name(self.state());
+ self.update_name(self.state()).await;
}
Command::Leave => {
self.quit(String::from("Leaving"));
@@ -362,18 +384,18 @@ impl MusicBot {
Ok(())
}
- fn update_name(&self, state: State) {
+ async fn update_name(&self, state: State) {
let volume = (self.volume() * 100.0).round();
let name = match state {
State::EndOfStream => format!("🎵 {} ({}%)", self.name, volume),
_ => format!("🎵 {} - {} ({}%)", self.name, state, volume),
};
- self.set_nickname(&name);
+ self.set_nickname(name).await;
}
- fn on_state(&self, state: State) -> Result<(), AudioPlayerError> {
- let mut current_state = self.state.write().unwrap();
- if *current_state != state {
+ async fn on_state(&self, state: State) -> Result<(), AudioPlayerError> {
+ let current_state = *self.state.read().unwrap();
+ if current_state != state {
match state {
State::EndOfStream => {
let next_track = self
@@ -384,24 +406,24 @@ impl MusicBot {
if let Some(request) = next_track {
info!("Advancing playlist");
- self.start_playing_audio(request);
+ self.start_playing_audio(request).await;
} else {
- self.update_name(state);
- self.set_description("");
+ self.update_name(state).await;
+ self.set_description(String::new()).await;
}
}
State::Stopped => {
- if *current_state != State::EndOfStream {
- self.update_name(state);
- self.set_description("");
+ if current_state != State::EndOfStream {
+ self.update_name(state).await;
+ self.set_description(String::new()).await;
}
}
- _ => self.update_name(state),
+ _ => self.update_name(state).await,
}
}
- if !(*current_state == State::EndOfStream && state == State::Stopped) {
- *current_state = state;
+ if !(current_state == State::EndOfStream && state == State::Stopped) {
+ *self.state.write().unwrap() = state;
}
Ok(())
@@ -418,28 +440,28 @@ impl MusicBot {
client: _,
old_channel,
} => {
- self.on_client_left_channel(old_channel);
+ self.on_client_left_channel(old_channel).await;
}
MusicBotMessage::ClientDisconnected { id: _, client } => {
let old_channel = client.channel;
- self.on_client_left_channel(old_channel);
+ self.on_client_left_channel(old_channel).await;
}
- MusicBotMessage::ChannelCreated(_) => {
+ MusicBotMessage::ChannelAdded(_) => {
// TODO Only subscribe to one channel
- self.subscribe_all();
+ self.subscribe_all().await;
}
MusicBotMessage::StateChange(state) => {
- self.on_state(state)?;
+ self.on_state(state).await?;
}
- MusicBotMessage::Quit(_) => (),
+ _ => (),
}
Ok(())
}
- fn on_client_left_channel(&self, old_channel: ChannelId) {
- let my_channel = self.my_channel();
- if old_channel == my_channel && self.user_count(my_channel) <= 1 {
+ async fn on_client_left_channel(&self, old_channel: ChannelId) {
+ let my_channel = self.my_channel().await;
+ if old_channel == my_channel && self.user_count(my_channel).await <= 1 {
self.quit(String::from("Channel is empty"));
}
}