diff options
| author | Jokler <jokler@protonmail.com> | 2020-01-12 18:11:02 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-01-12 18:11:02 +0100 |
| commit | 78690b2e2949ed3be38a136f1c6ac2866ac32df7 (patch) | |
| tree | 64535e2e534e57ef94a057f8010b87c88fc56021 /src | |
| parent | fc62434581e5f7411177e7c30dd1f4543ec354be (diff) | |
| parent | 193987e0c7185eb63827b2d91f1e2779c4e557d3 (diff) | |
| download | pokebot-78690b2e2949ed3be38a136f1c6ac2866ac32df7.tar.gz pokebot-78690b2e2949ed3be38a136f1c6ac2866ac32df7.zip | |
Merge pull request #1 from fkaa/refactoring
Add enterprise logging and split out gstreamer & teamspeak specific code
Diffstat (limited to 'src')
| -rw-r--r-- | src/audio_player.rs | 328 | ||||
| -rw-r--r-- | src/main.rs | 472 | ||||
| -rw-r--r-- | src/playlist.rs | 9 | ||||
| -rw-r--r-- | src/state.rs | 436 | ||||
| -rw-r--r-- | src/teamspeak.rs | 116 | ||||
| -rw-r--r-- | src/youtube_dl.rs | 38 |
6 files changed, 806 insertions, 593 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs new file mode 100644 index 0000000..6626417 --- /dev/null +++ b/src/audio_player.rs @@ -0,0 +1,328 @@ +use std::sync::Once; + +use gstreamer as gst; +use gst::prelude::*; +use gstreamer_app::{AppSink, AppSinkCallbacks}; +use gst::{GhostPad}; + +use log::{info, debug, warn, error}; +use std::sync::mpsc::Sender; +use std::sync::{Mutex, Arc}; +use crate::{State, ApplicationMessage}; +use glib::BoolError; + +static GST_INIT: Once = Once::new(); + +#[derive(Debug)] +pub enum AudioPlayerError { + GStreamerError(glib::error::BoolError), + StateChangeFailed, +} + +impl From<glib::error::BoolError> for AudioPlayerError { + fn from(err: BoolError) -> Self { + AudioPlayerError::GStreamerError(err) + } +} + +impl From<gst::StateChangeError> for AudioPlayerError { + fn from(_err: gst::StateChangeError) -> Self { + AudioPlayerError::StateChangeFailed + } +} + +pub struct AudioPlayer { + pipeline: gst::Pipeline, + bus: gst::Bus, + http_src: gst::Element, + + volume: gst::Element, + sender: Arc<Mutex<Sender<ApplicationMessage>>>, +} + +fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> { + Ok(gst::ElementFactory::make( + factoryname, + Some(display_name) + )?) +} + +fn link_elements(a: &gst::Element, b: &gst::Element) -> Result<(), AudioPlayerError> { + a.link(b)?; + + Ok(()) +} + +fn add_decode_bin_new_pad_callback( + decode_bin: &gst::Element, + audio_bin: gst::Bin, + ghost_pad: gst::GhostPad, +) { + decode_bin.connect_pad_added(move |_, new_pad| { + debug!("New pad received on decode bin"); + let name = if let Some(caps) = new_pad.get_current_caps() { + debug!("Pad caps: {}", caps.to_string()); + if let Some(structure) = caps.get_structure(0) { + Some(structure.get_name().to_string()) + } else { + None + } + } else { + None + }; + + match name.as_deref() { + Some("audio/x-raw") => { + if let Some(peer) = ghost_pad.get_peer() { + peer.unlink(&ghost_pad).unwrap(); + } + + info!("Found raw audio, linking audio bin"); + new_pad.link(&ghost_pad).unwrap(); + + audio_bin.sync_state_with_parent().unwrap(); + } + _ => {} + } + }); +} + +impl AudioPlayer { + pub fn new(sender: Arc<Mutex<Sender<ApplicationMessage>>>, callback: Option<Box<dyn FnMut(&[u8]) + Send>>) -> Result<Self, AudioPlayerError> { + GST_INIT.call_once(|| gst::init().unwrap()); + + info!("Creating audio player"); + + let pipeline = gst::Pipeline::new(Some("TeamSpeak Audio Player")); + let bus = pipeline.get_bus().unwrap(); + let http_src = make_element("souphttpsrc", "http source")?; + let decode_bin = make_element("decodebin", "decode bin")?; + pipeline.add_many(&[&http_src, &decode_bin])?; + + link_elements(&http_src, &decode_bin)?; + + let (audio_bin, volume, ghost_pad) = Self::create_audio_bin(callback)?; + + add_decode_bin_new_pad_callback( + &decode_bin, + audio_bin.clone(), + ghost_pad); + + pipeline.add(&audio_bin)?; + + pipeline.set_state(gst::State::Ready)?; + + Ok(AudioPlayer { + pipeline, + bus, + http_src, + + volume, + sender, + }) + } + + fn create_audio_bin(callback: Option<Box<dyn FnMut(&[u8]) + Send>>) -> Result<(gst::Bin, gst::Element, gst::GhostPad), AudioPlayerError> { + let audio_bin = gst::Bin::new(Some("audio bin")); + let queue = make_element("queue", "audio queue")?; + let convert = make_element("audioconvert", "audio converter")?; + let volume = make_element("volume", "volume")?; + let resample = make_element("audioresample", "audio resampler")?; + let pads = queue.get_sink_pads(); + let queue_sink_pad = pads + .first() + .unwrap(); + + audio_bin.add_many(&[ + &queue, + &convert, + &volume, + &resample, + ])?; + + if let Some(mut callback) = callback { + let opus_enc = make_element("opusenc", "opus encoder")?; + let sink = make_element("appsink", "app sink")?; + + let appsink = sink + .clone() + .dynamic_cast::<AppSink>() + .expect("Sink element is expected to be an appsink!"); + appsink.set_caps(Some(&gst::Caps::new_simple("audio/x-opus", &[]))); + let callbacks = AppSinkCallbacks::new() + .new_sample(move |sink| { + let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + let buffer = sample.get_buffer().ok_or(gst::FlowError::Error)?; + let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + let samples = map.as_slice(); + + callback(samples); + + Ok(gst::FlowSuccess::Ok) + }) + .build(); + appsink.set_callbacks(callbacks); + + audio_bin.add_many(&[ + &opus_enc, + &sink + ])?; + + gst::Element::link_many(&[ + &queue, + &convert, + &volume, + &resample, + &opus_enc, + &sink + ])?; + } else { + let sink = make_element("autoaudiosink", "auto audio sink")?; + + audio_bin.add_many(&[ + &sink + ])?; + + gst::Element::link_many(&[ + &queue, + &convert, + &volume, + &resample, + &sink + ])?; + }; + + let ghost_pad = GhostPad::new(Some("audio bin sink"), queue_sink_pad).unwrap(); + ghost_pad.set_active(true)?; + audio_bin.add_pad(&ghost_pad)?; + + Ok((audio_bin, volume, ghost_pad)) + } + + pub fn set_source_url(&self, location: String) -> Result<(), AudioPlayerError> { + info!("Setting location URI: {}", location); + self.http_src.set_property("location", &location)?; + + Ok(()) + } + + pub fn set_volume(&self, volume: f64) -> Result<(), AudioPlayerError> { + let log_volume = 1.0 - 10.0f64.powf(-volume * 2.0); + info!("Setting volume: {} -> {}", volume, log_volume); + + self.volume.set_property("volume", &log_volume)?; + + Ok(()) + } + + pub fn is_started(&self) -> bool { + let (_, current, pending) = self.pipeline.get_state(gst::ClockTime(None)); + + match (current, pending) { + (gst::State::Null, gst::State::VoidPending) => false, + (_, gst::State::Null) => false, + (gst::State::Ready, gst::State::VoidPending) => false, + _ => true + } + } + + pub fn reset(&self) -> Result<(), AudioPlayerError> { + info!("Setting pipeline state to null"); + + self.pipeline.set_state(gst::State::Null)?; + + Ok(()) + } + + pub fn play(&self) -> Result<(), AudioPlayerError> { + info!("Setting pipeline state to playing"); + + self.pipeline.set_state(gst::State::Playing)?; + + Ok(()) + } + + pub fn pause(&self) -> Result<(), AudioPlayerError> { + info!("Setting pipeline state to paused"); + + self.pipeline.set_state(gst::State::Paused)?; + + Ok(()) + } + + pub fn stop_current(&self) { + info!("Stopping pipeline, sending EOS"); + + let handled = self.http_src.send_event(gst::Event::new_eos().build()); + if !handled { + warn!("EOS event was not handled"); + } + } + + fn send_state(&self, state: State) { + info!("Sending state {:?} to application", state); + let sender = self.sender.lock().unwrap(); + sender.send(ApplicationMessage::StateChange(state)).unwrap(); + } + + pub fn poll(&self) { + debug!("Polling GStreamer"); + 'outer: loop { + while let Some(msg) = self.bus.timed_pop(gst::ClockTime(None)) { + use gst::MessageView; + + match msg.view() { + MessageView::StateChanged(state) => { + if let Some(src) = state.get_src() { + if src.get_name() != self.pipeline.get_name() { + continue; + } + } + + let old = state.get_old(); + let current = state.get_current(); + let pending = state.get_pending(); + + match (old, current, pending) { + (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => self.send_state(State::Playing), + (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => self.send_state(State::Paused), + (_, gst::State::Ready, gst::State::Null) => self.send_state(State::Stopped), + (_, gst::State::Null, gst::State::VoidPending) => self.send_state(State::Stopped), + _ => { + debug!("Pipeline transitioned from {:?} to {:?}, with {:?} pending", old, current, pending); + } + } + } + MessageView::Eos(..) => { + info!("End of stream reached"); + self.reset().unwrap(); + + break 'outer; + }, + MessageView::Warning(warn) => { + warn!( + "Warning from {:?}: {} ({:?})", + warn.get_src().map(|s| s.get_path_string()), + warn.get_error(), + warn.get_debug() + ); + break 'outer; + } + MessageView::Error(err) => { + error!( + "Error from {:?}: {} ({:?})", + err.get_src().map(|s| s.get_path_string()), + err.get_error(), + err.get_debug() + ); + break 'outer; + } + _ => { + // debug!("{:?}", msg) + }, + }; + } + } + debug!("Left GStreamer message loop"); + } +}
\ No newline at end of file diff --git a/src/main.rs b/src/main.rs index cdb0bd6..c4fdd2f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,31 +1,39 @@ -use std::io::Read; +use std::io::{Read, BufRead}; use std::path::PathBuf; use std::str::FromStr; +use std::thread; +use std::sync::{Arc, Mutex}; use futures::{ - compat::Future01CompatExt, future::{FutureExt, TryFutureExt}, }; - -use futures01::future::Future; use structopt::clap::AppSettings; use structopt::StructOpt; - use tsclientlib::{ - events::Event, ConnectOptions, Connection, ConnectionLock, Event::ConEvents, Identity, - MessageTarget, + ConnectOptions, Identity, MessageTarget, Invoker, ClientId, }; +use log::{info, debug}; -use log::error; - +mod audio_player; +mod youtube_dl; +mod teamspeak; mod playlist; -mod state; -use state::State; + +use audio_player::*; +use teamspeak::*; +use playlist::*; +use std::sync::mpsc::Sender; #[derive(StructOpt, Debug)] #[structopt(raw(global_settings = "&[AppSettings::ColoredHelp]"))] struct Args { #[structopt( + short = "l", + long = "local", + help = "Run locally in text mode" + )] + local: bool, + #[structopt( short = "a", long = "address", default_value = "localhost", @@ -58,174 +66,324 @@ struct Args { // 3. Print udp packets } +pub struct Message { + pub target: MessageTarget, + pub invoker: Invoker, + pub text: String, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum State { + Playing, + Paused, + Stopped, + EndOfStream, +} + +pub enum ApplicationMessage { + TextMessage(Message), + StateChange(State), +} + +struct Application { + player: Arc<AudioPlayer>, + teamspeak: Option<Arc<TeamSpeakConnection>>, + playlist: Arc<Mutex<Playlist>>, + state: Arc<Mutex<State>>, +} + +impl Application { + pub fn new(player: Arc<AudioPlayer>, playlist: Arc<Mutex<Playlist>>, teamspeak: Option<Arc<TeamSpeakConnection>>) -> Self { + Self { + player, + teamspeak, + playlist, + state: Arc::new(Mutex::new(State::Stopped)), + } + } + + #[inline(always)] + fn with_teamspeak<F: Fn(&TeamSpeakConnection)>(&self, func: F) { + if let Some(ts) = &self.teamspeak { + func(&ts); + } + } + + fn start_playing_audio(&self, request: AudioRequest) { + self.send_message(&format!("Playing '{}'", request.title)); + self.set_description(&format!("Currently playing '{}'", request.title)); + self.player.reset().unwrap(); + self.player.set_source_url(request.address).unwrap(); + self.player.play().unwrap(); + } + + pub fn add_audio(&self, url: String) { + if self.playlist.lock().expect("Mutex was not poisoned").is_full() { + info!("Audio playlist is full"); + self.send_message("Playlist is full"); + return; + } + + match youtube_dl::get_audio_download_url(url) { + Ok((audio_url, audio_title)) => { + info!("Found audio url: {}", audio_url); + + let request = AudioRequest { + title: audio_title, + address: audio_url, + }; + + let mut playlist = self.playlist.lock().expect("Mutex was not poisoned"); + playlist.push(request.clone()); + + if !self.player.is_started() { + if let Some(request) = playlist.pop() { + self.start_playing_audio(request); + } + } else { + self.send_message(&format!("Added '{}' to playlist", request.title)); + } + } + Err(e) => { + info!("Failed to find audio url: {}", e); + + self.send_message(&format!("Failed to find url: {}", e)); + } + } + } + + fn send_message(&self, text: &str) { + debug!("Sending message to TeamSpeak: {}", text); + + self.with_teamspeak(|ts| ts.send_message_to_channel(text)); + } + + fn set_nickname(&self, name: &str) { + info!("Setting TeamsSpeak nickname to {}", name); + + self.with_teamspeak(|ts| ts.set_nickname(name)); + } + + fn set_description(&self, desc: &str) { + info!("Setting TeamsSpeak description to {}", desc); + + self.with_teamspeak(|ts| ts.set_description(desc)); + } + + fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> { + let msg = message.text; + if msg.starts_with("!") { + let tokens = msg[1..].split_whitespace().collect::<Vec<_>>(); + + match tokens.get(0).map(|t| *t) { + Some("add") => { + if let Some(url) = &tokens.get(1) { + // strip bbcode tags from url + let url = url.replace("[URL]", "").replace("[/URL]", ""); + + self.add_audio(url.to_string()); + } + } + Some("play") => { + if !self.player.is_started() { + self.player.stop_current(); + } else { + self.player.play()?; + } + } + Some("pause") => { + self.player.pause()?; + } + Some("stop") => { + self.player.reset()?; + } + Some("next") => { + let playlist = self.playlist.lock().expect("Mutex was not poisoned"); + if !playlist.is_empty() { + info!("Skipping to next track"); + self.player.stop_current(); + } else { + info!("Playlist empty, cannot skip"); + self.player.reset()?; + } + } + Some("clear") => { + self.playlist.lock().expect("Mutex was not poisoned").clear(); + } + Some("volume") => { + if let Some(&volume) = &tokens.get(1) { + if let Ok(volume) = f64::from_str(volume) { + let volume = volume.max(0.0).min(100.0) * 0.01; + + self.player.set_volume(volume)?; + } + } + } + _ => {} + } + } + + Ok(()) + } + + fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { + let mut current_state = self.state.lock().unwrap(); + if *current_state != state { + match state { + State::Playing => { + self.set_nickname("PokeBot - Playing"); + } + State::Paused => { + self.set_nickname("PokeBot - Paused"); + } + State::Stopped => { + self.set_nickname("PokeBot"); + self.set_description(""); + } + State::EndOfStream => { + let next_track = self.playlist.lock().expect("Mutex was not poisoned").pop(); + if let Some(request) = next_track { + info!("Advancing playlist"); + + self.start_playing_audio(request); + } else { + self.set_nickname("PokeBot"); + self.set_description(""); + } + } + } + } + + *current_state = state; + + Ok(()) + } + + pub fn on_message(&self, message: ApplicationMessage) -> Result<(), AudioPlayerError> { + match message { + ApplicationMessage::TextMessage(message) => { + if let MessageTarget::Poke(who) = message.target { + info!("Poked by {}, joining their channel", who); + self.with_teamspeak(|ts| ts.join_channel_of_user(who)); + } else { + self.on_text(message)?; + } + } + ApplicationMessage::StateChange(state) => { + self.on_state(state)?; + } + } + + Ok(()) + } +} + fn main() { + log4rs::init_file("log4rs.yml", Default::default()).unwrap(); + tokio::run(async_main().unit_error().boxed().compat()); } async fn async_main() { + info!("Starting PokeBot!"); + // Parse command line options let args = Args::from_args(); - let id = if let Some(path) = args.id_path { - let mut file = std::fs::File::open(path).expect("Failed to open id file"); - let mut content = String::new(); - file.read_to_string(&mut content) - .expect("Failed to read id file"); + debug!("Received CLI arguments: {:?}", std::env::args()); + + let (tx, rx) = ::std::sync::mpsc::channel(); + let tx = Arc::new(Mutex::new(tx)); + let (player, connection) = if args.local { + info!("Starting in CLI mode"); + let audio_player = AudioPlayer::new(tx.clone(), None) + .unwrap(); - toml::from_str(&content).expect("Failed to parse id file") + (audio_player, None) } else { - Identity::create().expect("Failed to create id") - }; + info!("Starting in TeamSpeak mode"); - let mut con_config = ConnectOptions::new(args.address) - .version(tsclientlib::Version::Linux_3_3_2) - .name(String::from("PokeBot")) - .identity(id) - .log_commands(args.verbose >= 1) - .log_packets(args.verbose >= 2) - .log_udp_packets(args.verbose >= 3); + let id = if let Some(path) = args.id_path { + let mut file = std::fs::File::open(path).expect("Failed to open id file"); + let mut content = String::new(); + file.read_to_string(&mut content) + .expect("Failed to read id file"); - if let Some(channel) = args.default_channel { - con_config = con_config.channel(channel); - } + toml::from_str(&content).expect("Failed to parse id file") + } else { + Identity::create().expect("Failed to create id") + }; + + let mut con_config = ConnectOptions::new(args.address) + .version(tsclientlib::Version::Linux_3_3_2) + .name(String::from("PokeBot")) + .identity(id) + .log_commands(args.verbose >= 1) + .log_packets(args.verbose >= 2) + .log_udp_packets(args.verbose >= 3); + + if let Some(channel) = args.default_channel { + con_config = con_config.channel(channel); + } + + let connection = Arc::new(TeamSpeakConnection::new(tx.clone(), con_config).await.unwrap()); + let cconnection = connection.clone(); + let audio_player = AudioPlayer::new(tx.clone(), Some(Box::new(move |samples| { + cconnection.send_audio_packet(samples); + }))).unwrap(); + + (audio_player, Some(connection)) + }; + + player.set_volume(0.1).unwrap(); + let player = Arc::new(player); + let playlist = Arc::new(Mutex::new(Playlist::new())); + let application = Arc::new(Application::new(player.clone(), playlist.clone(), connection)); - //let (disconnect_send, disconnect_recv) = mpsc::unbounded(); - let conn = Connection::new(con_config).compat().await.unwrap(); + spawn_gstreamer_thread(player, tx.clone()); - let state = State::new(conn.clone()); - { - let packet = conn.lock().server.set_subscribed(true); - conn.send_packet(packet).compat().await.unwrap(); + if args.local { + spawn_stdin_reader(tx); } - //con.add_on_disconnect(Box::new( || { - //disconnect_send.unbounded_send(()).unwrap() - //})); - let inner_state = state.clone(); - conn.add_event_listener( - String::from("listener"), - Box::new(move |e| { - if let ConEvents(conn, events) = e { - for event in *events { - handle_event(&inner_state, &conn, event); - } - } - }), - ); loop { - state.poll().await; + while let Ok(msg) = rx.recv() { + application.on_message(msg).unwrap(); + } } - //let ctrl_c = tokio_signal::ctrl_c().flatten_stream(); +} - //let dc_fut = disconnect_recv.into_future().compat().fuse(); - //let ctrlc_fut = ctrl_c.into_future().compat().fuse(); - //ctrlc_fut.await.map_err(|(e, _)| e).unwrap(); +fn spawn_stdin_reader(tx: Arc<Mutex<Sender<ApplicationMessage>>>) { + thread::spawn(move || { + let stdin = ::std::io::stdin(); + let lock = stdin.lock(); + for line in lock.lines() { + let line = line.unwrap(); - //conn.disconnect(DisconnectOptions::new()) - //.compat() - //.await - //.unwrap(); + let message = ApplicationMessage::TextMessage( + Message { + target: MessageTarget::Server, + invoker: Invoker { + name: String::from("stdin"), + id: ClientId(0), + uid: None, + }, + text: line + } + ); - // TODO Should not be required - //std::process::exit(0); + let tx = tx.lock().unwrap(); + tx.send(message).unwrap(); + } + }); } -fn handle_event<'a>(state: &State, conn: &ConnectionLock<'a>, event: &Event) { - match event { - Event::Message { - from: target, - invoker: sender, - message: msg, - } => { - if let MessageTarget::Poke(who) = target { - let channel = conn - .clients - .get(&who) - .expect("can find poke sender") - .channel; - tokio::spawn( - conn.to_mut() - .get_client(&conn.own_client) - .expect("can get myself") - .set_channel(channel) - .map_err(|e| error!("Failed to switch channel: {}", e)), - ); - } else if sender.id != conn.own_client { - if msg.starts_with("!") { - let tokens = msg[1..].split_whitespace().collect::<Vec<_>>(); - match tokens.get(0).map(|t| *t) { - Some("test") => { - tokio::spawn( - conn.to_mut() - .send_message(*target, "works :)") - .map_err(|_| ()), - ); - } - Some("add") => { - let mut invalid = false; - if let Some(url) = &tokens.get(1) { - if url.len() > 11 { - tokio::spawn( - conn.to_mut().set_name("PokeBot - Loading").map_err(|_| ()), - ); - let trimmed = url[5..url.len() - 6].to_owned(); - let inner_state = state.clone(); - tokio::spawn( - async move { inner_state.add_audio(trimmed).await } - .unit_error() - .boxed() - .compat(), - ); - } else { - invalid = true; - } - } else { - invalid = true; - } - if invalid { - tokio::spawn( - conn.to_mut() - .send_message(MessageTarget::Channel, "Invalid Url") - .map_err(|_| ()), - ); - } - } - Some("volume") => { - if let Ok(volume) = f64::from_str(tokens[1]) { - if 0.0 <= volume && volume <= 100.0 { - state.volume(volume); - } else { - tokio::spawn( - conn.to_mut() - .send_message( - MessageTarget::Channel, - "Volume must be between 0 and 100", - ) - .map_err(|_| ()), - ); - } - } - } - Some("play") => { - state.play(); - } - Some("skip") => { - state.next(); - } - Some("clear") => { - state.clear(); - } - Some("pause") => { - state.pause(); - } - Some("stop") => { - state.stop(); - } - _ => (), - }; - } - } +fn spawn_gstreamer_thread(player: Arc<AudioPlayer>, tx: Arc<Mutex<Sender<ApplicationMessage>>>) { + thread::spawn(move || { + loop { + player.poll(); + + tx.lock().unwrap().send(ApplicationMessage::StateChange(State::EndOfStream)).unwrap(); } - _ => (), - } + }); } diff --git a/src/playlist.rs b/src/playlist.rs index 6a3dabe..818a470 100644 --- a/src/playlist.rs +++ b/src/playlist.rs @@ -1,3 +1,5 @@ +use log::info; + pub struct Playlist { data: Vec<Option<AudioRequest>>, read: usize, @@ -20,6 +22,8 @@ impl Playlist { return false; } + info!("Adding {} to playlist", &req.title); + if self.data.len() < self.data.capacity() { self.data.push(Some(req)); } else { @@ -32,6 +36,7 @@ impl Playlist { self.is_full = true; } + true } @@ -51,6 +56,8 @@ impl Playlist { let res = self.data[self.read].take(); self.read += 1; + info!("Popping {:?} from playlist", res.as_ref().map(|r| &r.title)); + res } } @@ -60,6 +67,8 @@ impl Playlist { self.read = 0; self.write = 0; self.is_full = false; + + info!("Cleared playlist") } } diff --git a/src/state.rs b/src/state.rs deleted file mode 100644 index bf953ce..0000000 --- a/src/state.rs +++ /dev/null @@ -1,436 +0,0 @@ -use std::process::Command; -use std::sync::{Arc, Mutex}; - -use futures::compat::Future01CompatExt; -use futures01::{future::Future, sink::Sink}; -use futures_util::stream::StreamExt; -use tokio_process::CommandExt; - -use tsclientlib::{Connection, DisconnectOptions, MessageTarget}; - -use gst::prelude::*; -use gstreamer as gst; -use gstreamer_app as gst_app; - -use byte_slice_cast::*; - -use crate::playlist::{AudioRequest, Playlist}; - -#[derive(Clone)] -pub struct State { - conn: Arc<Connection>, - pipeline: Arc<gst::Pipeline>, - playlist: Arc<Mutex<Playlist>>, -} - -impl State { - pub fn new(conn: Connection) -> Self { - let conn_arc = Arc::new(conn); - - gst::init().unwrap(); - let pipeline = gst::Pipeline::new(Some("Ts Audio Player")); - let http_dl = gst::ElementFactory::make("souphttpsrc", Some("http-source")).unwrap(); - let decoder = gst::ElementFactory::make("decodebin", Some("video-decoder")).unwrap(); - pipeline.add_many(&[&http_dl, &decoder]).unwrap(); - - http_dl - .link(&decoder) - .expect("Can link https_dl to decoder"); - - let pipeline_weak = pipeline.downgrade(); - - let inner_conn = conn_arc.clone(); - decoder.connect_pad_added(move |_, decoder_src| { - let pipeline = match pipeline_weak.upgrade() { - Some(pipeline) => pipeline, - None => return, - }; - - let is_audio = { - let media_type = decoder_src.get_current_caps().and_then(|caps| { - caps.get_structure(0).map(|s| { - let name = s.get_name(); - name.starts_with("audio/") - }) - }); - - match media_type { - None => { - eprintln!( - "Failed to get media type from pad {}", - decoder_src.get_name() - ); - return; - } - Some(media_type) => media_type, - } - }; - - if is_audio { - let prev_volume = if let Some(volume) = pipeline.get_by_name("volume") { - volume - .get_property("volume") - .expect("Can get volume property") - .get_some::<f64>() - .unwrap_or(0.1) - } else { - 0.1 - }; - - let names = [ - "audio-converter", - "volume", - "audio-resampler", - "opus-encoder", - "app-sink", - ]; - for element in pipeline.iterate_elements() { - if let Ok(element) = element { - if names.contains(&&*element.get_name()) { - element.set_state(gst::State::Null).unwrap(); - pipeline - .remove_many(&[&element]) - .expect("Can remove element"); - } - } - } - - let convert = - gst::ElementFactory::make("audioconvert", Some("audio-converter")).unwrap(); - let volume = gst::ElementFactory::make("volume", Some("volume")).unwrap(); - let resample = - gst::ElementFactory::make("audioresample", Some("audio-resampler")).unwrap(); - let opus_enc = gst::ElementFactory::make("opusenc", Some("opus-encoder")).unwrap(); - let sink = gst::ElementFactory::make("appsink", Some("app-sink")).unwrap(); - - sink.set_property("async", &false) - .expect("Can make app-sink async"); - - volume - .set_property("volume", &prev_volume) - .expect("Can change volume"); - - { - let elements = &[&convert, &volume, &resample, &opus_enc, &sink]; - pipeline.add_many(elements).expect("Can add audio elements"); - gst::Element::link_many(elements).expect("Can link audio elements"); - - for e in elements { - e.sync_state_with_parent() - .expect("Can sync state with parent"); - } - } - - let appsink = sink - .dynamic_cast::<gst_app::AppSink>() - .expect("Sink is an Appsink"); - - appsink.set_caps(Some(&gst::Caps::new_simple("audio/x-opus", &[]))); - - let inner_conn = inner_conn.clone(); - appsink.set_callbacks( - gst_app::AppSinkCallbacks::new() - // Add a handler to the "new-sample" signal. - .new_sample(move |appsink| { - // Pull the sample in question out of the appsink's buffer. - let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?; - let buffer = sample - .get_buffer() - .expect("Failed to get buffer from appsink"); - - let map = buffer - .map_readable() - .expect("Failed to map buffer readable"); - - let samples = map - .as_slice_of::<u8>() - .expect("Failed to interprete buffer as S16 PCM"); - - let packet = tsproto_packets::packets::OutAudio::new( - &tsproto_packets::packets::AudioData::C2S { - id: 0, - codec: tsproto_packets::packets::CodecType::OpusMusic, - data: &samples, - }, - ); - - let send_packet = inner_conn - .get_packet_sink() - .send(packet) - .map(|_| ()) - .map_err(|e| println!("Failed to send voice packet: {}", e)); - - tokio::run(send_packet); - - Ok(gst::FlowSuccess::Ok) - }) - .build(), - ); - - let convert_sink = convert - .get_static_pad("sink") - .expect("queue has no sinkpad"); - decoder_src - .link(&convert_sink) - .expect("Can link decoder src to convert sink"); - } - }); - - Self { - conn: conn_arc, - pipeline: Arc::new(pipeline), - playlist: Arc::new(Mutex::new(Playlist::new())), - } - } - - pub async fn add_audio(&self, url: String) { - if self - .playlist - .lock() - .expect("Mutex was not poisoned") - .is_full() - { - self.send_message(MessageTarget::Channel, "Playlist is full"); - return; - } - - let ytdl_args = [ - "--no-playlist", - "-f", - "bestaudio/best", - "-g", - "--get-filename", - "-o", - "%(title)s", - &url, - ]; - - let output = Command::new("youtube-dl") - .args(&ytdl_args) - .output_async() - .compat() - .await - .expect("youtube-dl is runnable"); - - if output.status.success() == false { - self.set_name("PokeBot"); - self.send_message(MessageTarget::Channel, "Failed to load url"); - return; - } - - let output_string = String::from_utf8(output.stdout).unwrap(); - let output_lines = output_string.lines().collect::<Vec<_>>(); - let address = output_lines[0]; - let title = output_lines[1]; - - let req = AudioRequest { - title: title.to_owned(), - address: address.to_owned(), - }; - - let state = self.pipeline.get_state(gst::ClockTime(None)).1; - if gst::State::Null == state { - self.set_name("PokeBot - Playing"); - self.start_audio(req); - } else { - match state { - gst::State::Playing => self.set_name("PokeBot - Playing"), - gst::State::Paused => self.set_name("PokeBot - Paused"), - gst::State::Ready => self.set_name("PokeBot - Stopped"), - gst::State::Null | gst::State::__Unknown(_) | gst::State::VoidPending => { - unreachable!() - } - } - - let title = req.title.clone(); - if self - .playlist - .lock() - .expect("Mutex was not poisoned") - .push(req) - == false - { - self.send_message(MessageTarget::Channel, "Playlist is full"); - } else { - self.send_message( - MessageTarget::Channel, - &format!("Added '{}' to the playlist", title), - ); - } - } - } - - pub async fn poll(&self) { - let bus = self - .pipeline - .get_bus() - .expect("Pipeline without bus. Shouldn't happen!"); - - let mut messages = gst::BusStream::new(&bus); - while let Some(msg) = messages.next().await { - use gst::MessageView; - - match msg.view() { - MessageView::Eos(..) => break, - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.get_src().map(|s| s.get_path_string()), - err.get_error(), - err.get_debug() - ); - break; - } - _ => (), - }; - } - - self.next(); - } - - pub fn start_audio(&self, req: AudioRequest) { - self.pipeline - .set_state(gst::State::Null) - .expect("Can set state to null"); - - let http_src = self - .pipeline - .get_by_name("http-source") - .expect("Http source should be registered"); - - http_src - .set_property("location", &&req.address) - .expect("Can set location on http_dl"); - - self.pipeline - .set_state(gst::State::Playing) - .expect("Can set state to playing"); - - self.set_description(&format!("Currently playing: {}", req.title)); - } - - pub fn volume(&self, volume: f64) { - if let Some(vol_filter) = self.pipeline.get_by_name("volume") { - vol_filter - .set_property("volume", &(10.0f64.powf(volume / 50.0 - 2.0))) - .expect("can change volume"); - - // TODO Reflect volume in name - } - } - - pub fn play(&self) { - let http_src = self - .pipeline - .get_by_name("http-source") - .expect("Http source should be registered"); - - if http_src - .get_property("location") - .ok() - .and_then(|v| v.get::<String>().ok().and_then(|s| s.map(|s| s.is_empty()))) - .unwrap_or(true) - { - if self - .playlist - .lock() - .expect("Mutex was not poisoned") - .is_empty() - { - self.send_message(MessageTarget::Channel, "There is nothing to play"); - return; - } - } - - self.pipeline - .set_state(gst::State::Playing) - .expect("can play"); - - self.set_name("PokeBot - Playing"); - } - - pub fn next(&self) { - if let Some(req) = self.playlist.lock().expect("Mutex was not poisoned").pop() { - self.start_audio(req); - } else { - self.pipeline - .set_state(gst::State::Null) - .expect("Can set state to null"); - - let http_src = self - .pipeline - .get_by_name("http-source") - .expect("Http source should be registered"); - - http_src - .set_property("location", &"") - .expect("Can set location on http_dl"); - - self.set_name("PokeBot"); - } - } - - pub fn clear(&self) { - self.playlist - .lock() - .expect("Mutex was not poisoned") - .clear(); - - self.send_message(MessageTarget::Channel, "Playlist was cleared"); - } - - pub fn pause(&self) { - self.pipeline - .set_state(gst::State::Paused) - .expect("can pause"); - - self.set_name("PokeBot - Paused"); - } - - pub fn stop(&self) { - self.pipeline - .set_state(gst::State::Ready) - .expect("can stop"); - - self.set_name("PokeBot - Stopped"); - } - - pub async fn disconnect(&self) { - self.conn - .disconnect(DisconnectOptions::new()) - .compat() - .await; - } - - pub fn send_message(&self, target: MessageTarget, message: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .send_message(target, message) - .map_err(|e| println!("Failed to send message: {}", e)), - ); - } - - pub fn set_name(&self, name: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .set_name(name) - .map_err(|e| println!("Failed to change name: {}", e)), - ); - } - - pub fn set_description(&self, desc: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .get_client(&self.conn.lock().own_client) - .expect("Can get myself") - .set_description(desc) - .map_err(|e| println!("Failed to change description: {}", e)), - ); - } -} diff --git a/src/teamspeak.rs b/src/teamspeak.rs new file mode 100644 index 0000000..785eba2 --- /dev/null +++ b/src/teamspeak.rs @@ -0,0 +1,116 @@ +use futures::{ + compat::Future01CompatExt, +}; +use futures01::{future::Future, sink::Sink}; + +use tsclientlib::{Connection, ConnectOptions, events::Event, ClientId, MessageTarget}; +use tsclientlib::Event::ConEvents; +use crate::{ApplicationMessage, Message}; +use std::sync::mpsc::Sender; +use std::sync::{Mutex, Arc}; + +use log::{error}; + +pub struct TeamSpeakConnection { + conn: Connection, +} + +fn get_message<'a>(event: &Event) -> Option<Message> { + match event { + Event::Message { + from: target, + invoker: sender, + message: msg, + } => { + Some(Message { + target: target.clone(), + invoker: sender.clone(), + text: msg.clone(), + }) + } + _ => None, + } +} + +impl TeamSpeakConnection { + pub async fn new(tx: Arc<Mutex<Sender<ApplicationMessage>>>, options: ConnectOptions) -> Result<TeamSpeakConnection, tsclientlib::Error> { + let conn = Connection::new(options).compat().await?; + let packet = conn.lock().server.set_subscribed(true); + conn.send_packet(packet).compat().await.unwrap(); + + conn.add_event_listener( + String::from("listener"), + Box::new(move |e| { + if let ConEvents(_conn, events) = e { + for event in *events { + if let Some(msg) = get_message(event) { + let tx = tx.lock().unwrap(); + tx.send(ApplicationMessage::TextMessage(msg)).unwrap(); + } + } + } + }), + ); + + Ok(TeamSpeakConnection { + conn, + }) + } + + pub fn send_audio_packet(&self, samples: &[u8]) { + let packet = tsproto_packets::packets::OutAudio::new( + &tsproto_packets::packets::AudioData::C2S { + id: 0, + codec: tsproto_packets::packets::CodecType::OpusMusic, + data: samples, + }, + ); + + let send_packet = self.conn + .get_packet_sink() + .send(packet) + .map(|_| ()) + .map_err(|_| error!("Failed to send voice packet")); + + tokio::run(send_packet); + } + + pub fn join_channel_of_user(&self, id: ClientId) { + let channel = self.conn.lock() + .clients + .get(&id) + .expect("can find poke sender") + .channel; + tokio::spawn(self.conn.lock().to_mut() + .get_client(&self.conn.lock().own_client) + .expect("can get myself") + .set_channel(channel) + .map_err(|e| error!("Failed to switch channel: {}", e))); + } + + pub fn set_nickname(&self, name: &str) { + tokio::spawn(self.conn + .lock() + .to_mut() + .set_name(name) + .map_err(|e| error!("Failed to set nickname: {}", e))); + } + + pub fn set_description(&self, desc: &str) { + tokio::spawn( + self.conn + .lock() + .to_mut() + .get_client(&self.conn.lock().own_client) + .expect("Can get myself") + .set_description(desc) + .map_err(|e| error!("Failed to change description: {}", e))); + } + + pub fn send_message_to_channel(&self, text: &str) { + tokio::spawn(self.conn.lock().to_mut() + .send_message(MessageTarget::Channel, text) + .map_err(|e| error!("Failed to send message: {}", e))); + + } +} diff --git a/src/youtube_dl.rs b/src/youtube_dl.rs new file mode 100644 index 0000000..1eff302 --- /dev/null +++ b/src/youtube_dl.rs @@ -0,0 +1,38 @@ +use std::process::{Command, Stdio}; + +use log::{debug}; + +pub fn get_audio_download_url(uri: String) -> Result<(String, String), String> { + let ytdl_args = [ + "--no-playlist", + "-f", + "bestaudio/best", + "-g", + "--get-filename", + "-o", + "%(title)s", + &uri, + ]; + + let mut cmd = Command::new("youtube-dl"); + cmd.args(&ytdl_args); + cmd.stdin(Stdio::null()); + + debug!("yt-dl command: {:?}", cmd); + + let ytdl_output = cmd + .output() + .unwrap(); + + let output = String::from_utf8(ytdl_output.stdout.clone()).unwrap(); + + if ytdl_output.status.success() == false { + return Err(String::from_utf8(ytdl_output.stderr.clone()).unwrap()); + } + + let lines = output.lines().collect::<Vec<_>>(); + let url = lines[0].to_owned(); + let title = lines[1].to_owned(); + + Ok((url, title)) +}
\ No newline at end of file |
