summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJokler <jokler@protonmail.com>2020-01-12 18:11:02 +0100
committerGitHub <noreply@github.com>2020-01-12 18:11:02 +0100
commit78690b2e2949ed3be38a136f1c6ac2866ac32df7 (patch)
tree64535e2e534e57ef94a057f8010b87c88fc56021 /src
parentfc62434581e5f7411177e7c30dd1f4543ec354be (diff)
parent193987e0c7185eb63827b2d91f1e2779c4e557d3 (diff)
downloadpokebot-78690b2e2949ed3be38a136f1c6ac2866ac32df7.tar.gz
pokebot-78690b2e2949ed3be38a136f1c6ac2866ac32df7.zip
Merge pull request #1 from fkaa/refactoring
Add enterprise logging and split out gstreamer & teamspeak specific code
Diffstat (limited to 'src')
-rw-r--r--src/audio_player.rs328
-rw-r--r--src/main.rs472
-rw-r--r--src/playlist.rs9
-rw-r--r--src/state.rs436
-rw-r--r--src/teamspeak.rs116
-rw-r--r--src/youtube_dl.rs38
6 files changed, 806 insertions, 593 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs
new file mode 100644
index 0000000..6626417
--- /dev/null
+++ b/src/audio_player.rs
@@ -0,0 +1,328 @@
+use std::sync::Once;
+
+use gstreamer as gst;
+use gst::prelude::*;
+use gstreamer_app::{AppSink, AppSinkCallbacks};
+use gst::{GhostPad};
+
+use log::{info, debug, warn, error};
+use std::sync::mpsc::Sender;
+use std::sync::{Mutex, Arc};
+use crate::{State, ApplicationMessage};
+use glib::BoolError;
+
+static GST_INIT: Once = Once::new();
+
+#[derive(Debug)]
+pub enum AudioPlayerError {
+ GStreamerError(glib::error::BoolError),
+ StateChangeFailed,
+}
+
+impl From<glib::error::BoolError> for AudioPlayerError {
+ fn from(err: BoolError) -> Self {
+ AudioPlayerError::GStreamerError(err)
+ }
+}
+
+impl From<gst::StateChangeError> for AudioPlayerError {
+ fn from(_err: gst::StateChangeError) -> Self {
+ AudioPlayerError::StateChangeFailed
+ }
+}
+
+pub struct AudioPlayer {
+ pipeline: gst::Pipeline,
+ bus: gst::Bus,
+ http_src: gst::Element,
+
+ volume: gst::Element,
+ sender: Arc<Mutex<Sender<ApplicationMessage>>>,
+}
+
+fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> {
+ Ok(gst::ElementFactory::make(
+ factoryname,
+ Some(display_name)
+ )?)
+}
+
+fn link_elements(a: &gst::Element, b: &gst::Element) -> Result<(), AudioPlayerError> {
+ a.link(b)?;
+
+ Ok(())
+}
+
+fn add_decode_bin_new_pad_callback(
+ decode_bin: &gst::Element,
+ audio_bin: gst::Bin,
+ ghost_pad: gst::GhostPad,
+) {
+ decode_bin.connect_pad_added(move |_, new_pad| {
+ debug!("New pad received on decode bin");
+ let name = if let Some(caps) = new_pad.get_current_caps() {
+ debug!("Pad caps: {}", caps.to_string());
+ if let Some(structure) = caps.get_structure(0) {
+ Some(structure.get_name().to_string())
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+
+ match name.as_deref() {
+ Some("audio/x-raw") => {
+ if let Some(peer) = ghost_pad.get_peer() {
+ peer.unlink(&ghost_pad).unwrap();
+ }
+
+ info!("Found raw audio, linking audio bin");
+ new_pad.link(&ghost_pad).unwrap();
+
+ audio_bin.sync_state_with_parent().unwrap();
+ }
+ _ => {}
+ }
+ });
+}
+
+impl AudioPlayer {
+ pub fn new(sender: Arc<Mutex<Sender<ApplicationMessage>>>, callback: Option<Box<dyn FnMut(&[u8]) + Send>>) -> Result<Self, AudioPlayerError> {
+ GST_INIT.call_once(|| gst::init().unwrap());
+
+ info!("Creating audio player");
+
+ let pipeline = gst::Pipeline::new(Some("TeamSpeak Audio Player"));
+ let bus = pipeline.get_bus().unwrap();
+ let http_src = make_element("souphttpsrc", "http source")?;
+ let decode_bin = make_element("decodebin", "decode bin")?;
+ pipeline.add_many(&[&http_src, &decode_bin])?;
+
+ link_elements(&http_src, &decode_bin)?;
+
+ let (audio_bin, volume, ghost_pad) = Self::create_audio_bin(callback)?;
+
+ add_decode_bin_new_pad_callback(
+ &decode_bin,
+ audio_bin.clone(),
+ ghost_pad);
+
+ pipeline.add(&audio_bin)?;
+
+ pipeline.set_state(gst::State::Ready)?;
+
+ Ok(AudioPlayer {
+ pipeline,
+ bus,
+ http_src,
+
+ volume,
+ sender,
+ })
+ }
+
+ fn create_audio_bin(callback: Option<Box<dyn FnMut(&[u8]) + Send>>) -> Result<(gst::Bin, gst::Element, gst::GhostPad), AudioPlayerError> {
+ let audio_bin = gst::Bin::new(Some("audio bin"));
+ let queue = make_element("queue", "audio queue")?;
+ let convert = make_element("audioconvert", "audio converter")?;
+ let volume = make_element("volume", "volume")?;
+ let resample = make_element("audioresample", "audio resampler")?;
+ let pads = queue.get_sink_pads();
+ let queue_sink_pad = pads
+ .first()
+ .unwrap();
+
+ audio_bin.add_many(&[
+ &queue,
+ &convert,
+ &volume,
+ &resample,
+ ])?;
+
+ if let Some(mut callback) = callback {
+ let opus_enc = make_element("opusenc", "opus encoder")?;
+ let sink = make_element("appsink", "app sink")?;
+
+ let appsink = sink
+ .clone()
+ .dynamic_cast::<AppSink>()
+ .expect("Sink element is expected to be an appsink!");
+ appsink.set_caps(Some(&gst::Caps::new_simple("audio/x-opus", &[])));
+ let callbacks = AppSinkCallbacks::new()
+ .new_sample(move |sink| {
+ let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
+ let buffer = sample.get_buffer().ok_or(gst::FlowError::Error)?;
+ let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
+ let samples = map.as_slice();
+
+ callback(samples);
+
+ Ok(gst::FlowSuccess::Ok)
+ })
+ .build();
+ appsink.set_callbacks(callbacks);
+
+ audio_bin.add_many(&[
+ &opus_enc,
+ &sink
+ ])?;
+
+ gst::Element::link_many(&[
+ &queue,
+ &convert,
+ &volume,
+ &resample,
+ &opus_enc,
+ &sink
+ ])?;
+ } else {
+ let sink = make_element("autoaudiosink", "auto audio sink")?;
+
+ audio_bin.add_many(&[
+ &sink
+ ])?;
+
+ gst::Element::link_many(&[
+ &queue,
+ &convert,
+ &volume,
+ &resample,
+ &sink
+ ])?;
+ };
+
+ let ghost_pad = GhostPad::new(Some("audio bin sink"), queue_sink_pad).unwrap();
+ ghost_pad.set_active(true)?;
+ audio_bin.add_pad(&ghost_pad)?;
+
+ Ok((audio_bin, volume, ghost_pad))
+ }
+
+ pub fn set_source_url(&self, location: String) -> Result<(), AudioPlayerError> {
+ info!("Setting location URI: {}", location);
+ self.http_src.set_property("location", &location)?;
+
+ Ok(())
+ }
+
+ pub fn set_volume(&self, volume: f64) -> Result<(), AudioPlayerError> {
+ let log_volume = 1.0 - 10.0f64.powf(-volume * 2.0);
+ info!("Setting volume: {} -> {}", volume, log_volume);
+
+ self.volume.set_property("volume", &log_volume)?;
+
+ Ok(())
+ }
+
+ pub fn is_started(&self) -> bool {
+ let (_, current, pending) = self.pipeline.get_state(gst::ClockTime(None));
+
+ match (current, pending) {
+ (gst::State::Null, gst::State::VoidPending) => false,
+ (_, gst::State::Null) => false,
+ (gst::State::Ready, gst::State::VoidPending) => false,
+ _ => true
+ }
+ }
+
+ pub fn reset(&self) -> Result<(), AudioPlayerError> {
+ info!("Setting pipeline state to null");
+
+ self.pipeline.set_state(gst::State::Null)?;
+
+ Ok(())
+ }
+
+ pub fn play(&self) -> Result<(), AudioPlayerError> {
+ info!("Setting pipeline state to playing");
+
+ self.pipeline.set_state(gst::State::Playing)?;
+
+ Ok(())
+ }
+
+ pub fn pause(&self) -> Result<(), AudioPlayerError> {
+ info!("Setting pipeline state to paused");
+
+ self.pipeline.set_state(gst::State::Paused)?;
+
+ Ok(())
+ }
+
+ pub fn stop_current(&self) {
+ info!("Stopping pipeline, sending EOS");
+
+ let handled = self.http_src.send_event(gst::Event::new_eos().build());
+ if !handled {
+ warn!("EOS event was not handled");
+ }
+ }
+
+ fn send_state(&self, state: State) {
+ info!("Sending state {:?} to application", state);
+ let sender = self.sender.lock().unwrap();
+ sender.send(ApplicationMessage::StateChange(state)).unwrap();
+ }
+
+ pub fn poll(&self) {
+ debug!("Polling GStreamer");
+ 'outer: loop {
+ while let Some(msg) = self.bus.timed_pop(gst::ClockTime(None)) {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::StateChanged(state) => {
+ if let Some(src) = state.get_src() {
+ if src.get_name() != self.pipeline.get_name() {
+ continue;
+ }
+ }
+
+ let old = state.get_old();
+ let current = state.get_current();
+ let pending = state.get_pending();
+
+ match (old, current, pending) {
+ (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => self.send_state(State::Playing),
+ (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => self.send_state(State::Paused),
+ (_, gst::State::Ready, gst::State::Null) => self.send_state(State::Stopped),
+ (_, gst::State::Null, gst::State::VoidPending) => self.send_state(State::Stopped),
+ _ => {
+ debug!("Pipeline transitioned from {:?} to {:?}, with {:?} pending", old, current, pending);
+ }
+ }
+ }
+ MessageView::Eos(..) => {
+ info!("End of stream reached");
+ self.reset().unwrap();
+
+ break 'outer;
+ },
+ MessageView::Warning(warn) => {
+ warn!(
+ "Warning from {:?}: {} ({:?})",
+ warn.get_src().map(|s| s.get_path_string()),
+ warn.get_error(),
+ warn.get_debug()
+ );
+ break 'outer;
+ }
+ MessageView::Error(err) => {
+ error!(
+ "Error from {:?}: {} ({:?})",
+ err.get_src().map(|s| s.get_path_string()),
+ err.get_error(),
+ err.get_debug()
+ );
+ break 'outer;
+ }
+ _ => {
+ // debug!("{:?}", msg)
+ },
+ };
+ }
+ }
+ debug!("Left GStreamer message loop");
+ }
+} \ No newline at end of file
diff --git a/src/main.rs b/src/main.rs
index cdb0bd6..c4fdd2f 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,31 +1,39 @@
-use std::io::Read;
+use std::io::{Read, BufRead};
use std::path::PathBuf;
use std::str::FromStr;
+use std::thread;
+use std::sync::{Arc, Mutex};
use futures::{
- compat::Future01CompatExt,
future::{FutureExt, TryFutureExt},
};
-
-use futures01::future::Future;
use structopt::clap::AppSettings;
use structopt::StructOpt;
-
use tsclientlib::{
- events::Event, ConnectOptions, Connection, ConnectionLock, Event::ConEvents, Identity,
- MessageTarget,
+ ConnectOptions, Identity, MessageTarget, Invoker, ClientId,
};
+use log::{info, debug};
-use log::error;
-
+mod audio_player;
+mod youtube_dl;
+mod teamspeak;
mod playlist;
-mod state;
-use state::State;
+
+use audio_player::*;
+use teamspeak::*;
+use playlist::*;
+use std::sync::mpsc::Sender;
#[derive(StructOpt, Debug)]
#[structopt(raw(global_settings = "&[AppSettings::ColoredHelp]"))]
struct Args {
#[structopt(
+ short = "l",
+ long = "local",
+ help = "Run locally in text mode"
+ )]
+ local: bool,
+ #[structopt(
short = "a",
long = "address",
default_value = "localhost",
@@ -58,174 +66,324 @@ struct Args {
// 3. Print udp packets
}
+pub struct Message {
+ pub target: MessageTarget,
+ pub invoker: Invoker,
+ pub text: String,
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub enum State {
+ Playing,
+ Paused,
+ Stopped,
+ EndOfStream,
+}
+
+pub enum ApplicationMessage {
+ TextMessage(Message),
+ StateChange(State),
+}
+
+struct Application {
+ player: Arc<AudioPlayer>,
+ teamspeak: Option<Arc<TeamSpeakConnection>>,
+ playlist: Arc<Mutex<Playlist>>,
+ state: Arc<Mutex<State>>,
+}
+
+impl Application {
+ pub fn new(player: Arc<AudioPlayer>, playlist: Arc<Mutex<Playlist>>, teamspeak: Option<Arc<TeamSpeakConnection>>) -> Self {
+ Self {
+ player,
+ teamspeak,
+ playlist,
+ state: Arc::new(Mutex::new(State::Stopped)),
+ }
+ }
+
+ #[inline(always)]
+ fn with_teamspeak<F: Fn(&TeamSpeakConnection)>(&self, func: F) {
+ if let Some(ts) = &self.teamspeak {
+ func(&ts);
+ }
+ }
+
+ fn start_playing_audio(&self, request: AudioRequest) {
+ self.send_message(&format!("Playing '{}'", request.title));
+ self.set_description(&format!("Currently playing '{}'", request.title));
+ self.player.reset().unwrap();
+ self.player.set_source_url(request.address).unwrap();
+ self.player.play().unwrap();
+ }
+
+ pub fn add_audio(&self, url: String) {
+ if self.playlist.lock().expect("Mutex was not poisoned").is_full() {
+ info!("Audio playlist is full");
+ self.send_message("Playlist is full");
+ return;
+ }
+
+ match youtube_dl::get_audio_download_url(url) {
+ Ok((audio_url, audio_title)) => {
+ info!("Found audio url: {}", audio_url);
+
+ let request = AudioRequest {
+ title: audio_title,
+ address: audio_url,
+ };
+
+ let mut playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ playlist.push(request.clone());
+
+ if !self.player.is_started() {
+ if let Some(request) = playlist.pop() {
+ self.start_playing_audio(request);
+ }
+ } else {
+ self.send_message(&format!("Added '{}' to playlist", request.title));
+ }
+ }
+ Err(e) => {
+ info!("Failed to find audio url: {}", e);
+
+ self.send_message(&format!("Failed to find url: {}", e));
+ }
+ }
+ }
+
+ fn send_message(&self, text: &str) {
+ debug!("Sending message to TeamSpeak: {}", text);
+
+ self.with_teamspeak(|ts| ts.send_message_to_channel(text));
+ }
+
+ fn set_nickname(&self, name: &str) {
+ info!("Setting TeamsSpeak nickname to {}", name);
+
+ self.with_teamspeak(|ts| ts.set_nickname(name));
+ }
+
+ fn set_description(&self, desc: &str) {
+ info!("Setting TeamsSpeak description to {}", desc);
+
+ self.with_teamspeak(|ts| ts.set_description(desc));
+ }
+
+ fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> {
+ let msg = message.text;
+ if msg.starts_with("!") {
+ let tokens = msg[1..].split_whitespace().collect::<Vec<_>>();
+
+ match tokens.get(0).map(|t| *t) {
+ Some("add") => {
+ if let Some(url) = &tokens.get(1) {
+ // strip bbcode tags from url
+ let url = url.replace("[URL]", "").replace("[/URL]", "");
+
+ self.add_audio(url.to_string());
+ }
+ }
+ Some("play") => {
+ if !self.player.is_started() {
+ self.player.stop_current();
+ } else {
+ self.player.play()?;
+ }
+ }
+ Some("pause") => {
+ self.player.pause()?;
+ }
+ Some("stop") => {
+ self.player.reset()?;
+ }
+ Some("next") => {
+ let playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ if !playlist.is_empty() {
+ info!("Skipping to next track");
+ self.player.stop_current();
+ } else {
+ info!("Playlist empty, cannot skip");
+ self.player.reset()?;
+ }
+ }
+ Some("clear") => {
+ self.playlist.lock().expect("Mutex was not poisoned").clear();
+ }
+ Some("volume") => {
+ if let Some(&volume) = &tokens.get(1) {
+ if let Ok(volume) = f64::from_str(volume) {
+ let volume = volume.max(0.0).min(100.0) * 0.01;
+
+ self.player.set_volume(volume)?;
+ }
+ }
+ }
+ _ => {}
+ }
+ }
+
+ Ok(())
+ }
+
+ fn on_state(&self, state: State) -> Result<(), AudioPlayerError> {
+ let mut current_state = self.state.lock().unwrap();
+ if *current_state != state {
+ match state {
+ State::Playing => {
+ self.set_nickname("PokeBot - Playing");
+ }
+ State::Paused => {
+ self.set_nickname("PokeBot - Paused");
+ }
+ State::Stopped => {
+ self.set_nickname("PokeBot");
+ self.set_description("");
+ }
+ State::EndOfStream => {
+ let next_track = self.playlist.lock().expect("Mutex was not poisoned").pop();
+ if let Some(request) = next_track {
+ info!("Advancing playlist");
+
+ self.start_playing_audio(request);
+ } else {
+ self.set_nickname("PokeBot");
+ self.set_description("");
+ }
+ }
+ }
+ }
+
+ *current_state = state;
+
+ Ok(())
+ }
+
+ pub fn on_message(&self, message: ApplicationMessage) -> Result<(), AudioPlayerError> {
+ match message {
+ ApplicationMessage::TextMessage(message) => {
+ if let MessageTarget::Poke(who) = message.target {
+ info!("Poked by {}, joining their channel", who);
+ self.with_teamspeak(|ts| ts.join_channel_of_user(who));
+ } else {
+ self.on_text(message)?;
+ }
+ }
+ ApplicationMessage::StateChange(state) => {
+ self.on_state(state)?;
+ }
+ }
+
+ Ok(())
+ }
+}
+
fn main() {
+ log4rs::init_file("log4rs.yml", Default::default()).unwrap();
+
tokio::run(async_main().unit_error().boxed().compat());
}
async fn async_main() {
+ info!("Starting PokeBot!");
+
// Parse command line options
let args = Args::from_args();
- let id = if let Some(path) = args.id_path {
- let mut file = std::fs::File::open(path).expect("Failed to open id file");
- let mut content = String::new();
- file.read_to_string(&mut content)
- .expect("Failed to read id file");
+ debug!("Received CLI arguments: {:?}", std::env::args());
+
+ let (tx, rx) = ::std::sync::mpsc::channel();
+ let tx = Arc::new(Mutex::new(tx));
+ let (player, connection) = if args.local {
+ info!("Starting in CLI mode");
+ let audio_player = AudioPlayer::new(tx.clone(), None)
+ .unwrap();
- toml::from_str(&content).expect("Failed to parse id file")
+ (audio_player, None)
} else {
- Identity::create().expect("Failed to create id")
- };
+ info!("Starting in TeamSpeak mode");
- let mut con_config = ConnectOptions::new(args.address)
- .version(tsclientlib::Version::Linux_3_3_2)
- .name(String::from("PokeBot"))
- .identity(id)
- .log_commands(args.verbose >= 1)
- .log_packets(args.verbose >= 2)
- .log_udp_packets(args.verbose >= 3);
+ let id = if let Some(path) = args.id_path {
+ let mut file = std::fs::File::open(path).expect("Failed to open id file");
+ let mut content = String::new();
+ file.read_to_string(&mut content)
+ .expect("Failed to read id file");
- if let Some(channel) = args.default_channel {
- con_config = con_config.channel(channel);
- }
+ toml::from_str(&content).expect("Failed to parse id file")
+ } else {
+ Identity::create().expect("Failed to create id")
+ };
+
+ let mut con_config = ConnectOptions::new(args.address)
+ .version(tsclientlib::Version::Linux_3_3_2)
+ .name(String::from("PokeBot"))
+ .identity(id)
+ .log_commands(args.verbose >= 1)
+ .log_packets(args.verbose >= 2)
+ .log_udp_packets(args.verbose >= 3);
+
+ if let Some(channel) = args.default_channel {
+ con_config = con_config.channel(channel);
+ }
+
+ let connection = Arc::new(TeamSpeakConnection::new(tx.clone(), con_config).await.unwrap());
+ let cconnection = connection.clone();
+ let audio_player = AudioPlayer::new(tx.clone(), Some(Box::new(move |samples| {
+ cconnection.send_audio_packet(samples);
+ }))).unwrap();
+
+ (audio_player, Some(connection))
+ };
+
+ player.set_volume(0.1).unwrap();
+ let player = Arc::new(player);
+ let playlist = Arc::new(Mutex::new(Playlist::new()));
+ let application = Arc::new(Application::new(player.clone(), playlist.clone(), connection));
- //let (disconnect_send, disconnect_recv) = mpsc::unbounded();
- let conn = Connection::new(con_config).compat().await.unwrap();
+ spawn_gstreamer_thread(player, tx.clone());
- let state = State::new(conn.clone());
- {
- let packet = conn.lock().server.set_subscribed(true);
- conn.send_packet(packet).compat().await.unwrap();
+ if args.local {
+ spawn_stdin_reader(tx);
}
- //con.add_on_disconnect(Box::new( || {
- //disconnect_send.unbounded_send(()).unwrap()
- //}));
- let inner_state = state.clone();
- conn.add_event_listener(
- String::from("listener"),
- Box::new(move |e| {
- if let ConEvents(conn, events) = e {
- for event in *events {
- handle_event(&inner_state, &conn, event);
- }
- }
- }),
- );
loop {
- state.poll().await;
+ while let Ok(msg) = rx.recv() {
+ application.on_message(msg).unwrap();
+ }
}
- //let ctrl_c = tokio_signal::ctrl_c().flatten_stream();
+}
- //let dc_fut = disconnect_recv.into_future().compat().fuse();
- //let ctrlc_fut = ctrl_c.into_future().compat().fuse();
- //ctrlc_fut.await.map_err(|(e, _)| e).unwrap();
+fn spawn_stdin_reader(tx: Arc<Mutex<Sender<ApplicationMessage>>>) {
+ thread::spawn(move || {
+ let stdin = ::std::io::stdin();
+ let lock = stdin.lock();
+ for line in lock.lines() {
+ let line = line.unwrap();
- //conn.disconnect(DisconnectOptions::new())
- //.compat()
- //.await
- //.unwrap();
+ let message = ApplicationMessage::TextMessage(
+ Message {
+ target: MessageTarget::Server,
+ invoker: Invoker {
+ name: String::from("stdin"),
+ id: ClientId(0),
+ uid: None,
+ },
+ text: line
+ }
+ );
- // TODO Should not be required
- //std::process::exit(0);
+ let tx = tx.lock().unwrap();
+ tx.send(message).unwrap();
+ }
+ });
}
-fn handle_event<'a>(state: &State, conn: &ConnectionLock<'a>, event: &Event) {
- match event {
- Event::Message {
- from: target,
- invoker: sender,
- message: msg,
- } => {
- if let MessageTarget::Poke(who) = target {
- let channel = conn
- .clients
- .get(&who)
- .expect("can find poke sender")
- .channel;
- tokio::spawn(
- conn.to_mut()
- .get_client(&conn.own_client)
- .expect("can get myself")
- .set_channel(channel)
- .map_err(|e| error!("Failed to switch channel: {}", e)),
- );
- } else if sender.id != conn.own_client {
- if msg.starts_with("!") {
- let tokens = msg[1..].split_whitespace().collect::<Vec<_>>();
- match tokens.get(0).map(|t| *t) {
- Some("test") => {
- tokio::spawn(
- conn.to_mut()
- .send_message(*target, "works :)")
- .map_err(|_| ()),
- );
- }
- Some("add") => {
- let mut invalid = false;
- if let Some(url) = &tokens.get(1) {
- if url.len() > 11 {
- tokio::spawn(
- conn.to_mut().set_name("PokeBot - Loading").map_err(|_| ()),
- );
- let trimmed = url[5..url.len() - 6].to_owned();
- let inner_state = state.clone();
- tokio::spawn(
- async move { inner_state.add_audio(trimmed).await }
- .unit_error()
- .boxed()
- .compat(),
- );
- } else {
- invalid = true;
- }
- } else {
- invalid = true;
- }
- if invalid {
- tokio::spawn(
- conn.to_mut()
- .send_message(MessageTarget::Channel, "Invalid Url")
- .map_err(|_| ()),
- );
- }
- }
- Some("volume") => {
- if let Ok(volume) = f64::from_str(tokens[1]) {
- if 0.0 <= volume && volume <= 100.0 {
- state.volume(volume);
- } else {
- tokio::spawn(
- conn.to_mut()
- .send_message(
- MessageTarget::Channel,
- "Volume must be between 0 and 100",
- )
- .map_err(|_| ()),
- );
- }
- }
- }
- Some("play") => {
- state.play();
- }
- Some("skip") => {
- state.next();
- }
- Some("clear") => {
- state.clear();
- }
- Some("pause") => {
- state.pause();
- }
- Some("stop") => {
- state.stop();
- }
- _ => (),
- };
- }
- }
+fn spawn_gstreamer_thread(player: Arc<AudioPlayer>, tx: Arc<Mutex<Sender<ApplicationMessage>>>) {
+ thread::spawn(move || {
+ loop {
+ player.poll();
+
+ tx.lock().unwrap().send(ApplicationMessage::StateChange(State::EndOfStream)).unwrap();
}
- _ => (),
- }
+ });
}
diff --git a/src/playlist.rs b/src/playlist.rs
index 6a3dabe..818a470 100644
--- a/src/playlist.rs
+++ b/src/playlist.rs
@@ -1,3 +1,5 @@
+use log::info;
+
pub struct Playlist {
data: Vec<Option<AudioRequest>>,
read: usize,
@@ -20,6 +22,8 @@ impl Playlist {
return false;
}
+ info!("Adding {} to playlist", &req.title);
+
if self.data.len() < self.data.capacity() {
self.data.push(Some(req));
} else {
@@ -32,6 +36,7 @@ impl Playlist {
self.is_full = true;
}
+
true
}
@@ -51,6 +56,8 @@ impl Playlist {
let res = self.data[self.read].take();
self.read += 1;
+ info!("Popping {:?} from playlist", res.as_ref().map(|r| &r.title));
+
res
}
}
@@ -60,6 +67,8 @@ impl Playlist {
self.read = 0;
self.write = 0;
self.is_full = false;
+
+ info!("Cleared playlist")
}
}
diff --git a/src/state.rs b/src/state.rs
deleted file mode 100644
index bf953ce..0000000
--- a/src/state.rs
+++ /dev/null
@@ -1,436 +0,0 @@
-use std::process::Command;
-use std::sync::{Arc, Mutex};
-
-use futures::compat::Future01CompatExt;
-use futures01::{future::Future, sink::Sink};
-use futures_util::stream::StreamExt;
-use tokio_process::CommandExt;
-
-use tsclientlib::{Connection, DisconnectOptions, MessageTarget};
-
-use gst::prelude::*;
-use gstreamer as gst;
-use gstreamer_app as gst_app;
-
-use byte_slice_cast::*;
-
-use crate::playlist::{AudioRequest, Playlist};
-
-#[derive(Clone)]
-pub struct State {
- conn: Arc<Connection>,
- pipeline: Arc<gst::Pipeline>,
- playlist: Arc<Mutex<Playlist>>,
-}
-
-impl State {
- pub fn new(conn: Connection) -> Self {
- let conn_arc = Arc::new(conn);
-
- gst::init().unwrap();
- let pipeline = gst::Pipeline::new(Some("Ts Audio Player"));
- let http_dl = gst::ElementFactory::make("souphttpsrc", Some("http-source")).unwrap();
- let decoder = gst::ElementFactory::make("decodebin", Some("video-decoder")).unwrap();
- pipeline.add_many(&[&http_dl, &decoder]).unwrap();
-
- http_dl
- .link(&decoder)
- .expect("Can link https_dl to decoder");
-
- let pipeline_weak = pipeline.downgrade();
-
- let inner_conn = conn_arc.clone();
- decoder.connect_pad_added(move |_, decoder_src| {
- let pipeline = match pipeline_weak.upgrade() {
- Some(pipeline) => pipeline,
- None => return,
- };
-
- let is_audio = {
- let media_type = decoder_src.get_current_caps().and_then(|caps| {
- caps.get_structure(0).map(|s| {
- let name = s.get_name();
- name.starts_with("audio/")
- })
- });
-
- match media_type {
- None => {
- eprintln!(
- "Failed to get media type from pad {}",
- decoder_src.get_name()
- );
- return;
- }
- Some(media_type) => media_type,
- }
- };
-
- if is_audio {
- let prev_volume = if let Some(volume) = pipeline.get_by_name("volume") {
- volume
- .get_property("volume")
- .expect("Can get volume property")
- .get_some::<f64>()
- .unwrap_or(0.1)
- } else {
- 0.1
- };
-
- let names = [
- "audio-converter",
- "volume",
- "audio-resampler",
- "opus-encoder",
- "app-sink",
- ];
- for element in pipeline.iterate_elements() {
- if let Ok(element) = element {
- if names.contains(&&*element.get_name()) {
- element.set_state(gst::State::Null).unwrap();
- pipeline
- .remove_many(&[&element])
- .expect("Can remove element");
- }
- }
- }
-
- let convert =
- gst::ElementFactory::make("audioconvert", Some("audio-converter")).unwrap();
- let volume = gst::ElementFactory::make("volume", Some("volume")).unwrap();
- let resample =
- gst::ElementFactory::make("audioresample", Some("audio-resampler")).unwrap();
- let opus_enc = gst::ElementFactory::make("opusenc", Some("opus-encoder")).unwrap();
- let sink = gst::ElementFactory::make("appsink", Some("app-sink")).unwrap();
-
- sink.set_property("async", &false)
- .expect("Can make app-sink async");
-
- volume
- .set_property("volume", &prev_volume)
- .expect("Can change volume");
-
- {
- let elements = &[&convert, &volume, &resample, &opus_enc, &sink];
- pipeline.add_many(elements).expect("Can add audio elements");
- gst::Element::link_many(elements).expect("Can link audio elements");
-
- for e in elements {
- e.sync_state_with_parent()
- .expect("Can sync state with parent");
- }
- }
-
- let appsink = sink
- .dynamic_cast::<gst_app::AppSink>()
- .expect("Sink is an Appsink");
-
- appsink.set_caps(Some(&gst::Caps::new_simple("audio/x-opus", &[])));
-
- let inner_conn = inner_conn.clone();
- appsink.set_callbacks(
- gst_app::AppSinkCallbacks::new()
- // Add a handler to the "new-sample" signal.
- .new_sample(move |appsink| {
- // Pull the sample in question out of the appsink's buffer.
- let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
- let buffer = sample
- .get_buffer()
- .expect("Failed to get buffer from appsink");
-
- let map = buffer
- .map_readable()
- .expect("Failed to map buffer readable");
-
- let samples = map
- .as_slice_of::<u8>()
- .expect("Failed to interprete buffer as S16 PCM");
-
- let packet = tsproto_packets::packets::OutAudio::new(
- &tsproto_packets::packets::AudioData::C2S {
- id: 0,
- codec: tsproto_packets::packets::CodecType::OpusMusic,
- data: &samples,
- },
- );
-
- let send_packet = inner_conn
- .get_packet_sink()
- .send(packet)
- .map(|_| ())
- .map_err(|e| println!("Failed to send voice packet: {}", e));
-
- tokio::run(send_packet);
-
- Ok(gst::FlowSuccess::Ok)
- })
- .build(),
- );
-
- let convert_sink = convert
- .get_static_pad("sink")
- .expect("queue has no sinkpad");
- decoder_src
- .link(&convert_sink)
- .expect("Can link decoder src to convert sink");
- }
- });
-
- Self {
- conn: conn_arc,
- pipeline: Arc::new(pipeline),
- playlist: Arc::new(Mutex::new(Playlist::new())),
- }
- }
-
- pub async fn add_audio(&self, url: String) {
- if self
- .playlist
- .lock()
- .expect("Mutex was not poisoned")
- .is_full()
- {
- self.send_message(MessageTarget::Channel, "Playlist is full");
- return;
- }
-
- let ytdl_args = [
- "--no-playlist",
- "-f",
- "bestaudio/best",
- "-g",
- "--get-filename",
- "-o",
- "%(title)s",
- &url,
- ];
-
- let output = Command::new("youtube-dl")
- .args(&ytdl_args)
- .output_async()
- .compat()
- .await
- .expect("youtube-dl is runnable");
-
- if output.status.success() == false {
- self.set_name("PokeBot");
- self.send_message(MessageTarget::Channel, "Failed to load url");
- return;
- }
-
- let output_string = String::from_utf8(output.stdout).unwrap();
- let output_lines = output_string.lines().collect::<Vec<_>>();
- let address = output_lines[0];
- let title = output_lines[1];
-
- let req = AudioRequest {
- title: title.to_owned(),
- address: address.to_owned(),
- };
-
- let state = self.pipeline.get_state(gst::ClockTime(None)).1;
- if gst::State::Null == state {
- self.set_name("PokeBot - Playing");
- self.start_audio(req);
- } else {
- match state {
- gst::State::Playing => self.set_name("PokeBot - Playing"),
- gst::State::Paused => self.set_name("PokeBot - Paused"),
- gst::State::Ready => self.set_name("PokeBot - Stopped"),
- gst::State::Null | gst::State::__Unknown(_) | gst::State::VoidPending => {
- unreachable!()
- }
- }
-
- let title = req.title.clone();
- if self
- .playlist
- .lock()
- .expect("Mutex was not poisoned")
- .push(req)
- == false
- {
- self.send_message(MessageTarget::Channel, "Playlist is full");
- } else {
- self.send_message(
- MessageTarget::Channel,
- &format!("Added '{}' to the playlist", title),
- );
- }
- }
- }
-
- pub async fn poll(&self) {
- let bus = self
- .pipeline
- .get_bus()
- .expect("Pipeline without bus. Shouldn't happen!");
-
- let mut messages = gst::BusStream::new(&bus);
- while let Some(msg) = messages.next().await {
- use gst::MessageView;
-
- match msg.view() {
- MessageView::Eos(..) => break,
- MessageView::Error(err) => {
- println!(
- "Error from {:?}: {} ({:?})",
- err.get_src().map(|s| s.get_path_string()),
- err.get_error(),
- err.get_debug()
- );
- break;
- }
- _ => (),
- };
- }
-
- self.next();
- }
-
- pub fn start_audio(&self, req: AudioRequest) {
- self.pipeline
- .set_state(gst::State::Null)
- .expect("Can set state to null");
-
- let http_src = self
- .pipeline
- .get_by_name("http-source")
- .expect("Http source should be registered");
-
- http_src
- .set_property("location", &&req.address)
- .expect("Can set location on http_dl");
-
- self.pipeline
- .set_state(gst::State::Playing)
- .expect("Can set state to playing");
-
- self.set_description(&format!("Currently playing: {}", req.title));
- }
-
- pub fn volume(&self, volume: f64) {
- if let Some(vol_filter) = self.pipeline.get_by_name("volume") {
- vol_filter
- .set_property("volume", &(10.0f64.powf(volume / 50.0 - 2.0)))
- .expect("can change volume");
-
- // TODO Reflect volume in name
- }
- }
-
- pub fn play(&self) {
- let http_src = self
- .pipeline
- .get_by_name("http-source")
- .expect("Http source should be registered");
-
- if http_src
- .get_property("location")
- .ok()
- .and_then(|v| v.get::<String>().ok().and_then(|s| s.map(|s| s.is_empty())))
- .unwrap_or(true)
- {
- if self
- .playlist
- .lock()
- .expect("Mutex was not poisoned")
- .is_empty()
- {
- self.send_message(MessageTarget::Channel, "There is nothing to play");
- return;
- }
- }
-
- self.pipeline
- .set_state(gst::State::Playing)
- .expect("can play");
-
- self.set_name("PokeBot - Playing");
- }
-
- pub fn next(&self) {
- if let Some(req) = self.playlist.lock().expect("Mutex was not poisoned").pop() {
- self.start_audio(req);
- } else {
- self.pipeline
- .set_state(gst::State::Null)
- .expect("Can set state to null");
-
- let http_src = self
- .pipeline
- .get_by_name("http-source")
- .expect("Http source should be registered");
-
- http_src
- .set_property("location", &"")
- .expect("Can set location on http_dl");
-
- self.set_name("PokeBot");
- }
- }
-
- pub fn clear(&self) {
- self.playlist
- .lock()
- .expect("Mutex was not poisoned")
- .clear();
-
- self.send_message(MessageTarget::Channel, "Playlist was cleared");
- }
-
- pub fn pause(&self) {
- self.pipeline
- .set_state(gst::State::Paused)
- .expect("can pause");
-
- self.set_name("PokeBot - Paused");
- }
-
- pub fn stop(&self) {
- self.pipeline
- .set_state(gst::State::Ready)
- .expect("can stop");
-
- self.set_name("PokeBot - Stopped");
- }
-
- pub async fn disconnect(&self) {
- self.conn
- .disconnect(DisconnectOptions::new())
- .compat()
- .await;
- }
-
- pub fn send_message(&self, target: MessageTarget, message: &str) {
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .send_message(target, message)
- .map_err(|e| println!("Failed to send message: {}", e)),
- );
- }
-
- pub fn set_name(&self, name: &str) {
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .set_name(name)
- .map_err(|e| println!("Failed to change name: {}", e)),
- );
- }
-
- pub fn set_description(&self, desc: &str) {
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .get_client(&self.conn.lock().own_client)
- .expect("Can get myself")
- .set_description(desc)
- .map_err(|e| println!("Failed to change description: {}", e)),
- );
- }
-}
diff --git a/src/teamspeak.rs b/src/teamspeak.rs
new file mode 100644
index 0000000..785eba2
--- /dev/null
+++ b/src/teamspeak.rs
@@ -0,0 +1,116 @@
+use futures::{
+ compat::Future01CompatExt,
+};
+use futures01::{future::Future, sink::Sink};
+
+use tsclientlib::{Connection, ConnectOptions, events::Event, ClientId, MessageTarget};
+use tsclientlib::Event::ConEvents;
+use crate::{ApplicationMessage, Message};
+use std::sync::mpsc::Sender;
+use std::sync::{Mutex, Arc};
+
+use log::{error};
+
+pub struct TeamSpeakConnection {
+ conn: Connection,
+}
+
+fn get_message<'a>(event: &Event) -> Option<Message> {
+ match event {
+ Event::Message {
+ from: target,
+ invoker: sender,
+ message: msg,
+ } => {
+ Some(Message {
+ target: target.clone(),
+ invoker: sender.clone(),
+ text: msg.clone(),
+ })
+ }
+ _ => None,
+ }
+}
+
+impl TeamSpeakConnection {
+ pub async fn new(tx: Arc<Mutex<Sender<ApplicationMessage>>>, options: ConnectOptions) -> Result<TeamSpeakConnection, tsclientlib::Error> {
+ let conn = Connection::new(options).compat().await?;
+ let packet = conn.lock().server.set_subscribed(true);
+ conn.send_packet(packet).compat().await.unwrap();
+
+ conn.add_event_listener(
+ String::from("listener"),
+ Box::new(move |e| {
+ if let ConEvents(_conn, events) = e {
+ for event in *events {
+ if let Some(msg) = get_message(event) {
+ let tx = tx.lock().unwrap();
+ tx.send(ApplicationMessage::TextMessage(msg)).unwrap();
+ }
+ }
+ }
+ }),
+ );
+
+ Ok(TeamSpeakConnection {
+ conn,
+ })
+ }
+
+ pub fn send_audio_packet(&self, samples: &[u8]) {
+ let packet = tsproto_packets::packets::OutAudio::new(
+ &tsproto_packets::packets::AudioData::C2S {
+ id: 0,
+ codec: tsproto_packets::packets::CodecType::OpusMusic,
+ data: samples,
+ },
+ );
+
+ let send_packet = self.conn
+ .get_packet_sink()
+ .send(packet)
+ .map(|_| ())
+ .map_err(|_| error!("Failed to send voice packet"));
+
+ tokio::run(send_packet);
+ }
+
+ pub fn join_channel_of_user(&self, id: ClientId) {
+ let channel = self.conn.lock()
+ .clients
+ .get(&id)
+ .expect("can find poke sender")
+ .channel;
+ tokio::spawn(self.conn.lock().to_mut()
+ .get_client(&self.conn.lock().own_client)
+ .expect("can get myself")
+ .set_channel(channel)
+ .map_err(|e| error!("Failed to switch channel: {}", e)));
+ }
+
+ pub fn set_nickname(&self, name: &str) {
+ tokio::spawn(self.conn
+ .lock()
+ .to_mut()
+ .set_name(name)
+ .map_err(|e| error!("Failed to set nickname: {}", e)));
+ }
+
+ pub fn set_description(&self, desc: &str) {
+ tokio::spawn(
+ self.conn
+ .lock()
+ .to_mut()
+ .get_client(&self.conn.lock().own_client)
+ .expect("Can get myself")
+ .set_description(desc)
+ .map_err(|e| error!("Failed to change description: {}", e)));
+ }
+
+ pub fn send_message_to_channel(&self, text: &str) {
+ tokio::spawn(self.conn.lock().to_mut()
+ .send_message(MessageTarget::Channel, text)
+ .map_err(|e| error!("Failed to send message: {}", e)));
+
+ }
+}
diff --git a/src/youtube_dl.rs b/src/youtube_dl.rs
new file mode 100644
index 0000000..1eff302
--- /dev/null
+++ b/src/youtube_dl.rs
@@ -0,0 +1,38 @@
+use std::process::{Command, Stdio};
+
+use log::{debug};
+
+pub fn get_audio_download_url(uri: String) -> Result<(String, String), String> {
+ let ytdl_args = [
+ "--no-playlist",
+ "-f",
+ "bestaudio/best",
+ "-g",
+ "--get-filename",
+ "-o",
+ "%(title)s",
+ &uri,
+ ];
+
+ let mut cmd = Command::new("youtube-dl");
+ cmd.args(&ytdl_args);
+ cmd.stdin(Stdio::null());
+
+ debug!("yt-dl command: {:?}", cmd);
+
+ let ytdl_output = cmd
+ .output()
+ .unwrap();
+
+ let output = String::from_utf8(ytdl_output.stdout.clone()).unwrap();
+
+ if ytdl_output.status.success() == false {
+ return Err(String::from_utf8(ytdl_output.stderr.clone()).unwrap());
+ }
+
+ let lines = output.lines().collect::<Vec<_>>();
+ let url = lines[0].to_owned();
+ let title = lines[1].to_owned();
+
+ Ok((url, title))
+} \ No newline at end of file