diff options
| author | Jokler <jokler.contact@gmail.com> | 2020-01-06 00:07:57 +0100 |
|---|---|---|
| committer | Jokler <jokler.contact@gmail.com> | 2020-01-06 00:07:57 +0100 |
| commit | 31fab217a2221d76c79bd0b7b0f2e97fbc23d06b (patch) | |
| tree | 399388eab5fecfd485b1f85fbb5770dca424f2ed /src | |
| download | pokebot-31fab217a2221d76c79bd0b7b0f2e97fbc23d06b.tar.gz pokebot-31fab217a2221d76c79bd0b7b0f2e97fbc23d06b.zip | |
Initial commit
Diffstat (limited to 'src')
| -rw-r--r-- | src/main.rs | 193 | ||||
| -rw-r--r-- | src/state.rs | 267 |
2 files changed, 460 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..e00ae9f --- /dev/null +++ b/src/main.rs @@ -0,0 +1,193 @@ +use std::str::FromStr; +use std::io::Read; +use std::path::PathBuf; + +use futures::{ + compat::Future01CompatExt, + future::{FutureExt, TryFutureExt}, +}; + +use futures01::{future::Future, stream::Stream, sync::mpsc}; +use structopt::clap::AppSettings; +use structopt::StructOpt; + +use tsclientlib::{ + events::Event, ChannelId, ConnectOptions, Connection, ConnectionLock, DisconnectOptions, + Event::ConEvents, Identity, MessageTarget, +}; + +use log::error; + +mod state; +use state::State; + +#[derive(StructOpt, Debug)] +#[structopt(raw(global_settings = "&[AppSettings::ColoredHelp]"))] +struct Args { + #[structopt( + short = "a", + long = "address", + default_value = "localhost", + help = "The address of the server to connect to" + )] + address: String, + #[structopt( + short = "i", + long = "id", + help = "Identity file - good luck creating one", + parse(from_os_str) + )] + id_path: Option<PathBuf>, + #[structopt( + short = "v", + long = "verbose", + help = "Print the content of all packets", + parse(from_occurrences) + )] + verbose: u8, + // 0. Print nothing + // 1. Print command string + // 2. Print packets + // 3. Print udp packets +} + +fn main() { + tokio::run(async_main().unit_error().boxed().compat()); +} + +async fn async_main() { + // Parse command line options + let args = Args::from_args(); + + let id = if let Some(path) = args.id_path { + let mut file = std::fs::File::open(path).expect("Failed to open id file"); + let mut content = String::new(); + file.read_to_string(&mut content) + .expect("Failed to read id file"); + + toml::from_str(&content).expect("Failed to parse id file") + } else { + Identity::create().expect("Failed to create id") + }; + + let con_config = ConnectOptions::new(args.address) + .version(tsclientlib::Version::Linux_3_3_2) + .name(String::from("PokeBot")) + .identity(id) + .log_commands(args.verbose >= 1) + .log_packets(args.verbose >= 2) + .log_udp_packets(args.verbose >= 3); + + //let (disconnect_send, disconnect_recv) = mpsc::unbounded(); + let con = Connection::new(con_config).compat().await.unwrap(); + + let mut state = State::new(con.clone()); + { + let packet = con.lock().server.set_subscribed(true); + con.send_packet(packet).compat().await; + } + //con.add_on_disconnect(Box::new( || { + //disconnect_send.unbounded_send(()).unwrap() + //})); + let inner_state = state.clone(); + con.add_event_listener( + String::from("listener"), + Box::new(move |e| { + if let ConEvents(con, events) = e { + for event in *events { + handle_event(&inner_state, &con, event); + } + } + }), + ); + + loop { + state.poll().await; + } + let ctrl_c = tokio_signal::ctrl_c().flatten_stream(); + + //let dc_fut = disconnect_recv.into_future().compat().fuse(); + //let ctrlc_fut = ctrl_c.into_future().compat().fuse(); + //ctrlc_fut.await.map_err(|(e, _)| e).unwrap(); + + con.disconnect(DisconnectOptions::new()) + .compat() + .await + .unwrap(); + + // TODO Should not be required + std::process::exit(0); +} + +fn handle_event<'a>(state: &State, con: &ConnectionLock<'a>, event: &Event) { + match event { + Event::Message { + from: target, + invoker: sender, + message: msg, + } => { + if let MessageTarget::Poke(who) = target { + let channel = con.clients.get(&who).expect("can find poke sender").channel; + tokio::spawn( + con.to_mut() + .get_client(&con.own_client) + .expect("can get myself") + .set_channel(channel) + .map_err(|e| error!("Failed to switch channel: {}", e)), + ); + } else if sender.id != con.own_client { + if msg.starts_with("!") { + let tokens = msg[1..].split_whitespace().collect::<Vec<_>>(); + match tokens.get(0).map(|t| *t) { + Some("test") => { + tokio::spawn( + con.to_mut() + .send_message(*target, "works :)") + .map_err(|_| ()), + ); + } + Some("add") => { + let mut invalid = false; + if let Some(url) = &tokens.get(1) { + if url.len() > 11 { + let trimmed = url[5..url.len() - 6].to_owned(); + state.add_audio(trimmed); + } else { + invalid = true; + } + } else { + invalid = true; + } + if invalid { + tokio::spawn( + con.to_mut() + .send_message(MessageTarget::Channel, "Invalid Url") + .map_err(|_| ()), + ); + } + } + Some("volume") => { + if let Ok(volume) = f64::from_str(tokens[1]) { + state.volume(volume / 100.0); + } + } + Some("play") => { + state.play(); + } + Some("pause") => { + state.pause(); + } + Some("stop") => { + state.stop(); + } + Some("quit") => { + //tokio::spawn(state.disconnect().unit_error().boxed().compat()); + } + _ => (), + }; + } + } + } + _ => (), + } +} diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..a1c0d6a --- /dev/null +++ b/src/state.rs @@ -0,0 +1,267 @@ +use std::error::Error; +use std::process::{Command, Stdio}; +use std::sync::Arc; + +use futures::{ + compat::Future01CompatExt, + future::{FutureExt, TryFutureExt}, +}; +use futures01::{future::Future, sink::Sink}; + +use futures_util::stream::StreamExt; +use tsclientlib::{Connection, DisconnectOptions}; + +use gst::prelude::*; +use gstreamer as gst; +use gstreamer_app as gst_app; +use gstreamer_audio as gst_audio; + +use byte_slice_cast::*; + +#[derive(Clone)] +pub struct State { + conn: Arc<Connection>, + pipeline: Arc<gst::Pipeline>, +} + +impl State { + pub fn new(conn: Connection) -> Self { + let conn_arc = Arc::new(conn); + + gst::init().unwrap(); + let pipeline = gst::Pipeline::new(Some("Ts Audio Player")); + let http_dl = gst::ElementFactory::make("souphttpsrc", Some("http-source")).unwrap(); + let decoder = gst::ElementFactory::make("decodebin", Some("video-decoder")).unwrap(); + pipeline.add_many(&[&http_dl, &decoder]).unwrap(); + + http_dl + .link(&decoder) + .expect("Can link https_dl to decoder"); + + let pipeline_weak = pipeline.downgrade(); + + let inner_conn = conn_arc.clone(); + decoder.connect_pad_added(move |_, decoder_src| { + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return, + }; + + let is_audio = { + let media_type = decoder_src.get_current_caps().and_then(|caps| { + caps.get_structure(0).map(|s| { + let name = s.get_name(); + name.starts_with("audio/") + }) + }); + + match media_type { + None => { + eprintln!( + "Failed to get media type from pad {}", + decoder_src.get_name() + ); + return; + } + Some(media_type) => media_type, + } + }; + + if is_audio { + let names = [ + "audio-converter", + "volume", + "audio-resampler", + "opus-encoder", + "app-sink", + ]; + for element in pipeline.iterate_elements() { + if let Ok(element) = element { + if names.contains(&&*element.get_name()) { + element.set_state(gst::State::Null); + pipeline.remove_many(&[&element]); + } + } + } + + let convert = + gst::ElementFactory::make("audioconvert", Some("audio-converter")).unwrap(); + let volume = gst::ElementFactory::make("volume", Some("volume")).unwrap(); + let resample = + gst::ElementFactory::make("audioresample", Some("audio-resampler")).unwrap(); + let opus_enc = gst::ElementFactory::make("opusenc", Some("opus-encoder")).unwrap(); + let sink = gst::ElementFactory::make("appsink", Some("app-sink")).unwrap(); + + volume.set_property("volume", &0.2); + + { + let elements = &[&convert, &volume, &resample, &opus_enc, &sink]; + pipeline.add_many(elements).expect("Can add audio elements"); + gst::Element::link_many(elements).expect("Can link audio elements"); + + for e in elements { + e.sync_state_with_parent() + .expect("Can sync state with parent"); + } + } + + let appsink = sink + .dynamic_cast::<gst_app::AppSink>() + .expect("Sink element is expected to be an appsink!"); + + appsink.set_caps(Some(&gst::Caps::new_simple("audio/x-opus", &[]))); + + let inner_conn = inner_conn.clone(); + appsink.set_callbacks( + gst_app::AppSinkCallbacks::new() + // Add a handler to the "new-sample" signal. + .new_sample(move |appsink| { + // Pull the sample in question out of the appsink's buffer. + let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + let buffer = sample + .get_buffer() + .expect("Failed to get buffer from appsink"); + + let map = buffer + .map_readable() + .expect("Failed to map buffer readable"); + + let samples = map + .as_slice_of::<u8>() + .expect("Failed to interprete buffer as S16 PCM"); + + let packet = tsproto_packets::packets::OutAudio::new( + &tsproto_packets::packets::AudioData::C2S { + id: 0, + codec: tsproto_packets::packets::CodecType::OpusMusic, + data: &samples, + }, + ); + + tokio::run( + inner_conn + .get_packet_sink() + .send(packet) + .map(|_| ()) + .map_err(|e| (println!("Failed to send voice packet."))), + ); + + Ok(gst::FlowSuccess::Ok) + }) + .build(), + ); + + let convert_sink = convert + .get_static_pad("sink") + .expect("queue has no sinkpad"); + decoder_src + .link(&convert_sink) + .expect("Can link decoder src to convert sink"); + } + }); + + Self { + conn: conn_arc, + pipeline: Arc::new(pipeline), + } + } + + pub fn add_audio<'a>(&self, uri: String) { + let ytdl_args = ["-f", "bestaudio", "-g", &uri, "-o", "-"]; + + let ytdl_output = Command::new("youtube-dl") + .args(&ytdl_args) + .stdin(Stdio::null()) + .output() + .unwrap(); + + if ytdl_output.status.success() == false { + return; + } + let dl_url: &str = &String::from_utf8(ytdl_output.stderr).unwrap(); + + self.pipeline + .set_state(gst::State::Ready) + .expect("Can set state to null"); + + let http_src = self + .pipeline + .get_by_name("http-source") + .expect("Http source should be registered"); + http_src + .set_property("location", &dl_url) + .expect("Can set location on http_dl"); + + self.pipeline + .set_state(gst::State::Playing) + .expect("Can set state to playing"); + } + + pub async fn poll(&self) { + let bus = self + .pipeline + .get_bus() + .expect("Pipeline without bus. Shouldn't happen!"); + + let mut messages = gst::BusStream::new(&bus); + while let Some(msg) = messages.next().await { + use gst::MessageView; + + // Determine whether we want to quit: on EOS or error message + // we quit, otherwise simply continue. + match msg.view() { + MessageView::Eos(..) => break, + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.get_src().map(|s| s.get_path_string()), + err.get_error(), + err.get_debug() + ); + break; + } + _ => (), + }; + } + + self.pipeline + .set_state(gst::State::Null) + .expect("Can set state to null"); + } + + pub fn volume(&self, volume: f64) { + self.pipeline + .set_state(gst::State::Paused) + .expect("Can set state to null"); + let http_src = self + .pipeline + .get_by_name("http-source") + .expect("Http source should be registered") + .set_property("volume", &volume); + } + + pub fn play(&self) { + self.pipeline + .set_state(gst::State::Playing) + .expect("can play"); + } + + pub fn pause(&self) { + self.pipeline + .set_state(gst::State::Paused) + .expect("can pause"); + } + + pub fn stop(&self) { + self.pipeline + .set_state(gst::State::Ready) + .expect("can stop"); + } + + pub async fn disconnect(&self) { + self.conn + .disconnect(DisconnectOptions::new()) + .compat() + .await; + } +} |
