diff options
| -rw-r--r-- | .gitignore | 4 | ||||
| -rw-r--r-- | Cargo.lock | 19 | ||||
| -rw-r--r-- | Cargo.toml | 1 | ||||
| -rw-r--r-- | README.md | 13 | ||||
| -rw-r--r-- | config.toml.example | 18 | ||||
| -rw-r--r-- | src/audio_player.rs | 74 | ||||
| -rw-r--r-- | src/bot.rs | 5 | ||||
| -rw-r--r-- | src/bot/master.rs | 268 | ||||
| -rw-r--r-- | src/bot/music.rs | 408 | ||||
| -rw-r--r-- | src/command.rs | 2 | ||||
| -rw-r--r-- | src/main.rs | 419 | ||||
| -rw-r--r-- | src/teamspeak.rs | 157 | ||||
| -rw-r--r-- | src/youtube_dl.rs | 12 |
13 files changed, 981 insertions, 419 deletions
@@ -1,4 +1,4 @@ /target **/*.rs.bk -id.toml -log/
\ No newline at end of file +config.toml +log/ @@ -1786,6 +1786,7 @@ dependencies = [ "gstreamer-audio", "log", "log4rs", + "rand 0.7.3", "serde", "serde_json", "structopt", @@ -1914,22 +1915,23 @@ dependencies = [ "rand_isaac", "rand_jitter", "rand_os", - "rand_pcg", + "rand_pcg 0.1.2", "rand_xorshift", "winapi 0.3.8", ] [[package]] name = "rand" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ae1b169243eaf61759b8475a998f0a385e42042370f3a7dbaf35246eacc8412" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" dependencies = [ "getrandom", "libc", "rand_chacha 0.2.1", "rand_core 0.5.1", "rand_hc 0.2.0", + "rand_pcg 0.2.1", ] [[package]] @@ -2039,6 +2041,15 @@ dependencies = [ ] [[package]] +name = "rand_pcg" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16abd0c1b639e9eb4d7c50c0b8100b0d0f849be2349829c740fe8e6eb4816429" +dependencies = [ + "rand_core 0.5.1", +] + +[[package]] name = "rand_xorshift" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2569,7 +2580,7 @@ checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" dependencies = [ "cfg-if", "libc", - "rand 0.7.2", + "rand 0.7.3", "redox_syscall", "remove_dir_all", "winapi 0.3.8", @@ -33,3 +33,4 @@ gstreamer-audio = "0.15.0" byte-slice-cast = "0.3.5" serde_json = "1.0.44" serde = "1.0.104" +rand = { version = "0.7.3", features = ["small_rng"] } @@ -1,18 +1,23 @@ # pokebot ``` -pokebot 0.1.0 +pokebot 0.1.1 Jokler <jokler@protonmail.com> USAGE: - pokebot [FLAGS] [OPTIONS] + pokebot [FLAGS] [OPTIONS] [config_path] FLAGS: -h, --help Prints help information + -l, --local Run locally in text mode -V, --version Prints version information -v, --verbose Print the content of all packets OPTIONS: - -a, --address <address> The address of the server to connect to [default: localhost] - -i, --id <id_path> Identity file - good luck creating one + -a, --address <address> The address of the server to connect to + -g, --generate-identities <gen_id_count> Generate 'count' identities + -d, --master_channel <master_channel> The channel the master bot should connect to + +ARGS: + <config_path> Configuration file [default: config.toml] ``` diff --git a/config.toml.example b/config.toml.example new file mode 100644 index 0000000..c2fbc73 --- /dev/null +++ b/config.toml.example @@ -0,0 +1,18 @@ +# Name of the master bot +name = "PokeBot" +# Address of the server to connect to +address = "localhost" +# Channel for the master bot +channel = "Lobby" + +# Names for the music bots +names = ["MusicBot"] + +# Identity of the master bot +[id] +key = "insert-key-here" +counter = 0 +max_counter = 0 + +# Add identities for music bots here +# [[ids]] diff --git a/src/audio_player.rs b/src/audio_player.rs index 97a61cd..97ecfbf 100644 --- a/src/audio_player.rs +++ b/src/audio_player.rs @@ -6,7 +6,7 @@ use gstreamer as gst; use gstreamer_app::{AppSink, AppSinkCallbacks}; use gstreamer_audio::{StreamVolume, StreamVolumeFormat}; -use crate::{ApplicationMessage, State}; +use crate::bot::{MusicBotMessage, State}; use glib::BoolError; use log::{debug, error, info, warn}; use std::sync::{Arc, Mutex}; @@ -14,22 +14,10 @@ use tokio02::sync::mpsc::UnboundedSender; static GST_INIT: Once = Once::new(); -#[derive(Debug)] -pub enum AudioPlayerError { - GStreamerError(glib::error::BoolError), - StateChangeFailed, -} - -impl From<glib::error::BoolError> for AudioPlayerError { - fn from(err: BoolError) -> Self { - AudioPlayerError::GStreamerError(err) - } -} - -impl From<gst::StateChangeError> for AudioPlayerError { - fn from(_err: gst::StateChangeError) -> Self { - AudioPlayerError::StateChangeFailed - } +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +pub enum PollResult { + Continue, + Quit, } pub struct AudioPlayer { @@ -38,7 +26,7 @@ pub struct AudioPlayer { http_src: gst::Element, volume: gst::Element, - sender: Arc<Mutex<UnboundedSender<ApplicationMessage>>>, + sender: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, } fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> { @@ -87,7 +75,7 @@ fn add_decode_bin_new_pad_callback( impl AudioPlayer { pub fn new( - sender: Arc<Mutex<UnboundedSender<ApplicationMessage>>>, + sender: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, callback: Option<Box<dyn FnMut(&[u8]) + Send>>, ) -> Result<Self, AudioPlayerError> { GST_INIT.call_once(|| gst::init().unwrap()); @@ -239,13 +227,27 @@ impl AudioPlayer { Ok(()) } + pub fn quit(&self, reason: String) { + info!("Quitting audio player"); + + if let Err(e) = self + .bus + .post(&gst::Message::new_application(gst::Structure::new_empty("quit")).build()) + { + warn!("Failed to send \"quit\" app event: {}", e); + } + + let sender = self.sender.lock().unwrap(); + sender.send(MusicBotMessage::Quit(reason)).unwrap(); + } + fn send_state(&self, state: State) { info!("Sending state {:?} to application", state); let sender = self.sender.lock().unwrap(); - sender.send(ApplicationMessage::StateChange(state)).unwrap(); + sender.send(MusicBotMessage::StateChange(state)).unwrap(); } - pub fn poll(&self) { + pub fn poll(&self) -> PollResult { debug!("Polling GStreamer"); 'outer: loop { while let Some(msg) = self.bus.timed_pop(gst::ClockTime(None)) { @@ -308,12 +310,40 @@ impl AudioPlayer { ); break 'outer; } + MessageView::Application(content) => { + if let Some(s) = content.get_structure() { + if s.get_name() == "quit" { + self.reset().unwrap(); + return PollResult::Quit; + } + } + } _ => { - // debug!("{:?}", msg) + //debug!("{:?}", msg) } }; } } debug!("Left GStreamer message loop"); + + PollResult::Continue + } +} + +#[derive(Debug)] +pub enum AudioPlayerError { + GStreamerError(glib::error::BoolError), + StateChangeFailed, +} + +impl From<glib::error::BoolError> for AudioPlayerError { + fn from(err: BoolError) -> Self { + AudioPlayerError::GStreamerError(err) + } +} + +impl From<gst::StateChangeError> for AudioPlayerError { + fn from(_err: gst::StateChangeError) -> Self { + AudioPlayerError::StateChangeFailed } } diff --git a/src/bot.rs b/src/bot.rs new file mode 100644 index 0000000..95809f4 --- /dev/null +++ b/src/bot.rs @@ -0,0 +1,5 @@ +mod master; +mod music; + +pub use master::*; +pub use music::*; diff --git a/src/bot/master.rs b/src/bot/master.rs new file mode 100644 index 0000000..2488064 --- /dev/null +++ b/src/bot/master.rs @@ -0,0 +1,268 @@ +use std::collections::HashMap; +use std::future::Future; +use std::sync::{Arc, Mutex}; + +use futures::future::{FutureExt, TryFutureExt}; +use futures01::future::Future as Future01; +use log::info; +use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; +use serde::{Deserialize, Serialize}; +use tsclientlib::{ClientId, ConnectOptions, Identity, MessageTarget}; + +use crate::audio_player::AudioPlayerError; +use crate::teamspeak::TeamSpeakConnection; + +use crate::Args; + +use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage}; + +pub struct MasterBot { + config: Arc<MasterConfig>, + music_bots: Arc<Mutex<MusicBots>>, + teamspeak: Arc<TeamSpeakConnection>, +} + +struct MusicBots { + rng: SmallRng, + available_names: Vec<usize>, + available_ids: Vec<usize>, + connected_bots: HashMap<String, Arc<MusicBot>>, +} + +impl MasterBot { + pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) { + let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); + let tx = Arc::new(Mutex::new(tx)); + info!("Starting in TeamSpeak mode"); + + let mut con_config = ConnectOptions::new(args.address.clone()) + .version(tsclientlib::Version::Linux_3_3_2) + .name(args.master_name.clone()) + .identity(args.id) + .log_commands(args.verbose >= 1) + .log_packets(args.verbose >= 2) + .log_udp_packets(args.verbose >= 3); + + if let Some(channel) = args.channel { + con_config = con_config.channel(channel); + } + + let connection = Arc::new( + TeamSpeakConnection::new(tx.clone(), con_config) + .await + .unwrap(), + ); + + let config = Arc::new(MasterConfig { + master_name: args.master_name, + address: args.address, + names: args.names, + ids: args.ids, + local: args.local, + verbose: args.verbose, + }); + + let name_count = config.names.len(); + let id_count = config.ids.len(); + + let music_bots = Arc::new(Mutex::new(MusicBots { + rng: SmallRng::from_entropy(), + available_names: (0..name_count).collect(), + available_ids: (0..id_count).collect(), + connected_bots: HashMap::new(), + })); + + let bot = Arc::new(Self { + config, + music_bots, + teamspeak: connection, + }); + + bot.teamspeak + .set_description("Poke me if you want a music bot!"); + + let cbot = bot.clone(); + let msg_loop = async move { + loop { + while let Some(msg) = rx.recv().await { + cbot.on_message(msg).await.unwrap(); + } + } + }; + + (bot, msg_loop) + } + + fn build_bot_args_for(&self, id: ClientId) -> Option<MusicBotArgs> { + let channel = self + .teamspeak + .channel_of_user(id) + .expect("Can find poke sender"); + + if channel == self.teamspeak.my_channel() { + self.teamspeak.send_message_to_user( + id, + &format!( + "Joining the channel of \"{}\" is not allowed", + self.config.master_name + ), + ); + return None; + } + + let MusicBots { + ref mut rng, + ref mut available_names, + ref mut available_ids, + ref connected_bots, + } = &mut *self.music_bots.lock().expect("Mutex was not poisoned"); + + for (_, bot) in connected_bots { + if bot.my_channel() == channel { + self.teamspeak.send_message_to_user( + id, + &format!( + "\"{}\" is already in this channel. \ + Multiple bots in one channel are not allowed.", + bot.name() + ), + ); + return None; + } + } + + let channel_path = self + .teamspeak + .channel_path_of_user(id) + .expect("can find poke sender"); + + available_names.shuffle(rng); + let name_index = match available_names.pop() { + Some(v) => v, + None => { + self.teamspeak + .send_message_to_user(id, "Out of names. Too many bots are already connected!"); + return None; + } + }; + let name = self.config.names[name_index].clone(); + + available_ids.shuffle(rng); + let id_index = match available_ids.pop() { + Some(v) => v, + None => { + self.teamspeak.send_message_to_user( + id, + "Out of identities. Too many bots are already connected!", + ); + return None; + } + }; + + let id = self.config.ids[id_index].clone(); + + let cmusic_bots = self.music_bots.clone(); + let disconnect_cb = Box::new(move |n, name_index, id_index| { + let mut music_bots = cmusic_bots.lock().expect("Mutex was not poisoned"); + music_bots.connected_bots.remove(&n); + music_bots.available_names.push(name_index); + music_bots.available_ids.push(id_index); + }); + + info!("Connecting to {} on {}", channel_path, self.config.address); + + Some(MusicBotArgs { + name, + name_index, + id_index, + local: self.config.local, + address: self.config.address.clone(), + id, + channel: channel_path, + verbose: self.config.verbose, + disconnect_cb, + }) + } + + async fn spawn_bot_for(&self, id: ClientId) { + if let Some(bot_args) = self.build_bot_args_for(id) { + let (bot, fut) = MusicBot::new(bot_args).await; + tokio::spawn(fut.unit_error().boxed().compat().map(|_| ())); + let mut music_bots = self.music_bots.lock().expect("Mutex was not poisoned"); + music_bots + .connected_bots + .insert(bot.name().to_string(), bot); + } + } + + async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { + if let MusicBotMessage::TextMessage(message) = message { + if let MessageTarget::Poke(who) = message.target { + info!("Poked by {}, creating bot for their channel", who); + self.spawn_bot_for(who).await; + } + } + + Ok(()) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MasterArgs { + #[serde(default = "default_name")] + pub master_name: String, + #[serde(default = "default_local")] + pub local: bool, + pub address: String, + pub channel: Option<String>, + #[serde(default = "default_verbose")] + pub verbose: u8, + pub names: Vec<String>, + pub id: Identity, + pub ids: Vec<Identity>, +} + +fn default_name() -> String { + String::from("PokeBot") +} + +fn default_local() -> bool { + false +} + +fn default_verbose() -> u8 { + 0 +} + +impl MasterArgs { + pub fn merge(self, args: Args) -> Self { + let address = args.address.unwrap_or(self.address); + let local = args.local || self.local; + let channel = args.master_channel.or(self.channel); + let verbose = if args.verbose > 0 { + args.verbose + } else { + self.verbose + }; + + Self { + master_name: self.master_name, + names: self.names, + ids: self.ids, + local, + address, + id: self.id, + channel, + verbose, + } + } +} + +pub struct MasterConfig { + pub master_name: String, + pub address: String, + pub names: Vec<String>, + pub ids: Vec<Identity>, + pub local: bool, + pub verbose: u8, +} diff --git a/src/bot/music.rs b/src/bot/music.rs new file mode 100644 index 0000000..dee1514 --- /dev/null +++ b/src/bot/music.rs @@ -0,0 +1,408 @@ +use std::future::Future; +use std::io::BufRead; +use std::sync::{Arc, Mutex}; +use std::thread; + +use log::{debug, info}; +use structopt::StructOpt; +use tokio02::sync::mpsc::UnboundedSender; +use tsclientlib::{data, ChannelId, ClientId, ConnectOptions, Identity, Invoker, MessageTarget}; + +use crate::audio_player::{AudioPlayer, AudioPlayerError, PollResult}; +use crate::command::Command; +use crate::playlist::Playlist; +use crate::teamspeak::TeamSpeakConnection; +use crate::youtube_dl::AudioMetadata; + +#[derive(Debug)] +pub struct Message { + pub target: MessageTarget, + pub invoker: Invoker, + pub text: String, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum State { + Playing, + Paused, + Stopped, + EndOfStream, +} + +#[derive(Debug)] +pub enum MusicBotMessage { + TextMessage(Message), + ClientChannel { + client: ClientId, + old_channel: ChannelId, + }, + ClientDisconnected { + id: ClientId, + client: data::Client, + }, + StateChange(State), + Quit(String), +} + +pub struct MusicBot { + name: String, + player: Arc<AudioPlayer>, + teamspeak: Option<Arc<TeamSpeakConnection>>, + playlist: Arc<Mutex<Playlist>>, + state: Arc<Mutex<State>>, +} + +pub struct MusicBotArgs { + pub name: String, + pub name_index: usize, + pub id_index: usize, + pub local: bool, + pub address: String, + pub id: Identity, + pub channel: String, + pub verbose: u8, + pub disconnect_cb: Box<dyn FnMut(String, usize, usize) + Send + Sync>, +} + +impl MusicBot { + pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) { + let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); + let tx = Arc::new(Mutex::new(tx)); + let (player, connection) = if args.local { + info!("Starting in CLI mode"); + let audio_player = AudioPlayer::new(tx.clone(), None).unwrap(); + + (audio_player, None) + } else { + info!("Starting in TeamSpeak mode"); + + let con_config = ConnectOptions::new(args.address) + .version(tsclientlib::Version::Linux_3_3_2) + .name(format!("🎵 {}", args.name)) + .identity(args.id) + .log_commands(args.verbose >= 1) + .log_packets(args.verbose >= 2) + .log_udp_packets(args.verbose >= 3) + .channel(args.channel); + + let connection = Arc::new( + TeamSpeakConnection::new(tx.clone(), con_config) + .await + .unwrap(), + ); + let cconnection = connection.clone(); + let audio_player = AudioPlayer::new( + tx.clone(), + Some(Box::new(move |samples| { + cconnection.send_audio_packet(samples); + })), + ) + .unwrap(); + + (audio_player, Some(connection)) + }; + + player.set_volume(0.5).unwrap(); + let player = Arc::new(player); + let playlist = Arc::new(Mutex::new(Playlist::new())); + + spawn_gstreamer_thread(player.clone(), tx.clone()); + + if args.local { + spawn_stdin_reader(tx); + } + + let bot = Arc::new(Self { + name: args.name.clone(), + player, + teamspeak: connection, + playlist, + state: Arc::new(Mutex::new(State::Stopped)), + }); + + let cbot = bot.clone(); + let mut disconnect_cb = args.disconnect_cb; + let name = args.name; + let name_index = args.name_index; + let id_index = args.id_index; + let msg_loop = async move { + 'outer: loop { + while let Some(msg) = rx.recv().await { + if let MusicBotMessage::Quit(reason) = msg { + cbot.with_teamspeak(|ts| ts.disconnect(&reason)); + disconnect_cb(name, name_index, id_index); + break 'outer; + } + cbot.on_message(msg).await.unwrap(); + } + } + debug!("Left message loop"); + }; + + (bot, msg_loop) + } + + #[inline(always)] + fn with_teamspeak<F: Fn(&TeamSpeakConnection)>(&self, func: F) { + if let Some(ts) = &self.teamspeak { + func(&ts); + } + } + + fn start_playing_audio(&self, metadata: AudioMetadata) { + if let Some(title) = metadata.title { + self.send_message(&format!("Playing '{}'", title)); + self.set_description(&format!("Currently playing '{}'", title)); + } else { + self.send_message("Playing unknown title"); + self.set_description("Currently playing"); + } + self.player.reset().unwrap(); + self.player.set_source_url(metadata.url).unwrap(); + self.player.play().unwrap(); + } + + pub async fn add_audio(&self, url: String) { + match crate::youtube_dl::get_audio_download_url(url).await { + Ok(metadata) => { + info!("Found audio url: {}", metadata.url); + + let mut playlist = self.playlist.lock().expect("Mutex was not poisoned"); + playlist.push(metadata.clone()); + + if !self.player.is_started() { + if let Some(request) = playlist.pop() { + self.start_playing_audio(request); + } + } else { + if let Some(title) = metadata.title { + self.send_message(&format!("Added '{}' to playlist", title)); + } else { + self.send_message("Added to playlist"); + } + } + } + Err(e) => { + info!("Failed to find audio url: {}", e); + + self.send_message(&format!("Failed to find url: {}", e)); + } + } + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn my_channel(&self) -> ChannelId { + self.teamspeak + .as_ref() + .map(|ts| ts.my_channel()) + .expect("my_channel needs ts") + } + + fn user_count(&self, channel: ChannelId) -> u32 { + self.teamspeak + .as_ref() + .map(|ts| ts.user_count(channel)) + .expect("user_count needs ts") + } + + fn send_message(&self, text: &str) { + debug!("Sending message to TeamSpeak: {}", text); + + self.with_teamspeak(|ts| ts.send_message_to_channel(text)); + } + + fn set_nickname(&self, name: &str) { + info!("Setting TeamsSpeak nickname to {}", name); + + self.with_teamspeak(|ts| ts.set_nickname(name)); + } + + fn set_description(&self, desc: &str) { + info!("Setting TeamsSpeak description to {}", desc); + + self.with_teamspeak(|ts| ts.set_description(desc)); + } + + async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> { + let msg = message.text; + if msg.starts_with("!") { + let tokens = msg[1..].split_whitespace().collect::<Vec<_>>(); + + match Command::from_iter_safe(&tokens) { + Ok(args) => self.on_command(args).await?, + Err(e) if e.kind == structopt::clap::ErrorKind::HelpDisplayed => { + self.send_message(&format!("\n{}", e.message)); + } + _ => (), + } + } + + Ok(()) + } + + async fn on_command(&self, command: Command) -> Result<(), AudioPlayerError> { + match command { + Command::Play => { + let playlist = self.playlist.lock().expect("Mutex was not poisoned"); + + if !self.player.is_started() { + if !playlist.is_empty() { + self.player.stop_current()?; + } + } else { + self.player.play()?; + } + } + Command::Add { url } => { + // strip bbcode tags from url + let url = url.replace("[URL]", "").replace("[/URL]", ""); + + self.add_audio(url.to_string()).await; + } + Command::Pause => { + self.player.pause()?; + } + Command::Stop => { + self.player.reset()?; + } + Command::Next => { + let playlist = self.playlist.lock().expect("Mutex was not poisoned"); + if !playlist.is_empty() { + info!("Skipping to next track"); + self.player.stop_current()?; + } else { + info!("Playlist empty, cannot skip"); + self.player.reset()?; + } + } + Command::Clear => { + self.playlist + .lock() + .expect("Mutex was not poisoned") + .clear(); + } + Command::Volume { percent: volume } => { + let volume = volume.max(0.0).min(100.0) * 0.01; + self.player.set_volume(volume)?; + } + Command::Leave => { + self.quit(String::from("Leaving")); + } + } + + Ok(()) + } + + fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { + let mut current_state = self.state.lock().unwrap(); + if *current_state != state { + match state { + State::Playing => { + self.set_nickname(&format!("🎵 {} - Playing", self.name)); + } + State::Paused => { + self.set_nickname(&format!("🎵 {} - Paused", self.name)); + } + State::Stopped => { + self.set_nickname(&format!("🎵 {}", self.name)); + self.set_description(""); + } + State::EndOfStream => { + let next_track = self.playlist.lock().expect("Mutex was not poisoned").pop(); + if let Some(request) = next_track { + info!("Advancing playlist"); + + self.start_playing_audio(request); + } else { + self.set_nickname(&format!("🎵 {}", self.name)); + self.set_description(""); + } + } + } + } + + *current_state = state; + + Ok(()) + } + + async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { + match message { + MusicBotMessage::TextMessage(message) => { + if MessageTarget::Channel == message.target { + self.on_text(message).await?; + } + } + MusicBotMessage::ClientChannel { + client: _, + old_channel, + } => { + self.on_client_left_channel(old_channel); + } + MusicBotMessage::ClientDisconnected { id: _, client } => { + let old_channel = client.channel; + self.on_client_left_channel(old_channel); + } + MusicBotMessage::StateChange(state) => { + self.on_state(state)?; + } + MusicBotMessage::Quit(_) => (), + } + + Ok(()) + } + + fn on_client_left_channel(&self, old_channel: ChannelId) { + let my_channel = self.my_channel(); + if old_channel == my_channel && self.user_count(my_channel) <= 1 { + self.quit(String::from("Channel is empty")); + } + } + + pub fn quit(&self, reason: String) { + self.player.quit(reason); + } +} + +fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) { + debug!("Spawning stdin reader thread"); + thread::spawn(move || { + let stdin = ::std::io::stdin(); + let lock = stdin.lock(); + for line in lock.lines() { + let line = line.unwrap(); + + let message = MusicBotMessage::TextMessage(Message { + target: MessageTarget::Channel, + invoker: Invoker { + name: String::from("stdin"), + id: ClientId(0), + uid: None, + }, + text: line, + }); + + let tx = tx.lock().unwrap(); + tx.send(message).unwrap(); + } + }); +} + +fn spawn_gstreamer_thread( + player: Arc<AudioPlayer>, + tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, +) { + thread::spawn(move || loop { + if player.poll() == PollResult::Quit { + break; + } + + tx.lock() + .unwrap() + .send(MusicBotMessage::StateChange(State::EndOfStream)) + .unwrap(); + }); +} diff --git a/src/command.rs b/src/command.rs index fbc714c..3a39290 100644 --- a/src/command.rs +++ b/src/command.rs @@ -27,4 +27,6 @@ pub enum Command { Clear, /// Changes the volume to the specified value Volume { percent: f64 }, + /// Leaves the channel + Leave, } diff --git a/src/main.rs b/src/main.rs index f4f7559..922162f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,52 +1,51 @@ -use std::io::{BufRead, Read}; +use std::fs::File; +use std::io::{Read, Write}; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; -use std::thread; use futures::future::{FutureExt, TryFutureExt}; use log::{debug, info}; use structopt::clap::AppSettings; use structopt::StructOpt; -use tokio02::sync::mpsc::UnboundedSender; -use tsclientlib::{ClientId, ConnectOptions, Identity, Invoker, MessageTarget}; +use tsclientlib::Identity; mod audio_player; +mod bot; mod command; mod playlist; mod teamspeak; mod youtube_dl; -use audio_player::*; -use playlist::*; -use teamspeak::*; -use youtube_dl::AudioMetadata; -use command::Command; +use bot::{MasterArgs, MasterBot, MusicBot, MusicBotArgs}; #[derive(StructOpt, Debug)] #[structopt(raw(global_settings = "&[AppSettings::ColoredHelp]"))] -struct Args { +pub struct Args { #[structopt(short = "l", long = "local", help = "Run locally in text mode")] local: bool, #[structopt( + short = "g", + long = "generate-identities", + help = "Generate 'count' identities" + )] + gen_id_count: Option<u8>, + #[structopt( short = "a", long = "address", - default_value = "localhost", help = "The address of the server to connect to" )] - address: String, + address: Option<String>, #[structopt( - short = "i", - long = "id", - help = "Identity file - good luck creating one", - parse(from_os_str) + help = "Configuration file", + parse(from_os_str), + default_value = "config.toml" )] - id_path: Option<PathBuf>, + config_path: PathBuf, #[structopt( - short = "c", - long = "channel", - help = "The channel the bot should connect to" + short = "d", + long = "master_channel", + help = "The channel the master bot should connect to" )] - default_channel: Option<String>, + master_channel: Option<String>, #[structopt( short = "v", long = "verbose", @@ -60,352 +59,70 @@ struct Args { // 3. Print udp packets } -#[derive(Debug)] -pub struct Message { - pub target: MessageTarget, - pub invoker: Invoker, - pub text: String, -} - -#[derive(Debug, PartialEq, Eq)] -pub enum State { - Playing, - Paused, - Stopped, - EndOfStream, -} - -#[derive(Debug)] -pub enum ApplicationMessage { - TextMessage(Message), - StateChange(State), -} - -struct Application { - player: Arc<AudioPlayer>, - teamspeak: Option<Arc<TeamSpeakConnection>>, - playlist: Arc<Mutex<Playlist>>, - state: Arc<Mutex<State>>, -} - -impl Application { - pub fn new( - player: Arc<AudioPlayer>, - playlist: Arc<Mutex<Playlist>>, - teamspeak: Option<Arc<TeamSpeakConnection>>, - ) -> Self { - Self { - player, - teamspeak, - playlist, - state: Arc::new(Mutex::new(State::Stopped)), - } - } - - #[inline(always)] - fn with_teamspeak<F: Fn(&TeamSpeakConnection)>(&self, func: F) { - if let Some(ts) = &self.teamspeak { - func(&ts); - } - } - - fn start_playing_audio(&self, metadata: AudioMetadata) { - if let Some(title) = metadata.title { - self.send_message(&format!("Playing '{}'", title)); - self.set_description(&format!("Currently playing '{}'", title)); - } else { - self.send_message("Playing unknown title"); - self.set_description("Currently playing"); - } - self.player.reset().unwrap(); - self.player.set_source_url(metadata.url).unwrap(); - self.player.play().unwrap(); - } - - pub async fn add_audio(&self, url: String) { - match youtube_dl::get_audio_download_url(url).await { - Ok(metadata) => { - info!("Found audio url: {}", metadata.url); - - let mut playlist = self.playlist.lock().expect("Mutex was not poisoned"); - playlist.push(metadata.clone()); - - if !self.player.is_started() { - if let Some(request) = playlist.pop() { - self.start_playing_audio(request); - } - } else { - if let Some(title) = metadata.title { - self.send_message(&format!("Added '{}' to playlist", title)); - } else { - self.send_message("Added to playlist"); - } - } - } - Err(e) => { - info!("Failed to find audio url: {}", e); - - self.send_message(&format!("Failed to find url: {}", e)); - } - } - } - - fn send_message(&self, text: &str) { - debug!("Sending message to TeamSpeak: {}", text); - - self.with_teamspeak(|ts| ts.send_message_to_channel(text)); - } - - fn set_nickname(&self, name: &str) { - info!("Setting TeamsSpeak nickname to {}", name); - - self.with_teamspeak(|ts| ts.set_nickname(name)); - } - - fn set_description(&self, desc: &str) { - info!("Setting TeamsSpeak description to {}", desc); - - self.with_teamspeak(|ts| ts.set_description(desc)); - } - - async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> { - let msg = message.text; - if msg.starts_with("!") { - let tokens = msg[1..].split_whitespace().collect::<Vec<_>>(); - - match Command::from_iter_safe(&tokens) { - Ok(args) => self.on_command(args).await?, - Err(e) if e.kind == structopt::clap::ErrorKind::HelpDisplayed => { - self.send_message(&format!("\n{}", e.message)); - } - _ => (), - } - } - - Ok(()) - } - - async fn on_command(&self, command: Command) -> Result<(), AudioPlayerError> { - match command { - Command::Play => { - let playlist = self.playlist.lock().expect("Mutex was not poisoned"); - - if !self.player.is_started() { - if !playlist.is_empty() { - self.player.stop_current()?; - } - } else { - self.player.play()?; - } - } - Command::Add { url } => { - // strip bbcode tags from url - let url = url.replace("[URL]", "").replace("[/URL]", ""); - - self.add_audio(url.to_string()).await; - } - Command::Pause => { - self.player.pause()?; - } - Command::Stop => { - self.player.reset()?; - } - Command::Next => { - let playlist = self.playlist.lock().expect("Mutex was not poisoned"); - if !playlist.is_empty() { - info!("Skipping to next track"); - self.player.stop_current()?; - } else { - info!("Playlist empty, cannot skip"); - self.player.reset()?; - } - } - Command::Clear => { - self.playlist - .lock() - .expect("Mutex was not poisoned") - .clear(); - } - Command::Volume { percent: volume } => { - let volume = volume.max(0.0).min(100.0) * 0.01; - self.player.set_volume(volume)?; - } - } - - Ok(()) - } - - fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { - let mut current_state = self.state.lock().unwrap(); - if *current_state != state { - match state { - State::Playing => { - self.set_nickname("PokeBot - Playing"); - } - State::Paused => { - self.set_nickname("PokeBot - Paused"); - } - State::Stopped => { - self.set_nickname("PokeBot"); - self.set_description(""); - } - State::EndOfStream => { - let next_track = self.playlist.lock().expect("Mutex was not poisoned").pop(); - if let Some(request) = next_track { - info!("Advancing playlist"); - - self.start_playing_audio(request); - } else { - self.set_nickname("PokeBot"); - self.set_description(""); - } - } - } - } - - *current_state = state; - - Ok(()) - } - - pub async fn on_message(&self, message: ApplicationMessage) -> Result<(), AudioPlayerError> { - match message { - ApplicationMessage::TextMessage(message) => { - if let MessageTarget::Poke(who) = message.target { - info!("Poked by {}, joining their channel", who); - self.with_teamspeak(|ts| ts.join_channel_of_user(who)); - } else { - self.on_text(message).await?; - } - } - ApplicationMessage::StateChange(state) => { - self.on_state(state)?; - } - } - - Ok(()) +fn main() { + if let Err(e) = run() { + println!("Error: {}", e); } } -fn main() { +fn run() -> Result<(), Box<dyn std::error::Error>> { log4rs::init_file("log4rs.yml", Default::default()).unwrap(); - tokio::run(async_main().unit_error().boxed().compat()); -} - -async fn async_main() { - info!("Starting PokeBot!"); - // Parse command line options let args = Args::from_args(); - debug!("Received CLI arguments: {:?}", std::env::args()); + let mut file = File::open(&args.config_path)?; + let mut toml = String::new(); + file.read_to_string(&mut toml)?; - let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); - let tx = Arc::new(Mutex::new(tx)); - let (player, connection) = if args.local { - info!("Starting in CLI mode"); - let audio_player = AudioPlayer::new(tx.clone(), None).unwrap(); + let mut config: MasterArgs = toml::from_str(&toml)?; - (audio_player, None) - } else { - info!("Starting in TeamSpeak mode"); - - let id = if let Some(path) = args.id_path { - let mut file = std::fs::File::open(path).expect("Failed to open id file"); - let mut content = String::new(); - file.read_to_string(&mut content) - .expect("Failed to read id file"); - - toml::from_str(&content).expect("Failed to parse id file") - } else { - Identity::create().expect("Failed to create id") - }; - - let mut con_config = ConnectOptions::new(args.address) - .version(tsclientlib::Version::Linux_3_3_2) - .name(String::from("PokeBot")) - .identity(id) - .log_commands(args.verbose >= 1) - .log_packets(args.verbose >= 2) - .log_udp_packets(args.verbose >= 3); - - if let Some(channel) = args.default_channel { - con_config = con_config.channel(channel); + if let Some(count) = args.gen_id_count { + for _ in 0..count { + let id = Identity::create().expect("Failed to create id"); + config.ids.push(id); } - let connection = Arc::new( - TeamSpeakConnection::new(tx.clone(), con_config) - .await - .unwrap(), - ); - let cconnection = connection.clone(); - let audio_player = AudioPlayer::new( - tx.clone(), - Some(Box::new(move |samples| { - cconnection.send_audio_packet(samples); - })), - ) - .unwrap(); - - (audio_player, Some(connection)) - }; + let toml = toml::to_string(&config)?; + let mut file = File::create(&args.config_path)?; + file.write_all(toml.as_bytes())?; - player.set_volume(0.5).unwrap(); - let player = Arc::new(player); - let playlist = Arc::new(Mutex::new(Playlist::new())); - let application = Arc::new(Application::new( - player.clone(), - playlist.clone(), - connection, - )); - - spawn_gstreamer_thread(player, tx.clone()); - - if args.local { - spawn_stdin_reader(tx); - } - - loop { - while let Some(msg) = rx.recv().await { - application.on_message(msg).await.unwrap(); - } + return Ok(()); } -} -fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<ApplicationMessage>>>) { - thread::spawn(move || { - let stdin = ::std::io::stdin(); - let lock = stdin.lock(); - for line in lock.lines() { - let line = line.unwrap(); + let bot_args = config.merge(args); - let message = ApplicationMessage::TextMessage(Message { - target: MessageTarget::Server, - invoker: Invoker { - name: String::from("stdin"), - id: ClientId(0), - uid: None, - }, - text: line, - }); + info!("Starting PokeBot!"); + debug!("Received CLI arguments: {:?}", std::env::args()); - let tx = tx.lock().unwrap(); - tx.send(message).unwrap(); + tokio::run( + async { + if bot_args.local { + let name = bot_args.names[0].clone(); + let id = bot_args.ids[0].clone(); + + let disconnect_cb = Box::new(move |_, _, _| {}); + + let bot_args = MusicBotArgs { + name: name, + name_index: 0, + id_index: 0, + local: true, + address: bot_args.address.clone(), + id, + channel: String::from("local"), + verbose: bot_args.verbose, + disconnect_cb, + }; + MusicBot::new(bot_args).await.1.await; + } else { + MasterBot::new(bot_args).await.1.await; + } } - }); -} - -fn spawn_gstreamer_thread( - player: Arc<AudioPlayer>, - tx: Arc<Mutex<UnboundedSender<ApplicationMessage>>>, -) { - thread::spawn(move || loop { - player.poll(); + .unit_error() + .boxed() + .compat(), + ); - tx.lock() - .unwrap() - .send(ApplicationMessage::StateChange(State::EndOfStream)) - .unwrap(); - }); + Ok(()) } diff --git a/src/teamspeak.rs b/src/teamspeak.rs index 79dc1bc..b429869 100644 --- a/src/teamspeak.rs +++ b/src/teamspeak.rs @@ -1,36 +1,78 @@ +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + use futures::compat::Future01CompatExt; use futures01::{future::Future, sink::Sink}; use tokio02::sync::mpsc::UnboundedSender; -use crate::{ApplicationMessage, Message}; -use std::sync::{Arc, Mutex}; use tsclientlib::Event::ConEvents; -use tsclientlib::{events::Event, ClientId, ConnectOptions, Connection, MessageTarget}; +use tsclientlib::{ + events::Event, ChannelId, ClientId, ConnectOptions, Connection, DisconnectOptions, + MessageTarget, Reason, +}; use log::error; +use crate::bot::{Message, MusicBotMessage}; + pub struct TeamSpeakConnection { conn: Connection, } -fn get_message<'a>(event: &Event) -> Option<Message> { +fn get_message<'a>(event: &Event) -> Option<MusicBotMessage> { + use tsclientlib::events::{PropertyId, PropertyValue}; + match event { Event::Message { from: target, invoker: sender, message: msg, - } => Some(Message { - target: target.clone(), + } => Some(MusicBotMessage::TextMessage(Message { + target: *target, invoker: sender.clone(), text: msg.clone(), - }), + })), + Event::PropertyChanged { + id: property, + old: from, + invoker: _, + } => match property { + PropertyId::ClientChannel(client) => { + if let PropertyValue::ChannelId(from) = from { + Some(MusicBotMessage::ClientChannel { + client: *client, + old_channel: *from, + }) + } else { + None + } + } + _ => None, + }, + Event::PropertyRemoved { + id: property, + old: client, + invoker: _, + } => match property { + PropertyId::Client(id) => { + if let PropertyValue::Client(client) = client { + Some(MusicBotMessage::ClientDisconnected { + id: *id, + client: client.clone(), + }) + } else { + None + } + } + _ => None, + }, _ => None, } } impl TeamSpeakConnection { pub async fn new( - tx: Arc<Mutex<UnboundedSender<ApplicationMessage>>>, + tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>, options: ConnectOptions, ) -> Result<TeamSpeakConnection, tsclientlib::Error> { let conn = Connection::new(options).compat().await?; @@ -43,8 +85,9 @@ impl TeamSpeakConnection { if let ConEvents(_conn, events) = e { for event in *events { if let Some(msg) = get_message(event) { - let tx = tx.lock().unwrap(); - tx.send(ApplicationMessage::TextMessage(msg)).unwrap(); + let tx = tx.lock().expect("Mutex was not poisoned"); + // Ignore the result because the receiver might get dropped first. + let _ = tx.send(msg); } } } @@ -72,23 +115,58 @@ impl TeamSpeakConnection { tokio::run(send_packet); } - pub fn join_channel_of_user(&self, id: ClientId) { - let channel = self - .conn - .lock() - .clients - .get(&id) - .expect("can find poke sender") - .channel; - tokio::spawn( - self.conn - .lock() - .to_mut() - .get_client(&self.conn.lock().own_client) - .expect("can get myself") - .set_channel(channel) - .map_err(|e| error!("Failed to switch channel: {}", e)), - ); + pub fn channel_of_user(&self, id: ClientId) -> Option<ChannelId> { + Some(self.conn.lock().clients.get(&id)?.channel) + } + + pub fn channel_path_of_user(&self, id: ClientId) -> Option<String> { + let conn = self.conn.lock(); + + let channel_id = conn.clients.get(&id)?.channel; + + let mut channel = conn + .channels + .get(&channel_id) + .expect("can find user channel"); + + let mut names = vec![&channel.name[..]]; + + // Channel 0 is the root channel + while channel.parent != ChannelId(0) { + names.push("/"); + channel = conn + .channels + .get(&channel.parent) + .expect("can find user channel"); + names.push(&channel.name); + } + + let mut path = String::new(); + while let Some(name) = names.pop() { + path.push_str(name); + } + + Some(path) + } + + pub fn my_channel(&self) -> ChannelId { + let conn = self.conn.lock(); + conn.clients + .get(&conn.own_client) + .expect("can find myself") + .channel + } + + pub fn user_count(&self, channel: ChannelId) -> u32 { + let conn = self.conn.lock(); + let mut count = 0; + for (_, client) in &conn.clients { + if client.channel == channel { + count += 1; + } + } + + count } pub fn set_nickname(&self, name: &str) { @@ -122,4 +200,29 @@ impl TeamSpeakConnection { .map_err(|e| error!("Failed to send message: {}", e)), ); } + + pub fn send_message_to_user(&self, id: ClientId, text: &str) { + tokio::spawn( + self.conn + .lock() + .to_mut() + .send_message(MessageTarget::Client(id), text) + .map_err(|e| error!("Failed to send message: {}", e)), + ); + } + + pub fn disconnect(&self, reason: &str) { + let opt = DisconnectOptions::new() + .reason(Reason::Clientdisconnect) + .message(reason); + tokio::spawn( + self.conn + .disconnect(opt) + .map_err(|e| error!("Failed to send message: {}", e)), + ); + // Might or might not be required to keep tokio running while the bot disconnects + tokio::spawn( + tokio::timer::Delay::new(Instant::now() + Duration::from_secs(1)).map_err(|_| ()), + ); + } } diff --git a/src/youtube_dl.rs b/src/youtube_dl.rs index a917c54..c6012f0 100644 --- a/src/youtube_dl.rs +++ b/src/youtube_dl.rs @@ -1,8 +1,8 @@ +use futures::compat::Future01CompatExt; use std::process::{Command, Stdio}; use tokio_process::CommandExt; -use futures::compat::Future01CompatExt; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use log::debug; @@ -13,13 +13,7 @@ pub struct AudioMetadata { } pub async fn get_audio_download_url(uri: String) -> Result<AudioMetadata, String> { - let ytdl_args = [ - "--no-playlist", - "-f", - "bestaudio/best", - "-j", - &uri, - ]; + let ytdl_args = ["--no-playlist", "-f", "bestaudio/best", "-j", &uri]; let mut cmd = Command::new("youtube-dl"); cmd.args(&ytdl_args); |
