aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/audio_player.rs328
-rw-r--r--src/bot/master.rs363
-rw-r--r--src/bot/music.rs500
-rw-r--r--src/log_bridge.rs101
-rw-r--r--src/main.rs154
-rw-r--r--src/playlist.rs16
-rw-r--r--src/teamspeak/mod.rs245
-rw-r--r--src/web_server.rs34
-rw-r--r--src/web_server/api.rs17
-rw-r--r--src/web_server/bot_data.rs47
-rw-r--r--src/web_server/bot_executor.rs63
-rw-r--r--src/web_server/default.rs17
-rw-r--r--src/web_server/tmtu.rs17
-rw-r--r--src/youtube_dl.rs15
14 files changed, 1067 insertions, 850 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs
index 1f6649f..23581f9 100644
--- a/src/audio_player.rs
+++ b/src/audio_player.rs
@@ -7,36 +7,31 @@ use gstreamer as gst;
use gstreamer_app::{AppSink, AppSinkCallbacks};
use gstreamer_audio::{StreamVolume, StreamVolumeFormat};
-use crate::bot::{MusicBotMessage, State};
use glib::BoolError;
-use log::{debug, error, info, warn};
-use std::sync::{Arc, RwLock};
-use tokio::sync::mpsc::UnboundedSender;
+use slog::{debug, error, info, warn, Logger};
+use xtra::WeakAddress;
+use crate::bot::{MusicBot, MusicBotMessage, State};
use crate::command::{Seek, VolumeChange};
use crate::youtube_dl::AudioMetadata;
static GST_INIT: Once = Once::new();
-#[derive(PartialEq, Eq, Debug, Clone, Copy)]
-pub enum PollResult {
- Continue,
- Quit,
-}
-
pub struct AudioPlayer {
pipeline: gst::Pipeline,
bus: gst::Bus,
http_src: gst::Element,
- volume_f64: RwLock<f64>,
+ volume_f64: f64,
volume: gst::Element,
- sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
- currently_playing: RwLock<Option<AudioMetadata>>,
+ currently_playing: Option<AudioMetadata>,
+
+ logger: Logger,
}
fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> {
- Ok(gst::ElementFactory::make(factoryname, Some(display_name))?)
+ Ok(gst::ElementFactory::make(factoryname, Some(display_name))
+ .map_err(|_| AudioPlayerError::MissingPlugin(factoryname.to_string()))?)
}
fn link_elements(a: &gst::Element, b: &gst::Element) -> Result<(), AudioPlayerError> {
@@ -49,11 +44,12 @@ fn add_decode_bin_new_pad_callback(
decode_bin: &gst::Element,
audio_bin: gst::Bin,
ghost_pad: gst::GhostPad,
+ logger: Logger,
) {
decode_bin.connect_pad_added(move |_, new_pad| {
- debug!("New pad received on decode bin");
+ debug!(logger, "New pad received on decode bin");
let name = if let Some(caps) = new_pad.get_current_caps() {
- debug!("Pad caps: {}", caps.to_string());
+ debug!(logger, "Found caps"; "caps" => caps.to_string());
if let Some(structure) = caps.get_structure(0) {
Some(structure.get_name().to_string())
} else {
@@ -68,7 +64,7 @@ fn add_decode_bin_new_pad_callback(
peer.unlink(&ghost_pad).unwrap();
}
- info!("Found raw audio, linking audio bin");
+ info!(logger, "Found raw audio, linking audio bin");
new_pad.link(&ghost_pad).unwrap();
audio_bin.sync_state_with_parent().unwrap();
@@ -77,27 +73,15 @@ fn add_decode_bin_new_pad_callback(
}
impl AudioPlayer {
- pub fn new(
- sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
- callback: Option<Box<dyn FnMut(&[u8]) + Send>>,
- ) -> Result<Self, AudioPlayerError> {
+ pub fn new(logger: Logger) -> Result<Self, AudioPlayerError> {
GST_INIT.call_once(|| gst::init().unwrap());
- info!("Creating audio player");
+ info!(logger, "Creating audio player");
let pipeline = gst::Pipeline::new(Some("TeamSpeak Audio Player"));
let bus = pipeline.get_bus().unwrap();
let http_src = make_element("souphttpsrc", "http source")?;
- let decode_bin = make_element("decodebin", "decode bin")?;
- pipeline.add_many(&[&http_src, &decode_bin])?;
-
- link_elements(&http_src, &decode_bin)?;
-
- let (audio_bin, volume, ghost_pad) = Self::create_audio_bin(callback)?;
-
- add_decode_bin_new_pad_callback(&decode_bin, audio_bin.clone(), ghost_pad);
-
- pipeline.add(&audio_bin)?;
+ let volume = make_element("volume", "volume")?;
// The documentation says that we have to make sure to handle
// all messages if auto flushing is deactivated.
@@ -111,26 +95,31 @@ impl AudioPlayer {
pipeline,
bus,
http_src,
+ logger,
- volume_f64: RwLock::new(0.0),
+ volume_f64: 0.0,
volume,
- sender,
- currently_playing: RwLock::new(None),
+ currently_playing: None,
})
}
- fn create_audio_bin(
+ pub fn setup_with_audio_callback(
+ &self,
callback: Option<Box<dyn FnMut(&[u8]) + Send>>,
- ) -> Result<(gst::Bin, gst::Element, gst::GhostPad), AudioPlayerError> {
+ ) -> Result<(), AudioPlayerError> {
+ let decode_bin = make_element("decodebin", "decode bin")?;
+ self.pipeline.add_many(&[&self.http_src, &decode_bin])?;
+
+ link_elements(&self.http_src, &decode_bin)?;
+
let audio_bin = gst::Bin::new(Some("audio bin"));
let queue = make_element("queue", "audio queue")?;
let convert = make_element("audioconvert", "audio converter")?;
- let volume = make_element("volume", "volume")?;
let resample = make_element("audioresample", "audio resampler")?;
let pads = queue.get_sink_pads();
let queue_sink_pad = pads.first().unwrap();
- audio_bin.add_many(&[&queue, &convert, &volume, &resample])?;
+ audio_bin.add_many(&[&queue, &convert, &self.volume, &resample])?;
if let Some(mut callback) = callback {
let opus_enc = make_element("opusenc", "opus encoder")?;
@@ -160,49 +149,64 @@ impl AudioPlayer {
audio_bin.add_many(&[&opus_enc, &sink])?;
- gst::Element::link_many(&[&queue, &convert, &volume, &resample, &opus_enc, &sink])?;
+ gst::Element::link_many(&[
+ &queue,
+ &convert,
+ &self.volume,
+ &resample,
+ &opus_enc,
+ &sink,
+ ])?;
} else {
let sink = make_element("autoaudiosink", "auto audio sink")?;
audio_bin.add_many(&[&sink])?;
- gst::Element::link_many(&[&queue, &convert, &volume, &resample, &sink])?;
+ gst::Element::link_many(&[&queue, &convert, &self.volume, &resample, &sink])?;
};
let ghost_pad = GhostPad::with_target(Some("audio bin sink"), queue_sink_pad).unwrap();
ghost_pad.set_active(true)?;
audio_bin.add_pad(&ghost_pad)?;
- Ok((audio_bin, volume, ghost_pad))
+ add_decode_bin_new_pad_callback(
+ &decode_bin,
+ audio_bin.clone(),
+ ghost_pad,
+ self.logger.clone(),
+ );
+
+ self.pipeline.add(&audio_bin)?;
+
+ Ok(())
}
- pub fn set_metadata(&self, data: AudioMetadata) -> Result<(), AudioPlayerError> {
+ pub fn set_metadata(&mut self, data: AudioMetadata) -> Result<(), AudioPlayerError> {
self.set_source_url(data.url.clone())?;
- let mut currently_playing = self.currently_playing.write().unwrap();
- *currently_playing = Some(data);
+ self.currently_playing = Some(data);
Ok(())
}
fn set_source_url(&self, location: String) -> Result<(), AudioPlayerError> {
- info!("Setting location URI: {}", location);
+ info!(self.logger, "Setting source"; "url" => &location);
self.http_src.set_property("location", &location)?;
Ok(())
}
- pub fn change_volume(&self, volume: VolumeChange) -> Result<(), AudioPlayerError> {
+ pub fn change_volume(&mut self, volume: VolumeChange) -> Result<(), AudioPlayerError> {
let new_volume = match volume {
- VolumeChange::Positive(vol) => self.volume() + vol,
- VolumeChange::Negative(vol) => self.volume() - vol,
+ VolumeChange::Positive(vol) => self.volume_f64 + vol,
+ VolumeChange::Negative(vol) => self.volume_f64 - vol,
VolumeChange::Absolute(vol) => vol,
};
let new_volume = new_volume.max(0.0).min(1.0);
- *self.volume_f64.write().unwrap() = new_volume;
+ self.volume_f64 = new_volume;
let db = 50.0 * new_volume.log10();
- info!("Setting volume: {} -> {} dB", new_volume, db);
+ info!(self.logger, "Setting volume"; "volume" => new_volume, "db" => db);
let linear =
StreamVolume::convert_volume(StreamVolumeFormat::Db, StreamVolumeFormat::Linear, db);
@@ -212,36 +216,10 @@ impl AudioPlayer {
Ok(())
}
- pub fn is_started(&self) -> bool {
- let (_, current, pending) = self.pipeline.get_state(gst::ClockTime(None));
-
- match (current, pending) {
- (gst::State::Null, gst::State::VoidPending) => false,
- (_, gst::State::Null) => false,
- (gst::State::Ready, gst::State::VoidPending) => false,
- _ => true,
- }
- }
-
- pub fn volume(&self) -> f64 {
- *self.volume_f64.read().unwrap()
- }
-
- pub fn position(&self) -> Option<Duration> {
- self.pipeline
- .query_position::<gst::ClockTime>()
- .and_then(|t| t.0.map(Duration::from_nanos))
- }
-
- pub fn currently_playing(&self) -> Option<AudioMetadata> {
- self.currently_playing.read().unwrap().clone()
- }
+ pub fn reset(&mut self) -> Result<(), AudioPlayerError> {
+ info!(self.logger, "Setting pipeline state"; "to" => "null");
- pub fn reset(&self) -> Result<(), AudioPlayerError> {
- info!("Setting pipeline state to null");
-
- let mut currently_playing = self.currently_playing.write().unwrap();
- *currently_playing = None;
+ self.currently_playing = None;
self.pipeline.set_state(gst::State::Null)?;
@@ -249,7 +227,7 @@ impl AudioPlayer {
}
pub fn play(&self) -> Result<(), AudioPlayerError> {
- info!("Setting pipeline state to playing");
+ info!(self.logger, "Setting pipeline state"; "to" => "playing");
self.pipeline.set_state(gst::State::Playing)?;
@@ -257,7 +235,7 @@ impl AudioPlayer {
}
pub fn pause(&self) -> Result<(), AudioPlayerError> {
- info!("Setting pipeline state to paused");
+ info!(self.logger, "Setting pipeline state"; "to" => "paused");
self.pipeline.set_state(gst::State::Paused)?;
@@ -289,7 +267,7 @@ impl AudioPlayer {
};
let time = humantime::format_duration(absolute);
- info!("Seeking to {}", time);
+ info!(self.logger, "Seeking"; "time" => %time);
self.pipeline.seek_simple(
gst::SeekFlags::FLUSH,
@@ -300,121 +278,125 @@ impl AudioPlayer {
}
pub fn stop_current(&self) -> Result<(), AudioPlayerError> {
- info!("Stopping pipeline, sending EOS");
+ info!(self.logger, "Stopping pipeline, sending EOS");
self.bus.post(&gst::message::Eos::new())?;
Ok(())
}
- pub fn quit(&self, reason: String) {
- info!("Quitting audio player");
-
- if self
- .bus
- .post(&gst::message::Application::new(gst::Structure::new_empty(
- "quit",
- )))
- .is_err()
- {
- warn!("Tried to send \"quit\" app event on flushing bus.");
+ pub fn is_started(&self) -> bool {
+ let (_, current, pending) = self.pipeline.get_state(gst::ClockTime(None));
+
+ match (current, pending) {
+ (gst::State::Null, gst::State::VoidPending) => false,
+ (_, gst::State::Null) => false,
+ (gst::State::Ready, gst::State::VoidPending) => false,
+ _ => true,
}
+ }
- let sender = self.sender.read().unwrap();
- sender.send(MusicBotMessage::Quit(reason)).unwrap();
+ pub fn volume(&self) -> f64 {
+ self.volume_f64
}
- fn send_state(&self, state: State) {
- info!("Sending state {:?} to application", state);
- let sender = self.sender.read().unwrap();
- sender.send(MusicBotMessage::StateChange(state)).unwrap();
+ pub fn position(&self) -> Option<Duration> {
+ self.pipeline
+ .query_position::<gst::ClockTime>()
+ .and_then(|t| t.0.map(Duration::from_nanos))
}
- pub fn poll(&self) -> PollResult {
- debug!("Polling GStreamer");
- 'outer: loop {
- while let Some(msg) = self.bus.timed_pop(gst::ClockTime(None)) {
- use gst::MessageView;
-
- match msg.view() {
- MessageView::StateChanged(state) => {
- if let Some(src) = state.get_src() {
- if src.get_name() != self.pipeline.get_name() {
- continue;
- }
- }
+ pub fn currently_playing(&self) -> Option<AudioMetadata> {
+ self.currently_playing.clone()
+ }
+
+ pub fn register_bot(&self, bot: WeakAddress<MusicBot>) {
+ let pipeline_name = self.pipeline.get_name();
+ debug!(self.logger, "Setting sync handler on gstreamer bus");
- let old = state.get_old();
- let current = state.get_current();
- let pending = state.get_pending();
-
- match (old, current, pending) {
- (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => {
- self.send_state(State::Playing)
- }
- (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => {
- self.send_state(State::Paused)
- }
- (_, gst::State::Ready, gst::State::Null) => {
- self.send_state(State::Stopped)
- }
- (_, gst::State::Null, gst::State::VoidPending) => {
- self.send_state(State::Stopped)
- }
- _ => {
- debug!(
- "Pipeline transitioned from {:?} to {:?}, with {:?} pending",
- old, current, pending
- );
- }
+ let logger = self.logger.clone();
+ let handle = tokio::runtime::Handle::current();
+ self.bus.set_sync_handler(move |_, msg| {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::StateChanged(state) => {
+ if let Some(src) = state.get_src() {
+ if src.get_name() != pipeline_name {
+ return gst::BusSyncReply::Drop;
}
}
- MessageView::Eos(..) => {
- info!("End of stream reached");
- self.reset().unwrap();
- break 'outer;
- }
- MessageView::Warning(warn) => {
- warn!(
- "Warning from {:?}: {} ({:?})",
- warn.get_src().map(|s| s.get_path_string()),
- warn.get_error(),
- warn.get_debug()
- );
- break 'outer;
- }
- MessageView::Error(err) => {
- error!(
- "Error from {:?}: {} ({:?})",
- err.get_src().map(|s| s.get_path_string()),
- err.get_error(),
- err.get_debug()
- );
- break 'outer;
- }
- MessageView::Application(content) => {
- if let Some(s) = content.get_structure() {
- if s.get_name() == "quit" {
- self.reset().unwrap();
- return PollResult::Quit;
- }
+ let old = state.get_old();
+ let current = state.get_current();
+ let pending = state.get_pending();
+
+ match (old, current, pending) {
+ (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => {
+ send_state(&handle, &bot, State::Playing);
+ }
+ (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => {
+ send_state(&handle, &bot, State::Paused);
+ }
+ (_, gst::State::Ready, gst::State::Null) => {
+ send_state(&handle, &bot, State::Stopped);
+ }
+ (_, gst::State::Null, gst::State::VoidPending) => {
+ send_state(&handle, &bot, State::Stopped);
+ }
+ _ => {
+ debug!(
+ logger,
+ "Pipeline transitioned";
+ "from" => ?old,
+ "to" => ?current,
+ "pending" => ?pending
+ );
}
}
- _ => {
- //debug!("Unhandled message on bus: {:?}", msg)
- }
- };
+ }
+ MessageView::Eos(..) => {
+ info!(logger, "End of stream reached");
+
+ send_state(&handle, &bot, State::EndOfStream);
+ }
+ MessageView::Warning(warn) => {
+ warn!(
+ logger,
+ "Received warning from bus";
+ "source" => ?warn.get_src().map(|s| s.get_path_string()),
+ "error" => %warn.get_error(),
+ "debug" => ?warn.get_debug()
+ );
+ }
+ MessageView::Error(err) => {
+ error!(
+ logger,
+ "Received error from bus";
+ "source" => ?err.get_src().map(|s| s.get_path_string()),
+ "error" => %err.get_error(),
+ "debug" => ?err.get_debug()
+ );
+
+ send_state(&handle, &bot, State::EndOfStream);
+ }
+ _ => {
+ //debug!("Unhandled message on bus: {:?}", msg)
+ }
}
- }
- debug!("Left GStreamer message loop");
- PollResult::Continue
+ gst::BusSyncReply::Drop
+ });
}
}
+fn send_state(handle: &tokio::runtime::Handle, addr: &WeakAddress<MusicBot>, state: State) {
+ handle.spawn(addr.send(MusicBotMessage::StateChange(state)));
+}
+
#[derive(Debug)]
pub enum AudioPlayerError {
+ MissingPlugin(String),
GStreamerError(glib::error::BoolError),
StateChangeFailed,
SeekError,
diff --git a/src/bot/master.rs b/src/bot/master.rs
index 1480e17..94332ac 100644
--- a/src/bot/master.rs
+++ b/src/bot/master.rs
@@ -1,41 +1,52 @@
use std::collections::HashMap;
-use std::future::Future;
-use std::sync::{Arc, RwLock};
-use log::info;
+use async_trait::async_trait;
+use futures::future;
use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
use serde::{Deserialize, Serialize};
-use tokio::sync::mpsc::UnboundedSender;
-use tsclientlib::{ClientId, Connection, Identity, MessageTarget};
+use slog::{error, info, o, trace, Logger};
+use tsclientlib::{ClientId, ConnectOptions, Connection, Identity, MessageTarget};
+use xtra::{spawn::Tokio, Actor, Address, Context, Handler, Message, WeakAddress};
use crate::audio_player::AudioPlayerError;
use crate::teamspeak::TeamSpeakConnection;
use crate::Args;
-use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage};
+use crate::bot::{GetBotData, GetChannel, GetName, MusicBot, MusicBotArgs, MusicBotMessage};
pub struct MasterBot {
- config: Arc<MasterConfig>,
- music_bots: Arc<RwLock<MusicBots>>,
+ config: MasterConfig,
+ my_addr: Option<WeakAddress<Self>>,
teamspeak: TeamSpeakConnection,
- sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
+ available_names: Vec<String>,
+ available_ids: Vec<Identity>,
+ connected_bots: HashMap<String, Address<MusicBot>>,
+ rng: SmallRng,
+ logger: Logger,
}
-struct MusicBots {
- rng: SmallRng,
- available_names: Vec<usize>,
- available_ids: Vec<usize>,
- connected_bots: HashMap<String, Arc<MusicBot>>,
+#[derive(Debug, Serialize, Deserialize)]
+pub struct MasterArgs {
+ #[serde(default = "default_name")]
+ pub master_name: String,
+ pub address: String,
+ pub channel: Option<String>,
+ #[serde(default = "default_verbose")]
+ pub verbose: u8,
+ pub domain: String,
+ pub bind_address: String,
+ pub names: Vec<String>,
+ pub id: Option<Identity>,
+ pub ids: Option<Vec<Identity>>,
}
impl MasterBot {
- pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) {
- let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
- let tx = Arc::new(RwLock::new(tx));
- info!("Starting in TeamSpeak mode");
+ pub async fn spawn(args: MasterArgs, logger: Logger) -> Address<Self> {
+ info!(logger, "Starting in TeamSpeak mode");
let mut con_config = Connection::build(args.address.clone())
+ .logger(logger.clone())
.version(tsclientlib::Version::Linux_3_3_2)
.name(args.master_name.clone())
.identity(args.id.expect("identity should exist"))
@@ -47,168 +58,116 @@ impl MasterBot {
con_config = con_config.channel(channel);
}
- let connection = TeamSpeakConnection::new(tx.clone(), con_config)
- .await
- .unwrap();
+ let connection = TeamSpeakConnection::new(logger.clone()).await.unwrap();
+ trace!(logger, "Created teamspeak connection");
- let config = Arc::new(MasterConfig {
+ let config = MasterConfig {
master_name: args.master_name,
address: args.address,
- names: args.names,
- ids: args.ids.expect("identies should exists"),
- local: args.local,
verbose: args.verbose,
- });
-
- let name_count = config.names.len();
- let id_count = config.ids.len();
+ };
- let music_bots = Arc::new(RwLock::new(MusicBots {
+ let bot_addr = Self {
+ config,
+ my_addr: None,
+ teamspeak: connection,
+ logger: logger.clone(),
rng: SmallRng::from_entropy(),
- available_names: (0..name_count).collect(),
- available_ids: (0..id_count).collect(),
+ available_names: args.names,
+ available_ids: args.ids.expect("identities"),
connected_bots: HashMap::new(),
- }));
+ }
+ .create(None)
+ .spawn(&mut Tokio::Global);
- let bot = Arc::new(Self {
- config,
- music_bots,
- teamspeak: connection,
- sender: tx.clone(),
- });
-
- let cbot = bot.clone();
- let msg_loop = async move {
- 'outer: loop {
- while let Some(msg) = rx.recv().await {
- match msg {
- MusicBotMessage::Quit(reason) => {
- let mut cteamspeak = cbot.teamspeak.clone();
- cteamspeak.disconnect(&reason).await;
- break 'outer;
- }
- MusicBotMessage::ClientDisconnected { id, .. } => {
- if id == cbot.my_id().await {
- // TODO Reconnect since quit was not called
- break 'outer;
- }
- }
- _ => cbot.on_message(msg).await.unwrap(),
- }
- }
- }
- };
+ bot_addr.send(Connect(con_config)).await.unwrap().unwrap();
+ trace!(logger, "Spawned master bot actor");
- (bot, msg_loop)
+ bot_addr
}
- async fn build_bot_args_for(&self, id: ClientId) -> Result<MusicBotArgs, BotCreationError> {
- let mut cteamspeak = self.teamspeak.clone();
- let channel = match cteamspeak.channel_of_user(id).await {
+ async fn bot_args_for_client(
+ &mut self,
+ user_id: ClientId,
+ ) -> Result<MusicBotArgs, BotCreationError> {
+ let channel = match self.teamspeak.channel_of_user(user_id).await {
Some(channel) => channel,
None => return Err(BotCreationError::UnfoundUser),
};
- if channel == cteamspeak.my_channel().await {
+ if channel == self.teamspeak.current_channel().await.unwrap() {
return Err(BotCreationError::MasterChannel(
self.config.master_name.clone(),
));
}
- let MusicBots {
- ref mut rng,
- ref mut available_names,
- ref mut available_ids,
- ref connected_bots,
- } = &mut *self.music_bots.write().expect("RwLock was not poisoned");
-
- for bot in connected_bots.values() {
- if bot.my_channel().await == channel {
- return Err(BotCreationError::MultipleBots(bot.name().to_owned()));
+ for bot in self.connected_bots.values() {
+ if bot.send(GetChannel).await.unwrap() == Some(channel) {
+ return Err(BotCreationError::MultipleBots(
+ bot.send(GetName).await.unwrap(),
+ ));
}
}
- let channel_path = cteamspeak
- .channel_path_of_user(id)
+ let channel_path = self
+ .teamspeak
+ .channel_path_of_user(user_id)
.await
.expect("can find poke sender");
- available_names.shuffle(rng);
- let name_index = match available_names.pop() {
+ self.available_names.shuffle(&mut self.rng);
+ let name = match self.available_names.pop() {
Some(v) => v,
None => {
return Err(BotCreationError::OutOfNames);
}
};
- let name = self.config.names[name_index].clone();
- available_ids.shuffle(rng);
- let id_index = match available_ids.pop() {
+ self.available_ids.shuffle(&mut self.rng);
+ let identity = match self.available_ids.pop() {
Some(v) => v,
None => {
return Err(BotCreationError::OutOfIdentities);
}
};
- let id = self.config.ids[id_index].clone();
-
- let cmusic_bots = self.music_bots.clone();
- let disconnect_cb = Box::new(move |n, name_index, id_index| {
- let mut music_bots = cmusic_bots.write().expect("RwLock was not poisoned");
- music_bots.connected_bots.remove(&n);
- music_bots.available_names.push(name_index);
- music_bots.available_ids.push(id_index);
- });
-
- info!("Connecting to {} on {}", channel_path, self.config.address);
-
Ok(MusicBotArgs {
- name,
- name_index,
- id_index,
- local: self.config.local,
+ name: name.clone(),
+ master: self.my_addr.clone(),
address: self.config.address.clone(),
- id,
+ identity,
+ local: false,
channel: channel_path,
verbose: self.config.verbose,
- disconnect_cb,
+ logger: self.logger.new(o!("musicbot" => name)),
})
}
- async fn spawn_bot_for(&self, id: ClientId) {
- match self.build_bot_args_for(id).await {
+ async fn spawn_bot_for_client(&mut self, id: ClientId) {
+ match self.bot_args_for_client(id).await {
Ok(bot_args) => {
- let (bot, fut) = MusicBot::new(bot_args).await;
- tokio::spawn(fut);
- let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned");
- music_bots
- .connected_bots
- .insert(bot.name().to_string(), bot);
- }
- Err(e) => {
- let mut cteamspeak = self.teamspeak.clone();
- cteamspeak.send_message_to_user(id, e.to_string()).await
+ let name = bot_args.name.clone();
+ let bot = MusicBot::spawn(bot_args).await;
+ self.connected_bots.insert(name, bot);
}
+ Err(e) => self.teamspeak.send_message_to_user(id, e.to_string()).await,
}
}
- async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> {
+ async fn on_message(&mut self, message: MusicBotMessage) -> Result<(), AudioPlayerError> {
match message {
MusicBotMessage::TextMessage(message) => {
if let MessageTarget::Poke(who) = message.target {
- info!("Poked by {}, creating bot for their channel", who);
- self.spawn_bot_for(who).await;
+ info!(
+ self.logger,
+ "Poked, creating bot"; "user" => %who
+ );
+ self.spawn_bot_for_client(who).await;
}
}
- MusicBotMessage::ChannelAdded(id) => {
- let mut cteamspeak = self.teamspeak.clone();
- cteamspeak.subscribe(id).await;
- }
MusicBotMessage::ClientAdded(id) => {
- let mut cteamspeak = self.teamspeak.clone();
-
- if id == cteamspeak.my_id().await {
- cteamspeak
+ if id == self.teamspeak.my_id().await {
+ self.teamspeak
.set_description(String::from("Poke me if you want a music bot!"))
.await;
}
@@ -219,41 +178,17 @@ impl MasterBot {
Ok(())
}
- async fn my_id(&self) -> ClientId {
- let mut cteamspeak = self.teamspeak.clone();
+ pub async fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> {
+ let bot = self.connected_bots.get(&name)?;
- cteamspeak.my_id().await
+ bot.send(GetBotData).await.ok()
}
- pub fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> {
- let music_bots = self.music_bots.read().unwrap();
- let bot = music_bots.connected_bots.get(&name)?;
-
- Some(crate::web_server::BotData {
- name,
- state: bot.state(),
- volume: bot.volume(),
- position: bot.position(),
- currently_playing: bot.currently_playing(),
- playlist: bot.playlist_to_vec(),
- })
- }
-
- pub fn bot_datas(&self) -> Vec<crate::web_server::BotData> {
- let music_bots = self.music_bots.read().unwrap();
-
- let len = music_bots.connected_bots.len();
+ pub async fn bot_datas(&self) -> Vec<crate::web_server::BotData> {
+ let len = self.connected_bots.len();
let mut result = Vec::with_capacity(len);
- for (name, bot) in &music_bots.connected_bots {
- let bot_data = crate::web_server::BotData {
- name: name.clone(),
- state: bot.state(),
- volume: bot.volume(),
- position: bot.position(),
- currently_playing: bot.currently_playing(),
- playlist: bot.playlist_to_vec(),
- };
-
+ for bot in self.connected_bots.values() {
+ let bot_data = bot.send(GetBotData).await.unwrap();
result.push(bot_data);
}
@@ -261,24 +196,96 @@ impl MasterBot {
}
pub fn bot_names(&self) -> Vec<String> {
- let music_bots = self.music_bots.read().unwrap();
-
- let len = music_bots.connected_bots.len();
+ let len = self.connected_bots.len();
let mut result = Vec::with_capacity(len);
- for name in music_bots.connected_bots.keys() {
+ for name in self.connected_bots.keys() {
result.push(name.clone());
}
result
}
- pub fn quit(&self, reason: String) {
- let music_bots = self.music_bots.read().unwrap();
- for bot in music_bots.connected_bots.values() {
- bot.quit(reason.clone())
+ fn on_bot_disconnect(&mut self, name: String, id: Identity) {
+ self.connected_bots.remove(&name);
+ self.available_names.push(name);
+ self.available_ids.push(id);
+ }
+
+ pub async fn quit(&mut self, reason: String) -> Result<(), tsclientlib::Error> {
+ let futures = self
+ .connected_bots
+ .values()
+ .map(|b| b.send(Quit(reason.clone())));
+ for res in future::join_all(futures).await {
+ if let Err(e) = res {
+ error!(self.logger, "Failed to shut down bot"; "error" => %e);
+ }
}
- let sender = self.sender.read().unwrap();
- sender.send(MusicBotMessage::Quit(reason)).unwrap();
+ self.teamspeak.disconnect(&reason).await
+ }
+}
+
+#[async_trait]
+impl Actor for MasterBot {
+ async fn started(&mut self, ctx: &mut Context<Self>) {
+ self.my_addr = Some(ctx.address().unwrap().downgrade());
+ }
+}
+
+pub struct Connect(pub ConnectOptions);
+impl Message for Connect {
+ type Result = Result<(), tsclientlib::Error>;
+}
+
+#[async_trait]
+impl Handler<Connect> for MasterBot {
+ async fn handle(
+ &mut self,
+ opt: Connect,
+ ctx: &mut Context<Self>,
+ ) -> Result<(), tsclientlib::Error> {
+ let addr = ctx.address().unwrap();
+ self.teamspeak.connect_for_bot(opt.0, addr.downgrade())?;
+ Ok(())
+ }
+}
+
+pub struct Quit(pub String);
+impl Message for Quit {
+ type Result = Result<(), tsclientlib::Error>;
+}
+
+#[async_trait]
+impl Handler<Quit> for MasterBot {
+ async fn handle(&mut self, q: Quit, _: &mut Context<Self>) -> Result<(), tsclientlib::Error> {
+ self.quit(q.0).await
+ }
+}
+
+pub struct BotDisonnected {
+ pub name: String,
+ pub identity: Identity,
+}
+
+impl Message for BotDisonnected {
+ type Result = ();
+}
+
+#[async_trait]
+impl Handler<BotDisonnected> for MasterBot {
+ async fn handle(&mut self, dc: BotDisonnected, _: &mut Context<Self>) {
+ self.on_bot_disconnect(dc.name, dc.identity);
+ }
+}
+
+#[async_trait]
+impl Handler<MusicBotMessage> for MasterBot {
+ async fn handle(
+ &mut self,
+ msg: MusicBotMessage,
+ _: &mut Context<Self>,
+ ) -> Result<(), AudioPlayerError> {
+ self.on_message(msg).await
}
}
@@ -313,31 +320,10 @@ impl std::fmt::Display for BotCreationError {
}
}
-#[derive(Debug, Serialize, Deserialize)]
-pub struct MasterArgs {
- #[serde(default = "default_name")]
- pub master_name: String,
- #[serde(default = "default_local")]
- pub local: bool,
- pub address: String,
- pub channel: Option<String>,
- #[serde(default = "default_verbose")]
- pub verbose: u8,
- pub domain: String,
- pub bind_address: String,
- pub names: Vec<String>,
- pub id: Option<Identity>,
- pub ids: Option<Vec<Identity>>,
-}
-
fn default_name() -> String {
String::from("PokeBot")
}
-fn default_local() -> bool {
- false
-}
-
fn default_verbose() -> u8 {
0
}
@@ -345,7 +331,6 @@ fn default_verbose() -> u8 {
impl MasterArgs {
pub fn merge(self, args: Args) -> Self {
let address = args.address.unwrap_or(self.address);
- let local = args.local || self.local;
let channel = args.master_channel.or(self.channel);
let verbose = if args.verbose > 0 {
args.verbose
@@ -357,7 +342,6 @@ impl MasterArgs {
master_name: self.master_name,
names: self.names,
ids: self.ids,
- local,
address,
domain: self.domain,
bind_address: self.bind_address,
@@ -371,8 +355,5 @@ impl MasterArgs {
pub struct MasterConfig {
pub master_name: String,
pub address: String,
- pub names: Vec<String>,
- pub ids: Vec<Identity>,
- pub local: bool,
pub verbose: u8,
}
diff --git a/src/bot/music.rs b/src/bot/music.rs
index 90305d0..a57b66c 100644
--- a/src/bot/music.rs
+++ b/src/bot/music.rs
@@ -1,25 +1,22 @@
-use std::future::Future;
-use std::io::BufRead;
-use std::sync::{Arc, RwLock};
-use std::thread;
-use std::time::Duration;
+use async_trait::async_trait;
-use log::{debug, info};
use serde::Serialize;
+use slog::{debug, info, Logger};
use structopt::StructOpt;
-use tokio::sync::mpsc::UnboundedSender;
use tsclientlib::{data, ChannelId, ClientId, Connection, Identity, Invoker, MessageTarget};
+use xtra::{spawn::Tokio, Actor, Address, Context, Handler, Message, WeakAddress};
-use crate::audio_player::{AudioPlayer, AudioPlayerError, PollResult};
+use crate::audio_player::{AudioPlayer, AudioPlayerError};
+use crate::bot::{BotDisonnected, Connect, MasterBot, Quit};
use crate::command::Command;
use crate::command::VolumeChange;
use crate::playlist::Playlist;
use crate::teamspeak as ts;
-use crate::youtube_dl::AudioMetadata;
+use crate::youtube_dl::{self, AudioMetadata};
use ts::TeamSpeakConnection;
#[derive(Debug)]
-pub struct Message {
+pub struct ChatMessage {
pub target: MessageTarget,
pub invoker: Invoker,
pub text: String,
@@ -33,6 +30,10 @@ pub enum State {
EndOfStream,
}
+impl Message for State {
+ type Result = ();
+}
+
impl std::fmt::Display for State {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
match self {
@@ -47,7 +48,7 @@ impl std::fmt::Display for State {
#[derive(Debug)]
pub enum MusicBotMessage {
- TextMessage(Message),
+ TextMessage(ChatMessage),
ClientChannel {
client: ClientId,
old_channel: ChannelId,
@@ -59,112 +60,95 @@ pub enum MusicBotMessage {
client: Box<data::Client>,
},
StateChange(State),
- Quit(String),
+}
+
+impl Message for MusicBotMessage {
+ type Result = Result<(), AudioPlayerError>;
}
pub struct MusicBot {
name: String,
- player: Arc<AudioPlayer>,
+ identity: Identity,
+ player: AudioPlayer,
teamspeak: Option<TeamSpeakConnection>,
- playlist: Arc<RwLock<Playlist>>,
- state: Arc<RwLock<State>>,
+ master: Option<WeakAddress<MasterBot>>,
+ playlist: Playlist,
+ state: State,
+ logger: Logger,
}
pub struct MusicBotArgs {
pub name: String,
- pub name_index: usize,
- pub id_index: usize,
+ pub master: Option<WeakAddress<MasterBot>>,
pub local: bool,
pub address: String,
- pub id: Identity,
+ pub identity: Identity,
pub channel: String,
pub verbose: u8,
- pub disconnect_cb: Box<dyn FnMut(String, usize, usize) + Send + Sync>,
+ pub logger: Logger,
}
impl MusicBot {
- pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future<Output = ()>) {
- let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
- let tx = Arc::new(RwLock::new(tx));
- let (player, connection) = if args.local {
- info!("Starting in CLI mode");
- let audio_player = AudioPlayer::new(tx.clone(), None).unwrap();
-
- (audio_player, None)
- } else {
- info!("Starting in TeamSpeak mode");
-
- let con_config = Connection::build(args.address)
- .version(tsclientlib::Version::Linux_3_3_2)
- .name(format!("🎵 {}", args.name))
- .identity(args.id)
- .log_commands(args.verbose >= 1)
- .log_packets(args.verbose >= 2)
- .log_udp_packets(args.verbose >= 3)
- .channel(args.channel);
-
- let connection = TeamSpeakConnection::new(tx.clone(), con_config)
- .await
- .unwrap();
- let mut cconnection = connection.clone();
- let audio_player = AudioPlayer::new(
- tx.clone(),
- Some(Box::new(move |samples| {
- let mut rt = tokio::runtime::Runtime::new().unwrap();
- rt.block_on(cconnection.send_audio_packet(samples));
- })),
- )
- .unwrap();
-
- (audio_player, Some(connection))
- };
-
+ pub async fn spawn(args: MusicBotArgs) -> Address<Self> {
+ let mut player = AudioPlayer::new(args.logger.clone()).unwrap();
player.change_volume(VolumeChange::Absolute(0.5)).unwrap();
- let player = Arc::new(player);
- let playlist = Arc::new(RwLock::new(Playlist::new()));
- spawn_gstreamer_thread(player.clone(), tx.clone());
+ let playlist = Playlist::new(args.logger.clone());
- if args.local {
- spawn_stdin_reader(tx);
- }
+ let teamspeak = if args.local {
+ info!(args.logger, "Starting in CLI mode");
+ player.setup_with_audio_callback(None).unwrap();
- let bot = Arc::new(Self {
+ None
+ } else {
+ Some(TeamSpeakConnection::new(args.logger.clone()).await.unwrap())
+ };
+ let bot = Self {
name: args.name.clone(),
+ master: args.master,
+ identity: args.identity.clone(),
player,
- teamspeak: connection,
+ teamspeak,
playlist,
- state: Arc::new(RwLock::new(State::EndOfStream)),
- });
-
- let cbot = bot.clone();
- let mut disconnect_cb = args.disconnect_cb;
- let name = args.name;
- let name_index = args.name_index;
- let id_index = args.id_index;
- let msg_loop = async move {
- 'outer: loop {
- while let Some(msg) = rx.recv().await {
- if let MusicBotMessage::Quit(reason) = msg {
- if let Some(ts) = &cbot.teamspeak {
- let mut ts = ts.clone();
- ts.disconnect(&reason).await;
- }
- disconnect_cb(name, name_index, id_index);
- break 'outer;
- }
- cbot.on_message(msg).await.unwrap();
- }
- }
- debug!("Left message loop");
+ state: State::EndOfStream,
+ logger: args.logger.clone(),
};
- bot.update_name(State::EndOfStream).await;
+ let bot_addr = bot.create(None).spawn(&mut Tokio::Global);
+
+ info!(
+ args.logger,
+ "Connecting";
+ "name" => &args.name,
+ "channel" => &args.channel,
+ "address" => &args.address,
+ );
+
+ let opt = Connection::build(args.address)
+ .logger(args.logger.clone())
+ .version(tsclientlib::Version::Linux_3_3_2)
+ .name(format!("🎵 {}", args.name))
+ .identity(args.identity)
+ .log_commands(args.verbose >= 1)
+ .log_packets(args.verbose >= 2)
+ .log_udp_packets(args.verbose >= 3)
+ .channel(args.channel);
+ bot_addr.send(Connect(opt)).await.unwrap().unwrap();
+ bot_addr
+ .send(MusicBotMessage::StateChange(State::EndOfStream))
+ .await
+ .unwrap()
+ .unwrap();
+
+ if args.local {
+ debug!(args.logger, "Spawning stdin reader thread");
+ spawn_stdin_reader(bot_addr.downgrade());
+ }
- (bot, msg_loop)
+ bot_addr
}
- async fn start_playing_audio(&self, metadata: AudioMetadata) {
+ async fn start_playing_audio(&mut self, metadata: AudioMetadata) {
let duration = if let Some(duration) = metadata.duration {
format!("({})", ts::bold(&humantime::format_duration(duration)))
} else {
@@ -184,25 +168,16 @@ impl MusicBot {
self.player.play().unwrap();
}
- pub async fn add_audio(&self, url: String, user: String) {
- match crate::youtube_dl::get_audio_download_from_url(url).await {
+ pub async fn add_audio(&mut self, url: String, user: String) {
+ match youtube_dl::get_audio_download_from_url(url, &self.logger).await {
Ok(mut metadata) => {
metadata.added_by = user;
- info!("Found audio url: {}", metadata.url);
+ info!(self.logger, "Found source"; "url" => &metadata.url);
- // RWLockGuard can not be kept around or the compiler complains that
- // it might cross the await boundary
- self.playlist
- .write()
- .expect("RwLock was not poisoned")
- .push(metadata.clone());
+ self.playlist.push(metadata.clone());
if !self.player.is_started() {
- let entry = self
- .playlist
- .write()
- .expect("RwLock was not poisoned")
- .pop();
+ let entry = self.playlist.pop();
if let Some(request) = entry {
self.start_playing_audio(request).await;
}
@@ -222,7 +197,7 @@ impl MusicBot {
}
}
Err(e) => {
- info!("Failed to find audio url: {}", e);
+ info!(self.logger, "Failed to find audio url"; "error" => &e);
self.send_message(format!("Failed to find url: {}", e))
.await;
@@ -235,74 +210,50 @@ impl MusicBot {
}
pub fn state(&self) -> State {
- *self.state.read().expect("RwLock was not poisoned")
+ self.state
}
- pub fn volume(&self) -> f64 {
+ pub async fn volume(&self) -> f64 {
self.player.volume()
}
- pub fn position(&self) -> Option<Duration> {
- self.player.position()
- }
-
- pub fn currently_playing(&self) -> Option<AudioMetadata> {
- self.player.currently_playing()
- }
+ pub async fn current_channel(&mut self) -> Option<ChannelId> {
+ let ts = self.teamspeak.as_mut().expect("current_channel needs ts");
- pub fn playlist_to_vec(&self) -> Vec<AudioMetadata> {
- self.playlist.read().unwrap().to_vec()
+ ts.current_channel().await
}
- pub async fn my_channel(&self) -> ChannelId {
- let ts = self.teamspeak.as_ref().expect("my_channel needs ts");
-
- let mut ts = ts.clone();
- ts.my_channel().await
- }
+ async fn user_count(&mut self, channel: ChannelId) -> u32 {
+ let ts = self.teamspeak.as_mut().expect("user_count needs ts");
- async fn user_count(&self, channel: ChannelId) -> u32 {
- let ts = self.teamspeak.as_ref().expect("user_count needs ts");
-
- let mut ts = ts.clone();
ts.user_count(channel).await
}
- async fn send_message(&self, text: String) {
- debug!("Sending message to TeamSpeak: {}", text);
+ async fn send_message(&mut self, text: String) {
+ debug!(self.logger, "Sending message to TeamSpeak"; "message" => &text);
- if let Some(ts) = &self.teamspeak {
- let mut ts = ts.clone();
+ if let Some(ts) = &mut self.teamspeak {
ts.send_message_to_channel(text).await;
}
}
- async fn set_nickname(&self, name: String) {
- info!("Setting TeamSpeak nickname: {}", name);
+ async fn set_nickname(&mut self, name: String) {
+ info!(self.logger, "Setting TeamSpeak nickname"; "name" => &name);
- if let Some(ts) = &self.teamspeak {
- let mut ts = ts.clone();
+ if let Some(ts) = &mut self.teamspeak {
ts.set_nickname(name).await;
}
}
- async fn set_description(&self, desc: String) {
- info!("Setting TeamSpeak description: {}", desc);
+ async fn set_description(&mut self, desc: String) {
+ info!(self.logger, "Setting TeamSpeak description"; "description" => &desc);
- if let Some(ts) = &self.teamspeak {
- let mut ts = ts.clone();
+ if let Some(ts) = &mut self.teamspeak {
ts.set_description(desc).await;
}
}
- async fn subscribe(&self, id: ChannelId) {
- if let Some(ts) = &self.teamspeak {
- let mut ts = ts.clone();
- ts.subscribe(id).await;
- }
- }
-
- async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> {
+ async fn on_text(&mut self, message: ChatMessage) -> Result<(), AudioPlayerError> {
let msg = message.text;
if msg.starts_with('!') {
let tokens = msg[1..].split_whitespace().collect::<Vec<_>>();
@@ -319,13 +270,15 @@ impl MusicBot {
Ok(())
}
- async fn on_command(&self, command: Command, invoker: Invoker) -> Result<(), AudioPlayerError> {
+ async fn on_command(
+ &mut self,
+ command: Command,
+ invoker: Invoker,
+ ) -> Result<(), AudioPlayerError> {
match command {
Command::Play => {
- let playlist = self.playlist.read().expect("RwLock was not poisoned");
-
if !self.player.is_started() {
- if !playlist.is_empty() {
+ if !self.playlist.is_empty() {
self.player.stop_current()?;
}
} else {
@@ -357,35 +310,32 @@ impl MusicBot {
}
}
Command::Next => {
- let playlist = self.playlist.read().expect("RwLock was not poisoned");
- if !playlist.is_empty() {
- info!("Skipping to next track");
+ if !self.playlist.is_empty() {
+ info!(self.logger, "Skipping to next track");
self.player.stop_current()?;
} else {
- info!("Playlist empty, cannot skip");
+ info!(self.logger, "Playlist empty, cannot skip");
self.player.reset()?;
}
}
Command::Clear => {
- self.playlist
- .write()
- .expect("RwLock was not poisoned")
- .clear();
+ self.send_message(String::from("Cleared playlist")).await;
+ self.playlist.clear();
}
Command::Volume { volume } => {
self.player.change_volume(volume)?;
self.update_name(self.state()).await;
}
Command::Leave => {
- self.quit(String::from("Leaving"));
+ self.quit(String::from("Leaving"), true).await.unwrap();
}
}
Ok(())
}
- async fn update_name(&self, state: State) {
- let volume = (self.volume() * 100.0).round();
+ async fn update_name(&mut self, state: State) {
+ let volume = (self.volume().await * 100.0).round();
let name = match state {
State::EndOfStream => format!("🎵 {} ({}%)", self.name, volume),
_ => format!("🎵 {} - {} ({}%)", self.name, state, volume),
@@ -393,43 +343,39 @@ impl MusicBot {
self.set_nickname(name).await;
}
- async fn on_state(&self, state: State) -> Result<(), AudioPlayerError> {
- let current_state = *self.state.read().unwrap();
- if current_state != state {
- match state {
+ async fn on_state(&mut self, new_state: State) -> Result<(), AudioPlayerError> {
+ if self.state != new_state {
+ match new_state {
State::EndOfStream => {
- let next_track = self
- .playlist
- .write()
- .expect("RwLock was not poisoned")
- .pop();
+ self.player.reset()?;
+ let next_track = self.playlist.pop();
if let Some(request) = next_track {
- info!("Advancing playlist");
+ info!(self.logger, "Advancing playlist");
self.start_playing_audio(request).await;
} else {
- self.update_name(state).await;
+ self.update_name(new_state).await;
self.set_description(String::new()).await;
}
}
State::Stopped => {
- if current_state != State::EndOfStream {
- self.update_name(state).await;
+ if self.state != State::EndOfStream {
+ self.update_name(new_state).await;
self.set_description(String::new()).await;
}
}
- _ => self.update_name(state).await,
+ _ => self.update_name(new_state).await,
}
}
- if !(current_state == State::EndOfStream && state == State::Stopped) {
- *self.state.write().unwrap() = state;
+ if !(self.state == State::EndOfStream && new_state == State::Stopped) {
+ self.state = new_state;
}
Ok(())
}
- async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> {
+ async fn on_message(&mut self, message: MusicBotMessage) -> Result<(), AudioPlayerError> {
match message {
MusicBotMessage::TextMessage(message) => {
if MessageTarget::Channel == message.target {
@@ -446,9 +392,6 @@ impl MusicBot {
let old_channel = client.channel;
self.on_client_left_channel(old_channel).await;
}
- MusicBotMessage::ChannelAdded(id) => {
- self.subscribe(id).await;
- }
MusicBotMessage::StateChange(state) => {
self.on_state(state).await?;
}
@@ -458,60 +401,163 @@ impl MusicBot {
Ok(())
}
- async fn on_client_left_channel(&self, old_channel: ChannelId) {
- let my_channel = self.my_channel().await;
- if old_channel == my_channel && self.user_count(my_channel).await <= 1 {
- self.quit(String::from("Channel is empty"));
+ // FIXME logs an error if this music bot is the one leaving
+ async fn on_client_left_channel(&mut self, old_channel: ChannelId) {
+ let current_channel = match self.current_channel().await {
+ Some(c) => c,
+ None => {
+ return;
+ }
+ };
+ if old_channel == current_channel && self.user_count(current_channel).await <= 1 {
+ self.quit(String::from("Channel is empty"), true)
+ .await
+ .unwrap();
+ }
+ }
+
+ pub async fn quit(
+ &mut self,
+ reason: String,
+ inform_master: bool,
+ ) -> Result<(), tsclientlib::Error> {
+ // FIXME logs errors if the bot is playing something because it tries to
+ // change its name and description
+ self.player.reset().unwrap();
+
+ let ts = self.teamspeak.as_mut().unwrap();
+ ts.disconnect(&reason).await?;
+
+ if inform_master {
+ if let Some(master) = &self.master {
+ master
+ .send(BotDisonnected {
+ name: self.name.clone(),
+ identity: self.identity.clone(),
+ })
+ .await
+ .unwrap();
+ }
+ }
+
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl Actor for MusicBot {
+ async fn started(&mut self, ctx: &mut Context<Self>) {
+ let addr = ctx.address().unwrap().downgrade();
+ self.player.register_bot(addr);
+ }
+}
+
+#[async_trait]
+impl Handler<Connect> for MusicBot {
+ async fn handle(
+ &mut self,
+ opt: Connect,
+ ctx: &mut Context<Self>,
+ ) -> Result<(), tsclientlib::Error> {
+ let addr = ctx.address().unwrap().downgrade();
+ self.teamspeak
+ .as_mut()
+ .unwrap()
+ .connect_for_bot(opt.0, addr)?;
+
+ let mut connection = self.teamspeak.as_ref().unwrap().clone();
+ let handle = tokio::runtime::Handle::current();
+ self.player
+ .setup_with_audio_callback(Some(Box::new(move |samples| {
+ handle.block_on(connection.send_audio_packet(samples));
+ })))
+ .unwrap();
+
+ Ok(())
+ }
+}
+
+pub struct GetName;
+impl Message for GetName {
+ type Result = String;
+}
+
+#[async_trait]
+impl Handler<GetName> for MusicBot {
+ async fn handle(&mut self, _: GetName, _: &mut Context<Self>) -> String {
+ self.name().to_owned()
+ }
+}
+
+pub struct GetBotData;
+impl Message for GetBotData {
+ type Result = crate::web_server::BotData;
+}
+
+#[async_trait]
+impl Handler<GetBotData> for MusicBot {
+ async fn handle(&mut self, _: GetBotData, _: &mut Context<Self>) -> crate::web_server::BotData {
+ crate::web_server::BotData {
+ name: self.name.clone(),
+ playlist: self.playlist.to_vec(),
+ currently_playing: self.player.currently_playing(),
+ position: self.player.position(),
+ state: self.state(),
+ volume: self.volume().await,
}
}
+}
- pub fn quit(&self, reason: String) {
- self.player.quit(reason);
+pub struct GetChannel;
+impl Message for GetChannel {
+ type Result = Option<ChannelId>;
+}
+
+#[async_trait]
+impl Handler<GetChannel> for MusicBot {
+ async fn handle(&mut self, _: GetChannel, _: &mut Context<Self>) -> Option<ChannelId> {
+ self.current_channel().await
}
}
-fn spawn_stdin_reader(tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>) {
- debug!("Spawning stdin reader thread");
- thread::Builder::new()
- .name(String::from("stdin reader"))
- .spawn(move || {
- let stdin = ::std::io::stdin();
- let lock = stdin.lock();
- for line in lock.lines() {
- let line = line.unwrap();
-
- let message = MusicBotMessage::TextMessage(Message {
- target: MessageTarget::Channel,
- invoker: Invoker {
- name: String::from("stdin"),
- id: ClientId(0),
- uid: None,
- },
- text: line,
- });
-
- let tx = tx.read().unwrap();
- tx.send(message).unwrap();
- }
- })
- .expect("Failed to spawn stdin reader thread");
+#[async_trait]
+impl Handler<Quit> for MusicBot {
+ async fn handle(&mut self, q: Quit, _: &mut Context<Self>) -> Result<(), tsclientlib::Error> {
+ self.quit(q.0, false).await
+ }
}
-fn spawn_gstreamer_thread(
- player: Arc<AudioPlayer>,
- tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
-) {
- thread::Builder::new()
- .name(String::from("gstreamer polling"))
- .spawn(move || loop {
- if player.poll() == PollResult::Quit {
- break;
- }
+#[async_trait]
+impl Handler<MusicBotMessage> for MusicBot {
+ async fn handle(
+ &mut self,
+ msg: MusicBotMessage,
+ _: &mut Context<Self>,
+ ) -> Result<(), AudioPlayerError> {
+ self.on_message(msg).await
+ }
+}
- tx.read()
- .unwrap()
- .send(MusicBotMessage::StateChange(State::EndOfStream))
- .unwrap();
- })
- .expect("Failed to spawn gstreamer thread");
+fn spawn_stdin_reader(addr: WeakAddress<MusicBot>) {
+ use tokio::io::AsyncBufReadExt;
+
+ tokio::task::spawn(async move {
+ let stdin = tokio::io::stdin();
+ let reader = tokio::io::BufReader::new(stdin);
+ let mut lines = reader.lines();
+
+ while let Some(line) = lines.next_line().await.unwrap() {
+ let message = MusicBotMessage::TextMessage(ChatMessage {
+ target: MessageTarget::Channel,
+ invoker: Invoker {
+ name: String::from("stdin"),
+ id: ClientId(0),
+ uid: None,
+ },
+ text: line,
+ });
+
+ addr.send(message).await.unwrap().unwrap();
+ }
+ });
}
diff --git a/src/log_bridge.rs b/src/log_bridge.rs
new file mode 100644
index 0000000..35bcb01
--- /dev/null
+++ b/src/log_bridge.rs
@@ -0,0 +1,101 @@
+// TODO Temporary file until we have a better logging setup for slog
+
+use slog::{Drain, KV};
+use std::fmt::{self, Arguments, Write};
+
+pub struct LogBridge<T>(pub T);
+
+impl<T: log::Log> Drain for LogBridge<T> {
+ type Ok = ();
+ type Err = slog::Error;
+
+ fn log(&self, record: &slog::Record, kvs: &slog::OwnedKVList) -> Result<(), Self::Err> {
+ let mut target = record.tag();
+ if target.is_empty() {
+ target = record.module();
+ }
+
+ let lazy = LazyLog::new(record, kvs);
+
+ self.0.log(
+ &log::Record::builder()
+ .args(format_args!("{}", lazy))
+ .level(level_to_log(record.level()))
+ .target(target)
+ .module_path_static(Some(record.module()))
+ .file_static(Some(record.file()))
+ .line(Some(record.line()))
+ .build(),
+ );
+
+ Ok(())
+ }
+
+ fn is_enabled(&self, level: slog::Level) -> bool {
+ let meta = log::Metadata::builder().level(level_to_log(level)).build();
+
+ self.0.enabled(&meta)
+ }
+}
+
+fn level_to_log(level: slog::Level) -> log::Level {
+ match level {
+ slog::Level::Critical | slog::Level::Error => log::Level::Error,
+ slog::Level::Warning => log::Level::Warn,
+ slog::Level::Info => log::Level::Info,
+ slog::Level::Debug => log::Level::Debug,
+ slog::Level::Trace => log::Level::Trace,
+ }
+}
+
+struct LazyLog<'a> {
+ record: &'a slog::Record<'a>,
+ kvs: &'a slog::OwnedKVList,
+}
+
+impl<'a> LazyLog<'a> {
+ fn new(record: &'a slog::Record, kvs: &'a slog::OwnedKVList) -> Self {
+ LazyLog { record, kvs }
+ }
+}
+
+impl<'a> fmt::Display for LazyLog<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{}", self.record.msg())?;
+
+ let mut ser = StringSerializer::new();
+
+ self.kvs
+ .serialize(self.record, &mut ser)
+ .map_err(|_| fmt::Error)?;
+ self.record
+ .kv()
+ .serialize(self.record, &mut ser)
+ .map_err(|_| fmt::Error)?;
+
+ write!(f, "{}", ser.finish())
+ }
+}
+
+struct StringSerializer {
+ inner: String,
+}
+
+impl StringSerializer {
+ fn new() -> Self {
+ StringSerializer {
+ inner: String::new(),
+ }
+ }
+
+ fn finish(self) -> String {
+ self.inner
+ }
+}
+
+impl slog::Serializer for StringSerializer {
+ fn emit_arguments(&mut self, key: slog::Key, value: &Arguments) -> slog::Result {
+ write!(self.inner, ", {}: {}", key, value)?;
+ Ok(())
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index f755db0..51b1e38 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,22 +2,25 @@ use std::fs::File;
use std::io::{Read, Write};
use std::path::PathBuf;
use std::thread;
-use std::time::Duration;
-use log::{debug, error, info};
+use slog::{debug, error, info, o, Drain, Logger};
use structopt::clap::AppSettings;
use structopt::StructOpt;
+#[cfg(unix)]
+use tokio::signal::unix::*;
use tsclientlib::Identity;
mod audio_player;
mod bot;
mod command;
+mod log_bridge;
mod playlist;
mod teamspeak;
mod web_server;
mod youtube_dl;
-use bot::{MasterArgs, MasterBot, MusicBot, MusicBotArgs};
+use bot::{MasterArgs, MasterBot, MusicBot, MusicBotArgs, Quit};
+use log_bridge::LogBridge;
#[derive(StructOpt, Debug)]
#[structopt(global_settings = &[AppSettings::ColoredHelp])]
@@ -51,6 +54,10 @@ pub struct Args {
help = "The channel the master bot should connect to"
)]
master_channel: Option<String>,
+ // 0. Print nothing
+ // 1. Print command string
+ // 2. Print packets
+ // 3. Print udp packets
#[structopt(
short = "v",
long = "verbose",
@@ -58,25 +65,44 @@ pub struct Args {
parse(from_occurrences)
)]
verbose: u8,
- // 0. Print nothing
- // 1. Print command string
- // 2. Print packets
- // 3. Print udp packets
}
#[tokio::main]
async fn main() {
- if let Err(e) = run().await {
- println!("Error: {}", e);
+ let root_logger = {
+ let config = log4rs::load_config_file("log4rs.yml", Default::default()).unwrap();
+ let drain = LogBridge(log4rs::Logger::new(config)).fuse();
+ // slog_async adds a channel because log4rs if not unwind safe
+ let drain = slog_async::Async::new(drain).build().fuse();
+
+ Logger::root(drain, o!())
+ };
+
+ let scope_guard = slog_scope::set_global_logger(root_logger.clone());
+ // On SIGTERM the logger resets for some reason which makes the bot panic
+ // if it tries to log anything
+ scope_guard.cancel_reset();
+
+ slog_stdlog::init().unwrap();
+
+ if let Err(e) = run(root_logger.clone()).await {
+ error!(root_logger, "{}", e);
}
}
-async fn run() -> Result<(), Box<dyn std::error::Error>> {
- log4rs::init_file("log4rs.yml", Default::default()).unwrap();
-
+async fn run(root_logger: Logger) -> Result<(), Box<dyn std::error::Error>> {
// Parse command line options
let args = Args::from_args();
+ // Set up signal handlers
+ let ctrl_c = tokio::task::spawn(tokio::signal::ctrl_c());
+ #[cfg(unix)]
+ let (sighup, sigterm, sigquit) = (
+ tokio::task::spawn(hangup()),
+ tokio::task::spawn(terminate()),
+ tokio::task::spawn(quit()),
+ );
+
let mut file = File::open(&args.config_path)?;
let mut toml = String::new();
file.read_to_string(&mut toml)?;
@@ -107,14 +133,14 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
if let Some(level) = args.wanted_level {
if let Some(id) = &mut config.id {
- info!("Upgrading master identity");
+ info!(root_logger, "Upgrading master identity");
id.upgrade_level(level).expect("can upgrade level");
}
if let Some(ids) = &mut config.ids {
let len = ids.len();
for (i, id) in ids.iter_mut().enumerate() {
- info!("Upgrading bot identity {}/{}", i + 1, len);
+ info!(root_logger, "Upgrading bot identity"; "current" => i + 1, "amount" => len);
id.upgrade_level(level).expect("can upgrade level");
}
}
@@ -127,53 +153,101 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
}
if config.id.is_none() || config.ids.is_none() {
- error!("Failed to find required identites, try running with `-g`");
+ error!(
+ root_logger,
+ "Failed to find required identites, try running with `-g`"
+ );
return Ok(());
}
+ let local = args.local;
let bot_args = config.merge(args);
- info!("Starting PokeBot!");
- debug!("Received CLI arguments: {:?}", std::env::args());
+ info!(root_logger, "Starting PokeBot!");
+ debug!(root_logger, "Received CLI arguments"; "args" => ?std::env::args());
- if bot_args.local {
+ if local {
let name = bot_args.names[0].clone();
- let id = bot_args.ids.expect("identies should exists")[0].clone();
-
- let disconnect_cb = Box::new(move |_, _, _| {});
+ let identity = bot_args.ids.expect("identies should exists")[0].clone();
let bot_args = MusicBotArgs {
name,
- name_index: 0,
- id_index: 0,
+ master: None,
local: true,
address: bot_args.address.clone(),
- id,
+ identity,
channel: String::from("local"),
verbose: bot_args.verbose,
- disconnect_cb,
+ logger: root_logger,
};
- MusicBot::new(bot_args).await.1.await;
+ MusicBot::spawn(bot_args).await;
+
+ ctrl_c.await??;
} else {
let domain = bot_args.domain.clone();
let bind_address = bot_args.bind_address.clone();
- let (bot, fut) = MasterBot::new(bot_args).await;
-
- thread::spawn(|| {
- let web_args = web_server::WebServerArgs {
- domain,
- bind_address,
- bot,
- };
- if let Err(e) = web_server::start(web_args) {
- error!("Error in web server: {}", e);
+ let bot_name = bot_args.master_name.clone();
+ let bot_logger = root_logger.new(o!("master" => bot_name.clone()));
+ let bot = MasterBot::spawn(bot_args, bot_logger).await;
+
+ let web_args = web_server::WebServerArgs {
+ domain,
+ bind_address,
+ bot: bot.downgrade(),
+ };
+ spawn_web_server(web_args, root_logger.new(o!("webserver" => bot_name)));
+
+ #[cfg(unix)]
+ tokio::select! {
+ res = ctrl_c => {
+ res??;
+ info!(root_logger, "Received signal, shutting down"; "signal" => "SIGINT");
}
- });
+ _ = sigterm => {
+ info!(root_logger, "Received signal, shutting down"; "signal" => "SIGTERM");
+ }
+ _ = sighup => {
+ info!(root_logger, "Received signal, shutting down"; "signal" => "SIGHUP");
+ }
+ _ = sigquit => {
+ info!(root_logger, "Received signal, shutting down"; "signal" => "SIGQUIT");
+ }
+ };
+
+ #[cfg(windows)]
+ ctrl_c.await??;
- fut.await;
- // Keep tokio running while the bot disconnects
- tokio::time::delay_for(Duration::from_secs(1)).await;
+ bot.send(Quit(String::from("Stopping")))
+ .await
+ .unwrap()
+ .unwrap();
}
Ok(())
}
+
+pub fn spawn_web_server(args: web_server::WebServerArgs, logger: Logger) {
+ thread::spawn(move || {
+ if let Err(e) = web_server::start(args, logger.clone()) {
+ error!(logger, "Error in web server"; "error" => %e);
+ }
+ });
+}
+
+#[cfg(unix)]
+pub async fn terminate() -> std::io::Result<()> {
+ signal(SignalKind::terminate())?.recv().await;
+ Ok(())
+}
+
+#[cfg(unix)]
+pub async fn hangup() -> std::io::Result<()> {
+ signal(SignalKind::hangup())?.recv().await;
+ Ok(())
+}
+
+#[cfg(unix)]
+pub async fn quit() -> std::io::Result<()> {
+ signal(SignalKind::quit())?.recv().await;
+ Ok(())
+}
diff --git a/src/playlist.rs b/src/playlist.rs
index 445f8a5..31fcfc0 100644
--- a/src/playlist.rs
+++ b/src/playlist.rs
@@ -1,29 +1,35 @@
use std::collections::VecDeque;
-use log::info;
+use slog::{info, Logger};
use crate::youtube_dl::AudioMetadata;
pub struct Playlist {
data: VecDeque<AudioMetadata>,
+ logger: Logger,
}
impl Playlist {
- pub fn new() -> Self {
+ pub fn new(logger: Logger) -> Self {
Self {
data: VecDeque::new(),
+ logger,
}
}
pub fn push(&mut self, data: AudioMetadata) {
- info!("Adding {:?} to playlist", &data.title);
+ info!(self.logger, "Adding to playlist"; "title" => &data.title);
self.data.push_front(data)
}
pub fn pop(&mut self) -> Option<AudioMetadata> {
let res = self.data.pop_back();
- info!("Popping {:?} from playlist", res.as_ref().map(|r| &r.title));
+ info!(
+ self.logger,
+ "Popping from playlist";
+ "title" => res.as_ref().map(|r| &r.title)
+ );
res
}
@@ -45,6 +51,6 @@ impl Playlist {
pub fn clear(&mut self) {
self.data.clear();
- info!("Cleared playlist")
+ info!(self.logger, "Cleared playlist")
}
}
diff --git a/src/teamspeak/mod.rs b/src/teamspeak/mod.rs
index beb3f44..7a68d38 100644
--- a/src/teamspeak/mod.rs
+++ b/src/teamspeak/mod.rs
@@ -1,7 +1,5 @@
-use std::sync::{Arc, RwLock};
-
use futures::stream::StreamExt;
-use tokio::sync::mpsc::UnboundedSender;
+use xtra::{Actor, Handler, WeakAddress};
use tsclientlib::data::exts::{M2BClientEditExt, M2BClientUpdateExt};
use tsclientlib::{
@@ -10,9 +8,9 @@ use tsclientlib::{
ChannelId, ClientId, ConnectOptions, DisconnectOptions, MessageTarget, OutCommandExt, Reason,
};
-use log::{debug, error};
+use slog::{debug, error, info, trace, Logger};
-use crate::bot::{Message, MusicBotMessage};
+use crate::bot::{ChatMessage, MusicBotMessage};
mod bbcode;
@@ -20,7 +18,8 @@ pub use bbcode::*;
#[derive(Clone)]
pub struct TeamSpeakConnection {
- handle: SyncConnectionHandle,
+ handle: Option<SyncConnectionHandle>,
+ logger: Logger,
}
fn get_message(event: &Event) -> Option<MusicBotMessage> {
@@ -31,7 +30,7 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> {
target,
invoker: sender,
message: msg,
- } => Some(MusicBotMessage::TextMessage(Message {
+ } => Some(MusicBotMessage::TextMessage(ChatMessage {
target: *target,
invoker: sender.clone(),
text: msg.clone(),
@@ -86,50 +85,82 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> {
}
impl TeamSpeakConnection {
- pub async fn new(
- tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
+ pub async fn new(logger: Logger) -> Result<TeamSpeakConnection, tsclientlib::Error> {
+ Ok(TeamSpeakConnection {
+ handle: None,
+ logger,
+ })
+ }
+
+ pub fn connect_for_bot<T: Actor + Handler<MusicBotMessage>>(
+ &mut self,
options: ConnectOptions,
- ) -> Result<TeamSpeakConnection, tsclientlib::Error> {
+ bot: WeakAddress<T>,
+ ) -> Result<(), tsclientlib::Error> {
+ info!(self.logger, "Starting TeamSpeak connection");
+
let conn = options.connect()?;
- let conn = SyncConnection::from(conn);
- let mut handle = conn.get_handle();
-
- tokio::spawn(conn.for_each(move |i| {
- let tx = tx.clone();
- async move {
- match i {
- Ok(SyncStreamItem::ConEvents(events)) => {
+ let mut conn = SyncConnection::from(conn);
+ let handle = conn.get_handle();
+ self.handle = Some(handle);
+
+ let ev_logger = self.logger.clone();
+ tokio::spawn(async move {
+ while let Some(item) = conn.next().await {
+ use SyncStreamItem::*;
+
+ match item {
+ Ok(ConEvents(events)) => {
for event in &events {
if let Some(msg) = get_message(event) {
- let tx = tx.read().expect("RwLock was not poisoned");
- // Ignore the result because the receiver might get dropped first.
- let _ = tx.send(msg);
+ tokio::spawn(bot.send(msg));
}
}
}
- Err(e) => error!("Error occured during event reading: {}", e),
- Ok(SyncStreamItem::DisconnectedTemporarily) => debug!("Temporary disconnect!"),
- _ => (),
+ Err(e) => error!(ev_logger, "Error occured during event reading: {}", e),
+ Ok(DisconnectedTemporarily(r)) => {
+ debug!(ev_logger, "Temporary disconnect"; "reason" => ?r)
+ }
+ Ok(Audio(_)) => {
+ trace!(ev_logger, "Audio received");
+ }
+ Ok(IdentityLevelIncreasing(_)) => {
+ trace!(ev_logger, "Identity level increasing");
+ }
+ Ok(IdentityLevelIncreased) => {
+ trace!(ev_logger, "Identity level increased");
+ }
+ Ok(NetworkStatsUpdated) => {
+ trace!(ev_logger, "Network stats updated");
+ }
}
}
- }));
-
- handle.wait_until_connected().await?;
-
- let mut chandle = handle.clone();
- chandle
- .with_connection(|mut conn| {
- conn.get_state()
- .expect("is connected")
- .server
- .set_subscribed(true)
- .send(&mut conn)
- .unwrap()
- })
- .await
- .unwrap();
-
- Ok(TeamSpeakConnection { handle })
+ });
+
+ let mut handle = self.handle.clone();
+ tokio::spawn(async move {
+ handle
+ .as_mut()
+ .expect("connect_for_bot was called")
+ .wait_until_connected()
+ .await
+ .unwrap();
+ handle
+ .as_mut()
+ .expect("connect_for_bot was called")
+ .with_connection(|mut conn| {
+ conn.get_state()
+ .expect("can get state")
+ .server
+ .set_subscribed(true)
+ .send(&mut conn)
+ })
+ .await
+ .and_then(|v| v)
+ .unwrap();
+ });
+
+ Ok(())
}
pub async fn send_audio_packet(&mut self, samples: &[u8]) {
@@ -140,22 +171,25 @@ impl TeamSpeakConnection {
data: samples,
});
- self.handle
- .with_connection(|conn| {
- if let Err(e) = conn
- .get_tsproto_client_mut()
+ if let Err(e) = self
+ .handle
+ .as_mut()
+ .expect("connect_for_bot was called")
+ .with_connection(move |conn| {
+ conn.get_tsproto_client_mut()
.expect("can get tsproto client")
.send_packet(packet)
- {
- error!("Failed to send voice packet: {}", e);
- }
})
.await
- .unwrap();
+ {
+ error!(self.logger, "Failed to send voice packet: {}", e);
+ }
}
pub async fn channel_of_user(&mut self, id: ClientId) -> Option<ChannelId> {
self.handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |conn| {
conn.get_state()
.expect("can get state")
@@ -164,30 +198,28 @@ impl TeamSpeakConnection {
.map(|c| c.channel)
})
.await
- .unwrap()
+ .map_err(|e| error!(self.logger, "Failed to get channel of user"; "error" => %e))
+ .ok()
+ .and_then(|v| v)
}
pub async fn channel_path_of_user(&mut self, id: ClientId) -> Option<String> {
self.handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |conn| {
let state = conn.get_state().expect("can get state");
let channel_id = state.clients.get(&id)?.channel;
- let mut channel = state
- .channels
- .get(&channel_id)
- .expect("can find user channel");
+ let mut channel = state.channels.get(&channel_id)?;
let mut names = vec![&channel.name[..]];
// Channel 0 is the root channel
while channel.parent != ChannelId(0) {
names.push("/");
- channel = state
- .channels
- .get(&channel.parent)
- .expect("can find user channel");
+ channel = state.channels.get(&channel.parent)?;
names.push(&channel.name);
}
@@ -199,11 +231,15 @@ impl TeamSpeakConnection {
Some(path)
})
.await
- .unwrap()
+ .map_err(|e| error!(self.logger, "Failed to get channel path of user"; "error" => %e))
+ .ok()
+ .and_then(|v| v)
}
- pub async fn my_channel(&mut self) -> ChannelId {
+ pub async fn current_channel(&mut self) -> Option<ChannelId> {
self.handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |conn| {
let state = conn.get_state().expect("can get state");
state
@@ -213,11 +249,14 @@ impl TeamSpeakConnection {
.channel
})
.await
- .unwrap()
+ .map_err(|e| error!(self.logger, "Failed to get channel"; "error" => %e))
+ .ok()
}
pub async fn my_id(&mut self) -> ClientId {
self.handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |conn| conn.get_state().expect("can get state").own_client)
.await
.unwrap()
@@ -225,6 +264,8 @@ impl TeamSpeakConnection {
pub async fn user_count(&mut self, channel: ChannelId) -> u32 {
self.handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |conn| {
let state = conn.get_state().expect("can get state");
let mut count = 0;
@@ -241,88 +282,90 @@ impl TeamSpeakConnection {
}
pub async fn set_nickname(&mut self, name: String) {
- self.handle
+ if let Err(e) = self
+ .handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |mut conn| {
conn.get_state()
.expect("can get state")
.client_update()
.set_name(&name)
.send(&mut conn)
- .map_err(|e| error!("Failed to set nickname: {}", e))
})
.await
- .unwrap()
- .unwrap();
+ .and_then(|v| v)
+ {
+ error!(self.logger, "Failed to set nickname: {}", e);
+ }
}
pub async fn set_description(&mut self, desc: String) {
- self.handle
+ if let Err(e) = self
+ .handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |mut conn| {
let state = conn.get_state().expect("can get state");
- let _ = state
+ state
.clients
.get(&state.own_client)
.expect("can get myself")
.edit()
.set_description(&desc)
.send(&mut conn)
- .map_err(|e| error!("Failed to change description: {}", e));
})
.await
- .unwrap()
+ .and_then(|v| v)
+ {
+ error!(self.logger, "Failed to change description: {}", e);
+ }
}
pub async fn send_message_to_channel(&mut self, text: String) {
- self.handle
+ if let Err(e) = self
+ .handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |mut conn| {
- let _ = conn
- .get_state()
+ conn.get_state()
.expect("can get state")
.send_message(MessageTarget::Channel, &text)
.send(&mut conn)
- .map_err(|e| error!("Failed to send message: {}", e));
})
.await
- .unwrap()
+ .and_then(|v| v)
+ {
+ error!(self.logger, "Failed to send message: {}", e);
+ }
}
pub async fn send_message_to_user(&mut self, client: ClientId, text: String) {
- self.handle
+ if let Err(e) = self
+ .handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |mut conn| {
- let _ = conn
- .get_state()
+ conn.get_state()
.expect("can get state")
.send_message(MessageTarget::Client(client), &text)
.send(&mut conn)
- .map_err(|e| error!("Failed to send message: {}", e));
})
.await
- .unwrap()
+ .and_then(|v| v)
+ {
+ error!(self.logger, "Failed to send message: {}", e);
+ }
}
- pub async fn subscribe(&mut self, id: ChannelId) {
- self.handle
- .with_connection(move |mut conn| {
- let channel = match conn.get_state().expect("can get state").channels.get(&id) {
- Some(c) => c,
- None => {
- error!("Failed to find channel to subscribe to");
- return;
- }
- };
-
- if let Err(e) = channel.set_subscribed(true).send(&mut conn) {
- error!("Failed to send subscribe packet: {}", e);
- }
- })
- .await
- .unwrap()
- }
-
- pub async fn disconnect(&mut self, reason: &str) {
+ pub async fn disconnect(&mut self, reason: &str) -> Result<(), tsclientlib::Error> {
let opt = DisconnectOptions::new()
.reason(Reason::Clientdisconnect)
.message(reason);
- self.handle.disconnect(opt).await.unwrap();
+ self.handle
+ .as_mut()
+ .expect("connect_for_bot was called")
+ .disconnect(opt)
+ .await
}
}
diff --git a/src/web_server.rs b/src/web_server.rs
index d731fae..8e5d446 100644
--- a/src/web_server.rs
+++ b/src/web_server.rs
@@ -1,38 +1,38 @@
-use std::sync::Arc;
use std::time::Duration;
-use actix::{Actor, Addr};
-use actix_web::{get, middleware::Logger, post, web, App, HttpServer, Responder};
-use askama::Template;
-use askama_actix::TemplateIntoResponse;
+use actix_slog::StructuredLogger;
+use actix_web::{get, post, web, App, HttpServer, Responder};
+use askama_actix::{Template, TemplateIntoResponse};
use serde::{Deserialize, Serialize};
+use slog::Logger;
+use xtra::WeakAddress;
use crate::bot::MasterBot;
use crate::youtube_dl::AudioMetadata;
mod api;
-mod bot_executor;
+mod bot_data;
mod default;
mod front_end_cookie;
mod tmtu;
-pub use bot_executor::*;
+pub use bot_data::*;
use front_end_cookie::FrontEnd;
pub struct WebServerArgs {
pub domain: String,
pub bind_address: String,
- pub bot: Arc<MasterBot>,
+ pub bot: WeakAddress<MasterBot>,
}
#[actix_rt::main]
-pub async fn start(args: WebServerArgs) -> std::io::Result<()> {
- let cbot = args.bot.clone();
- let bot_addr: Addr<BotExecutor> = BotExecutor(cbot.clone()).start();
+pub async fn start(args: WebServerArgs, logger: Logger) -> std::io::Result<()> {
+ let bot = args.bot;
+ let bind_address = args.bind_address;
HttpServer::new(move || {
App::new()
- .data(bot_addr.clone())
- .wrap(Logger::default())
+ .data(bot.clone())
+ .wrap(StructuredLogger::new(logger.clone()))
.service(index)
.service(get_bot)
.service(post_front_end)
@@ -44,12 +44,10 @@ pub async fn start(args: WebServerArgs) -> std::io::Result<()> {
.service(web::scope("/docs").service(get_api_docs))
.service(actix_files::Files::new("/static", "web_server/static/"))
})
- .bind(args.bind_address)?
+ .bind(bind_address)?
.run()
.await?;
- args.bot.quit(String::from("Stopping"));
-
Ok(())
}
@@ -75,7 +73,7 @@ pub struct BotData {
}
#[get("/")]
-async fn index(bot: web::Data<Addr<BotExecutor>>, front: FrontEnd) -> impl Responder {
+async fn index(bot: web::Data<WeakAddress<MasterBot>>, front: FrontEnd) -> impl Responder {
match front {
FrontEnd::Default => default::index(bot).await,
FrontEnd::Tmtu => tmtu::index(bot).await,
@@ -84,7 +82,7 @@ async fn index(bot: web::Data<Addr<BotExecutor>>, front: FrontEnd) -> impl Respo
#[get("/bot/{name}")]
async fn get_bot(
- bot: web::Data<Addr<BotExecutor>>,
+ bot: web::Data<WeakAddress<MasterBot>>,
name: web::Path<String>,
front: FrontEnd,
) -> impl Responder {
diff --git a/src/web_server/api.rs b/src/web_server/api.rs
index 4deedad..b1d50c4 100644
--- a/src/web_server/api.rs
+++ b/src/web_server/api.rs
@@ -1,22 +1,23 @@
-use actix::Addr;
use actix_web::{get, web, HttpResponse, Responder, ResponseError};
use derive_more::Display;
use serde::Serialize;
+use xtra::WeakAddress;
-use crate::web_server::{BotDataListRequest, BotDataRequest, BotExecutor};
+use crate::web_server::{BotDataListRequest, BotDataRequest};
+use crate::MasterBot;
#[get("/bots")]
-pub async fn get_bot_list(bot: web::Data<Addr<BotExecutor>>) -> impl Responder {
- let bot_datas = match bot.send(BotDataListRequest).await.unwrap() {
- Ok(data) => data,
- Err(_) => Vec::with_capacity(0),
- };
+pub async fn get_bot_list(bot: web::Data<WeakAddress<MasterBot>>) -> impl Responder {
+ let bot_datas = bot.send(BotDataListRequest).await.unwrap();
web::Json(bot_datas)
}
#[get("/bots/{name}")]
-pub async fn get_bot(bot: web::Data<Addr<BotExecutor>>, name: web::Path<String>) -> impl Responder {
+pub async fn get_bot(
+ bot: web::Data<WeakAddress<MasterBot>>,
+ name: web::Path<String>,
+) -> impl Responder {
if let Some(bot_data) = bot.send(BotDataRequest(name.into_inner())).await.unwrap() {
Ok(web::Json(bot_data))
} else {
diff --git a/src/web_server/bot_data.rs b/src/web_server/bot_data.rs
new file mode 100644
index 0000000..af0c5e1
--- /dev/null
+++ b/src/web_server/bot_data.rs
@@ -0,0 +1,47 @@
+use async_trait::async_trait;
+
+use xtra::{Context, Handler, Message};
+
+use crate::bot::MasterBot;
+use crate::web_server::BotData;
+
+pub struct BotNameListRequest;
+
+impl Message for BotNameListRequest {
+ type Result = Vec<String>;
+}
+
+#[async_trait]
+impl Handler<BotNameListRequest> for MasterBot {
+ async fn handle(&mut self, _: BotNameListRequest, _: &mut Context<Self>) -> Vec<String> {
+ self.bot_names()
+ }
+}
+
+pub struct BotDataListRequest;
+
+impl Message for BotDataListRequest {
+ type Result = Vec<BotData>;
+}
+
+#[async_trait]
+impl Handler<BotDataListRequest> for MasterBot {
+ async fn handle(&mut self, _: BotDataListRequest, _: &mut Context<Self>) -> Vec<BotData> {
+ self.bot_datas().await
+ }
+}
+
+pub struct BotDataRequest(pub String);
+
+impl Message for BotDataRequest {
+ type Result = Option<BotData>;
+}
+
+#[async_trait]
+impl Handler<BotDataRequest> for MasterBot {
+ async fn handle(&mut self, r: BotDataRequest, _: &mut Context<Self>) -> Option<BotData> {
+ let name = r.0;
+
+ self.bot_data(name).await
+ }
+}
diff --git a/src/web_server/bot_executor.rs b/src/web_server/bot_executor.rs
deleted file mode 100644
index 0d3e7b7..0000000
--- a/src/web_server/bot_executor.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-use std::sync::Arc;
-
-use actix::{Actor, Context, Handler, Message};
-
-use crate::bot::MasterBot;
-use crate::web_server::BotData;
-
-pub struct BotExecutor(pub Arc<MasterBot>);
-
-impl Actor for BotExecutor {
- type Context = Context<Self>;
-}
-
-pub struct BotNameListRequest;
-
-impl Message for BotNameListRequest {
- // A plain Vec does not work for some reason
- type Result = Result<Vec<String>, ()>;
-}
-
-impl Handler<BotNameListRequest> for BotExecutor {
- type Result = Result<Vec<String>, ()>;
-
- fn handle(&mut self, _: BotNameListRequest, _: &mut Self::Context) -> Self::Result {
- let bot = &self.0;
-
- Ok(bot.bot_names())
- }
-}
-
-pub struct BotDataListRequest;
-
-impl Message for BotDataListRequest {
- // A plain Vec does not work for some reason
- type Result = Result<Vec<BotData>, ()>;
-}
-
-impl Handler<BotDataListRequest> for BotExecutor {
- type Result = Result<Vec<BotData>, ()>;
-
- fn handle(&mut self, _: BotDataListRequest, _: &mut Self::Context) -> Self::Result {
- let bot = &self.0;
-
- Ok(bot.bot_datas())
- }
-}
-
-pub struct BotDataRequest(pub String);
-
-impl Message for BotDataRequest {
- type Result = Option<BotData>;
-}
-
-impl Handler<BotDataRequest> for BotExecutor {
- type Result = Option<BotData>;
-
- fn handle(&mut self, r: BotDataRequest, _: &mut Self::Context) -> Self::Result {
- let name = r.0;
- let bot = &self.0;
-
- bot.bot_data(name)
- }
-}
diff --git a/src/web_server/default.rs b/src/web_server/default.rs
index 542dade..6b15784 100644
--- a/src/web_server/default.rs
+++ b/src/web_server/default.rs
@@ -1,9 +1,9 @@
-use actix::Addr;
use actix_web::{http::header, web, Error, HttpResponse};
-use askama::Template;
-use askama_actix::TemplateIntoResponse;
+use askama_actix::{Template, TemplateIntoResponse};
+use xtra::WeakAddress;
-use crate::web_server::{filters, BotData, BotDataRequest, BotExecutor, BotNameListRequest};
+use crate::web_server::{filters, BotData, BotDataRequest, BotNameListRequest};
+use crate::MasterBot;
#[derive(Template)]
#[template(path = "index.htm")]
@@ -12,8 +12,8 @@ struct OverviewTemplate<'a> {
bot: Option<&'a BotData>,
}
-pub async fn index(bot: web::Data<Addr<BotExecutor>>) -> Result<HttpResponse, Error> {
- let bot_names = bot.send(BotNameListRequest).await.unwrap().unwrap();
+pub async fn index(bot: web::Data<WeakAddress<MasterBot>>) -> Result<HttpResponse, Error> {
+ let bot_names = bot.send(BotNameListRequest).await.unwrap();
OverviewTemplate {
bot_names: &bot_names,
@@ -23,10 +23,10 @@ pub async fn index(bot: web::Data<Addr<BotExecutor>>) -> Result<HttpResponse, Er
}
pub async fn get_bot(
- bot: web::Data<Addr<BotExecutor>>,
+ bot: web::Data<WeakAddress<MasterBot>>,
name: String,
) -> Result<HttpResponse, Error> {
- let bot_names = bot.send(BotNameListRequest).await.unwrap().unwrap();
+ let bot_names = bot.send(BotNameListRequest).await.unwrap();
if let Some(bot) = bot.send(BotDataRequest(name)).await.unwrap() {
OverviewTemplate {
@@ -35,7 +35,6 @@ pub async fn get_bot(
}
.into_response()
} else {
- // TODO to 404 or not to 404
Ok(HttpResponse::Found().header(header::LOCATION, "/").finish())
}
}
diff --git a/src/web_server/tmtu.rs b/src/web_server/tmtu.rs
index 33a14af..e9eea98 100644
--- a/src/web_server/tmtu.rs
+++ b/src/web_server/tmtu.rs
@@ -1,9 +1,9 @@
-use actix::Addr;
use actix_web::{http::header, web, Error, HttpResponse};
-use askama::Template;
-use askama_actix::TemplateIntoResponse;
+use askama_actix::{Template, TemplateIntoResponse};
+use xtra::WeakAddress;
-use crate::web_server::{filters, BotData, BotDataRequest, BotExecutor, BotNameListRequest};
+use crate::web_server::{filters, BotData, BotDataRequest, BotNameListRequest};
+use crate::MasterBot;
#[derive(Template)]
#[template(path = "tmtu/index.htm")]
@@ -12,8 +12,8 @@ struct TmtuTemplate {
bot: Option<BotData>,
}
-pub async fn index(bot: web::Data<Addr<BotExecutor>>) -> Result<HttpResponse, Error> {
- let bot_names = bot.send(BotNameListRequest).await.unwrap().unwrap();
+pub async fn index(bot: web::Data<WeakAddress<MasterBot>>) -> Result<HttpResponse, Error> {
+ let bot_names = bot.send(BotNameListRequest).await.unwrap();
TmtuTemplate {
bot_names,
@@ -23,10 +23,10 @@ pub async fn index(bot: web::Data<Addr<BotExecutor>>) -> Result<HttpResponse, Er
}
pub async fn get_bot(
- bot: web::Data<Addr<BotExecutor>>,
+ bot: web::Data<WeakAddress<MasterBot>>,
name: String,
) -> Result<HttpResponse, Error> {
- let bot_names = bot.send(BotNameListRequest).await.unwrap().unwrap();
+ let bot_names = bot.send(BotNameListRequest).await.unwrap();
if let Some(bot) = bot.send(BotDataRequest(name)).await.unwrap() {
TmtuTemplate {
@@ -35,7 +35,6 @@ pub async fn get_bot(
}
.into_response()
} else {
- // TODO to 404 or not to 404
Ok(HttpResponse::Found().header(header::LOCATION, "/").finish())
}
}
diff --git a/src/youtube_dl.rs b/src/youtube_dl.rs
index 496d0b4..ea10107 100644
--- a/src/youtube_dl.rs
+++ b/src/youtube_dl.rs
@@ -5,7 +5,7 @@ use tokio::process::Command;
use serde::{Deserialize, Serialize};
-use log::debug;
+use slog::{debug, Logger};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct AudioMetadata {
@@ -28,13 +28,16 @@ where
Ok(dur.map(Duration::from_secs_f64))
}
-pub async fn get_audio_download_from_url(uri: String) -> Result<AudioMetadata, String> {
+pub async fn get_audio_download_from_url(
+ url: String,
+ logger: &Logger,
+) -> Result<AudioMetadata, String> {
//youtube-dl sometimes just fails, so we give it a second try
- let ytdl_output = match run_youtube_dl(&uri).await {
+ let ytdl_output = match run_youtube_dl(&url, &logger).await {
Ok(o) => o,
Err(e) => {
if e.contains("Unable to extract video data") {
- run_youtube_dl(&uri).await?
+ run_youtube_dl(&url, &logger).await?
} else {
return Err(e);
}
@@ -46,14 +49,14 @@ pub async fn get_audio_download_from_url(uri: String) -> Result<AudioMetadata, S
Ok(output)
}
-async fn run_youtube_dl(url: &str) -> Result<String, String> {
+async fn run_youtube_dl(url: &str, logger: &Logger) -> Result<String, String> {
let ytdl_args = ["--no-playlist", "-f", "bestaudio/best", "-j", &url];
let mut cmd = Command::new("youtube-dl");
cmd.args(&ytdl_args);
cmd.stdin(Stdio::null());
- debug!("yt-dl command: {:?}", cmd);
+ debug!(logger, "running yt-dl"; "command" => ?cmd);
let ytdl_output = cmd.output().await.unwrap();
if !ytdl_output.status.success() {