diff options
| author | Jokler <jokler@protonmail.com> | 2020-01-13 05:41:00 +0100 |
|---|---|---|
| committer | Jokler <jokler@protonmail.com> | 2020-01-13 05:41:00 +0100 |
| commit | 39b248df9c92b3a6bc94c3eb3e872e502b3cef7a (patch) | |
| tree | 01e5200d69e42261cf3898f3f64bdae4eb3b75bf /src | |
| parent | 5b9dea6a29faf4e1722dcc9d7437141c8a937a14 (diff) | |
| download | pokebot-39b248df9c92b3a6bc94c3eb3e872e502b3cef7a.tar.gz pokebot-39b248df9c92b3a6bc94c3eb3e872e502b3cef7a.zip | |
Run cargo fmt
Diffstat (limited to 'src')
| -rw-r--r-- | src/audio_player.rs | 116 | ||||
| -rw-r--r-- | src/main.rs | 100 | ||||
| -rw-r--r-- | src/playlist.rs | 1 | ||||
| -rw-r--r-- | src/teamspeak.rs | 73 | ||||
| -rw-r--r-- | src/youtube_dl.rs | 8 |
5 files changed, 150 insertions, 148 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs index 0c2f06d..c4b4b26 100644 --- a/src/audio_player.rs +++ b/src/audio_player.rs @@ -1,16 +1,16 @@ use std::sync::Once; -use gstreamer as gst; use gst::prelude::*; +use gst::GhostPad; +use gstreamer as gst; use gstreamer_app::{AppSink, AppSinkCallbacks}; use gstreamer_audio::{StreamVolume, StreamVolumeFormat}; -use gst::{GhostPad}; -use log::{info, debug, warn, error}; -use std::sync::mpsc::Sender; -use std::sync::{Mutex, Arc}; -use crate::{State, ApplicationMessage}; +use crate::{ApplicationMessage, State}; use glib::BoolError; +use log::{debug, error, info, warn}; +use std::sync::mpsc::Sender; +use std::sync::{Arc, Mutex}; static GST_INIT: Once = Once::new(); @@ -42,10 +42,7 @@ pub struct AudioPlayer { } fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> { - Ok(gst::ElementFactory::make( - factoryname, - Some(display_name) - )?) + Ok(gst::ElementFactory::make(factoryname, Some(display_name))?) } fn link_elements(a: &gst::Element, b: &gst::Element) -> Result<(), AudioPlayerError> { @@ -89,7 +86,10 @@ fn add_decode_bin_new_pad_callback( } impl AudioPlayer { - pub fn new(sender: Arc<Mutex<Sender<ApplicationMessage>>>, callback: Option<Box<dyn FnMut(&[u8]) + Send>>) -> Result<Self, AudioPlayerError> { + pub fn new( + sender: Arc<Mutex<Sender<ApplicationMessage>>>, + callback: Option<Box<dyn FnMut(&[u8]) + Send>>, + ) -> Result<Self, AudioPlayerError> { GST_INIT.call_once(|| gst::init().unwrap()); info!("Creating audio player"); @@ -104,10 +104,7 @@ impl AudioPlayer { let (audio_bin, volume, ghost_pad) = Self::create_audio_bin(callback)?; - add_decode_bin_new_pad_callback( - &decode_bin, - audio_bin.clone(), - ghost_pad); + add_decode_bin_new_pad_callback(&decode_bin, audio_bin.clone(), ghost_pad); pipeline.add(&audio_bin)?; @@ -123,23 +120,18 @@ impl AudioPlayer { }) } - fn create_audio_bin(callback: Option<Box<dyn FnMut(&[u8]) + Send>>) -> Result<(gst::Bin, gst::Element, gst::GhostPad), AudioPlayerError> { + fn create_audio_bin( + callback: Option<Box<dyn FnMut(&[u8]) + Send>>, + ) -> Result<(gst::Bin, gst::Element, gst::GhostPad), AudioPlayerError> { let audio_bin = gst::Bin::new(Some("audio bin")); let queue = make_element("queue", "audio queue")?; let convert = make_element("audioconvert", "audio converter")?; let volume = make_element("volume", "volume")?; let resample = make_element("audioresample", "audio resampler")?; let pads = queue.get_sink_pads(); - let queue_sink_pad = pads - .first() - .unwrap(); + let queue_sink_pad = pads.first().unwrap(); - audio_bin.add_many(&[ - &queue, - &convert, - &volume, - &resample, - ])?; + audio_bin.add_many(&[&queue, &convert, &volume, &resample])?; if let Some(mut callback) = callback { let opus_enc = make_element("opusenc", "opus encoder")?; @@ -149,10 +141,10 @@ impl AudioPlayer { .clone() .dynamic_cast::<AppSink>() .expect("Sink element is expected to be an appsink!"); - appsink.set_caps(Some(&gst::Caps::new_simple("audio/x-opus", &[ - ("channels", &(2i32)), - ("rate", &(48_000i32)), - ]))); + appsink.set_caps(Some(&gst::Caps::new_simple( + "audio/x-opus", + &[("channels", &(2i32)), ("rate", &(48_000i32))], + ))); let callbacks = AppSinkCallbacks::new() .new_sample(move |sink| { let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; @@ -167,33 +159,15 @@ impl AudioPlayer { .build(); appsink.set_callbacks(callbacks); - audio_bin.add_many(&[ - &opus_enc, - &sink - ])?; - - gst::Element::link_many(&[ - &queue, - &convert, - &volume, - &resample, - &opus_enc, - &sink - ])?; + audio_bin.add_many(&[&opus_enc, &sink])?; + + gst::Element::link_many(&[&queue, &convert, &volume, &resample, &opus_enc, &sink])?; } else { let sink = make_element("autoaudiosink", "auto audio sink")?; - audio_bin.add_many(&[ - &sink - ])?; - - gst::Element::link_many(&[ - &queue, - &convert, - &volume, - &resample, - &sink - ])?; + audio_bin.add_many(&[&sink])?; + + gst::Element::link_many(&[&queue, &convert, &volume, &resample, &sink])?; }; let ghost_pad = GhostPad::new(Some("audio bin sink"), queue_sink_pad).unwrap(); @@ -214,11 +188,8 @@ impl AudioPlayer { let db = 50.0 * volume.log10(); info!("Setting volume: {} -> {} dB", volume, db); - let linear = StreamVolume::convert_volume( - StreamVolumeFormat::Db, - StreamVolumeFormat::Linear, - db, - ); + let linear = + StreamVolume::convert_volume(StreamVolumeFormat::Db, StreamVolumeFormat::Linear, db); self.volume.set_property("volume", &linear)?; @@ -232,7 +203,7 @@ impl AudioPlayer { (gst::State::Null, gst::State::VoidPending) => false, (_, gst::State::Null) => false, (gst::State::Ready, gst::State::VoidPending) => false, - _ => true + _ => true, } } @@ -263,7 +234,7 @@ impl AudioPlayer { pub fn stop_current(&self) { info!("Stopping pipeline, sending EOS"); - let handled = self.http_src.send_event(gst::Event::new_eos().build()); + let handled = self.http_src.send_event(gst::Event::new_eos().build()); if !handled { warn!("EOS event was not handled"); } @@ -294,12 +265,23 @@ impl AudioPlayer { let pending = state.get_pending(); match (old, current, pending) { - (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => self.send_state(State::Playing), - (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => self.send_state(State::Paused), - (_, gst::State::Ready, gst::State::Null) => self.send_state(State::Stopped), - (_, gst::State::Null, gst::State::VoidPending) => self.send_state(State::Stopped), + (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => { + self.send_state(State::Playing) + } + (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => { + self.send_state(State::Paused) + } + (_, gst::State::Ready, gst::State::Null) => { + self.send_state(State::Stopped) + } + (_, gst::State::Null, gst::State::VoidPending) => { + self.send_state(State::Stopped) + } _ => { - debug!("Pipeline transitioned from {:?} to {:?}, with {:?} pending", old, current, pending); + debug!( + "Pipeline transitioned from {:?} to {:?}, with {:?} pending", + old, current, pending + ); } } } @@ -308,7 +290,7 @@ impl AudioPlayer { self.reset().unwrap(); break 'outer; - }, + } MessageView::Warning(warn) => { warn!( "Warning from {:?}: {} ({:?})", @@ -329,7 +311,7 @@ impl AudioPlayer { } _ => { // debug!("{:?}", msg) - }, + } }; } } diff --git a/src/main.rs b/src/main.rs index 1207d87..0941c27 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,37 +1,29 @@ -use std::io::{Read, BufRead}; +use std::io::{BufRead, Read}; use std::path::PathBuf; use std::str::FromStr; -use std::thread; use std::sync::{Arc, Mutex}; +use std::thread; -use futures::{ - future::{FutureExt, TryFutureExt}, -}; +use futures::future::{FutureExt, TryFutureExt}; +use log::{debug, info}; use structopt::clap::AppSettings; use structopt::StructOpt; -use tsclientlib::{ - ConnectOptions, Identity, MessageTarget, Invoker, ClientId, -}; -use log::{info, debug}; +use tsclientlib::{ClientId, ConnectOptions, Identity, Invoker, MessageTarget}; mod audio_player; -mod youtube_dl; -mod teamspeak; mod playlist; +mod teamspeak; +mod youtube_dl; use audio_player::*; -use teamspeak::*; use playlist::*; use std::sync::mpsc::Sender; +use teamspeak::*; #[derive(StructOpt, Debug)] #[structopt(raw(global_settings = "&[AppSettings::ColoredHelp]"))] struct Args { - #[structopt( - short = "l", - long = "local", - help = "Run locally in text mode" - )] + #[structopt(short = "l", long = "local", help = "Run locally in text mode")] local: bool, #[structopt( short = "a", @@ -93,7 +85,11 @@ struct Application { } impl Application { - pub fn new(player: Arc<AudioPlayer>, playlist: Arc<Mutex<Playlist>>, teamspeak: Option<Arc<TeamSpeakConnection>>) -> Self { + pub fn new( + player: Arc<AudioPlayer>, + playlist: Arc<Mutex<Playlist>>, + teamspeak: Option<Arc<TeamSpeakConnection>>, + ) -> Self { Self { player, teamspeak, @@ -118,7 +114,12 @@ impl Application { } pub fn add_audio(&self, url: String) { - if self.playlist.lock().expect("Mutex was not poisoned").is_full() { + if self + .playlist + .lock() + .expect("Mutex was not poisoned") + .is_full() + { info!("Audio playlist is full"); self.send_message("Playlist is full"); return; @@ -212,7 +213,10 @@ impl Application { } } Some("clear") => { - self.playlist.lock().expect("Mutex was not poisoned").clear(); + self.playlist + .lock() + .expect("Mutex was not poisoned") + .clear(); } Some("volume") => { if let Some(&volume) = &tokens.get(1) { @@ -299,8 +303,7 @@ async fn async_main() { let tx = Arc::new(Mutex::new(tx)); let (player, connection) = if args.local { info!("Starting in CLI mode"); - let audio_player = AudioPlayer::new(tx.clone(), None) - .unwrap(); + let audio_player = AudioPlayer::new(tx.clone(), None).unwrap(); (audio_player, None) } else { @@ -329,11 +332,19 @@ async fn async_main() { con_config = con_config.channel(channel); } - let connection = Arc::new(TeamSpeakConnection::new(tx.clone(), con_config).await.unwrap()); + let connection = Arc::new( + TeamSpeakConnection::new(tx.clone(), con_config) + .await + .unwrap(), + ); let cconnection = connection.clone(); - let audio_player = AudioPlayer::new(tx.clone(), Some(Box::new(move |samples| { - cconnection.send_audio_packet(samples); - }))).unwrap(); + let audio_player = AudioPlayer::new( + tx.clone(), + Some(Box::new(move |samples| { + cconnection.send_audio_packet(samples); + })), + ) + .unwrap(); (audio_player, Some(connection)) }; @@ -341,7 +352,11 @@ async fn async_main() { player.set_volume(0.5).unwrap(); let player = Arc::new(player); let playlist = Arc::new(Mutex::new(Playlist::new())); - let application = Arc::new(Application::new(player.clone(), playlist.clone(), connection)); + let application = Arc::new(Application::new( + player.clone(), + playlist.clone(), + connection, + )); spawn_gstreamer_thread(player, tx.clone()); @@ -363,17 +378,15 @@ fn spawn_stdin_reader(tx: Arc<Mutex<Sender<ApplicationMessage>>>) { for line in lock.lines() { let line = line.unwrap(); - let message = ApplicationMessage::TextMessage( - Message { - target: MessageTarget::Server, - invoker: Invoker { - name: String::from("stdin"), - id: ClientId(0), - uid: None, - }, - text: line - } - ); + let message = ApplicationMessage::TextMessage(Message { + target: MessageTarget::Server, + invoker: Invoker { + name: String::from("stdin"), + id: ClientId(0), + uid: None, + }, + text: line, + }); let tx = tx.lock().unwrap(); tx.send(message).unwrap(); @@ -382,11 +395,12 @@ fn spawn_stdin_reader(tx: Arc<Mutex<Sender<ApplicationMessage>>>) { } fn spawn_gstreamer_thread(player: Arc<AudioPlayer>, tx: Arc<Mutex<Sender<ApplicationMessage>>>) { - thread::spawn(move || { - loop { - player.poll(); + thread::spawn(move || loop { + player.poll(); - tx.lock().unwrap().send(ApplicationMessage::StateChange(State::EndOfStream)).unwrap(); - } + tx.lock() + .unwrap() + .send(ApplicationMessage::StateChange(State::EndOfStream)) + .unwrap(); }); } diff --git a/src/playlist.rs b/src/playlist.rs index e96d927..689a743 100644 --- a/src/playlist.rs +++ b/src/playlist.rs @@ -36,7 +36,6 @@ impl Playlist { self.is_full = true; } - true } diff --git a/src/teamspeak.rs b/src/teamspeak.rs index 785eba2..2cd87e7 100644 --- a/src/teamspeak.rs +++ b/src/teamspeak.rs @@ -1,15 +1,13 @@ -use futures::{ - compat::Future01CompatExt, -}; +use futures::compat::Future01CompatExt; use futures01::{future::Future, sink::Sink}; -use tsclientlib::{Connection, ConnectOptions, events::Event, ClientId, MessageTarget}; -use tsclientlib::Event::ConEvents; use crate::{ApplicationMessage, Message}; use std::sync::mpsc::Sender; -use std::sync::{Mutex, Arc}; +use std::sync::{Arc, Mutex}; +use tsclientlib::Event::ConEvents; +use tsclientlib::{events::Event, ClientId, ConnectOptions, Connection, MessageTarget}; -use log::{error}; +use log::error; pub struct TeamSpeakConnection { conn: Connection, @@ -21,19 +19,20 @@ fn get_message<'a>(event: &Event) -> Option<Message> { from: target, invoker: sender, message: msg, - } => { - Some(Message { - target: target.clone(), - invoker: sender.clone(), - text: msg.clone(), - }) - } + } => Some(Message { + target: target.clone(), + invoker: sender.clone(), + text: msg.clone(), + }), _ => None, } } impl TeamSpeakConnection { - pub async fn new(tx: Arc<Mutex<Sender<ApplicationMessage>>>, options: ConnectOptions) -> Result<TeamSpeakConnection, tsclientlib::Error> { + pub async fn new( + tx: Arc<Mutex<Sender<ApplicationMessage>>>, + options: ConnectOptions, + ) -> Result<TeamSpeakConnection, tsclientlib::Error> { let conn = Connection::new(options).compat().await?; let packet = conn.lock().server.set_subscribed(true); conn.send_packet(packet).compat().await.unwrap(); @@ -52,21 +51,19 @@ impl TeamSpeakConnection { }), ); - Ok(TeamSpeakConnection { - conn, - }) + Ok(TeamSpeakConnection { conn }) } pub fn send_audio_packet(&self, samples: &[u8]) { - let packet = tsproto_packets::packets::OutAudio::new( - &tsproto_packets::packets::AudioData::C2S { + let packet = + tsproto_packets::packets::OutAudio::new(&tsproto_packets::packets::AudioData::C2S { id: 0, codec: tsproto_packets::packets::CodecType::OpusMusic, data: samples, - }, - ); + }); - let send_packet = self.conn + let send_packet = self + .conn .get_packet_sink() .send(packet) .map(|_| ()) @@ -76,24 +73,32 @@ impl TeamSpeakConnection { } pub fn join_channel_of_user(&self, id: ClientId) { - let channel = self.conn.lock() + let channel = self + .conn + .lock() .clients .get(&id) .expect("can find poke sender") .channel; - tokio::spawn(self.conn.lock().to_mut() + tokio::spawn( + self.conn + .lock() + .to_mut() .get_client(&self.conn.lock().own_client) .expect("can get myself") .set_channel(channel) - .map_err(|e| error!("Failed to switch channel: {}", e))); + .map_err(|e| error!("Failed to switch channel: {}", e)), + ); } pub fn set_nickname(&self, name: &str) { - tokio::spawn(self.conn + tokio::spawn( + self.conn .lock() .to_mut() .set_name(name) - .map_err(|e| error!("Failed to set nickname: {}", e))); + .map_err(|e| error!("Failed to set nickname: {}", e)), + ); } pub fn set_description(&self, desc: &str) { @@ -104,13 +109,17 @@ impl TeamSpeakConnection { .get_client(&self.conn.lock().own_client) .expect("Can get myself") .set_description(desc) - .map_err(|e| error!("Failed to change description: {}", e))); + .map_err(|e| error!("Failed to change description: {}", e)), + ); } pub fn send_message_to_channel(&self, text: &str) { - tokio::spawn(self.conn.lock().to_mut() + tokio::spawn( + self.conn + .lock() + .to_mut() .send_message(MessageTarget::Channel, text) - .map_err(|e| error!("Failed to send message: {}", e))); - + .map_err(|e| error!("Failed to send message: {}", e)), + ); } } diff --git a/src/youtube_dl.rs b/src/youtube_dl.rs index 1eff302..ac4635a 100644 --- a/src/youtube_dl.rs +++ b/src/youtube_dl.rs @@ -1,6 +1,6 @@ use std::process::{Command, Stdio}; -use log::{debug}; +use log::debug; pub fn get_audio_download_url(uri: String) -> Result<(String, String), String> { let ytdl_args = [ @@ -20,9 +20,7 @@ pub fn get_audio_download_url(uri: String) -> Result<(String, String), String> { debug!("yt-dl command: {:?}", cmd); - let ytdl_output = cmd - .output() - .unwrap(); + let ytdl_output = cmd.output().unwrap(); let output = String::from_utf8(ytdl_output.stdout.clone()).unwrap(); @@ -35,4 +33,4 @@ pub fn get_audio_download_url(uri: String) -> Result<(String, String), String> { let title = lines[1].to_owned(); Ok((url, title)) -}
\ No newline at end of file +} |
