summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJokler <jokler.contact@gmail.com>2020-01-06 00:07:57 +0100
committerJokler <jokler.contact@gmail.com>2020-01-06 00:07:57 +0100
commit31fab217a2221d76c79bd0b7b0f2e97fbc23d06b (patch)
tree399388eab5fecfd485b1f85fbb5770dca424f2ed /src
downloadpokebot-31fab217a2221d76c79bd0b7b0f2e97fbc23d06b.tar.gz
pokebot-31fab217a2221d76c79bd0b7b0f2e97fbc23d06b.zip
Initial commit
Diffstat (limited to 'src')
-rw-r--r--src/main.rs193
-rw-r--r--src/state.rs267
2 files changed, 460 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..e00ae9f
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,193 @@
+use std::str::FromStr;
+use std::io::Read;
+use std::path::PathBuf;
+
+use futures::{
+ compat::Future01CompatExt,
+ future::{FutureExt, TryFutureExt},
+};
+
+use futures01::{future::Future, stream::Stream, sync::mpsc};
+use structopt::clap::AppSettings;
+use structopt::StructOpt;
+
+use tsclientlib::{
+ events::Event, ChannelId, ConnectOptions, Connection, ConnectionLock, DisconnectOptions,
+ Event::ConEvents, Identity, MessageTarget,
+};
+
+use log::error;
+
+mod state;
+use state::State;
+
+#[derive(StructOpt, Debug)]
+#[structopt(raw(global_settings = "&[AppSettings::ColoredHelp]"))]
+struct Args {
+ #[structopt(
+ short = "a",
+ long = "address",
+ default_value = "localhost",
+ help = "The address of the server to connect to"
+ )]
+ address: String,
+ #[structopt(
+ short = "i",
+ long = "id",
+ help = "Identity file - good luck creating one",
+ parse(from_os_str)
+ )]
+ id_path: Option<PathBuf>,
+ #[structopt(
+ short = "v",
+ long = "verbose",
+ help = "Print the content of all packets",
+ parse(from_occurrences)
+ )]
+ verbose: u8,
+ // 0. Print nothing
+ // 1. Print command string
+ // 2. Print packets
+ // 3. Print udp packets
+}
+
+fn main() {
+ tokio::run(async_main().unit_error().boxed().compat());
+}
+
+async fn async_main() {
+ // Parse command line options
+ let args = Args::from_args();
+
+ let id = if let Some(path) = args.id_path {
+ let mut file = std::fs::File::open(path).expect("Failed to open id file");
+ let mut content = String::new();
+ file.read_to_string(&mut content)
+ .expect("Failed to read id file");
+
+ toml::from_str(&content).expect("Failed to parse id file")
+ } else {
+ Identity::create().expect("Failed to create id")
+ };
+
+ let con_config = ConnectOptions::new(args.address)
+ .version(tsclientlib::Version::Linux_3_3_2)
+ .name(String::from("PokeBot"))
+ .identity(id)
+ .log_commands(args.verbose >= 1)
+ .log_packets(args.verbose >= 2)
+ .log_udp_packets(args.verbose >= 3);
+
+ //let (disconnect_send, disconnect_recv) = mpsc::unbounded();
+ let con = Connection::new(con_config).compat().await.unwrap();
+
+ let mut state = State::new(con.clone());
+ {
+ let packet = con.lock().server.set_subscribed(true);
+ con.send_packet(packet).compat().await;
+ }
+ //con.add_on_disconnect(Box::new( || {
+ //disconnect_send.unbounded_send(()).unwrap()
+ //}));
+ let inner_state = state.clone();
+ con.add_event_listener(
+ String::from("listener"),
+ Box::new(move |e| {
+ if let ConEvents(con, events) = e {
+ for event in *events {
+ handle_event(&inner_state, &con, event);
+ }
+ }
+ }),
+ );
+
+ loop {
+ state.poll().await;
+ }
+ let ctrl_c = tokio_signal::ctrl_c().flatten_stream();
+
+ //let dc_fut = disconnect_recv.into_future().compat().fuse();
+ //let ctrlc_fut = ctrl_c.into_future().compat().fuse();
+ //ctrlc_fut.await.map_err(|(e, _)| e).unwrap();
+
+ con.disconnect(DisconnectOptions::new())
+ .compat()
+ .await
+ .unwrap();
+
+ // TODO Should not be required
+ std::process::exit(0);
+}
+
+fn handle_event<'a>(state: &State, con: &ConnectionLock<'a>, event: &Event) {
+ match event {
+ Event::Message {
+ from: target,
+ invoker: sender,
+ message: msg,
+ } => {
+ if let MessageTarget::Poke(who) = target {
+ let channel = con.clients.get(&who).expect("can find poke sender").channel;
+ tokio::spawn(
+ con.to_mut()
+ .get_client(&con.own_client)
+ .expect("can get myself")
+ .set_channel(channel)
+ .map_err(|e| error!("Failed to switch channel: {}", e)),
+ );
+ } else if sender.id != con.own_client {
+ if msg.starts_with("!") {
+ let tokens = msg[1..].split_whitespace().collect::<Vec<_>>();
+ match tokens.get(0).map(|t| *t) {
+ Some("test") => {
+ tokio::spawn(
+ con.to_mut()
+ .send_message(*target, "works :)")
+ .map_err(|_| ()),
+ );
+ }
+ Some("add") => {
+ let mut invalid = false;
+ if let Some(url) = &tokens.get(1) {
+ if url.len() > 11 {
+ let trimmed = url[5..url.len() - 6].to_owned();
+ state.add_audio(trimmed);
+ } else {
+ invalid = true;
+ }
+ } else {
+ invalid = true;
+ }
+ if invalid {
+ tokio::spawn(
+ con.to_mut()
+ .send_message(MessageTarget::Channel, "Invalid Url")
+ .map_err(|_| ()),
+ );
+ }
+ }
+ Some("volume") => {
+ if let Ok(volume) = f64::from_str(tokens[1]) {
+ state.volume(volume / 100.0);
+ }
+ }
+ Some("play") => {
+ state.play();
+ }
+ Some("pause") => {
+ state.pause();
+ }
+ Some("stop") => {
+ state.stop();
+ }
+ Some("quit") => {
+ //tokio::spawn(state.disconnect().unit_error().boxed().compat());
+ }
+ _ => (),
+ };
+ }
+ }
+ }
+ _ => (),
+ }
+}
diff --git a/src/state.rs b/src/state.rs
new file mode 100644
index 0000000..a1c0d6a
--- /dev/null
+++ b/src/state.rs
@@ -0,0 +1,267 @@
+use std::error::Error;
+use std::process::{Command, Stdio};
+use std::sync::Arc;
+
+use futures::{
+ compat::Future01CompatExt,
+ future::{FutureExt, TryFutureExt},
+};
+use futures01::{future::Future, sink::Sink};
+
+use futures_util::stream::StreamExt;
+use tsclientlib::{Connection, DisconnectOptions};
+
+use gst::prelude::*;
+use gstreamer as gst;
+use gstreamer_app as gst_app;
+use gstreamer_audio as gst_audio;
+
+use byte_slice_cast::*;
+
+#[derive(Clone)]
+pub struct State {
+ conn: Arc<Connection>,
+ pipeline: Arc<gst::Pipeline>,
+}
+
+impl State {
+ pub fn new(conn: Connection) -> Self {
+ let conn_arc = Arc::new(conn);
+
+ gst::init().unwrap();
+ let pipeline = gst::Pipeline::new(Some("Ts Audio Player"));
+ let http_dl = gst::ElementFactory::make("souphttpsrc", Some("http-source")).unwrap();
+ let decoder = gst::ElementFactory::make("decodebin", Some("video-decoder")).unwrap();
+ pipeline.add_many(&[&http_dl, &decoder]).unwrap();
+
+ http_dl
+ .link(&decoder)
+ .expect("Can link https_dl to decoder");
+
+ let pipeline_weak = pipeline.downgrade();
+
+ let inner_conn = conn_arc.clone();
+ decoder.connect_pad_added(move |_, decoder_src| {
+ let pipeline = match pipeline_weak.upgrade() {
+ Some(pipeline) => pipeline,
+ None => return,
+ };
+
+ let is_audio = {
+ let media_type = decoder_src.get_current_caps().and_then(|caps| {
+ caps.get_structure(0).map(|s| {
+ let name = s.get_name();
+ name.starts_with("audio/")
+ })
+ });
+
+ match media_type {
+ None => {
+ eprintln!(
+ "Failed to get media type from pad {}",
+ decoder_src.get_name()
+ );
+ return;
+ }
+ Some(media_type) => media_type,
+ }
+ };
+
+ if is_audio {
+ let names = [
+ "audio-converter",
+ "volume",
+ "audio-resampler",
+ "opus-encoder",
+ "app-sink",
+ ];
+ for element in pipeline.iterate_elements() {
+ if let Ok(element) = element {
+ if names.contains(&&*element.get_name()) {
+ element.set_state(gst::State::Null);
+ pipeline.remove_many(&[&element]);
+ }
+ }
+ }
+
+ let convert =
+ gst::ElementFactory::make("audioconvert", Some("audio-converter")).unwrap();
+ let volume = gst::ElementFactory::make("volume", Some("volume")).unwrap();
+ let resample =
+ gst::ElementFactory::make("audioresample", Some("audio-resampler")).unwrap();
+ let opus_enc = gst::ElementFactory::make("opusenc", Some("opus-encoder")).unwrap();
+ let sink = gst::ElementFactory::make("appsink", Some("app-sink")).unwrap();
+
+ volume.set_property("volume", &0.2);
+
+ {
+ let elements = &[&convert, &volume, &resample, &opus_enc, &sink];
+ pipeline.add_many(elements).expect("Can add audio elements");
+ gst::Element::link_many(elements).expect("Can link audio elements");
+
+ for e in elements {
+ e.sync_state_with_parent()
+ .expect("Can sync state with parent");
+ }
+ }
+
+ let appsink = sink
+ .dynamic_cast::<gst_app::AppSink>()
+ .expect("Sink element is expected to be an appsink!");
+
+ appsink.set_caps(Some(&gst::Caps::new_simple("audio/x-opus", &[])));
+
+ let inner_conn = inner_conn.clone();
+ appsink.set_callbacks(
+ gst_app::AppSinkCallbacks::new()
+ // Add a handler to the "new-sample" signal.
+ .new_sample(move |appsink| {
+ // Pull the sample in question out of the appsink's buffer.
+ let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
+ let buffer = sample
+ .get_buffer()
+ .expect("Failed to get buffer from appsink");
+
+ let map = buffer
+ .map_readable()
+ .expect("Failed to map buffer readable");
+
+ let samples = map
+ .as_slice_of::<u8>()
+ .expect("Failed to interprete buffer as S16 PCM");
+
+ let packet = tsproto_packets::packets::OutAudio::new(
+ &tsproto_packets::packets::AudioData::C2S {
+ id: 0,
+ codec: tsproto_packets::packets::CodecType::OpusMusic,
+ data: &samples,
+ },
+ );
+
+ tokio::run(
+ inner_conn
+ .get_packet_sink()
+ .send(packet)
+ .map(|_| ())
+ .map_err(|e| (println!("Failed to send voice packet."))),
+ );
+
+ Ok(gst::FlowSuccess::Ok)
+ })
+ .build(),
+ );
+
+ let convert_sink = convert
+ .get_static_pad("sink")
+ .expect("queue has no sinkpad");
+ decoder_src
+ .link(&convert_sink)
+ .expect("Can link decoder src to convert sink");
+ }
+ });
+
+ Self {
+ conn: conn_arc,
+ pipeline: Arc::new(pipeline),
+ }
+ }
+
+ pub fn add_audio<'a>(&self, uri: String) {
+ let ytdl_args = ["-f", "bestaudio", "-g", &uri, "-o", "-"];
+
+ let ytdl_output = Command::new("youtube-dl")
+ .args(&ytdl_args)
+ .stdin(Stdio::null())
+ .output()
+ .unwrap();
+
+ if ytdl_output.status.success() == false {
+ return;
+ }
+ let dl_url: &str = &String::from_utf8(ytdl_output.stderr).unwrap();
+
+ self.pipeline
+ .set_state(gst::State::Ready)
+ .expect("Can set state to null");
+
+ let http_src = self
+ .pipeline
+ .get_by_name("http-source")
+ .expect("Http source should be registered");
+ http_src
+ .set_property("location", &dl_url)
+ .expect("Can set location on http_dl");
+
+ self.pipeline
+ .set_state(gst::State::Playing)
+ .expect("Can set state to playing");
+ }
+
+ pub async fn poll(&self) {
+ let bus = self
+ .pipeline
+ .get_bus()
+ .expect("Pipeline without bus. Shouldn't happen!");
+
+ let mut messages = gst::BusStream::new(&bus);
+ while let Some(msg) = messages.next().await {
+ use gst::MessageView;
+
+ // Determine whether we want to quit: on EOS or error message
+ // we quit, otherwise simply continue.
+ match msg.view() {
+ MessageView::Eos(..) => break,
+ MessageView::Error(err) => {
+ println!(
+ "Error from {:?}: {} ({:?})",
+ err.get_src().map(|s| s.get_path_string()),
+ err.get_error(),
+ err.get_debug()
+ );
+ break;
+ }
+ _ => (),
+ };
+ }
+
+ self.pipeline
+ .set_state(gst::State::Null)
+ .expect("Can set state to null");
+ }
+
+ pub fn volume(&self, volume: f64) {
+ self.pipeline
+ .set_state(gst::State::Paused)
+ .expect("Can set state to null");
+ let http_src = self
+ .pipeline
+ .get_by_name("http-source")
+ .expect("Http source should be registered")
+ .set_property("volume", &volume);
+ }
+
+ pub fn play(&self) {
+ self.pipeline
+ .set_state(gst::State::Playing)
+ .expect("can play");
+ }
+
+ pub fn pause(&self) {
+ self.pipeline
+ .set_state(gst::State::Paused)
+ .expect("can pause");
+ }
+
+ pub fn stop(&self) {
+ self.pipeline
+ .set_state(gst::State::Ready)
+ .expect("can stop");
+ }
+
+ pub async fn disconnect(&self) {
+ self.conn
+ .disconnect(DisconnectOptions::new())
+ .compat()
+ .await;
+ }
+}