diff options
| author | Felix Kaaman <tmtu@tmtu.ee> | 2020-01-07 22:12:28 +0200 |
|---|---|---|
| committer | Felix Kaaman <tmtu@tmtu.ee> | 2020-01-12 12:08:21 +0200 |
| commit | 193987e0c7185eb63827b2d91f1e2779c4e557d3 (patch) | |
| tree | 64535e2e534e57ef94a057f8010b87c88fc56021 /src/main.rs | |
| parent | fc62434581e5f7411177e7c30dd1f4543ec354be (diff) | |
| download | pokebot-193987e0c7185eb63827b2d91f1e2779c4e557d3.tar.gz pokebot-193987e0c7185eb63827b2d91f1e2779c4e557d3.zip | |
Add enterprise logging and split out gstreamer & teamspeak specific code
Diffstat (limited to 'src/main.rs')
| -rw-r--r-- | src/main.rs | 472 |
1 files changed, 315 insertions, 157 deletions
diff --git a/src/main.rs b/src/main.rs index cdb0bd6..c4fdd2f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,31 +1,39 @@ -use std::io::Read; +use std::io::{Read, BufRead}; use std::path::PathBuf; use std::str::FromStr; +use std::thread; +use std::sync::{Arc, Mutex}; use futures::{ - compat::Future01CompatExt, future::{FutureExt, TryFutureExt}, }; - -use futures01::future::Future; use structopt::clap::AppSettings; use structopt::StructOpt; - use tsclientlib::{ - events::Event, ConnectOptions, Connection, ConnectionLock, Event::ConEvents, Identity, - MessageTarget, + ConnectOptions, Identity, MessageTarget, Invoker, ClientId, }; +use log::{info, debug}; -use log::error; - +mod audio_player; +mod youtube_dl; +mod teamspeak; mod playlist; -mod state; -use state::State; + +use audio_player::*; +use teamspeak::*; +use playlist::*; +use std::sync::mpsc::Sender; #[derive(StructOpt, Debug)] #[structopt(raw(global_settings = "&[AppSettings::ColoredHelp]"))] struct Args { #[structopt( + short = "l", + long = "local", + help = "Run locally in text mode" + )] + local: bool, + #[structopt( short = "a", long = "address", default_value = "localhost", @@ -58,174 +66,324 @@ struct Args { // 3. Print udp packets } +pub struct Message { + pub target: MessageTarget, + pub invoker: Invoker, + pub text: String, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum State { + Playing, + Paused, + Stopped, + EndOfStream, +} + +pub enum ApplicationMessage { + TextMessage(Message), + StateChange(State), +} + +struct Application { + player: Arc<AudioPlayer>, + teamspeak: Option<Arc<TeamSpeakConnection>>, + playlist: Arc<Mutex<Playlist>>, + state: Arc<Mutex<State>>, +} + +impl Application { + pub fn new(player: Arc<AudioPlayer>, playlist: Arc<Mutex<Playlist>>, teamspeak: Option<Arc<TeamSpeakConnection>>) -> Self { + Self { + player, + teamspeak, + playlist, + state: Arc::new(Mutex::new(State::Stopped)), + } + } + + #[inline(always)] + fn with_teamspeak<F: Fn(&TeamSpeakConnection)>(&self, func: F) { + if let Some(ts) = &self.teamspeak { + func(&ts); + } + } + + fn start_playing_audio(&self, request: AudioRequest) { + self.send_message(&format!("Playing '{}'", request.title)); + self.set_description(&format!("Currently playing '{}'", request.title)); + self.player.reset().unwrap(); + self.player.set_source_url(request.address).unwrap(); + self.player.play().unwrap(); + } + + pub fn add_audio(&self, url: String) { + if self.playlist.lock().expect("Mutex was not poisoned").is_full() { + info!("Audio playlist is full"); + self.send_message("Playlist is full"); + return; + } + + match youtube_dl::get_audio_download_url(url) { + Ok((audio_url, audio_title)) => { + info!("Found audio url: {}", audio_url); + + let request = AudioRequest { + title: audio_title, + address: audio_url, + }; + + let mut playlist = self.playlist.lock().expect("Mutex was not poisoned"); + playlist.push(request.clone()); + + if !self.player.is_started() { + if let Some(request) = playlist.pop() { + self.start_playing_audio(request); + } + } else { + self.send_message(&format!("Added '{}' to playlist", request.title)); + } + } + Err(e) => { + info!("Failed to find audio url: {}", e); + + self.send_message(&format!("Failed to find url: {}", e)); + } + } + } + + fn send_message(&self, text: &str) { + debug!("Sending message to TeamSpeak: {}", text); + + self.with_teamspeak(|ts| ts.send_message_to_channel(text)); + } + + fn set_nickname(&self, name: &str) { + info!("Setting TeamsSpeak nickname to {}", name); + + self.with_teamspeak(|ts| ts.set_nickname(name)); + } + + fn set_description(&self, desc: &str) { + info!("Setting TeamsSpeak description to {}", desc); + + self.with_teamspeak(|ts| ts.set_description(desc)); + } + + fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> { + let msg = message.text; + if msg.starts_with("!") { + let tokens = msg[1..].split_whitespace().collect::<Vec<_>>(); + + match tokens.get(0).map(|t| *t) { + Some("add") => { + if let Some(url) = &tokens.get(1) { + // strip bbcode tags from url + let url = url.replace("[URL]", "").replace("[/URL]", ""); + + self.add_audio(url.to_string()); + } + } + Some("play") => { + if !self.player.is_started() { + self.player.stop_current(); + } else { + self.player.play()?; + } + } + Some("pause") => { + self.player.pause()?; + } + Some("stop") => { + self.player.reset()?; + } + Some("next") => { + let playlist = self.playlist.lock().expect("Mutex was not poisoned"); + if !playlist.is_empty() { + info!("Skipping to next track"); + self.player.stop_current(); + } else { + info!("Playlist empty, cannot skip"); + self.player.reset()?; + } + } + Some("clear") => { + self.playlist.lock().expect("Mutex was not poisoned").clear(); + } + Some("volume") => { + if let Some(&volume) = &tokens.get(1) { + if let Ok(volume) = f64::from_str(volume) { + let volume = volume.max(0.0).min(100.0) * 0.01; + + self.player.set_volume(volume)?; + } + } + } + _ => {} + } + } + + Ok(()) + } + + fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { + let mut current_state = self.state.lock().unwrap(); + if *current_state != state { + match state { + State::Playing => { + self.set_nickname("PokeBot - Playing"); + } + State::Paused => { + self.set_nickname("PokeBot - Paused"); + } + State::Stopped => { + self.set_nickname("PokeBot"); + self.set_description(""); + } + State::EndOfStream => { + let next_track = self.playlist.lock().expect("Mutex was not poisoned").pop(); + if let Some(request) = next_track { + info!("Advancing playlist"); + + self.start_playing_audio(request); + } else { + self.set_nickname("PokeBot"); + self.set_description(""); + } + } + } + } + + *current_state = state; + + Ok(()) + } + + pub fn on_message(&self, message: ApplicationMessage) -> Result<(), AudioPlayerError> { + match message { + ApplicationMessage::TextMessage(message) => { + if let MessageTarget::Poke(who) = message.target { + info!("Poked by {}, joining their channel", who); + self.with_teamspeak(|ts| ts.join_channel_of_user(who)); + } else { + self.on_text(message)?; + } + } + ApplicationMessage::StateChange(state) => { + self.on_state(state)?; + } + } + + Ok(()) + } +} + fn main() { + log4rs::init_file("log4rs.yml", Default::default()).unwrap(); + tokio::run(async_main().unit_error().boxed().compat()); } async fn async_main() { + info!("Starting PokeBot!"); + // Parse command line options let args = Args::from_args(); - let id = if let Some(path) = args.id_path { - let mut file = std::fs::File::open(path).expect("Failed to open id file"); - let mut content = String::new(); - file.read_to_string(&mut content) - .expect("Failed to read id file"); + debug!("Received CLI arguments: {:?}", std::env::args()); + + let (tx, rx) = ::std::sync::mpsc::channel(); + let tx = Arc::new(Mutex::new(tx)); + let (player, connection) = if args.local { + info!("Starting in CLI mode"); + let audio_player = AudioPlayer::new(tx.clone(), None) + .unwrap(); - toml::from_str(&content).expect("Failed to parse id file") + (audio_player, None) } else { - Identity::create().expect("Failed to create id") - }; + info!("Starting in TeamSpeak mode"); - let mut con_config = ConnectOptions::new(args.address) - .version(tsclientlib::Version::Linux_3_3_2) - .name(String::from("PokeBot")) - .identity(id) - .log_commands(args.verbose >= 1) - .log_packets(args.verbose >= 2) - .log_udp_packets(args.verbose >= 3); + let id = if let Some(path) = args.id_path { + let mut file = std::fs::File::open(path).expect("Failed to open id file"); + let mut content = String::new(); + file.read_to_string(&mut content) + .expect("Failed to read id file"); - if let Some(channel) = args.default_channel { - con_config = con_config.channel(channel); - } + toml::from_str(&content).expect("Failed to parse id file") + } else { + Identity::create().expect("Failed to create id") + }; + + let mut con_config = ConnectOptions::new(args.address) + .version(tsclientlib::Version::Linux_3_3_2) + .name(String::from("PokeBot")) + .identity(id) + .log_commands(args.verbose >= 1) + .log_packets(args.verbose >= 2) + .log_udp_packets(args.verbose >= 3); + + if let Some(channel) = args.default_channel { + con_config = con_config.channel(channel); + } + + let connection = Arc::new(TeamSpeakConnection::new(tx.clone(), con_config).await.unwrap()); + let cconnection = connection.clone(); + let audio_player = AudioPlayer::new(tx.clone(), Some(Box::new(move |samples| { + cconnection.send_audio_packet(samples); + }))).unwrap(); + + (audio_player, Some(connection)) + }; + + player.set_volume(0.1).unwrap(); + let player = Arc::new(player); + let playlist = Arc::new(Mutex::new(Playlist::new())); + let application = Arc::new(Application::new(player.clone(), playlist.clone(), connection)); - //let (disconnect_send, disconnect_recv) = mpsc::unbounded(); - let conn = Connection::new(con_config).compat().await.unwrap(); + spawn_gstreamer_thread(player, tx.clone()); - let state = State::new(conn.clone()); - { - let packet = conn.lock().server.set_subscribed(true); - conn.send_packet(packet).compat().await.unwrap(); + if args.local { + spawn_stdin_reader(tx); } - //con.add_on_disconnect(Box::new( || { - //disconnect_send.unbounded_send(()).unwrap() - //})); - let inner_state = state.clone(); - conn.add_event_listener( - String::from("listener"), - Box::new(move |e| { - if let ConEvents(conn, events) = e { - for event in *events { - handle_event(&inner_state, &conn, event); - } - } - }), - ); loop { - state.poll().await; + while let Ok(msg) = rx.recv() { + application.on_message(msg).unwrap(); + } } - //let ctrl_c = tokio_signal::ctrl_c().flatten_stream(); +} - //let dc_fut = disconnect_recv.into_future().compat().fuse(); - //let ctrlc_fut = ctrl_c.into_future().compat().fuse(); - //ctrlc_fut.await.map_err(|(e, _)| e).unwrap(); +fn spawn_stdin_reader(tx: Arc<Mutex<Sender<ApplicationMessage>>>) { + thread::spawn(move || { + let stdin = ::std::io::stdin(); + let lock = stdin.lock(); + for line in lock.lines() { + let line = line.unwrap(); - //conn.disconnect(DisconnectOptions::new()) - //.compat() - //.await - //.unwrap(); + let message = ApplicationMessage::TextMessage( + Message { + target: MessageTarget::Server, + invoker: Invoker { + name: String::from("stdin"), + id: ClientId(0), + uid: None, + }, + text: line + } + ); - // TODO Should not be required - //std::process::exit(0); + let tx = tx.lock().unwrap(); + tx.send(message).unwrap(); + } + }); } -fn handle_event<'a>(state: &State, conn: &ConnectionLock<'a>, event: &Event) { - match event { - Event::Message { - from: target, - invoker: sender, - message: msg, - } => { - if let MessageTarget::Poke(who) = target { - let channel = conn - .clients - .get(&who) - .expect("can find poke sender") - .channel; - tokio::spawn( - conn.to_mut() - .get_client(&conn.own_client) - .expect("can get myself") - .set_channel(channel) - .map_err(|e| error!("Failed to switch channel: {}", e)), - ); - } else if sender.id != conn.own_client { - if msg.starts_with("!") { - let tokens = msg[1..].split_whitespace().collect::<Vec<_>>(); - match tokens.get(0).map(|t| *t) { - Some("test") => { - tokio::spawn( - conn.to_mut() - .send_message(*target, "works :)") - .map_err(|_| ()), - ); - } - Some("add") => { - let mut invalid = false; - if let Some(url) = &tokens.get(1) { - if url.len() > 11 { - tokio::spawn( - conn.to_mut().set_name("PokeBot - Loading").map_err(|_| ()), - ); - let trimmed = url[5..url.len() - 6].to_owned(); - let inner_state = state.clone(); - tokio::spawn( - async move { inner_state.add_audio(trimmed).await } - .unit_error() - .boxed() - .compat(), - ); - } else { - invalid = true; - } - } else { - invalid = true; - } - if invalid { - tokio::spawn( - conn.to_mut() - .send_message(MessageTarget::Channel, "Invalid Url") - .map_err(|_| ()), - ); - } - } - Some("volume") => { - if let Ok(volume) = f64::from_str(tokens[1]) { - if 0.0 <= volume && volume <= 100.0 { - state.volume(volume); - } else { - tokio::spawn( - conn.to_mut() - .send_message( - MessageTarget::Channel, - "Volume must be between 0 and 100", - ) - .map_err(|_| ()), - ); - } - } - } - Some("play") => { - state.play(); - } - Some("skip") => { - state.next(); - } - Some("clear") => { - state.clear(); - } - Some("pause") => { - state.pause(); - } - Some("stop") => { - state.stop(); - } - _ => (), - }; - } - } +fn spawn_gstreamer_thread(player: Arc<AudioPlayer>, tx: Arc<Mutex<Sender<ApplicationMessage>>>) { + thread::spawn(move || { + loop { + player.poll(); + + tx.lock().unwrap().send(ApplicationMessage::StateChange(State::EndOfStream)).unwrap(); } - _ => (), - } + }); } |
