diff options
| author | Jokler <jokler@protonmail.com> | 2020-10-15 13:11:54 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-15 13:11:54 +0000 |
| commit | 43974717fee9a98701c6efa2e7221cdbfe7e537e (patch) | |
| tree | 93fe1d75477ae3d1c8466611a2cedd7bed316aa2 /src/bot/music.rs | |
| parent | 23671b51b4e207574a63bce820acbf43169e2b6c (diff) | |
| parent | 4e1c2b9f04073294ecb8402486c20d9c01721598 (diff) | |
| download | pokebot-43974717fee9a98701c6efa2e7221cdbfe7e537e.tar.gz pokebot-43974717fee9a98701c6efa2e7221cdbfe7e537e.zip | |
Merge pull request #70 from Mavulp/actor-bots
Replace channels&locks with actors & log with slog
Diffstat (limited to 'src/bot/music.rs')
| -rw-r--r-- | src/bot/music.rs | 500 |
1 files changed, 273 insertions, 227 deletions
diff --git a/src/bot/music.rs b/src/bot/music.rs index 90305d0..a57b66c 100644 --- a/src/bot/music.rs +++ b/src/bot/music.rs @@ -1,25 +1,22 @@ -use std::future::Future; -use std::io::BufRead; -use std::sync::{Arc, RwLock}; -use std::thread; -use std::time::Duration; +use async_trait::async_trait; -use log::{debug, info}; use serde::Serialize; +use slog::{debug, info, Logger}; use structopt::StructOpt; -use tokio::sync::mpsc::UnboundedSender; use tsclientlib::{data, ChannelId, ClientId, Connection, Identity, Invoker, MessageTarget}; +use xtra::{spawn::Tokio, Actor, Address, Context, Handler, Message, WeakAddress}; -use crate::audio_player::{AudioPlayer, AudioPlayerError, PollResult}; +use crate::audio_player::{AudioPlayer, AudioPlayerError}; +use crate::bot::{BotDisonnected, Connect, MasterBot, Quit}; use crate::command::Command; use crate::command::VolumeChange; use crate::playlist::Playlist; use crate::teamspeak as ts; -use crate::youtube_dl::AudioMetadata; +use crate::youtube_dl::{self, AudioMetadata}; use ts::TeamSpeakConnection; #[derive(Debug)] -pub struct Message { +pub struct ChatMessage { pub target: MessageTarget, pub invoker: Invoker, pub text: String, @@ -33,6 +30,10 @@ pub enum State { EndOfStream, } +impl Message for State { + type Result = (); +} + impl std::fmt::Display for State { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { match self { @@ -47,7 +48,7 @@ impl std::fmt::Display for State { #[derive(Debug)] pub enum MusicBotMessage { - TextMessage(Message), + TextMessage(ChatMessage), ClientChannel { client: ClientId, old_channel: ChannelId, @@ -59,112 +60,95 @@ pub enum MusicBotMessage { client: Box<data::Client>, }, StateChange(State), - Quit(String), +} + +impl Message for MusicBotMessage { + type Result = Result<(), AudioPlayerError>; } pub struct MusicBot { name: String, - player: Arc<AudioPlayer>, + identity: Identity, + player: AudioPlayer, teamspeak: Option<TeamSpeakConnection>, - playlist: Arc<RwLock<Playlist>>, - state: Arc<RwLock<State>>, + master: Option<WeakAddress<MasterBot>>, + playlist: Playlist, + state: State, + logger: Logger, } pub struct MusicBotArgs { pub name: String, - pub name_index: usize, - pub id_index: usize, + pub master: Option<WeakAddress<MasterBot>>, pub local: bool, pub address: String, - pub id: Identity, + pub identity: Identity, pub channel: String, pub verbose: u8, - pub disconnect_cb: Box<dyn FnMut(String, usize, usize) + Send + Sync>, + pub logger: Logger, } impl MusicBot { - pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future<Output = ()>) { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let tx = Arc::new(RwLock::new(tx)); - let (player, connection) = if args.local { - info!("Starting in CLI mode"); - let audio_player = AudioPlayer::new(tx.clone(), None).unwrap(); - - (audio_player, None) - } else { - info!("Starting in TeamSpeak mode"); - - let con_config = Connection::build(args.address) - .version(tsclientlib::Version::Linux_3_3_2) - .name(format!("🎵 {}", args.name)) - .identity(args.id) - .log_commands(args.verbose >= 1) - .log_packets(args.verbose >= 2) - .log_udp_packets(args.verbose >= 3) - .channel(args.channel); - - let connection = TeamSpeakConnection::new(tx.clone(), con_config) - .await - .unwrap(); - let mut cconnection = connection.clone(); - let audio_player = AudioPlayer::new( - tx.clone(), - Some(Box::new(move |samples| { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(cconnection.send_audio_packet(samples)); - })), - ) - .unwrap(); - - (audio_player, Some(connection)) - }; - + pub async fn spawn(args: MusicBotArgs) -> Address<Self> { + let mut player = AudioPlayer::new(args.logger.clone()).unwrap(); player.change_volume(VolumeChange::Absolute(0.5)).unwrap(); - let player = Arc::new(player); - let playlist = Arc::new(RwLock::new(Playlist::new())); - spawn_gstreamer_thread(player.clone(), tx.clone()); + let playlist = Playlist::new(args.logger.clone()); - if args.local { - spawn_stdin_reader(tx); - } + let teamspeak = if args.local { + info!(args.logger, "Starting in CLI mode"); + player.setup_with_audio_callback(None).unwrap(); - let bot = Arc::new(Self { + None + } else { + Some(TeamSpeakConnection::new(args.logger.clone()).await.unwrap()) + }; + let bot = Self { name: args.name.clone(), + master: args.master, + identity: args.identity.clone(), player, - teamspeak: connection, + teamspeak, playlist, - state: Arc::new(RwLock::new(State::EndOfStream)), - }); - - let cbot = bot.clone(); - let mut disconnect_cb = args.disconnect_cb; - let name = args.name; - let name_index = args.name_index; - let id_index = args.id_index; - let msg_loop = async move { - 'outer: loop { - while let Some(msg) = rx.recv().await { - if let MusicBotMessage::Quit(reason) = msg { - if let Some(ts) = &cbot.teamspeak { - let mut ts = ts.clone(); - ts.disconnect(&reason).await; - } - disconnect_cb(name, name_index, id_index); - break 'outer; - } - cbot.on_message(msg).await.unwrap(); - } - } - debug!("Left message loop"); + state: State::EndOfStream, + logger: args.logger.clone(), }; - bot.update_name(State::EndOfStream).await; + let bot_addr = bot.create(None).spawn(&mut Tokio::Global); + + info!( + args.logger, + "Connecting"; + "name" => &args.name, + "channel" => &args.channel, + "address" => &args.address, + ); + + let opt = Connection::build(args.address) + .logger(args.logger.clone()) + .version(tsclientlib::Version::Linux_3_3_2) + .name(format!("🎵 {}", args.name)) + .identity(args.identity) + .log_commands(args.verbose >= 1) + .log_packets(args.verbose >= 2) + .log_udp_packets(args.verbose >= 3) + .channel(args.channel); + bot_addr.send(Connect(opt)).await.unwrap().unwrap(); + bot_addr + .send(MusicBotMessage::StateChange(State::EndOfStream)) + .await + .unwrap() + .unwrap(); + + if args.local { + debug!(args.logger, "Spawning stdin reader thread"); + spawn_stdin_reader(bot_addr.downgrade()); + } - (bot, msg_loop) + bot_addr } - async fn start_playing_audio(&self, metadata: AudioMetadata) { + async fn start_playing_audio(&mut self, metadata: AudioMetadata) { let duration = if let Some(duration) = metadata.duration { format!("({})", ts::bold(&humantime::format_duration(duration))) } else { @@ -184,25 +168,16 @@ impl MusicBot { self.player.play().unwrap(); } - pub async fn add_audio(&self, url: String, user: String) { - match crate::youtube_dl::get_audio_download_from_url(url).await { + pub async fn add_audio(&mut self, url: String, user: String) { + match youtube_dl::get_audio_download_from_url(url, &self.logger).await { Ok(mut metadata) => { metadata.added_by = user; - info!("Found audio url: {}", metadata.url); + info!(self.logger, "Found source"; "url" => &metadata.url); - // RWLockGuard can not be kept around or the compiler complains that - // it might cross the await boundary - self.playlist - .write() - .expect("RwLock was not poisoned") - .push(metadata.clone()); + self.playlist.push(metadata.clone()); if !self.player.is_started() { - let entry = self - .playlist - .write() - .expect("RwLock was not poisoned") - .pop(); + let entry = self.playlist.pop(); if let Some(request) = entry { self.start_playing_audio(request).await; } @@ -222,7 +197,7 @@ impl MusicBot { } } Err(e) => { - info!("Failed to find audio url: {}", e); + info!(self.logger, "Failed to find audio url"; "error" => &e); self.send_message(format!("Failed to find url: {}", e)) .await; @@ -235,74 +210,50 @@ impl MusicBot { } pub fn state(&self) -> State { - *self.state.read().expect("RwLock was not poisoned") + self.state } - pub fn volume(&self) -> f64 { + pub async fn volume(&self) -> f64 { self.player.volume() } - pub fn position(&self) -> Option<Duration> { - self.player.position() - } - - pub fn currently_playing(&self) -> Option<AudioMetadata> { - self.player.currently_playing() - } + pub async fn current_channel(&mut self) -> Option<ChannelId> { + let ts = self.teamspeak.as_mut().expect("current_channel needs ts"); - pub fn playlist_to_vec(&self) -> Vec<AudioMetadata> { - self.playlist.read().unwrap().to_vec() + ts.current_channel().await } - pub async fn my_channel(&self) -> ChannelId { - let ts = self.teamspeak.as_ref().expect("my_channel needs ts"); - - let mut ts = ts.clone(); - ts.my_channel().await - } + async fn user_count(&mut self, channel: ChannelId) -> u32 { + let ts = self.teamspeak.as_mut().expect("user_count needs ts"); - async fn user_count(&self, channel: ChannelId) -> u32 { - let ts = self.teamspeak.as_ref().expect("user_count needs ts"); - - let mut ts = ts.clone(); ts.user_count(channel).await } - async fn send_message(&self, text: String) { - debug!("Sending message to TeamSpeak: {}", text); + async fn send_message(&mut self, text: String) { + debug!(self.logger, "Sending message to TeamSpeak"; "message" => &text); - if let Some(ts) = &self.teamspeak { - let mut ts = ts.clone(); + if let Some(ts) = &mut self.teamspeak { ts.send_message_to_channel(text).await; } } - async fn set_nickname(&self, name: String) { - info!("Setting TeamSpeak nickname: {}", name); + async fn set_nickname(&mut self, name: String) { + info!(self.logger, "Setting TeamSpeak nickname"; "name" => &name); - if let Some(ts) = &self.teamspeak { - let mut ts = ts.clone(); + if let Some(ts) = &mut self.teamspeak { ts.set_nickname(name).await; } } - async fn set_description(&self, desc: String) { - info!("Setting TeamSpeak description: {}", desc); + async fn set_description(&mut self, desc: String) { + info!(self.logger, "Setting TeamSpeak description"; "description" => &desc); - if let Some(ts) = &self.teamspeak { - let mut ts = ts.clone(); + if let Some(ts) = &mut self.teamspeak { ts.set_description(desc).await; } } - async fn subscribe(&self, id: ChannelId) { - if let Some(ts) = &self.teamspeak { - let mut ts = ts.clone(); - ts.subscribe(id).await; - } - } - - async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> { + async fn on_text(&mut self, message: ChatMessage) -> Result<(), AudioPlayerError> { let msg = message.text; if msg.starts_with('!') { let tokens = msg[1..].split_whitespace().collect::<Vec<_>>(); @@ -319,13 +270,15 @@ impl MusicBot { Ok(()) } - async fn on_command(&self, command: Command, invoker: Invoker) -> Result<(), AudioPlayerError> { + async fn on_command( + &mut self, + command: Command, + invoker: Invoker, + ) -> Result<(), AudioPlayerError> { match command { Command::Play => { - let playlist = self.playlist.read().expect("RwLock was not poisoned"); - if !self.player.is_started() { - if !playlist.is_empty() { + if !self.playlist.is_empty() { self.player.stop_current()?; } } else { @@ -357,35 +310,32 @@ impl MusicBot { } } Command::Next => { - let playlist = self.playlist.read().expect("RwLock was not poisoned"); - if !playlist.is_empty() { - info!("Skipping to next track"); + if !self.playlist.is_empty() { + info!(self.logger, "Skipping to next track"); self.player.stop_current()?; } else { - info!("Playlist empty, cannot skip"); + info!(self.logger, "Playlist empty, cannot skip"); self.player.reset()?; } } Command::Clear => { - self.playlist - .write() - .expect("RwLock was not poisoned") - .clear(); + self.send_message(String::from("Cleared playlist")).await; + self.playlist.clear(); } Command::Volume { volume } => { self.player.change_volume(volume)?; self.update_name(self.state()).await; } Command::Leave => { - self.quit(String::from("Leaving")); + self.quit(String::from("Leaving"), true).await.unwrap(); } } Ok(()) } - async fn update_name(&self, state: State) { - let volume = (self.volume() * 100.0).round(); + async fn update_name(&mut self, state: State) { + let volume = (self.volume().await * 100.0).round(); let name = match state { State::EndOfStream => format!("🎵 {} ({}%)", self.name, volume), _ => format!("🎵 {} - {} ({}%)", self.name, state, volume), @@ -393,43 +343,39 @@ impl MusicBot { self.set_nickname(name).await; } - async fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { - let current_state = *self.state.read().unwrap(); - if current_state != state { - match state { + async fn on_state(&mut self, new_state: State) -> Result<(), AudioPlayerError> { + if self.state != new_state { + match new_state { State::EndOfStream => { - let next_track = self - .playlist - .write() - .expect("RwLock was not poisoned") - .pop(); + self.player.reset()?; + let next_track = self.playlist.pop(); if let Some(request) = next_track { - info!("Advancing playlist"); + info!(self.logger, "Advancing playlist"); self.start_playing_audio(request).await; } else { - self.update_name(state).await; + self.update_name(new_state).await; self.set_description(String::new()).await; } } State::Stopped => { - if current_state != State::EndOfStream { - self.update_name(state).await; + if self.state != State::EndOfStream { + self.update_name(new_state).await; self.set_description(String::new()).await; } } - _ => self.update_name(state).await, + _ => self.update_name(new_state).await, } } - if !(current_state == State::EndOfStream && state == State::Stopped) { - *self.state.write().unwrap() = state; + if !(self.state == State::EndOfStream && new_state == State::Stopped) { + self.state = new_state; } Ok(()) } - async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { + async fn on_message(&mut self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { match message { MusicBotMessage::TextMessage(message) => { if MessageTarget::Channel == message.target { @@ -446,9 +392,6 @@ impl MusicBot { let old_channel = client.channel; self.on_client_left_channel(old_channel).await; } - MusicBotMessage::ChannelAdded(id) => { - self.subscribe(id).await; - } MusicBotMessage::StateChange(state) => { self.on_state(state).await?; } @@ -458,60 +401,163 @@ impl MusicBot { Ok(()) } - async fn on_client_left_channel(&self, old_channel: ChannelId) { - let my_channel = self.my_channel().await; - if old_channel == my_channel && self.user_count(my_channel).await <= 1 { - self.quit(String::from("Channel is empty")); + // FIXME logs an error if this music bot is the one leaving + async fn on_client_left_channel(&mut self, old_channel: ChannelId) { + let current_channel = match self.current_channel().await { + Some(c) => c, + None => { + return; + } + }; + if old_channel == current_channel && self.user_count(current_channel).await <= 1 { + self.quit(String::from("Channel is empty"), true) + .await + .unwrap(); + } + } + + pub async fn quit( + &mut self, + reason: String, + inform_master: bool, + ) -> Result<(), tsclientlib::Error> { + // FIXME logs errors if the bot is playing something because it tries to + // change its name and description + self.player.reset().unwrap(); + + let ts = self.teamspeak.as_mut().unwrap(); + ts.disconnect(&reason).await?; + + if inform_master { + if let Some(master) = &self.master { + master + .send(BotDisonnected { + name: self.name.clone(), + identity: self.identity.clone(), + }) + .await + .unwrap(); + } + } + + Ok(()) + } +} + +#[async_trait] +impl Actor for MusicBot { + async fn started(&mut self, ctx: &mut Context<Self>) { + let addr = ctx.address().unwrap().downgrade(); + self.player.register_bot(addr); + } +} + +#[async_trait] +impl Handler<Connect> for MusicBot { + async fn handle( + &mut self, + opt: Connect, + ctx: &mut Context<Self>, + ) -> Result<(), tsclientlib::Error> { + let addr = ctx.address().unwrap().downgrade(); + self.teamspeak + .as_mut() + .unwrap() + .connect_for_bot(opt.0, addr)?; + + let mut connection = self.teamspeak.as_ref().unwrap().clone(); + let handle = tokio::runtime::Handle::current(); + self.player + .setup_with_audio_callback(Some(Box::new(move |samples| { + handle.block_on(connection.send_audio_packet(samples)); + }))) + .unwrap(); + + Ok(()) + } +} + +pub struct GetName; +impl Message for GetName { + type Result = String; +} + +#[async_trait] +impl Handler<GetName> for MusicBot { + async fn handle(&mut self, _: GetName, _: &mut Context<Self>) -> String { + self.name().to_owned() + } +} + +pub struct GetBotData; +impl Message for GetBotData { + type Result = crate::web_server::BotData; +} + +#[async_trait] +impl Handler<GetBotData> for MusicBot { + async fn handle(&mut self, _: GetBotData, _: &mut Context<Self>) -> crate::web_server::BotData { + crate::web_server::BotData { + name: self.name.clone(), + playlist: self.playlist.to_vec(), + currently_playing: self.player.currently_playing(), + position: self.player.position(), + state: self.state(), + volume: self.volume().await, } } +} - pub fn quit(&self, reason: String) { - self.player.quit(reason); +pub struct GetChannel; +impl Message for GetChannel { + type Result = Option<ChannelId>; +} + +#[async_trait] +impl Handler<GetChannel> for MusicBot { + async fn handle(&mut self, _: GetChannel, _: &mut Context<Self>) -> Option<ChannelId> { + self.current_channel().await } } -fn spawn_stdin_reader(tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>) { - debug!("Spawning stdin reader thread"); - thread::Builder::new() - .name(String::from("stdin reader")) - .spawn(move || { - let stdin = ::std::io::stdin(); - let lock = stdin.lock(); - for line in lock.lines() { - let line = line.unwrap(); - - let message = MusicBotMessage::TextMessage(Message { - target: MessageTarget::Channel, - invoker: Invoker { - name: String::from("stdin"), - id: ClientId(0), - uid: None, - }, - text: line, - }); - - let tx = tx.read().unwrap(); - tx.send(message).unwrap(); - } - }) - .expect("Failed to spawn stdin reader thread"); +#[async_trait] +impl Handler<Quit> for MusicBot { + async fn handle(&mut self, q: Quit, _: &mut Context<Self>) -> Result<(), tsclientlib::Error> { + self.quit(q.0, false).await + } } -fn spawn_gstreamer_thread( - player: Arc<AudioPlayer>, - tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, -) { - thread::Builder::new() - .name(String::from("gstreamer polling")) - .spawn(move || loop { - if player.poll() == PollResult::Quit { - break; - } +#[async_trait] +impl Handler<MusicBotMessage> for MusicBot { + async fn handle( + &mut self, + msg: MusicBotMessage, + _: &mut Context<Self>, + ) -> Result<(), AudioPlayerError> { + self.on_message(msg).await + } +} - tx.read() - .unwrap() - .send(MusicBotMessage::StateChange(State::EndOfStream)) - .unwrap(); - }) - .expect("Failed to spawn gstreamer thread"); +fn spawn_stdin_reader(addr: WeakAddress<MusicBot>) { + use tokio::io::AsyncBufReadExt; + + tokio::task::spawn(async move { + let stdin = tokio::io::stdin(); + let reader = tokio::io::BufReader::new(stdin); + let mut lines = reader.lines(); + + while let Some(line) = lines.next_line().await.unwrap() { + let message = MusicBotMessage::TextMessage(ChatMessage { + target: MessageTarget::Channel, + invoker: Invoker { + name: String::from("stdin"), + id: ClientId(0), + uid: None, + }, + text: line, + }); + + addr.send(message).await.unwrap().unwrap(); + } + }); } |
