summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJokler <jokler@protonmail.com>2020-01-13 05:41:00 +0100
committerJokler <jokler@protonmail.com>2020-01-13 05:41:00 +0100
commit39b248df9c92b3a6bc94c3eb3e872e502b3cef7a (patch)
tree01e5200d69e42261cf3898f3f64bdae4eb3b75bf
parent5b9dea6a29faf4e1722dcc9d7437141c8a937a14 (diff)
downloadpokebot-39b248df9c92b3a6bc94c3eb3e872e502b3cef7a.tar.gz
pokebot-39b248df9c92b3a6bc94c3eb3e872e502b3cef7a.zip
Run cargo fmt
-rw-r--r--src/audio_player.rs116
-rw-r--r--src/main.rs100
-rw-r--r--src/playlist.rs1
-rw-r--r--src/teamspeak.rs73
-rw-r--r--src/youtube_dl.rs8
5 files changed, 150 insertions, 148 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs
index 0c2f06d..c4b4b26 100644
--- a/src/audio_player.rs
+++ b/src/audio_player.rs
@@ -1,16 +1,16 @@
use std::sync::Once;
-use gstreamer as gst;
use gst::prelude::*;
+use gst::GhostPad;
+use gstreamer as gst;
use gstreamer_app::{AppSink, AppSinkCallbacks};
use gstreamer_audio::{StreamVolume, StreamVolumeFormat};
-use gst::{GhostPad};
-use log::{info, debug, warn, error};
-use std::sync::mpsc::Sender;
-use std::sync::{Mutex, Arc};
-use crate::{State, ApplicationMessage};
+use crate::{ApplicationMessage, State};
use glib::BoolError;
+use log::{debug, error, info, warn};
+use std::sync::mpsc::Sender;
+use std::sync::{Arc, Mutex};
static GST_INIT: Once = Once::new();
@@ -42,10 +42,7 @@ pub struct AudioPlayer {
}
fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> {
- Ok(gst::ElementFactory::make(
- factoryname,
- Some(display_name)
- )?)
+ Ok(gst::ElementFactory::make(factoryname, Some(display_name))?)
}
fn link_elements(a: &gst::Element, b: &gst::Element) -> Result<(), AudioPlayerError> {
@@ -89,7 +86,10 @@ fn add_decode_bin_new_pad_callback(
}
impl AudioPlayer {
- pub fn new(sender: Arc<Mutex<Sender<ApplicationMessage>>>, callback: Option<Box<dyn FnMut(&[u8]) + Send>>) -> Result<Self, AudioPlayerError> {
+ pub fn new(
+ sender: Arc<Mutex<Sender<ApplicationMessage>>>,
+ callback: Option<Box<dyn FnMut(&[u8]) + Send>>,
+ ) -> Result<Self, AudioPlayerError> {
GST_INIT.call_once(|| gst::init().unwrap());
info!("Creating audio player");
@@ -104,10 +104,7 @@ impl AudioPlayer {
let (audio_bin, volume, ghost_pad) = Self::create_audio_bin(callback)?;
- add_decode_bin_new_pad_callback(
- &decode_bin,
- audio_bin.clone(),
- ghost_pad);
+ add_decode_bin_new_pad_callback(&decode_bin, audio_bin.clone(), ghost_pad);
pipeline.add(&audio_bin)?;
@@ -123,23 +120,18 @@ impl AudioPlayer {
})
}
- fn create_audio_bin(callback: Option<Box<dyn FnMut(&[u8]) + Send>>) -> Result<(gst::Bin, gst::Element, gst::GhostPad), AudioPlayerError> {
+ fn create_audio_bin(
+ callback: Option<Box<dyn FnMut(&[u8]) + Send>>,
+ ) -> Result<(gst::Bin, gst::Element, gst::GhostPad), AudioPlayerError> {
let audio_bin = gst::Bin::new(Some("audio bin"));
let queue = make_element("queue", "audio queue")?;
let convert = make_element("audioconvert", "audio converter")?;
let volume = make_element("volume", "volume")?;
let resample = make_element("audioresample", "audio resampler")?;
let pads = queue.get_sink_pads();
- let queue_sink_pad = pads
- .first()
- .unwrap();
+ let queue_sink_pad = pads.first().unwrap();
- audio_bin.add_many(&[
- &queue,
- &convert,
- &volume,
- &resample,
- ])?;
+ audio_bin.add_many(&[&queue, &convert, &volume, &resample])?;
if let Some(mut callback) = callback {
let opus_enc = make_element("opusenc", "opus encoder")?;
@@ -149,10 +141,10 @@ impl AudioPlayer {
.clone()
.dynamic_cast::<AppSink>()
.expect("Sink element is expected to be an appsink!");
- appsink.set_caps(Some(&gst::Caps::new_simple("audio/x-opus", &[
- ("channels", &(2i32)),
- ("rate", &(48_000i32)),
- ])));
+ appsink.set_caps(Some(&gst::Caps::new_simple(
+ "audio/x-opus",
+ &[("channels", &(2i32)), ("rate", &(48_000i32))],
+ )));
let callbacks = AppSinkCallbacks::new()
.new_sample(move |sink| {
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
@@ -167,33 +159,15 @@ impl AudioPlayer {
.build();
appsink.set_callbacks(callbacks);
- audio_bin.add_many(&[
- &opus_enc,
- &sink
- ])?;
-
- gst::Element::link_many(&[
- &queue,
- &convert,
- &volume,
- &resample,
- &opus_enc,
- &sink
- ])?;
+ audio_bin.add_many(&[&opus_enc, &sink])?;
+
+ gst::Element::link_many(&[&queue, &convert, &volume, &resample, &opus_enc, &sink])?;
} else {
let sink = make_element("autoaudiosink", "auto audio sink")?;
- audio_bin.add_many(&[
- &sink
- ])?;
-
- gst::Element::link_many(&[
- &queue,
- &convert,
- &volume,
- &resample,
- &sink
- ])?;
+ audio_bin.add_many(&[&sink])?;
+
+ gst::Element::link_many(&[&queue, &convert, &volume, &resample, &sink])?;
};
let ghost_pad = GhostPad::new(Some("audio bin sink"), queue_sink_pad).unwrap();
@@ -214,11 +188,8 @@ impl AudioPlayer {
let db = 50.0 * volume.log10();
info!("Setting volume: {} -> {} dB", volume, db);
- let linear = StreamVolume::convert_volume(
- StreamVolumeFormat::Db,
- StreamVolumeFormat::Linear,
- db,
- );
+ let linear =
+ StreamVolume::convert_volume(StreamVolumeFormat::Db, StreamVolumeFormat::Linear, db);
self.volume.set_property("volume", &linear)?;
@@ -232,7 +203,7 @@ impl AudioPlayer {
(gst::State::Null, gst::State::VoidPending) => false,
(_, gst::State::Null) => false,
(gst::State::Ready, gst::State::VoidPending) => false,
- _ => true
+ _ => true,
}
}
@@ -263,7 +234,7 @@ impl AudioPlayer {
pub fn stop_current(&self) {
info!("Stopping pipeline, sending EOS");
- let handled = self.http_src.send_event(gst::Event::new_eos().build());
+ let handled = self.http_src.send_event(gst::Event::new_eos().build());
if !handled {
warn!("EOS event was not handled");
}
@@ -294,12 +265,23 @@ impl AudioPlayer {
let pending = state.get_pending();
match (old, current, pending) {
- (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => self.send_state(State::Playing),
- (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => self.send_state(State::Paused),
- (_, gst::State::Ready, gst::State::Null) => self.send_state(State::Stopped),
- (_, gst::State::Null, gst::State::VoidPending) => self.send_state(State::Stopped),
+ (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => {
+ self.send_state(State::Playing)
+ }
+ (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => {
+ self.send_state(State::Paused)
+ }
+ (_, gst::State::Ready, gst::State::Null) => {
+ self.send_state(State::Stopped)
+ }
+ (_, gst::State::Null, gst::State::VoidPending) => {
+ self.send_state(State::Stopped)
+ }
_ => {
- debug!("Pipeline transitioned from {:?} to {:?}, with {:?} pending", old, current, pending);
+ debug!(
+ "Pipeline transitioned from {:?} to {:?}, with {:?} pending",
+ old, current, pending
+ );
}
}
}
@@ -308,7 +290,7 @@ impl AudioPlayer {
self.reset().unwrap();
break 'outer;
- },
+ }
MessageView::Warning(warn) => {
warn!(
"Warning from {:?}: {} ({:?})",
@@ -329,7 +311,7 @@ impl AudioPlayer {
}
_ => {
// debug!("{:?}", msg)
- },
+ }
};
}
}
diff --git a/src/main.rs b/src/main.rs
index 1207d87..0941c27 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,37 +1,29 @@
-use std::io::{Read, BufRead};
+use std::io::{BufRead, Read};
use std::path::PathBuf;
use std::str::FromStr;
-use std::thread;
use std::sync::{Arc, Mutex};
+use std::thread;
-use futures::{
- future::{FutureExt, TryFutureExt},
-};
+use futures::future::{FutureExt, TryFutureExt};
+use log::{debug, info};
use structopt::clap::AppSettings;
use structopt::StructOpt;
-use tsclientlib::{
- ConnectOptions, Identity, MessageTarget, Invoker, ClientId,
-};
-use log::{info, debug};
+use tsclientlib::{ClientId, ConnectOptions, Identity, Invoker, MessageTarget};
mod audio_player;
-mod youtube_dl;
-mod teamspeak;
mod playlist;
+mod teamspeak;
+mod youtube_dl;
use audio_player::*;
-use teamspeak::*;
use playlist::*;
use std::sync::mpsc::Sender;
+use teamspeak::*;
#[derive(StructOpt, Debug)]
#[structopt(raw(global_settings = "&[AppSettings::ColoredHelp]"))]
struct Args {
- #[structopt(
- short = "l",
- long = "local",
- help = "Run locally in text mode"
- )]
+ #[structopt(short = "l", long = "local", help = "Run locally in text mode")]
local: bool,
#[structopt(
short = "a",
@@ -93,7 +85,11 @@ struct Application {
}
impl Application {
- pub fn new(player: Arc<AudioPlayer>, playlist: Arc<Mutex<Playlist>>, teamspeak: Option<Arc<TeamSpeakConnection>>) -> Self {
+ pub fn new(
+ player: Arc<AudioPlayer>,
+ playlist: Arc<Mutex<Playlist>>,
+ teamspeak: Option<Arc<TeamSpeakConnection>>,
+ ) -> Self {
Self {
player,
teamspeak,
@@ -118,7 +114,12 @@ impl Application {
}
pub fn add_audio(&self, url: String) {
- if self.playlist.lock().expect("Mutex was not poisoned").is_full() {
+ if self
+ .playlist
+ .lock()
+ .expect("Mutex was not poisoned")
+ .is_full()
+ {
info!("Audio playlist is full");
self.send_message("Playlist is full");
return;
@@ -212,7 +213,10 @@ impl Application {
}
}
Some("clear") => {
- self.playlist.lock().expect("Mutex was not poisoned").clear();
+ self.playlist
+ .lock()
+ .expect("Mutex was not poisoned")
+ .clear();
}
Some("volume") => {
if let Some(&volume) = &tokens.get(1) {
@@ -299,8 +303,7 @@ async fn async_main() {
let tx = Arc::new(Mutex::new(tx));
let (player, connection) = if args.local {
info!("Starting in CLI mode");
- let audio_player = AudioPlayer::new(tx.clone(), None)
- .unwrap();
+ let audio_player = AudioPlayer::new(tx.clone(), None).unwrap();
(audio_player, None)
} else {
@@ -329,11 +332,19 @@ async fn async_main() {
con_config = con_config.channel(channel);
}
- let connection = Arc::new(TeamSpeakConnection::new(tx.clone(), con_config).await.unwrap());
+ let connection = Arc::new(
+ TeamSpeakConnection::new(tx.clone(), con_config)
+ .await
+ .unwrap(),
+ );
let cconnection = connection.clone();
- let audio_player = AudioPlayer::new(tx.clone(), Some(Box::new(move |samples| {
- cconnection.send_audio_packet(samples);
- }))).unwrap();
+ let audio_player = AudioPlayer::new(
+ tx.clone(),
+ Some(Box::new(move |samples| {
+ cconnection.send_audio_packet(samples);
+ })),
+ )
+ .unwrap();
(audio_player, Some(connection))
};
@@ -341,7 +352,11 @@ async fn async_main() {
player.set_volume(0.5).unwrap();
let player = Arc::new(player);
let playlist = Arc::new(Mutex::new(Playlist::new()));
- let application = Arc::new(Application::new(player.clone(), playlist.clone(), connection));
+ let application = Arc::new(Application::new(
+ player.clone(),
+ playlist.clone(),
+ connection,
+ ));
spawn_gstreamer_thread(player, tx.clone());
@@ -363,17 +378,15 @@ fn spawn_stdin_reader(tx: Arc<Mutex<Sender<ApplicationMessage>>>) {
for line in lock.lines() {
let line = line.unwrap();
- let message = ApplicationMessage::TextMessage(
- Message {
- target: MessageTarget::Server,
- invoker: Invoker {
- name: String::from("stdin"),
- id: ClientId(0),
- uid: None,
- },
- text: line
- }
- );
+ let message = ApplicationMessage::TextMessage(Message {
+ target: MessageTarget::Server,
+ invoker: Invoker {
+ name: String::from("stdin"),
+ id: ClientId(0),
+ uid: None,
+ },
+ text: line,
+ });
let tx = tx.lock().unwrap();
tx.send(message).unwrap();
@@ -382,11 +395,12 @@ fn spawn_stdin_reader(tx: Arc<Mutex<Sender<ApplicationMessage>>>) {
}
fn spawn_gstreamer_thread(player: Arc<AudioPlayer>, tx: Arc<Mutex<Sender<ApplicationMessage>>>) {
- thread::spawn(move || {
- loop {
- player.poll();
+ thread::spawn(move || loop {
+ player.poll();
- tx.lock().unwrap().send(ApplicationMessage::StateChange(State::EndOfStream)).unwrap();
- }
+ tx.lock()
+ .unwrap()
+ .send(ApplicationMessage::StateChange(State::EndOfStream))
+ .unwrap();
});
}
diff --git a/src/playlist.rs b/src/playlist.rs
index e96d927..689a743 100644
--- a/src/playlist.rs
+++ b/src/playlist.rs
@@ -36,7 +36,6 @@ impl Playlist {
self.is_full = true;
}
-
true
}
diff --git a/src/teamspeak.rs b/src/teamspeak.rs
index 785eba2..2cd87e7 100644
--- a/src/teamspeak.rs
+++ b/src/teamspeak.rs
@@ -1,15 +1,13 @@
-use futures::{
- compat::Future01CompatExt,
-};
+use futures::compat::Future01CompatExt;
use futures01::{future::Future, sink::Sink};
-use tsclientlib::{Connection, ConnectOptions, events::Event, ClientId, MessageTarget};
-use tsclientlib::Event::ConEvents;
use crate::{ApplicationMessage, Message};
use std::sync::mpsc::Sender;
-use std::sync::{Mutex, Arc};
+use std::sync::{Arc, Mutex};
+use tsclientlib::Event::ConEvents;
+use tsclientlib::{events::Event, ClientId, ConnectOptions, Connection, MessageTarget};
-use log::{error};
+use log::error;
pub struct TeamSpeakConnection {
conn: Connection,
@@ -21,19 +19,20 @@ fn get_message<'a>(event: &Event) -> Option<Message> {
from: target,
invoker: sender,
message: msg,
- } => {
- Some(Message {
- target: target.clone(),
- invoker: sender.clone(),
- text: msg.clone(),
- })
- }
+ } => Some(Message {
+ target: target.clone(),
+ invoker: sender.clone(),
+ text: msg.clone(),
+ }),
_ => None,
}
}
impl TeamSpeakConnection {
- pub async fn new(tx: Arc<Mutex<Sender<ApplicationMessage>>>, options: ConnectOptions) -> Result<TeamSpeakConnection, tsclientlib::Error> {
+ pub async fn new(
+ tx: Arc<Mutex<Sender<ApplicationMessage>>>,
+ options: ConnectOptions,
+ ) -> Result<TeamSpeakConnection, tsclientlib::Error> {
let conn = Connection::new(options).compat().await?;
let packet = conn.lock().server.set_subscribed(true);
conn.send_packet(packet).compat().await.unwrap();
@@ -52,21 +51,19 @@ impl TeamSpeakConnection {
}),
);
- Ok(TeamSpeakConnection {
- conn,
- })
+ Ok(TeamSpeakConnection { conn })
}
pub fn send_audio_packet(&self, samples: &[u8]) {
- let packet = tsproto_packets::packets::OutAudio::new(
- &tsproto_packets::packets::AudioData::C2S {
+ let packet =
+ tsproto_packets::packets::OutAudio::new(&tsproto_packets::packets::AudioData::C2S {
id: 0,
codec: tsproto_packets::packets::CodecType::OpusMusic,
data: samples,
- },
- );
+ });
- let send_packet = self.conn
+ let send_packet = self
+ .conn
.get_packet_sink()
.send(packet)
.map(|_| ())
@@ -76,24 +73,32 @@ impl TeamSpeakConnection {
}
pub fn join_channel_of_user(&self, id: ClientId) {
- let channel = self.conn.lock()
+ let channel = self
+ .conn
+ .lock()
.clients
.get(&id)
.expect("can find poke sender")
.channel;
- tokio::spawn(self.conn.lock().to_mut()
+ tokio::spawn(
+ self.conn
+ .lock()
+ .to_mut()
.get_client(&self.conn.lock().own_client)
.expect("can get myself")
.set_channel(channel)
- .map_err(|e| error!("Failed to switch channel: {}", e)));
+ .map_err(|e| error!("Failed to switch channel: {}", e)),
+ );
}
pub fn set_nickname(&self, name: &str) {
- tokio::spawn(self.conn
+ tokio::spawn(
+ self.conn
.lock()
.to_mut()
.set_name(name)
- .map_err(|e| error!("Failed to set nickname: {}", e)));
+ .map_err(|e| error!("Failed to set nickname: {}", e)),
+ );
}
pub fn set_description(&self, desc: &str) {
@@ -104,13 +109,17 @@ impl TeamSpeakConnection {
.get_client(&self.conn.lock().own_client)
.expect("Can get myself")
.set_description(desc)
- .map_err(|e| error!("Failed to change description: {}", e)));
+ .map_err(|e| error!("Failed to change description: {}", e)),
+ );
}
pub fn send_message_to_channel(&self, text: &str) {
- tokio::spawn(self.conn.lock().to_mut()
+ tokio::spawn(
+ self.conn
+ .lock()
+ .to_mut()
.send_message(MessageTarget::Channel, text)
- .map_err(|e| error!("Failed to send message: {}", e)));
-
+ .map_err(|e| error!("Failed to send message: {}", e)),
+ );
}
}
diff --git a/src/youtube_dl.rs b/src/youtube_dl.rs
index 1eff302..ac4635a 100644
--- a/src/youtube_dl.rs
+++ b/src/youtube_dl.rs
@@ -1,6 +1,6 @@
use std::process::{Command, Stdio};
-use log::{debug};
+use log::debug;
pub fn get_audio_download_url(uri: String) -> Result<(String, String), String> {
let ytdl_args = [
@@ -20,9 +20,7 @@ pub fn get_audio_download_url(uri: String) -> Result<(String, String), String> {
debug!("yt-dl command: {:?}", cmd);
- let ytdl_output = cmd
- .output()
- .unwrap();
+ let ytdl_output = cmd.output().unwrap();
let output = String::from_utf8(ytdl_output.stdout.clone()).unwrap();
@@ -35,4 +33,4 @@ pub fn get_audio_download_url(uri: String) -> Result<(String, String), String> {
let title = lines[1].to_owned();
Ok((url, title))
-} \ No newline at end of file
+}