diff options
Diffstat (limited to 'src/bot/music.rs')
| -rw-r--r-- | src/bot/music.rs | 358 |
1 files changed, 358 insertions, 0 deletions
diff --git a/src/bot/music.rs b/src/bot/music.rs new file mode 100644 index 0000000..3677796 --- /dev/null +++ b/src/bot/music.rs @@ -0,0 +1,358 @@ +use std::future::Future; +use std::io::BufRead; +use std::sync::{Arc, Mutex}; +use std::thread; + +use log::{debug, info}; +use structopt::StructOpt; +use tokio02::sync::mpsc::UnboundedSender; +use tsclientlib::{ClientId, ConnectOptions, Identity, Invoker, MessageTarget}; + +use crate::audio_player::{AudioPlayerError, AudioPlayer, PollResult}; +use crate::command::Command; +use crate::playlist::Playlist; +use crate::teamspeak::TeamSpeakConnection; +use crate::youtube_dl::AudioMetadata; + +#[derive(Debug)] +pub struct Message { + pub target: MessageTarget, + pub invoker: Invoker, + pub text: String, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum State { + Playing, + Paused, + Stopped, + EndOfStream, +} + +#[derive(Debug)] +pub enum MusicBotMessage { + TextMessage(Message), + StateChange(State), + Quit(String), +} + +pub struct MusicBot { + name: String, + player: Arc<AudioPlayer>, + teamspeak: Option<Arc<TeamSpeakConnection>>, + playlist: Arc<Mutex<Playlist>>, + state: Arc<Mutex<State>>, +} + +#[derive(Debug)] +pub struct MusicBotArgs { + pub name: String, + pub owner: Option<ClientId>, + pub local: bool, + pub address: String, + pub id: Identity, + pub channel: String, + pub verbose: u8, +} + +impl MusicBot { + pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) { + let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); + let tx = Arc::new(Mutex::new(tx)); + let (player, connection) = if args.local { + info!("Starting in CLI mode"); + let audio_player = AudioPlayer::new(tx.clone(), None).unwrap(); + + (audio_player, None) + } else { + info!("Starting in TeamSpeak mode"); + + let con_config = ConnectOptions::new(args.address) + .version(tsclientlib::Version::Linux_3_3_2) + .name(args.name.clone()) + .identity(args.id) + .log_commands(args.verbose >= 1) + .log_packets(args.verbose >= 2) + .log_udp_packets(args.verbose >= 3) + .channel(args.channel); + + let connection = Arc::new( + TeamSpeakConnection::new(tx.clone(), con_config) + .await + .unwrap(), + ); + let cconnection = connection.clone(); + let audio_player = AudioPlayer::new( + tx.clone(), + Some(Box::new(move |samples| { + cconnection.send_audio_packet(samples); + })), + ) + .unwrap(); + + (audio_player, Some(connection)) + }; + + player.set_volume(0.5).unwrap(); + let player = Arc::new(player); + let playlist = Arc::new(Mutex::new(Playlist::new())); + + spawn_gstreamer_thread(player.clone(), tx.clone()); + + if args.local { + spawn_stdin_reader(tx); + } + + let bot = Arc::new(Self { + name: args.name, + player, + teamspeak: connection, + playlist, + state: Arc::new(Mutex::new(State::Stopped)), + }); + + let cbot = bot.clone(); + let msg_loop = async move { + 'outer: loop { + while let Some(msg) = rx.recv().await { + if let MusicBotMessage::Quit(reason) = msg { + cbot.with_teamspeak(|ts| ts.disconnect(&reason)); + break 'outer; + } + cbot.on_message(msg).await.unwrap(); + } + } + debug!("Left message loop"); + }; + + (bot, msg_loop) + } + + #[inline(always)] + fn with_teamspeak<F: Fn(&TeamSpeakConnection)>(&self, func: F) { + if let Some(ts) = &self.teamspeak { + func(&ts); + } + } + + fn start_playing_audio(&self, metadata: AudioMetadata) { + if let Some(title) = metadata.title { + self.send_message(&format!("Playing '{}'", title)); + self.set_description(&format!("Currently playing '{}'", title)); + } else { + self.send_message("Playing unknown title"); + self.set_description("Currently playing"); + } + self.player.reset().unwrap(); + self.player.set_source_url(metadata.url).unwrap(); + self.player.play().unwrap(); + } + + pub async fn add_audio(&self, url: String) { + match crate::youtube_dl::get_audio_download_url(url).await { + Ok(metadata) => { + info!("Found audio url: {}", metadata.url); + + let mut playlist = self.playlist.lock().expect("Mutex was not poisoned"); + playlist.push(metadata.clone()); + + if !self.player.is_started() { + if let Some(request) = playlist.pop() { + self.start_playing_audio(request); + } + } else { + if let Some(title) = metadata.title { + self.send_message(&format!("Added '{}' to playlist", title)); + } else { + self.send_message("Added to playlist"); + } + } + } + Err(e) => { + info!("Failed to find audio url: {}", e); + + self.send_message(&format!("Failed to find url: {}", e)); + } + } + } + + fn send_message(&self, text: &str) { + debug!("Sending message to TeamSpeak: {}", text); + + self.with_teamspeak(|ts| ts.send_message_to_channel(text)); + } + + fn set_nickname(&self, name: &str) { + info!("Setting TeamsSpeak nickname to {}", name); + + self.with_teamspeak(|ts| ts.set_nickname(name)); + } + + fn set_description(&self, desc: &str) { + info!("Setting TeamsSpeak description to {}", desc); + + self.with_teamspeak(|ts| ts.set_description(desc)); + } + + async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> { + let msg = message.text; + if msg.starts_with("!") { + let tokens = msg[1..].split_whitespace().collect::<Vec<_>>(); + + match Command::from_iter_safe(&tokens) { + Ok(args) => self.on_command(args).await?, + Err(e) if e.kind == structopt::clap::ErrorKind::HelpDisplayed => { + self.send_message(&format!("\n{}", e.message)); + } + _ => (), + } + } + + Ok(()) + } + + async fn on_command(&self, command: Command) -> Result<(), AudioPlayerError> { + match command { + Command::Play => { + let playlist = self.playlist.lock().expect("Mutex was not poisoned"); + + if !self.player.is_started() { + if !playlist.is_empty() { + self.player.stop_current()?; + } + } else { + self.player.play()?; + } + } + Command::Add { url } => { + // strip bbcode tags from url + let url = url.replace("[URL]", "").replace("[/URL]", ""); + + self.add_audio(url.to_string()).await; + } + Command::Pause => { + self.player.pause()?; + } + Command::Stop => { + self.player.reset()?; + } + Command::Next => { + let playlist = self.playlist.lock().expect("Mutex was not poisoned"); + if !playlist.is_empty() { + info!("Skipping to next track"); + self.player.stop_current()?; + } else { + info!("Playlist empty, cannot skip"); + self.player.reset()?; + } + } + Command::Clear => { + self.playlist + .lock() + .expect("Mutex was not poisoned") + .clear(); + } + Command::Volume { percent: volume } => { + let volume = volume.max(0.0).min(100.0) * 0.01; + self.player.set_volume(volume)?; + } + Command::Leave => { + self.quit(String::from("Leaving")); + } + } + + Ok(()) + } + + fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { + let mut current_state = self.state.lock().unwrap(); + if *current_state != state { + match state { + State::Playing => { + self.set_nickname(&format!("{} - Playing", self.name)); + } + State::Paused => { + self.set_nickname(&format!("{} - Paused", self.name)); + } + State::Stopped => { + self.set_nickname(&self.name); + self.set_description(""); + } + State::EndOfStream => { + let next_track = self.playlist.lock().expect("Mutex was not poisoned").pop(); + if let Some(request) = next_track { + info!("Advancing playlist"); + + self.start_playing_audio(request); + } else { + self.set_nickname(&self.name); + self.set_description(""); + } + } + } + } + + *current_state = state; + + Ok(()) + } + + async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { + match message { + MusicBotMessage::TextMessage(message) => { + if MessageTarget::Channel == message.target { + self.on_text(message).await?; + } + } + MusicBotMessage::StateChange(state) => { + self.on_state(state)?; + } + MusicBotMessage::Quit(_) => (), + } + + Ok(()) + } + + pub fn quit(&self, reason: String) { + self.player.quit(reason); + } +} + +fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) { + thread::spawn(move || { + let stdin = ::std::io::stdin(); + let lock = stdin.lock(); + for line in lock.lines() { + let line = line.unwrap(); + + let message = MusicBotMessage::TextMessage(Message { + target: MessageTarget::Server, + invoker: Invoker { + name: String::from("stdin"), + id: ClientId(0), + uid: None, + }, + text: line, + }); + + let tx = tx.lock().unwrap(); + tx.send(message).unwrap(); + } + }); +} + +fn spawn_gstreamer_thread( + player: Arc<AudioPlayer>, + tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, +) { + thread::spawn(move || loop { + if player.poll() == PollResult::Quit { + break; + } + + tx.lock() + .unwrap() + .send(MusicBotMessage::StateChange(State::EndOfStream)) + .unwrap(); + }); +} |
