aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/audio_player.rs10
-rw-r--r--src/bot/master.rs42
-rw-r--r--src/bot/music.rs38
-rw-r--r--src/main.rs21
-rw-r--r--src/teamspeak/bbcode.rs6
-rw-r--r--src/teamspeak/mod.rs6
-rw-r--r--src/web_server.rs60
7 files changed, 146 insertions, 37 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs
index 9ed645d..cdb04d7 100644
--- a/src/audio_player.rs
+++ b/src/audio_player.rs
@@ -10,7 +10,7 @@ use gstreamer_audio::{StreamVolume, StreamVolumeFormat};
use crate::bot::{MusicBotMessage, State};
use glib::BoolError;
use log::{debug, error, info, warn};
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, RwLock};
use tokio02::sync::mpsc::UnboundedSender;
static GST_INIT: Once = Once::new();
@@ -34,7 +34,7 @@ pub struct AudioPlayer {
http_src: gst::Element,
volume: gst::Element,
- sender: Arc<Mutex<UnboundedSender<MusicBotMessage>>>,
+ sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
}
fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> {
@@ -83,7 +83,7 @@ fn add_decode_bin_new_pad_callback(
impl AudioPlayer {
pub fn new(
- sender: Arc<Mutex<UnboundedSender<MusicBotMessage>>>,
+ sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
callback: Option<Box<dyn FnMut(&[u8]) + Send>>,
) -> Result<Self, AudioPlayerError> {
GST_INIT.call_once(|| gst::init().unwrap());
@@ -280,13 +280,13 @@ impl AudioPlayer {
warn!("Failed to send \"quit\" app event: {}", e);
}
- let sender = self.sender.lock().unwrap();
+ let sender = self.sender.read().unwrap();
sender.send(MusicBotMessage::Quit(reason)).unwrap();
}
fn send_state(&self, state: State) {
info!("Sending state {:?} to application", state);
- let sender = self.sender.lock().unwrap();
+ let sender = self.sender.read().unwrap();
sender.send(MusicBotMessage::StateChange(state)).unwrap();
}
diff --git a/src/bot/master.rs b/src/bot/master.rs
index 2488064..bc38cca 100644
--- a/src/bot/master.rs
+++ b/src/bot/master.rs
@@ -1,12 +1,13 @@
use std::collections::HashMap;
use std::future::Future;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, RwLock};
use futures::future::{FutureExt, TryFutureExt};
use futures01::future::Future as Future01;
use log::info;
use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
use serde::{Deserialize, Serialize};
+use tokio02::sync::mpsc::UnboundedSender;
use tsclientlib::{ClientId, ConnectOptions, Identity, MessageTarget};
use crate::audio_player::AudioPlayerError;
@@ -18,8 +19,9 @@ use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage};
pub struct MasterBot {
config: Arc<MasterConfig>,
- music_bots: Arc<Mutex<MusicBots>>,
+ music_bots: Arc<RwLock<MusicBots>>,
teamspeak: Arc<TeamSpeakConnection>,
+ sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
}
struct MusicBots {
@@ -32,7 +34,7 @@ struct MusicBots {
impl MasterBot {
pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) {
let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel();
- let tx = Arc::new(Mutex::new(tx));
+ let tx = Arc::new(RwLock::new(tx));
info!("Starting in TeamSpeak mode");
let mut con_config = ConnectOptions::new(args.address.clone())
@@ -65,7 +67,7 @@ impl MasterBot {
let name_count = config.names.len();
let id_count = config.ids.len();
- let music_bots = Arc::new(Mutex::new(MusicBots {
+ let music_bots = Arc::new(RwLock::new(MusicBots {
rng: SmallRng::from_entropy(),
available_names: (0..name_count).collect(),
available_ids: (0..id_count).collect(),
@@ -76,6 +78,7 @@ impl MasterBot {
config,
music_bots,
teamspeak: connection,
+ sender: tx.clone(),
});
bot.teamspeak
@@ -83,8 +86,12 @@ impl MasterBot {
let cbot = bot.clone();
let msg_loop = async move {
- loop {
+ 'outer: loop {
while let Some(msg) = rx.recv().await {
+ if let MusicBotMessage::Quit(reason) = msg {
+ cbot.teamspeak.disconnect(&reason);
+ break 'outer;
+ }
cbot.on_message(msg).await.unwrap();
}
}
@@ -115,7 +122,7 @@ impl MasterBot {
ref mut available_names,
ref mut available_ids,
ref connected_bots,
- } = &mut *self.music_bots.lock().expect("Mutex was not poisoned");
+ } = &mut *self.music_bots.write().expect("RwLock was not poisoned");
for (_, bot) in connected_bots {
if bot.my_channel() == channel {
@@ -163,7 +170,7 @@ impl MasterBot {
let cmusic_bots = self.music_bots.clone();
let disconnect_cb = Box::new(move |n, name_index, id_index| {
- let mut music_bots = cmusic_bots.lock().expect("Mutex was not poisoned");
+ let mut music_bots = cmusic_bots.write().expect("RwLock was not poisoned");
music_bots.connected_bots.remove(&n);
music_bots.available_names.push(name_index);
music_bots.available_ids.push(id_index);
@@ -188,7 +195,7 @@ impl MasterBot {
if let Some(bot_args) = self.build_bot_args_for(id) {
let (bot, fut) = MusicBot::new(bot_args).await;
tokio::spawn(fut.unit_error().boxed().compat().map(|_| ()));
- let mut music_bots = self.music_bots.lock().expect("Mutex was not poisoned");
+ let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned");
music_bots
.connected_bots
.insert(bot.name().to_string(), bot);
@@ -205,6 +212,25 @@ impl MasterBot {
Ok(())
}
+
+ pub fn names(&self) -> Vec<String> {
+ let music_bots = self.music_bots.read().unwrap();
+
+ music_bots
+ .connected_bots
+ .iter()
+ .map(|(_, b)| b.name().to_owned())
+ .collect()
+ }
+
+ pub fn quit(&self, reason: String) {
+ let music_bots = self.music_bots.read().unwrap();
+ for (_, bot) in &music_bots.connected_bots {
+ bot.quit(reason.clone())
+ }
+ let sender = self.sender.read().unwrap();
+ sender.send(MusicBotMessage::Quit(reason)).unwrap();
+ }
}
#[derive(Debug, Serialize, Deserialize)]
diff --git a/src/bot/music.rs b/src/bot/music.rs
index 2539695..a23ed3b 100644
--- a/src/bot/music.rs
+++ b/src/bot/music.rs
@@ -1,6 +1,6 @@
use std::future::Future;
use std::io::BufRead;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, RwLock};
use std::thread;
use humantime;
@@ -71,8 +71,8 @@ pub struct MusicBot {
name: String,
player: Arc<AudioPlayer>,
teamspeak: Option<Arc<TeamSpeakConnection>>,
- playlist: Arc<Mutex<Playlist>>,
- state: Arc<Mutex<State>>,
+ playlist: Arc<RwLock<Playlist>>,
+ state: Arc<RwLock<State>>,
}
pub struct MusicBotArgs {
@@ -90,7 +90,7 @@ pub struct MusicBotArgs {
impl MusicBot {
pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) {
let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel();
- let tx = Arc::new(Mutex::new(tx));
+ let tx = Arc::new(RwLock::new(tx));
let (player, connection) = if args.local {
info!("Starting in CLI mode");
let audio_player = AudioPlayer::new(tx.clone(), None).unwrap();
@@ -127,7 +127,7 @@ impl MusicBot {
player.set_volume(0.5).unwrap();
let player = Arc::new(player);
- let playlist = Arc::new(Mutex::new(Playlist::new()));
+ let playlist = Arc::new(RwLock::new(Playlist::new()));
spawn_gstreamer_thread(player.clone(), tx.clone());
@@ -140,7 +140,7 @@ impl MusicBot {
player,
teamspeak: connection,
playlist,
- state: Arc::new(Mutex::new(State::Stopped)),
+ state: Arc::new(RwLock::new(State::Stopped)),
});
let cbot = bot.clone();
@@ -190,7 +190,7 @@ impl MusicBot {
Ok(metadata) => {
info!("Found audio url: {}", metadata.url);
- let mut playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ let mut playlist = self.playlist.write().expect("RwLock was not poisoned");
playlist.push(metadata.clone());
if !self.player.is_started() {
@@ -269,7 +269,7 @@ impl MusicBot {
async fn on_command(&self, command: Command) -> Result<(), AudioPlayerError> {
match command {
Command::Play => {
- let playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ let playlist = self.playlist.read().expect("RwLock was not poisoned");
if !self.player.is_started() {
if !playlist.is_empty() {
@@ -303,7 +303,7 @@ impl MusicBot {
}
}
Command::Next => {
- let playlist = self.playlist.lock().expect("Mutex was not poisoned");
+ let playlist = self.playlist.read().expect("RwLock was not poisoned");
if !playlist.is_empty() {
info!("Skipping to next track");
self.player.stop_current()?;
@@ -314,8 +314,8 @@ impl MusicBot {
}
Command::Clear => {
self.playlist
- .lock()
- .expect("Mutex was not poisoned")
+ .write()
+ .expect("RwLock was not poisoned")
.clear();
}
Command::Volume { percent: volume } => {
@@ -331,7 +331,7 @@ impl MusicBot {
}
fn on_state(&self, state: State) -> Result<(), AudioPlayerError> {
- let mut current_state = self.state.lock().unwrap();
+ let mut current_state = self.state.write().unwrap();
if *current_state != state {
match state {
State::Playing => {
@@ -345,7 +345,11 @@ impl MusicBot {
self.set_description("");
}
State::EndOfStream => {
- let next_track = self.playlist.lock().expect("Mutex was not poisoned").pop();
+ let next_track = self
+ .playlist
+ .write()
+ .expect("RwLock was not poisoned")
+ .pop();
if let Some(request) = next_track {
info!("Advancing playlist");
@@ -401,7 +405,7 @@ impl MusicBot {
}
}
-fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) {
+fn spawn_stdin_reader(tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>) {
debug!("Spawning stdin reader thread");
thread::Builder::new()
.name(String::from("stdin reader"))
@@ -421,7 +425,7 @@ fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) {
text: line,
});
- let tx = tx.lock().unwrap();
+ let tx = tx.read().unwrap();
tx.send(message).unwrap();
}
})
@@ -430,7 +434,7 @@ fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>) {
fn spawn_gstreamer_thread(
player: Arc<AudioPlayer>,
- tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>,
+ tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
) {
thread::Builder::new()
.name(String::from("gstreamer polling"))
@@ -439,7 +443,7 @@ fn spawn_gstreamer_thread(
break;
}
- tx.lock()
+ tx.read()
.unwrap()
.send(MusicBotMessage::StateChange(State::EndOfStream))
.unwrap();
diff --git a/src/main.rs b/src/main.rs
index 922162f..2559a2a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,9 +1,10 @@
use std::fs::File;
use std::io::{Read, Write};
use std::path::PathBuf;
+use std::thread;
use futures::future::{FutureExt, TryFutureExt};
-use log::{debug, info};
+use log::{debug, error, info};
use structopt::clap::AppSettings;
use structopt::StructOpt;
use tsclientlib::Identity;
@@ -13,6 +14,7 @@ mod bot;
mod command;
mod playlist;
mod teamspeak;
+mod web_server;
mod youtube_dl;
use bot::{MasterArgs, MasterBot, MusicBot, MusicBotArgs};
@@ -116,7 +118,22 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
};
MusicBot::new(bot_args).await.1.await;
} else {
- MasterBot::new(bot_args).await.1.await;
+ let domain = bot_args.domain.clone();
+ let bind_address = bot_args.bind_address.clone();
+ let (bot, fut) = MasterBot::new(bot_args).await;
+
+ thread::spawn(|| {
+ let web_args = web_server::WebServerArgs {
+ domain,
+ bind_address,
+ bot,
+ };
+ if let Err(e) = web_server::start(web_args) {
+ error!("Error in web server: {}", e);
+ }
+ });
+
+ fut.await;
}
}
.unit_error()
diff --git a/src/teamspeak/bbcode.rs b/src/teamspeak/bbcode.rs
index 28be08a..91d576a 100644
--- a/src/teamspeak/bbcode.rs
+++ b/src/teamspeak/bbcode.rs
@@ -1,4 +1,4 @@
-use std::fmt::{Formatter, Display, Error};
+use std::fmt::{Display, Error, Formatter};
#[allow(dead_code)]
pub enum BbCode<'a> {
@@ -14,7 +14,9 @@ impl<'a> Display for BbCode<'a> {
BbCode::Bold(text) => fmt.write_fmt(format_args!("[B]{}[/B]", text))?,
BbCode::Italic(text) => fmt.write_fmt(format_args!("[I]{}[/I]", text))?,
BbCode::Underline(text) => fmt.write_fmt(format_args!("[U]{}[/U]", text))?,
- BbCode::Link(text, url) => fmt.write_fmt(format_args!("[URL={}]{}[/URL]", url, text))?,
+ BbCode::Link(text, url) => {
+ fmt.write_fmt(format_args!("[URL={}]{}[/URL]", url, text))?
+ }
};
Ok(())
diff --git a/src/teamspeak/mod.rs b/src/teamspeak/mod.rs
index 5ac0d44..7551e77 100644
--- a/src/teamspeak/mod.rs
+++ b/src/teamspeak/mod.rs
@@ -1,4 +1,4 @@
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use futures::compat::Future01CompatExt;
@@ -76,7 +76,7 @@ fn get_message<'a>(event: &Event) -> Option<MusicBotMessage> {
impl TeamSpeakConnection {
pub async fn new(
- tx: Arc<Mutex<UnboundedSender<MusicBotMessage>>>,
+ tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
options: ConnectOptions,
) -> Result<TeamSpeakConnection, tsclientlib::Error> {
let conn = Connection::new(options).compat().await?;
@@ -89,7 +89,7 @@ impl TeamSpeakConnection {
if let ConEvents(_conn, events) = e {
for event in *events {
if let Some(msg) = get_message(event) {
- let tx = tx.lock().expect("Mutex was not poisoned");
+ let tx = tx.read().expect("RwLock was not poisoned");
// Ignore the result because the receiver might get dropped first.
let _ = tx.send(msg);
}
diff --git a/src/web_server.rs b/src/web_server.rs
new file mode 100644
index 0000000..1edbc50
--- /dev/null
+++ b/src/web_server.rs
@@ -0,0 +1,60 @@
+use std::sync::Arc;
+
+use actix::{Actor, Addr, Handler, Message, SyncArbiter, SyncContext};
+use actix_web::{get, middleware::Logger, web, App, HttpResponse, HttpServer, Responder};
+
+use crate::bot::MasterBot;
+
+struct GetNames;
+
+impl Message for GetNames {
+ type Result = Result<Vec<String>, ()>;
+}
+
+#[get("/")]
+async fn index(bot: web::Data<Addr<BotExecutor>>) -> impl Responder {
+ let names = bot.send(GetNames).await.unwrap().unwrap();
+ HttpResponse::Ok().body(&format!("Music bots connected: {}", names.join(", ")))
+}
+
+pub struct WebServerArgs {
+ pub domain: String,
+ pub bind_address: String,
+ pub bot: Arc<MasterBot>,
+}
+
+#[actix_rt::main]
+pub async fn start(args: WebServerArgs) -> std::io::Result<()> {
+ let cbot = args.bot.clone();
+ let bot_addr: Addr<BotExecutor> = SyncArbiter::start(4, move || BotExecutor(cbot.clone()));
+
+ HttpServer::new(move || {
+ App::new()
+ .data(bot_addr.clone())
+ .wrap(Logger::default())
+ .service(index)
+ })
+ .bind(args.bind_address)?
+ .run()
+ .await?;
+
+ args.bot.quit(String::from("Stopping"));
+
+ Ok(())
+}
+
+pub struct BotExecutor(pub Arc<MasterBot>);
+
+impl Actor for BotExecutor {
+ type Context = SyncContext<Self>;
+}
+
+impl Handler<GetNames> for BotExecutor {
+ type Result = Result<Vec<String>, ()>;
+
+ fn handle(&mut self, _: GetNames, _: &mut Self::Context) -> Self::Result {
+ let bot = &self.0;
+
+ Ok(bot.names())
+ }
+}