diff options
| author | Jokler <jokler@protonmail.com> | 2020-10-14 00:19:27 +0200 |
|---|---|---|
| committer | Jokler <jokler@protonmail.com> | 2020-10-15 01:45:29 +0200 |
| commit | 4e1c2b9f04073294ecb8402486c20d9c01721598 (patch) | |
| tree | 93fe1d75477ae3d1c8466611a2cedd7bed316aa2 /src | |
| parent | 23671b51b4e207574a63bce820acbf43169e2b6c (diff) | |
| download | pokebot-4e1c2b9f04073294ecb8402486c20d9c01721598.tar.gz pokebot-4e1c2b9f04073294ecb8402486c20d9c01721598.zip | |
Replace channels&locks with actors & log with slog
Diffstat (limited to 'src')
| -rw-r--r-- | src/audio_player.rs | 328 | ||||
| -rw-r--r-- | src/bot/master.rs | 363 | ||||
| -rw-r--r-- | src/bot/music.rs | 500 | ||||
| -rw-r--r-- | src/log_bridge.rs | 101 | ||||
| -rw-r--r-- | src/main.rs | 154 | ||||
| -rw-r--r-- | src/playlist.rs | 16 | ||||
| -rw-r--r-- | src/teamspeak/mod.rs | 245 | ||||
| -rw-r--r-- | src/web_server.rs | 34 | ||||
| -rw-r--r-- | src/web_server/api.rs | 17 | ||||
| -rw-r--r-- | src/web_server/bot_data.rs | 47 | ||||
| -rw-r--r-- | src/web_server/bot_executor.rs | 63 | ||||
| -rw-r--r-- | src/web_server/default.rs | 17 | ||||
| -rw-r--r-- | src/web_server/tmtu.rs | 17 | ||||
| -rw-r--r-- | src/youtube_dl.rs | 15 |
14 files changed, 1067 insertions, 850 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs index 1f6649f..23581f9 100644 --- a/src/audio_player.rs +++ b/src/audio_player.rs @@ -7,36 +7,31 @@ use gstreamer as gst; use gstreamer_app::{AppSink, AppSinkCallbacks}; use gstreamer_audio::{StreamVolume, StreamVolumeFormat}; -use crate::bot::{MusicBotMessage, State}; use glib::BoolError; -use log::{debug, error, info, warn}; -use std::sync::{Arc, RwLock}; -use tokio::sync::mpsc::UnboundedSender; +use slog::{debug, error, info, warn, Logger}; +use xtra::WeakAddress; +use crate::bot::{MusicBot, MusicBotMessage, State}; use crate::command::{Seek, VolumeChange}; use crate::youtube_dl::AudioMetadata; static GST_INIT: Once = Once::new(); -#[derive(PartialEq, Eq, Debug, Clone, Copy)] -pub enum PollResult { - Continue, - Quit, -} - pub struct AudioPlayer { pipeline: gst::Pipeline, bus: gst::Bus, http_src: gst::Element, - volume_f64: RwLock<f64>, + volume_f64: f64, volume: gst::Element, - sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, - currently_playing: RwLock<Option<AudioMetadata>>, + currently_playing: Option<AudioMetadata>, + + logger: Logger, } fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> { - Ok(gst::ElementFactory::make(factoryname, Some(display_name))?) + Ok(gst::ElementFactory::make(factoryname, Some(display_name)) + .map_err(|_| AudioPlayerError::MissingPlugin(factoryname.to_string()))?) } fn link_elements(a: &gst::Element, b: &gst::Element) -> Result<(), AudioPlayerError> { @@ -49,11 +44,12 @@ fn add_decode_bin_new_pad_callback( decode_bin: &gst::Element, audio_bin: gst::Bin, ghost_pad: gst::GhostPad, + logger: Logger, ) { decode_bin.connect_pad_added(move |_, new_pad| { - debug!("New pad received on decode bin"); + debug!(logger, "New pad received on decode bin"); let name = if let Some(caps) = new_pad.get_current_caps() { - debug!("Pad caps: {}", caps.to_string()); + debug!(logger, "Found caps"; "caps" => caps.to_string()); if let Some(structure) = caps.get_structure(0) { Some(structure.get_name().to_string()) } else { @@ -68,7 +64,7 @@ fn add_decode_bin_new_pad_callback( peer.unlink(&ghost_pad).unwrap(); } - info!("Found raw audio, linking audio bin"); + info!(logger, "Found raw audio, linking audio bin"); new_pad.link(&ghost_pad).unwrap(); audio_bin.sync_state_with_parent().unwrap(); @@ -77,27 +73,15 @@ fn add_decode_bin_new_pad_callback( } impl AudioPlayer { - pub fn new( - sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, - callback: Option<Box<dyn FnMut(&[u8]) + Send>>, - ) -> Result<Self, AudioPlayerError> { + pub fn new(logger: Logger) -> Result<Self, AudioPlayerError> { GST_INIT.call_once(|| gst::init().unwrap()); - info!("Creating audio player"); + info!(logger, "Creating audio player"); let pipeline = gst::Pipeline::new(Some("TeamSpeak Audio Player")); let bus = pipeline.get_bus().unwrap(); let http_src = make_element("souphttpsrc", "http source")?; - let decode_bin = make_element("decodebin", "decode bin")?; - pipeline.add_many(&[&http_src, &decode_bin])?; - - link_elements(&http_src, &decode_bin)?; - - let (audio_bin, volume, ghost_pad) = Self::create_audio_bin(callback)?; - - add_decode_bin_new_pad_callback(&decode_bin, audio_bin.clone(), ghost_pad); - - pipeline.add(&audio_bin)?; + let volume = make_element("volume", "volume")?; // The documentation says that we have to make sure to handle // all messages if auto flushing is deactivated. @@ -111,26 +95,31 @@ impl AudioPlayer { pipeline, bus, http_src, + logger, - volume_f64: RwLock::new(0.0), + volume_f64: 0.0, volume, - sender, - currently_playing: RwLock::new(None), + currently_playing: None, }) } - fn create_audio_bin( + pub fn setup_with_audio_callback( + &self, callback: Option<Box<dyn FnMut(&[u8]) + Send>>, - ) -> Result<(gst::Bin, gst::Element, gst::GhostPad), AudioPlayerError> { + ) -> Result<(), AudioPlayerError> { + let decode_bin = make_element("decodebin", "decode bin")?; + self.pipeline.add_many(&[&self.http_src, &decode_bin])?; + + link_elements(&self.http_src, &decode_bin)?; + let audio_bin = gst::Bin::new(Some("audio bin")); let queue = make_element("queue", "audio queue")?; let convert = make_element("audioconvert", "audio converter")?; - let volume = make_element("volume", "volume")?; let resample = make_element("audioresample", "audio resampler")?; let pads = queue.get_sink_pads(); let queue_sink_pad = pads.first().unwrap(); - audio_bin.add_many(&[&queue, &convert, &volume, &resample])?; + audio_bin.add_many(&[&queue, &convert, &self.volume, &resample])?; if let Some(mut callback) = callback { let opus_enc = make_element("opusenc", "opus encoder")?; @@ -160,49 +149,64 @@ impl AudioPlayer { audio_bin.add_many(&[&opus_enc, &sink])?; - gst::Element::link_many(&[&queue, &convert, &volume, &resample, &opus_enc, &sink])?; + gst::Element::link_many(&[ + &queue, + &convert, + &self.volume, + &resample, + &opus_enc, + &sink, + ])?; } else { let sink = make_element("autoaudiosink", "auto audio sink")?; audio_bin.add_many(&[&sink])?; - gst::Element::link_many(&[&queue, &convert, &volume, &resample, &sink])?; + gst::Element::link_many(&[&queue, &convert, &self.volume, &resample, &sink])?; }; let ghost_pad = GhostPad::with_target(Some("audio bin sink"), queue_sink_pad).unwrap(); ghost_pad.set_active(true)?; audio_bin.add_pad(&ghost_pad)?; - Ok((audio_bin, volume, ghost_pad)) + add_decode_bin_new_pad_callback( + &decode_bin, + audio_bin.clone(), + ghost_pad, + self.logger.clone(), + ); + + self.pipeline.add(&audio_bin)?; + + Ok(()) } - pub fn set_metadata(&self, data: AudioMetadata) -> Result<(), AudioPlayerError> { + pub fn set_metadata(&mut self, data: AudioMetadata) -> Result<(), AudioPlayerError> { self.set_source_url(data.url.clone())?; - let mut currently_playing = self.currently_playing.write().unwrap(); - *currently_playing = Some(data); + self.currently_playing = Some(data); Ok(()) } fn set_source_url(&self, location: String) -> Result<(), AudioPlayerError> { - info!("Setting location URI: {}", location); + info!(self.logger, "Setting source"; "url" => &location); self.http_src.set_property("location", &location)?; Ok(()) } - pub fn change_volume(&self, volume: VolumeChange) -> Result<(), AudioPlayerError> { + pub fn change_volume(&mut self, volume: VolumeChange) -> Result<(), AudioPlayerError> { let new_volume = match volume { - VolumeChange::Positive(vol) => self.volume() + vol, - VolumeChange::Negative(vol) => self.volume() - vol, + VolumeChange::Positive(vol) => self.volume_f64 + vol, + VolumeChange::Negative(vol) => self.volume_f64 - vol, VolumeChange::Absolute(vol) => vol, }; let new_volume = new_volume.max(0.0).min(1.0); - *self.volume_f64.write().unwrap() = new_volume; + self.volume_f64 = new_volume; let db = 50.0 * new_volume.log10(); - info!("Setting volume: {} -> {} dB", new_volume, db); + info!(self.logger, "Setting volume"; "volume" => new_volume, "db" => db); let linear = StreamVolume::convert_volume(StreamVolumeFormat::Db, StreamVolumeFormat::Linear, db); @@ -212,36 +216,10 @@ impl AudioPlayer { Ok(()) } - pub fn is_started(&self) -> bool { - let (_, current, pending) = self.pipeline.get_state(gst::ClockTime(None)); - - match (current, pending) { - (gst::State::Null, gst::State::VoidPending) => false, - (_, gst::State::Null) => false, - (gst::State::Ready, gst::State::VoidPending) => false, - _ => true, - } - } - - pub fn volume(&self) -> f64 { - *self.volume_f64.read().unwrap() - } - - pub fn position(&self) -> Option<Duration> { - self.pipeline - .query_position::<gst::ClockTime>() - .and_then(|t| t.0.map(Duration::from_nanos)) - } - - pub fn currently_playing(&self) -> Option<AudioMetadata> { - self.currently_playing.read().unwrap().clone() - } + pub fn reset(&mut self) -> Result<(), AudioPlayerError> { + info!(self.logger, "Setting pipeline state"; "to" => "null"); - pub fn reset(&self) -> Result<(), AudioPlayerError> { - info!("Setting pipeline state to null"); - - let mut currently_playing = self.currently_playing.write().unwrap(); - *currently_playing = None; + self.currently_playing = None; self.pipeline.set_state(gst::State::Null)?; @@ -249,7 +227,7 @@ impl AudioPlayer { } pub fn play(&self) -> Result<(), AudioPlayerError> { - info!("Setting pipeline state to playing"); + info!(self.logger, "Setting pipeline state"; "to" => "playing"); self.pipeline.set_state(gst::State::Playing)?; @@ -257,7 +235,7 @@ impl AudioPlayer { } pub fn pause(&self) -> Result<(), AudioPlayerError> { - info!("Setting pipeline state to paused"); + info!(self.logger, "Setting pipeline state"; "to" => "paused"); self.pipeline.set_state(gst::State::Paused)?; @@ -289,7 +267,7 @@ impl AudioPlayer { }; let time = humantime::format_duration(absolute); - info!("Seeking to {}", time); + info!(self.logger, "Seeking"; "time" => %time); self.pipeline.seek_simple( gst::SeekFlags::FLUSH, @@ -300,121 +278,125 @@ impl AudioPlayer { } pub fn stop_current(&self) -> Result<(), AudioPlayerError> { - info!("Stopping pipeline, sending EOS"); + info!(self.logger, "Stopping pipeline, sending EOS"); self.bus.post(&gst::message::Eos::new())?; Ok(()) } - pub fn quit(&self, reason: String) { - info!("Quitting audio player"); - - if self - .bus - .post(&gst::message::Application::new(gst::Structure::new_empty( - "quit", - ))) - .is_err() - { - warn!("Tried to send \"quit\" app event on flushing bus."); + pub fn is_started(&self) -> bool { + let (_, current, pending) = self.pipeline.get_state(gst::ClockTime(None)); + + match (current, pending) { + (gst::State::Null, gst::State::VoidPending) => false, + (_, gst::State::Null) => false, + (gst::State::Ready, gst::State::VoidPending) => false, + _ => true, } + } - let sender = self.sender.read().unwrap(); - sender.send(MusicBotMessage::Quit(reason)).unwrap(); + pub fn volume(&self) -> f64 { + self.volume_f64 } - fn send_state(&self, state: State) { - info!("Sending state {:?} to application", state); - let sender = self.sender.read().unwrap(); - sender.send(MusicBotMessage::StateChange(state)).unwrap(); + pub fn position(&self) -> Option<Duration> { + self.pipeline + .query_position::<gst::ClockTime>() + .and_then(|t| t.0.map(Duration::from_nanos)) } - pub fn poll(&self) -> PollResult { - debug!("Polling GStreamer"); - 'outer: loop { - while let Some(msg) = self.bus.timed_pop(gst::ClockTime(None)) { - use gst::MessageView; - - match msg.view() { - MessageView::StateChanged(state) => { - if let Some(src) = state.get_src() { - if src.get_name() != self.pipeline.get_name() { - continue; - } - } + pub fn currently_playing(&self) -> Option<AudioMetadata> { + self.currently_playing.clone() + } + + pub fn register_bot(&self, bot: WeakAddress<MusicBot>) { + let pipeline_name = self.pipeline.get_name(); + debug!(self.logger, "Setting sync handler on gstreamer bus"); - let old = state.get_old(); - let current = state.get_current(); - let pending = state.get_pending(); - - match (old, current, pending) { - (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => { - self.send_state(State::Playing) - } - (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => { - self.send_state(State::Paused) - } - (_, gst::State::Ready, gst::State::Null) => { - self.send_state(State::Stopped) - } - (_, gst::State::Null, gst::State::VoidPending) => { - self.send_state(State::Stopped) - } - _ => { - debug!( - "Pipeline transitioned from {:?} to {:?}, with {:?} pending", - old, current, pending - ); - } + let logger = self.logger.clone(); + let handle = tokio::runtime::Handle::current(); + self.bus.set_sync_handler(move |_, msg| { + use gst::MessageView; + + match msg.view() { + MessageView::StateChanged(state) => { + if let Some(src) = state.get_src() { + if src.get_name() != pipeline_name { + return gst::BusSyncReply::Drop; } } - MessageView::Eos(..) => { - info!("End of stream reached"); - self.reset().unwrap(); - break 'outer; - } - MessageView::Warning(warn) => { - warn!( - "Warning from {:?}: {} ({:?})", - warn.get_src().map(|s| s.get_path_string()), - warn.get_error(), - warn.get_debug() - ); - break 'outer; - } - MessageView::Error(err) => { - error!( - "Error from {:?}: {} ({:?})", - err.get_src().map(|s| s.get_path_string()), - err.get_error(), - err.get_debug() - ); - break 'outer; - } - MessageView::Application(content) => { - if let Some(s) = content.get_structure() { - if s.get_name() == "quit" { - self.reset().unwrap(); - return PollResult::Quit; - } + let old = state.get_old(); + let current = state.get_current(); + let pending = state.get_pending(); + + match (old, current, pending) { + (gst::State::Paused, gst::State::Playing, gst::State::VoidPending) => { + send_state(&handle, &bot, State::Playing); + } + (gst::State::Playing, gst::State::Paused, gst::State::VoidPending) => { + send_state(&handle, &bot, State::Paused); + } + (_, gst::State::Ready, gst::State::Null) => { + send_state(&handle, &bot, State::Stopped); + } + (_, gst::State::Null, gst::State::VoidPending) => { + send_state(&handle, &bot, State::Stopped); + } + _ => { + debug!( + logger, + "Pipeline transitioned"; + "from" => ?old, + "to" => ?current, + "pending" => ?pending + ); } } - _ => { - //debug!("Unhandled message on bus: {:?}", msg) - } - }; + } + MessageView::Eos(..) => { + info!(logger, "End of stream reached"); + + send_state(&handle, &bot, State::EndOfStream); + } + MessageView::Warning(warn) => { + warn!( + logger, + "Received warning from bus"; + "source" => ?warn.get_src().map(|s| s.get_path_string()), + "error" => %warn.get_error(), + "debug" => ?warn.get_debug() + ); + } + MessageView::Error(err) => { + error!( + logger, + "Received error from bus"; + "source" => ?err.get_src().map(|s| s.get_path_string()), + "error" => %err.get_error(), + "debug" => ?err.get_debug() + ); + + send_state(&handle, &bot, State::EndOfStream); + } + _ => { + //debug!("Unhandled message on bus: {:?}", msg) + } } - } - debug!("Left GStreamer message loop"); - PollResult::Continue + gst::BusSyncReply::Drop + }); } } +fn send_state(handle: &tokio::runtime::Handle, addr: &WeakAddress<MusicBot>, state: State) { + handle.spawn(addr.send(MusicBotMessage::StateChange(state))); +} + #[derive(Debug)] pub enum AudioPlayerError { + MissingPlugin(String), GStreamerError(glib::error::BoolError), StateChangeFailed, SeekError, diff --git a/src/bot/master.rs b/src/bot/master.rs index 1480e17..94332ac 100644 --- a/src/bot/master.rs +++ b/src/bot/master.rs @@ -1,41 +1,52 @@ use std::collections::HashMap; -use std::future::Future; -use std::sync::{Arc, RwLock}; -use log::info; +use async_trait::async_trait; +use futures::future; use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc::UnboundedSender; -use tsclientlib::{ClientId, Connection, Identity, MessageTarget}; +use slog::{error, info, o, trace, Logger}; +use tsclientlib::{ClientId, ConnectOptions, Connection, Identity, MessageTarget}; +use xtra::{spawn::Tokio, Actor, Address, Context, Handler, Message, WeakAddress}; use crate::audio_player::AudioPlayerError; use crate::teamspeak::TeamSpeakConnection; use crate::Args; -use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage}; +use crate::bot::{GetBotData, GetChannel, GetName, MusicBot, MusicBotArgs, MusicBotMessage}; pub struct MasterBot { - config: Arc<MasterConfig>, - music_bots: Arc<RwLock<MusicBots>>, + config: MasterConfig, + my_addr: Option<WeakAddress<Self>>, teamspeak: TeamSpeakConnection, - sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, + available_names: Vec<String>, + available_ids: Vec<Identity>, + connected_bots: HashMap<String, Address<MusicBot>>, + rng: SmallRng, + logger: Logger, } -struct MusicBots { - rng: SmallRng, - available_names: Vec<usize>, - available_ids: Vec<usize>, - connected_bots: HashMap<String, Arc<MusicBot>>, +#[derive(Debug, Serialize, Deserialize)] +pub struct MasterArgs { + #[serde(default = "default_name")] + pub master_name: String, + pub address: String, + pub channel: Option<String>, + #[serde(default = "default_verbose")] + pub verbose: u8, + pub domain: String, + pub bind_address: String, + pub names: Vec<String>, + pub id: Option<Identity>, + pub ids: Option<Vec<Identity>>, } impl MasterBot { - pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let tx = Arc::new(RwLock::new(tx)); - info!("Starting in TeamSpeak mode"); + pub async fn spawn(args: MasterArgs, logger: Logger) -> Address<Self> { + info!(logger, "Starting in TeamSpeak mode"); let mut con_config = Connection::build(args.address.clone()) + .logger(logger.clone()) .version(tsclientlib::Version::Linux_3_3_2) .name(args.master_name.clone()) .identity(args.id.expect("identity should exist")) @@ -47,168 +58,116 @@ impl MasterBot { con_config = con_config.channel(channel); } - let connection = TeamSpeakConnection::new(tx.clone(), con_config) - .await - .unwrap(); + let connection = TeamSpeakConnection::new(logger.clone()).await.unwrap(); + trace!(logger, "Created teamspeak connection"); - let config = Arc::new(MasterConfig { + let config = MasterConfig { master_name: args.master_name, address: args.address, - names: args.names, - ids: args.ids.expect("identies should exists"), - local: args.local, verbose: args.verbose, - }); - - let name_count = config.names.len(); - let id_count = config.ids.len(); + }; - let music_bots = Arc::new(RwLock::new(MusicBots { + let bot_addr = Self { + config, + my_addr: None, + teamspeak: connection, + logger: logger.clone(), rng: SmallRng::from_entropy(), - available_names: (0..name_count).collect(), - available_ids: (0..id_count).collect(), + available_names: args.names, + available_ids: args.ids.expect("identities"), connected_bots: HashMap::new(), - })); + } + .create(None) + .spawn(&mut Tokio::Global); - let bot = Arc::new(Self { - config, - music_bots, - teamspeak: connection, - sender: tx.clone(), - }); - - let cbot = bot.clone(); - let msg_loop = async move { - 'outer: loop { - while let Some(msg) = rx.recv().await { - match msg { - MusicBotMessage::Quit(reason) => { - let mut cteamspeak = cbot.teamspeak.clone(); - cteamspeak.disconnect(&reason).await; - break 'outer; - } - MusicBotMessage::ClientDisconnected { id, .. } => { - if id == cbot.my_id().await { - // TODO Reconnect since quit was not called - break 'outer; - } - } - _ => cbot.on_message(msg).await.unwrap(), - } - } - } - }; + bot_addr.send(Connect(con_config)).await.unwrap().unwrap(); + trace!(logger, "Spawned master bot actor"); - (bot, msg_loop) + bot_addr } - async fn build_bot_args_for(&self, id: ClientId) -> Result<MusicBotArgs, BotCreationError> { - let mut cteamspeak = self.teamspeak.clone(); - let channel = match cteamspeak.channel_of_user(id).await { + async fn bot_args_for_client( + &mut self, + user_id: ClientId, + ) -> Result<MusicBotArgs, BotCreationError> { + let channel = match self.teamspeak.channel_of_user(user_id).await { Some(channel) => channel, None => return Err(BotCreationError::UnfoundUser), }; - if channel == cteamspeak.my_channel().await { + if channel == self.teamspeak.current_channel().await.unwrap() { return Err(BotCreationError::MasterChannel( self.config.master_name.clone(), )); } - let MusicBots { - ref mut rng, - ref mut available_names, - ref mut available_ids, - ref connected_bots, - } = &mut *self.music_bots.write().expect("RwLock was not poisoned"); - - for bot in connected_bots.values() { - if bot.my_channel().await == channel { - return Err(BotCreationError::MultipleBots(bot.name().to_owned())); + for bot in self.connected_bots.values() { + if bot.send(GetChannel).await.unwrap() == Some(channel) { + return Err(BotCreationError::MultipleBots( + bot.send(GetName).await.unwrap(), + )); } } - let channel_path = cteamspeak - .channel_path_of_user(id) + let channel_path = self + .teamspeak + .channel_path_of_user(user_id) .await .expect("can find poke sender"); - available_names.shuffle(rng); - let name_index = match available_names.pop() { + self.available_names.shuffle(&mut self.rng); + let name = match self.available_names.pop() { Some(v) => v, None => { return Err(BotCreationError::OutOfNames); } }; - let name = self.config.names[name_index].clone(); - available_ids.shuffle(rng); - let id_index = match available_ids.pop() { + self.available_ids.shuffle(&mut self.rng); + let identity = match self.available_ids.pop() { Some(v) => v, None => { return Err(BotCreationError::OutOfIdentities); } }; - let id = self.config.ids[id_index].clone(); - - let cmusic_bots = self.music_bots.clone(); - let disconnect_cb = Box::new(move |n, name_index, id_index| { - let mut music_bots = cmusic_bots.write().expect("RwLock was not poisoned"); - music_bots.connected_bots.remove(&n); - music_bots.available_names.push(name_index); - music_bots.available_ids.push(id_index); - }); - - info!("Connecting to {} on {}", channel_path, self.config.address); - Ok(MusicBotArgs { - name, - name_index, - id_index, - local: self.config.local, + name: name.clone(), + master: self.my_addr.clone(), address: self.config.address.clone(), - id, + identity, + local: false, channel: channel_path, verbose: self.config.verbose, - disconnect_cb, + logger: self.logger.new(o!("musicbot" => name)), }) } - async fn spawn_bot_for(&self, id: ClientId) { - match self.build_bot_args_for(id).await { + async fn spawn_bot_for_client(&mut self, id: ClientId) { + match self.bot_args_for_client(id).await { Ok(bot_args) => { - let (bot, fut) = MusicBot::new(bot_args).await; - tokio::spawn(fut); - let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned"); - music_bots - .connected_bots - .insert(bot.name().to_string(), bot); - } - Err(e) => { - let mut cteamspeak = self.teamspeak.clone(); - cteamspeak.send_message_to_user(id, e.to_string()).await + let name = bot_args.name.clone(); + let bot = MusicBot::spawn(bot_args).await; + self.connected_bots.insert(name, bot); } + Err(e) => self.teamspeak.send_message_to_user(id, e.to_string()).await, } } - async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { + async fn on_message(&mut self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { match message { MusicBotMessage::TextMessage(message) => { if let MessageTarget::Poke(who) = message.target { - info!("Poked by {}, creating bot for their channel", who); - self.spawn_bot_for(who).await; + info!( + self.logger, + "Poked, creating bot"; "user" => %who + ); + self.spawn_bot_for_client(who).await; } } - MusicBotMessage::ChannelAdded(id) => { - let mut cteamspeak = self.teamspeak.clone(); - cteamspeak.subscribe(id).await; - } MusicBotMessage::ClientAdded(id) => { - let mut cteamspeak = self.teamspeak.clone(); - - if id == cteamspeak.my_id().await { - cteamspeak + if id == self.teamspeak.my_id().await { + self.teamspeak .set_description(String::from("Poke me if you want a music bot!")) .await; } @@ -219,41 +178,17 @@ impl MasterBot { Ok(()) } - async fn my_id(&self) -> ClientId { - let mut cteamspeak = self.teamspeak.clone(); + pub async fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> { + let bot = self.connected_bots.get(&name)?; - cteamspeak.my_id().await + bot.send(GetBotData).await.ok() } - pub fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> { - let music_bots = self.music_bots.read().unwrap(); - let bot = music_bots.connected_bots.get(&name)?; - - Some(crate::web_server::BotData { - name, - state: bot.state(), - volume: bot.volume(), - position: bot.position(), - currently_playing: bot.currently_playing(), - playlist: bot.playlist_to_vec(), - }) - } - - pub fn bot_datas(&self) -> Vec<crate::web_server::BotData> { - let music_bots = self.music_bots.read().unwrap(); - - let len = music_bots.connected_bots.len(); + pub async fn bot_datas(&self) -> Vec<crate::web_server::BotData> { + let len = self.connected_bots.len(); let mut result = Vec::with_capacity(len); - for (name, bot) in &music_bots.connected_bots { - let bot_data = crate::web_server::BotData { - name: name.clone(), - state: bot.state(), - volume: bot.volume(), - position: bot.position(), - currently_playing: bot.currently_playing(), - playlist: bot.playlist_to_vec(), - }; - + for bot in self.connected_bots.values() { + let bot_data = bot.send(GetBotData).await.unwrap(); result.push(bot_data); } @@ -261,24 +196,96 @@ impl MasterBot { } pub fn bot_names(&self) -> Vec<String> { - let music_bots = self.music_bots.read().unwrap(); - - let len = music_bots.connected_bots.len(); + let len = self.connected_bots.len(); let mut result = Vec::with_capacity(len); - for name in music_bots.connected_bots.keys() { + for name in self.connected_bots.keys() { result.push(name.clone()); } result } - pub fn quit(&self, reason: String) { - let music_bots = self.music_bots.read().unwrap(); - for bot in music_bots.connected_bots.values() { - bot.quit(reason.clone()) + fn on_bot_disconnect(&mut self, name: String, id: Identity) { + self.connected_bots.remove(&name); + self.available_names.push(name); + self.available_ids.push(id); + } + + pub async fn quit(&mut self, reason: String) -> Result<(), tsclientlib::Error> { + let futures = self + .connected_bots + .values() + .map(|b| b.send(Quit(reason.clone()))); + for res in future::join_all(futures).await { + if let Err(e) = res { + error!(self.logger, "Failed to shut down bot"; "error" => %e); + } } - let sender = self.sender.read().unwrap(); - sender.send(MusicBotMessage::Quit(reason)).unwrap(); + self.teamspeak.disconnect(&reason).await + } +} + +#[async_trait] +impl Actor for MasterBot { + async fn started(&mut self, ctx: &mut Context<Self>) { + self.my_addr = Some(ctx.address().unwrap().downgrade()); + } +} + +pub struct Connect(pub ConnectOptions); +impl Message for Connect { + type Result = Result<(), tsclientlib::Error>; +} + +#[async_trait] +impl Handler<Connect> for MasterBot { + async fn handle( + &mut self, + opt: Connect, + ctx: &mut Context<Self>, + ) -> Result<(), tsclientlib::Error> { + let addr = ctx.address().unwrap(); + self.teamspeak.connect_for_bot(opt.0, addr.downgrade())?; + Ok(()) + } +} + +pub struct Quit(pub String); +impl Message for Quit { + type Result = Result<(), tsclientlib::Error>; +} + +#[async_trait] +impl Handler<Quit> for MasterBot { + async fn handle(&mut self, q: Quit, _: &mut Context<Self>) -> Result<(), tsclientlib::Error> { + self.quit(q.0).await + } +} + +pub struct BotDisonnected { + pub name: String, + pub identity: Identity, +} + +impl Message for BotDisonnected { + type Result = (); +} + +#[async_trait] +impl Handler<BotDisonnected> for MasterBot { + async fn handle(&mut self, dc: BotDisonnected, _: &mut Context<Self>) { + self.on_bot_disconnect(dc.name, dc.identity); + } +} + +#[async_trait] +impl Handler<MusicBotMessage> for MasterBot { + async fn handle( + &mut self, + msg: MusicBotMessage, + _: &mut Context<Self>, + ) -> Result<(), AudioPlayerError> { + self.on_message(msg).await } } @@ -313,31 +320,10 @@ impl std::fmt::Display for BotCreationError { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct MasterArgs { - #[serde(default = "default_name")] - pub master_name: String, - #[serde(default = "default_local")] - pub local: bool, - pub address: String, - pub channel: Option<String>, - #[serde(default = "default_verbose")] - pub verbose: u8, - pub domain: String, - pub bind_address: String, - pub names: Vec<String>, - pub id: Option<Identity>, - pub ids: Option<Vec<Identity>>, -} - fn default_name() -> String { String::from("PokeBot") } -fn default_local() -> bool { - false -} - fn default_verbose() -> u8 { 0 } @@ -345,7 +331,6 @@ fn default_verbose() -> u8 { impl MasterArgs { pub fn merge(self, args: Args) -> Self { let address = args.address.unwrap_or(self.address); - let local = args.local || self.local; let channel = args.master_channel.or(self.channel); let verbose = if args.verbose > 0 { args.verbose @@ -357,7 +342,6 @@ impl MasterArgs { master_name: self.master_name, names: self.names, ids: self.ids, - local, address, domain: self.domain, bind_address: self.bind_address, @@ -371,8 +355,5 @@ impl MasterArgs { pub struct MasterConfig { pub master_name: String, pub address: String, - pub names: Vec<String>, - pub ids: Vec<Identity>, - pub local: bool, pub verbose: u8, } diff --git a/src/bot/music.rs b/src/bot/music.rs index 90305d0..a57b66c 100644 --- a/src/bot/music.rs +++ b/src/bot/music.rs @@ -1,25 +1,22 @@ -use std::future::Future; -use std::io::BufRead; -use std::sync::{Arc, RwLock}; -use std::thread; -use std::time::Duration; +use async_trait::async_trait; -use log::{debug, info}; use serde::Serialize; +use slog::{debug, info, Logger}; use structopt::StructOpt; -use tokio::sync::mpsc::UnboundedSender; use tsclientlib::{data, ChannelId, ClientId, Connection, Identity, Invoker, MessageTarget}; +use xtra::{spawn::Tokio, Actor, Address, Context, Handler, Message, WeakAddress}; -use crate::audio_player::{AudioPlayer, AudioPlayerError, PollResult}; +use crate::audio_player::{AudioPlayer, AudioPlayerError}; +use crate::bot::{BotDisonnected, Connect, MasterBot, Quit}; use crate::command::Command; use crate::command::VolumeChange; use crate::playlist::Playlist; use crate::teamspeak as ts; -use crate::youtube_dl::AudioMetadata; +use crate::youtube_dl::{self, AudioMetadata}; use ts::TeamSpeakConnection; #[derive(Debug)] -pub struct Message { +pub struct ChatMessage { pub target: MessageTarget, pub invoker: Invoker, pub text: String, @@ -33,6 +30,10 @@ pub enum State { EndOfStream, } +impl Message for State { + type Result = (); +} + impl std::fmt::Display for State { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { match self { @@ -47,7 +48,7 @@ impl std::fmt::Display for State { #[derive(Debug)] pub enum MusicBotMessage { - TextMessage(Message), + TextMessage(ChatMessage), ClientChannel { client: ClientId, old_channel: ChannelId, @@ -59,112 +60,95 @@ pub enum MusicBotMessage { client: Box<data::Client>, }, StateChange(State), - Quit(String), +} + +impl Message for MusicBotMessage { + type Result = Result<(), AudioPlayerError>; } pub struct MusicBot { name: String, - player: Arc<AudioPlayer>, + identity: Identity, + player: AudioPlayer, teamspeak: Option<TeamSpeakConnection>, - playlist: Arc<RwLock<Playlist>>, - state: Arc<RwLock<State>>, + master: Option<WeakAddress<MasterBot>>, + playlist: Playlist, + state: State, + logger: Logger, } pub struct MusicBotArgs { pub name: String, - pub name_index: usize, - pub id_index: usize, + pub master: Option<WeakAddress<MasterBot>>, pub local: bool, pub address: String, - pub id: Identity, + pub identity: Identity, pub channel: String, pub verbose: u8, - pub disconnect_cb: Box<dyn FnMut(String, usize, usize) + Send + Sync>, + pub logger: Logger, } impl MusicBot { - pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future<Output = ()>) { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let tx = Arc::new(RwLock::new(tx)); - let (player, connection) = if args.local { - info!("Starting in CLI mode"); - let audio_player = AudioPlayer::new(tx.clone(), None).unwrap(); - - (audio_player, None) - } else { - info!("Starting in TeamSpeak mode"); - - let con_config = Connection::build(args.address) - .version(tsclientlib::Version::Linux_3_3_2) - .name(format!("🎵 {}", args.name)) - .identity(args.id) - .log_commands(args.verbose >= 1) - .log_packets(args.verbose >= 2) - .log_udp_packets(args.verbose >= 3) - .channel(args.channel); - - let connection = TeamSpeakConnection::new(tx.clone(), con_config) - .await - .unwrap(); - let mut cconnection = connection.clone(); - let audio_player = AudioPlayer::new( - tx.clone(), - Some(Box::new(move |samples| { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(cconnection.send_audio_packet(samples)); - })), - ) - .unwrap(); - - (audio_player, Some(connection)) - }; - + pub async fn spawn(args: MusicBotArgs) -> Address<Self> { + let mut player = AudioPlayer::new(args.logger.clone()).unwrap(); player.change_volume(VolumeChange::Absolute(0.5)).unwrap(); - let player = Arc::new(player); - let playlist = Arc::new(RwLock::new(Playlist::new())); - spawn_gstreamer_thread(player.clone(), tx.clone()); + let playlist = Playlist::new(args.logger.clone()); - if args.local { - spawn_stdin_reader(tx); - } + let teamspeak = if args.local { + info!(args.logger, "Starting in CLI mode"); + player.setup_with_audio_callback(None).unwrap(); - let bot = Arc::new(Self { + None + } else { + Some(TeamSpeakConnection::new(args.logger.clone()).await.unwrap()) + }; + let bot = Self { name: args.name.clone(), + master: args.master, + identity: args.identity.clone(), player, - teamspeak: connection, + teamspeak, playlist, - state: Arc::new(RwLock::new(State::EndOfStream)), - }); - - let cbot = bot.clone(); - let mut disconnect_cb = args.disconnect_cb; - let name = args.name; - let name_index = args.name_index; - let id_index = args.id_index; - let msg_loop = async move { - 'outer: loop { - while let Some(msg) = rx.recv().await { - if let MusicBotMessage::Quit(reason) = msg { - if let Some(ts) = &cbot.teamspeak { - let mut ts = ts.clone(); - ts.disconnect(&reason).await; - } - disconnect_cb(name, name_index, id_index); - break 'outer; - } - cbot.on_message(msg).await.unwrap(); - } - } - debug!("Left message loop"); + state: State::EndOfStream, + logger: args.logger.clone(), }; - bot.update_name(State::EndOfStream).await; + let bot_addr = bot.create(None).spawn(&mut Tokio::Global); + + info!( + args.logger, + "Connecting"; + "name" => &args.name, + "channel" => &args.channel, + "address" => &args.address, + ); + + let opt = Connection::build(args.address) + .logger(args.logger.clone()) + .version(tsclientlib::Version::Linux_3_3_2) + .name(format!("🎵 {}", args.name)) + .identity(args.identity) + .log_commands(args.verbose >= 1) + .log_packets(args.verbose >= 2) + .log_udp_packets(args.verbose >= 3) + .channel(args.channel); + bot_addr.send(Connect(opt)).await.unwrap().unwrap(); + bot_addr + .send(MusicBotMessage::StateChange(State::EndOfStream)) + .await + .unwrap() + .unwrap(); + + if args.local { + debug!(args.logger, "Spawning stdin reader thread"); + spawn_stdin_reader(bot_addr.downgrade()); + } - (bot, msg_loop) + bot_addr } - async fn start_playing_audio(&self, metadata: AudioMetadata) { + async fn start_playing_audio(&mut self, metadata: AudioMetadata) { let duration = if let Some(duration) = metadata.duration { format!("({})", ts::bold(&humantime::format_duration(duration))) } else { @@ -184,25 +168,16 @@ impl MusicBot { self.player.play().unwrap(); } - pub async fn add_audio(&self, url: String, user: String) { - match crate::youtube_dl::get_audio_download_from_url(url).await { + pub async fn add_audio(&mut self, url: String, user: String) { + match youtube_dl::get_audio_download_from_url(url, &self.logger).await { Ok(mut metadata) => { metadata.added_by = user; - info!("Found audio url: {}", metadata.url); + info!(self.logger, "Found source"; "url" => &metadata.url); - // RWLockGuard can not be kept around or the compiler complains that - // it might cross the await boundary - self.playlist - .write() - .expect("RwLock was not poisoned") - .push(metadata.clone()); + self.playlist.push(metadata.clone()); if !self.player.is_started() { - let entry = self - .playlist - .write() - .expect("RwLock was not poisoned") - .pop(); + let entry = self.playlist.pop(); if let Some(request) = entry { self.start_playing_audio(request).await; } @@ -222,7 +197,7 @@ impl MusicBot { } } Err(e) => { - info!("Failed to find audio url: {}", e); + info!(self.logger, "Failed to find audio url"; "error" => &e); self.send_message(format!("Failed to find url: {}", e)) .await; @@ -235,74 +210,50 @@ impl MusicBot { } pub fn state(&self) -> State { - *self.state.read().expect("RwLock was not poisoned") + self.state } - pub fn volume(&self) -> f64 { + pub async fn volume(&self) -> f64 { self.player.volume() } - pub fn position(&self) -> Option<Duration> { - self.player.position() - } - - pub fn currently_playing(&self) -> Option<AudioMetadata> { - self.player.currently_playing() - } + pub async fn current_channel(&mut self) -> Option<ChannelId> { + let ts = self.teamspeak.as_mut().expect("current_channel needs ts"); - pub fn playlist_to_vec(&self) -> Vec<AudioMetadata> { - self.playlist.read().unwrap().to_vec() + ts.current_channel().await } - pub async fn my_channel(&self) -> ChannelId { - let ts = self.teamspeak.as_ref().expect("my_channel needs ts"); - - let mut ts = ts.clone(); - ts.my_channel().await - } + async fn user_count(&mut self, channel: ChannelId) -> u32 { + let ts = self.teamspeak.as_mut().expect("user_count needs ts"); - async fn user_count(&self, channel: ChannelId) -> u32 { - let ts = self.teamspeak.as_ref().expect("user_count needs ts"); - - let mut ts = ts.clone(); ts.user_count(channel).await } - async fn send_message(&self, text: String) { - debug!("Sending message to TeamSpeak: {}", text); + async fn send_message(&mut self, text: String) { + debug!(self.logger, "Sending message to TeamSpeak"; "message" => &text); - if let Some(ts) = &self.teamspeak { - let mut ts = ts.clone(); + if let Some(ts) = &mut self.teamspeak { ts.send_message_to_channel(text).await; } } - async fn set_nickname(&self, name: String) { - info!("Setting TeamSpeak nickname: {}", name); + async fn set_nickname(&mut self, name: String) { + info!(self.logger, "Setting TeamSpeak nickname"; "name" => &name); - if let Some(ts) = &self.teamspeak { - let mut ts = ts.clone(); + if let Some(ts) = &mut self.teamspeak { ts.set_nickname(name).await; } } - async fn set_description(&self, desc: String) { - info!("Setting TeamSpeak description: {}", desc); + async fn set_description(&mut self, desc: String) { + info!(self.logger, "Setting TeamSpeak description"; "description" => &desc); - if let Some(ts) = &self.teamspeak { - let mut ts = ts.clone(); + if let Some(ts) = &mut self.teamspeak { ts.set_description(desc).await; } } - async fn subscribe(&self, id: ChannelId) { - if let Some(ts) = &self.teamspeak { - let mut ts = ts.clone(); - ts.subscribe(id).await; - } - } - - async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> { + async fn on_text(&mut self, message: ChatMessage) -> Result<(), AudioPlayerError> { let msg = message.text; if msg.starts_with('!') { let tokens = msg[1..].split_whitespace().collect::<Vec<_>>(); @@ -319,13 +270,15 @@ impl MusicBot { Ok(()) } - async fn on_command(&self, command: Command, invoker: Invoker) -> Result<(), AudioPlayerError> { + async fn on_command( + &mut self, + command: Command, + invoker: Invoker, + ) -> Result<(), AudioPlayerError> { match command { Command::Play => { - let playlist = self.playlist.read().expect("RwLock was not poisoned"); - if !self.player.is_started() { - if !playlist.is_empty() { + if !self.playlist.is_empty() { self.player.stop_current()?; } } else { @@ -357,35 +310,32 @@ impl MusicBot { } } Command::Next => { - let playlist = self.playlist.read().expect("RwLock was not poisoned"); - if !playlist.is_empty() { - info!("Skipping to next track"); + if !self.playlist.is_empty() { + info!(self.logger, "Skipping to next track"); self.player.stop_current()?; } else { - info!("Playlist empty, cannot skip"); + info!(self.logger, "Playlist empty, cannot skip"); self.player.reset()?; } } Command::Clear => { - self.playlist - .write() - .expect("RwLock was not poisoned") - .clear(); + self.send_message(String::from("Cleared playlist")).await; + self.playlist.clear(); } Command::Volume { volume } => { self.player.change_volume(volume)?; self.update_name(self.state()).await; } Command::Leave => { - self.quit(String::from("Leaving")); + self.quit(String::from("Leaving"), true).await.unwrap(); } } Ok(()) } - async fn update_name(&self, state: State) { - let volume = (self.volume() * 100.0).round(); + async fn update_name(&mut self, state: State) { + let volume = (self.volume().await * 100.0).round(); let name = match state { State::EndOfStream => format!("🎵 {} ({}%)", self.name, volume), _ => format!("🎵 {} - {} ({}%)", self.name, state, volume), @@ -393,43 +343,39 @@ impl MusicBot { self.set_nickname(name).await; } - async fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { - let current_state = *self.state.read().unwrap(); - if current_state != state { - match state { + async fn on_state(&mut self, new_state: State) -> Result<(), AudioPlayerError> { + if self.state != new_state { + match new_state { State::EndOfStream => { - let next_track = self - .playlist - .write() - .expect("RwLock was not poisoned") - .pop(); + self.player.reset()?; + let next_track = self.playlist.pop(); if let Some(request) = next_track { - info!("Advancing playlist"); + info!(self.logger, "Advancing playlist"); self.start_playing_audio(request).await; } else { - self.update_name(state).await; + self.update_name(new_state).await; self.set_description(String::new()).await; } } State::Stopped => { - if current_state != State::EndOfStream { - self.update_name(state).await; + if self.state != State::EndOfStream { + self.update_name(new_state).await; self.set_description(String::new()).await; } } - _ => self.update_name(state).await, + _ => self.update_name(new_state).await, } } - if !(current_state == State::EndOfStream && state == State::Stopped) { - *self.state.write().unwrap() = state; + if !(self.state == State::EndOfStream && new_state == State::Stopped) { + self.state = new_state; } Ok(()) } - async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { + async fn on_message(&mut self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { match message { MusicBotMessage::TextMessage(message) => { if MessageTarget::Channel == message.target { @@ -446,9 +392,6 @@ impl MusicBot { let old_channel = client.channel; self.on_client_left_channel(old_channel).await; } - MusicBotMessage::ChannelAdded(id) => { - self.subscribe(id).await; - } MusicBotMessage::StateChange(state) => { self.on_state(state).await?; } @@ -458,60 +401,163 @@ impl MusicBot { Ok(()) } - async fn on_client_left_channel(&self, old_channel: ChannelId) { - let my_channel = self.my_channel().await; - if old_channel == my_channel && self.user_count(my_channel).await <= 1 { - self.quit(String::from("Channel is empty")); + // FIXME logs an error if this music bot is the one leaving + async fn on_client_left_channel(&mut self, old_channel: ChannelId) { + let current_channel = match self.current_channel().await { + Some(c) => c, + None => { + return; + } + }; + if old_channel == current_channel && self.user_count(current_channel).await <= 1 { + self.quit(String::from("Channel is empty"), true) + .await + .unwrap(); + } + } + + pub async fn quit( + &mut self, + reason: String, + inform_master: bool, + ) -> Result<(), tsclientlib::Error> { + // FIXME logs errors if the bot is playing something because it tries to + // change its name and description + self.player.reset().unwrap(); + + let ts = self.teamspeak.as_mut().unwrap(); + ts.disconnect(&reason).await?; + + if inform_master { + if let Some(master) = &self.master { + master + .send(BotDisonnected { + name: self.name.clone(), + identity: self.identity.clone(), + }) + .await + .unwrap(); + } + } + + Ok(()) + } +} + +#[async_trait] +impl Actor for MusicBot { + async fn started(&mut self, ctx: &mut Context<Self>) { + let addr = ctx.address().unwrap().downgrade(); + self.player.register_bot(addr); + } +} + +#[async_trait] +impl Handler<Connect> for MusicBot { + async fn handle( + &mut self, + opt: Connect, + ctx: &mut Context<Self>, + ) -> Result<(), tsclientlib::Error> { + let addr = ctx.address().unwrap().downgrade(); + self.teamspeak + .as_mut() + .unwrap() + .connect_for_bot(opt.0, addr)?; + + let mut connection = self.teamspeak.as_ref().unwrap().clone(); + let handle = tokio::runtime::Handle::current(); + self.player + .setup_with_audio_callback(Some(Box::new(move |samples| { + handle.block_on(connection.send_audio_packet(samples)); + }))) + .unwrap(); + + Ok(()) + } +} + +pub struct GetName; +impl Message for GetName { + type Result = String; +} + +#[async_trait] +impl Handler<GetName> for MusicBot { + async fn handle(&mut self, _: GetName, _: &mut Context<Self>) -> String { + self.name().to_owned() + } +} + +pub struct GetBotData; +impl Message for GetBotData { + type Result = crate::web_server::BotData; +} + +#[async_trait] +impl Handler<GetBotData> for MusicBot { + async fn handle(&mut self, _: GetBotData, _: &mut Context<Self>) -> crate::web_server::BotData { + crate::web_server::BotData { + name: self.name.clone(), + playlist: self.playlist.to_vec(), + currently_playing: self.player.currently_playing(), + position: self.player.position(), + state: self.state(), + volume: self.volume().await, } } +} - pub fn quit(&self, reason: String) { - self.player.quit(reason); +pub struct GetChannel; +impl Message for GetChannel { + type Result = Option<ChannelId>; +} + +#[async_trait] +impl Handler<GetChannel> for MusicBot { + async fn handle(&mut self, _: GetChannel, _: &mut Context<Self>) -> Option<ChannelId> { + self.current_channel().await } } -fn spawn_stdin_reader(tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>) { - debug!("Spawning stdin reader thread"); - thread::Builder::new() - .name(String::from("stdin reader")) - .spawn(move || { - let stdin = ::std::io::stdin(); - let lock = stdin.lock(); - for line in lock.lines() { - let line = line.unwrap(); - - let message = MusicBotMessage::TextMessage(Message { - target: MessageTarget::Channel, - invoker: Invoker { - name: String::from("stdin"), - id: ClientId(0), - uid: None, - }, - text: line, - }); - - let tx = tx.read().unwrap(); - tx.send(message).unwrap(); - } - }) - .expect("Failed to spawn stdin reader thread"); +#[async_trait] +impl Handler<Quit> for MusicBot { + async fn handle(&mut self, q: Quit, _: &mut Context<Self>) -> Result<(), tsclientlib::Error> { + self.quit(q.0, false).await + } } -fn spawn_gstreamer_thread( - player: Arc<AudioPlayer>, - tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, -) { - thread::Builder::new() - .name(String::from("gstreamer polling")) - .spawn(move || loop { - if player.poll() == PollResult::Quit { - break; - } +#[async_trait] +impl Handler<MusicBotMessage> for MusicBot { + async fn handle( + &mut self, + msg: MusicBotMessage, + _: &mut Context<Self>, + ) -> Result<(), AudioPlayerError> { + self.on_message(msg).await + } +} - tx.read() - .unwrap() - .send(MusicBotMessage::StateChange(State::EndOfStream)) - .unwrap(); - }) - .expect("Failed to spawn gstreamer thread"); +fn spawn_stdin_reader(addr: WeakAddress<MusicBot>) { + use tokio::io::AsyncBufReadExt; + + tokio::task::spawn(async move { + let stdin = tokio::io::stdin(); + let reader = tokio::io::BufReader::new(stdin); + let mut lines = reader.lines(); + + while let Some(line) = lines.next_line().await.unwrap() { + let message = MusicBotMessage::TextMessage(ChatMessage { + target: MessageTarget::Channel, + invoker: Invoker { + name: String::from("stdin"), + id: ClientId(0), + uid: None, + }, + text: line, + }); + + addr.send(message).await.unwrap().unwrap(); + } + }); } diff --git a/src/log_bridge.rs b/src/log_bridge.rs new file mode 100644 index 0000000..35bcb01 --- /dev/null +++ b/src/log_bridge.rs @@ -0,0 +1,101 @@ +// TODO Temporary file until we have a better logging setup for slog + +use slog::{Drain, KV}; +use std::fmt::{self, Arguments, Write}; + +pub struct LogBridge<T>(pub T); + +impl<T: log::Log> Drain for LogBridge<T> { + type Ok = (); + type Err = slog::Error; + + fn log(&self, record: &slog::Record, kvs: &slog::OwnedKVList) -> Result<(), Self::Err> { + let mut target = record.tag(); + if target.is_empty() { + target = record.module(); + } + + let lazy = LazyLog::new(record, kvs); + + self.0.log( + &log::Record::builder() + .args(format_args!("{}", lazy)) + .level(level_to_log(record.level())) + .target(target) + .module_path_static(Some(record.module())) + .file_static(Some(record.file())) + .line(Some(record.line())) + .build(), + ); + + Ok(()) + } + + fn is_enabled(&self, level: slog::Level) -> bool { + let meta = log::Metadata::builder().level(level_to_log(level)).build(); + + self.0.enabled(&meta) + } +} + +fn level_to_log(level: slog::Level) -> log::Level { + match level { + slog::Level::Critical | slog::Level::Error => log::Level::Error, + slog::Level::Warning => log::Level::Warn, + slog::Level::Info => log::Level::Info, + slog::Level::Debug => log::Level::Debug, + slog::Level::Trace => log::Level::Trace, + } +} + +struct LazyLog<'a> { + record: &'a slog::Record<'a>, + kvs: &'a slog::OwnedKVList, +} + +impl<'a> LazyLog<'a> { + fn new(record: &'a slog::Record, kvs: &'a slog::OwnedKVList) -> Self { + LazyLog { record, kvs } + } +} + +impl<'a> fmt::Display for LazyLog<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.record.msg())?; + + let mut ser = StringSerializer::new(); + + self.kvs + .serialize(self.record, &mut ser) + .map_err(|_| fmt::Error)?; + self.record + .kv() + .serialize(self.record, &mut ser) + .map_err(|_| fmt::Error)?; + + write!(f, "{}", ser.finish()) + } +} + +struct StringSerializer { + inner: String, +} + +impl StringSerializer { + fn new() -> Self { + StringSerializer { + inner: String::new(), + } + } + + fn finish(self) -> String { + self.inner + } +} + +impl slog::Serializer for StringSerializer { + fn emit_arguments(&mut self, key: slog::Key, value: &Arguments) -> slog::Result { + write!(self.inner, ", {}: {}", key, value)?; + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index f755db0..51b1e38 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,22 +2,25 @@ use std::fs::File; use std::io::{Read, Write}; use std::path::PathBuf; use std::thread; -use std::time::Duration; -use log::{debug, error, info}; +use slog::{debug, error, info, o, Drain, Logger}; use structopt::clap::AppSettings; use structopt::StructOpt; +#[cfg(unix)] +use tokio::signal::unix::*; use tsclientlib::Identity; mod audio_player; mod bot; mod command; +mod log_bridge; mod playlist; mod teamspeak; mod web_server; mod youtube_dl; -use bot::{MasterArgs, MasterBot, MusicBot, MusicBotArgs}; +use bot::{MasterArgs, MasterBot, MusicBot, MusicBotArgs, Quit}; +use log_bridge::LogBridge; #[derive(StructOpt, Debug)] #[structopt(global_settings = &[AppSettings::ColoredHelp])] @@ -51,6 +54,10 @@ pub struct Args { help = "The channel the master bot should connect to" )] master_channel: Option<String>, + // 0. Print nothing + // 1. Print command string + // 2. Print packets + // 3. Print udp packets #[structopt( short = "v", long = "verbose", @@ -58,25 +65,44 @@ pub struct Args { parse(from_occurrences) )] verbose: u8, - // 0. Print nothing - // 1. Print command string - // 2. Print packets - // 3. Print udp packets } #[tokio::main] async fn main() { - if let Err(e) = run().await { - println!("Error: {}", e); + let root_logger = { + let config = log4rs::load_config_file("log4rs.yml", Default::default()).unwrap(); + let drain = LogBridge(log4rs::Logger::new(config)).fuse(); + // slog_async adds a channel because log4rs if not unwind safe + let drain = slog_async::Async::new(drain).build().fuse(); + + Logger::root(drain, o!()) + }; + + let scope_guard = slog_scope::set_global_logger(root_logger.clone()); + // On SIGTERM the logger resets for some reason which makes the bot panic + // if it tries to log anything + scope_guard.cancel_reset(); + + slog_stdlog::init().unwrap(); + + if let Err(e) = run(root_logger.clone()).await { + error!(root_logger, "{}", e); } } -async fn run() -> Result<(), Box<dyn std::error::Error>> { - log4rs::init_file("log4rs.yml", Default::default()).unwrap(); - +async fn run(root_logger: Logger) -> Result<(), Box<dyn std::error::Error>> { // Parse command line options let args = Args::from_args(); + // Set up signal handlers + let ctrl_c = tokio::task::spawn(tokio::signal::ctrl_c()); + #[cfg(unix)] + let (sighup, sigterm, sigquit) = ( + tokio::task::spawn(hangup()), + tokio::task::spawn(terminate()), + tokio::task::spawn(quit()), + ); + let mut file = File::open(&args.config_path)?; let mut toml = String::new(); file.read_to_string(&mut toml)?; @@ -107,14 +133,14 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> { if let Some(level) = args.wanted_level { if let Some(id) = &mut config.id { - info!("Upgrading master identity"); + info!(root_logger, "Upgrading master identity"); id.upgrade_level(level).expect("can upgrade level"); } if let Some(ids) = &mut config.ids { let len = ids.len(); for (i, id) in ids.iter_mut().enumerate() { - info!("Upgrading bot identity {}/{}", i + 1, len); + info!(root_logger, "Upgrading bot identity"; "current" => i + 1, "amount" => len); id.upgrade_level(level).expect("can upgrade level"); } } @@ -127,53 +153,101 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> { } if config.id.is_none() || config.ids.is_none() { - error!("Failed to find required identites, try running with `-g`"); + error!( + root_logger, + "Failed to find required identites, try running with `-g`" + ); return Ok(()); } + let local = args.local; let bot_args = config.merge(args); - info!("Starting PokeBot!"); - debug!("Received CLI arguments: {:?}", std::env::args()); + info!(root_logger, "Starting PokeBot!"); + debug!(root_logger, "Received CLI arguments"; "args" => ?std::env::args()); - if bot_args.local { + if local { let name = bot_args.names[0].clone(); - let id = bot_args.ids.expect("identies should exists")[0].clone(); - - let disconnect_cb = Box::new(move |_, _, _| {}); + let identity = bot_args.ids.expect("identies should exists")[0].clone(); let bot_args = MusicBotArgs { name, - name_index: 0, - id_index: 0, + master: None, local: true, address: bot_args.address.clone(), - id, + identity, channel: String::from("local"), verbose: bot_args.verbose, - disconnect_cb, + logger: root_logger, }; - MusicBot::new(bot_args).await.1.await; + MusicBot::spawn(bot_args).await; + + ctrl_c.await??; } else { let domain = bot_args.domain.clone(); let bind_address = bot_args.bind_address.clone(); - let (bot, fut) = MasterBot::new(bot_args).await; - - thread::spawn(|| { - let web_args = web_server::WebServerArgs { - domain, - bind_address, - bot, - }; - if let Err(e) = web_server::start(web_args) { - error!("Error in web server: {}", e); + let bot_name = bot_args.master_name.clone(); + let bot_logger = root_logger.new(o!("master" => bot_name.clone())); + let bot = MasterBot::spawn(bot_args, bot_logger).await; + + let web_args = web_server::WebServerArgs { + domain, + bind_address, + bot: bot.downgrade(), + }; + spawn_web_server(web_args, root_logger.new(o!("webserver" => bot_name))); + + #[cfg(unix)] + tokio::select! { + res = ctrl_c => { + res??; + info!(root_logger, "Received signal, shutting down"; "signal" => "SIGINT"); } - }); + _ = sigterm => { + info!(root_logger, "Received signal, shutting down"; "signal" => "SIGTERM"); + } + _ = sighup => { + info!(root_logger, "Received signal, shutting down"; "signal" => "SIGHUP"); + } + _ = sigquit => { + info!(root_logger, "Received signal, shutting down"; "signal" => "SIGQUIT"); + } + }; + + #[cfg(windows)] + ctrl_c.await??; - fut.await; - // Keep tokio running while the bot disconnects - tokio::time::delay_for(Duration::from_secs(1)).await; + bot.send(Quit(String::from("Stopping"))) + .await + .unwrap() + .unwrap(); } Ok(()) } + +pub fn spawn_web_server(args: web_server::WebServerArgs, logger: Logger) { + thread::spawn(move || { + if let Err(e) = web_server::start(args, logger.clone()) { + error!(logger, "Error in web server"; "error" => %e); + } + }); +} + +#[cfg(unix)] +pub async fn terminate() -> std::io::Result<()> { + signal(SignalKind::terminate())?.recv().await; + Ok(()) +} + +#[cfg(unix)] +pub async fn hangup() -> std::io::Result<()> { + signal(SignalKind::hangup())?.recv().await; + Ok(()) +} + +#[cfg(unix)] +pub async fn quit() -> std::io::Result<()> { + signal(SignalKind::quit())?.recv().await; + Ok(()) +} diff --git a/src/playlist.rs b/src/playlist.rs index 445f8a5..31fcfc0 100644 --- a/src/playlist.rs +++ b/src/playlist.rs @@ -1,29 +1,35 @@ use std::collections::VecDeque; -use log::info; +use slog::{info, Logger}; use crate::youtube_dl::AudioMetadata; pub struct Playlist { data: VecDeque<AudioMetadata>, + logger: Logger, } impl Playlist { - pub fn new() -> Self { + pub fn new(logger: Logger) -> Self { Self { data: VecDeque::new(), + logger, } } pub fn push(&mut self, data: AudioMetadata) { - info!("Adding {:?} to playlist", &data.title); + info!(self.logger, "Adding to playlist"; "title" => &data.title); self.data.push_front(data) } pub fn pop(&mut self) -> Option<AudioMetadata> { let res = self.data.pop_back(); - info!("Popping {:?} from playlist", res.as_ref().map(|r| &r.title)); + info!( + self.logger, + "Popping from playlist"; + "title" => res.as_ref().map(|r| &r.title) + ); res } @@ -45,6 +51,6 @@ impl Playlist { pub fn clear(&mut self) { self.data.clear(); - info!("Cleared playlist") + info!(self.logger, "Cleared playlist") } } diff --git a/src/teamspeak/mod.rs b/src/teamspeak/mod.rs index beb3f44..7a68d38 100644 --- a/src/teamspeak/mod.rs +++ b/src/teamspeak/mod.rs @@ -1,7 +1,5 @@ -use std::sync::{Arc, RwLock}; - use futures::stream::StreamExt; -use tokio::sync::mpsc::UnboundedSender; +use xtra::{Actor, Handler, WeakAddress}; use tsclientlib::data::exts::{M2BClientEditExt, M2BClientUpdateExt}; use tsclientlib::{ @@ -10,9 +8,9 @@ use tsclientlib::{ ChannelId, ClientId, ConnectOptions, DisconnectOptions, MessageTarget, OutCommandExt, Reason, }; -use log::{debug, error}; +use slog::{debug, error, info, trace, Logger}; -use crate::bot::{Message, MusicBotMessage}; +use crate::bot::{ChatMessage, MusicBotMessage}; mod bbcode; @@ -20,7 +18,8 @@ pub use bbcode::*; #[derive(Clone)] pub struct TeamSpeakConnection { - handle: SyncConnectionHandle, + handle: Option<SyncConnectionHandle>, + logger: Logger, } fn get_message(event: &Event) -> Option<MusicBotMessage> { @@ -31,7 +30,7 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> { target, invoker: sender, message: msg, - } => Some(MusicBotMessage::TextMessage(Message { + } => Some(MusicBotMessage::TextMessage(ChatMessage { target: *target, invoker: sender.clone(), text: msg.clone(), @@ -86,50 +85,82 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> { } impl TeamSpeakConnection { - pub async fn new( - tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, + pub async fn new(logger: Logger) -> Result<TeamSpeakConnection, tsclientlib::Error> { + Ok(TeamSpeakConnection { + handle: None, + logger, + }) + } + + pub fn connect_for_bot<T: Actor + Handler<MusicBotMessage>>( + &mut self, options: ConnectOptions, - ) -> Result<TeamSpeakConnection, tsclientlib::Error> { + bot: WeakAddress<T>, + ) -> Result<(), tsclientlib::Error> { + info!(self.logger, "Starting TeamSpeak connection"); + let conn = options.connect()?; - let conn = SyncConnection::from(conn); - let mut handle = conn.get_handle(); - - tokio::spawn(conn.for_each(move |i| { - let tx = tx.clone(); - async move { - match i { - Ok(SyncStreamItem::ConEvents(events)) => { + let mut conn = SyncConnection::from(conn); + let handle = conn.get_handle(); + self.handle = Some(handle); + + let ev_logger = self.logger.clone(); + tokio::spawn(async move { + while let Some(item) = conn.next().await { + use SyncStreamItem::*; + + match item { + Ok(ConEvents(events)) => { for event in &events { if let Some(msg) = get_message(event) { - let tx = tx.read().expect("RwLock was not poisoned"); - // Ignore the result because the receiver might get dropped first. - let _ = tx.send(msg); + tokio::spawn(bot.send(msg)); } } } - Err(e) => error!("Error occured during event reading: {}", e), - Ok(SyncStreamItem::DisconnectedTemporarily) => debug!("Temporary disconnect!"), - _ => (), + Err(e) => error!(ev_logger, "Error occured during event reading: {}", e), + Ok(DisconnectedTemporarily(r)) => { + debug!(ev_logger, "Temporary disconnect"; "reason" => ?r) + } + Ok(Audio(_)) => { + trace!(ev_logger, "Audio received"); + } + Ok(IdentityLevelIncreasing(_)) => { + trace!(ev_logger, "Identity level increasing"); + } + Ok(IdentityLevelIncreased) => { + trace!(ev_logger, "Identity level increased"); + } + Ok(NetworkStatsUpdated) => { + trace!(ev_logger, "Network stats updated"); + } } } - })); - - handle.wait_until_connected().await?; - - let mut chandle = handle.clone(); - chandle - .with_connection(|mut conn| { - conn.get_state() - .expect("is connected") - .server - .set_subscribed(true) - .send(&mut conn) - .unwrap() - }) - .await - .unwrap(); - - Ok(TeamSpeakConnection { handle }) + }); + + let mut handle = self.handle.clone(); + tokio::spawn(async move { + handle + .as_mut() + .expect("connect_for_bot was called") + .wait_until_connected() + .await + .unwrap(); + handle + .as_mut() + .expect("connect_for_bot was called") + .with_connection(|mut conn| { + conn.get_state() + .expect("can get state") + .server + .set_subscribed(true) + .send(&mut conn) + }) + .await + .and_then(|v| v) + .unwrap(); + }); + + Ok(()) } pub async fn send_audio_packet(&mut self, samples: &[u8]) { @@ -140,22 +171,25 @@ impl TeamSpeakConnection { data: samples, }); - self.handle - .with_connection(|conn| { - if let Err(e) = conn - .get_tsproto_client_mut() + if let Err(e) = self + .handle + .as_mut() + .expect("connect_for_bot was called") + .with_connection(move |conn| { + conn.get_tsproto_client_mut() .expect("can get tsproto client") .send_packet(packet) - { - error!("Failed to send voice packet: {}", e); - } }) .await - .unwrap(); + { + error!(self.logger, "Failed to send voice packet: {}", e); + } } pub async fn channel_of_user(&mut self, id: ClientId) -> Option<ChannelId> { self.handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |conn| { conn.get_state() .expect("can get state") @@ -164,30 +198,28 @@ impl TeamSpeakConnection { .map(|c| c.channel) }) .await - .unwrap() + .map_err(|e| error!(self.logger, "Failed to get channel of user"; "error" => %e)) + .ok() + .and_then(|v| v) } pub async fn channel_path_of_user(&mut self, id: ClientId) -> Option<String> { self.handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |conn| { let state = conn.get_state().expect("can get state"); let channel_id = state.clients.get(&id)?.channel; - let mut channel = state - .channels - .get(&channel_id) - .expect("can find user channel"); + let mut channel = state.channels.get(&channel_id)?; let mut names = vec![&channel.name[..]]; // Channel 0 is the root channel while channel.parent != ChannelId(0) { names.push("/"); - channel = state - .channels - .get(&channel.parent) - .expect("can find user channel"); + channel = state.channels.get(&channel.parent)?; names.push(&channel.name); } @@ -199,11 +231,15 @@ impl TeamSpeakConnection { Some(path) }) .await - .unwrap() + .map_err(|e| error!(self.logger, "Failed to get channel path of user"; "error" => %e)) + .ok() + .and_then(|v| v) } - pub async fn my_channel(&mut self) -> ChannelId { + pub async fn current_channel(&mut self) -> Option<ChannelId> { self.handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |conn| { let state = conn.get_state().expect("can get state"); state @@ -213,11 +249,14 @@ impl TeamSpeakConnection { .channel }) .await - .unwrap() + .map_err(|e| error!(self.logger, "Failed to get channel"; "error" => %e)) + .ok() } pub async fn my_id(&mut self) -> ClientId { self.handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |conn| conn.get_state().expect("can get state").own_client) .await .unwrap() @@ -225,6 +264,8 @@ impl TeamSpeakConnection { pub async fn user_count(&mut self, channel: ChannelId) -> u32 { self.handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |conn| { let state = conn.get_state().expect("can get state"); let mut count = 0; @@ -241,88 +282,90 @@ impl TeamSpeakConnection { } pub async fn set_nickname(&mut self, name: String) { - self.handle + if let Err(e) = self + .handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |mut conn| { conn.get_state() .expect("can get state") .client_update() .set_name(&name) .send(&mut conn) - .map_err(|e| error!("Failed to set nickname: {}", e)) }) .await - .unwrap() - .unwrap(); + .and_then(|v| v) + { + error!(self.logger, "Failed to set nickname: {}", e); + } } pub async fn set_description(&mut self, desc: String) { - self.handle + if let Err(e) = self + .handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |mut conn| { let state = conn.get_state().expect("can get state"); - let _ = state + state .clients .get(&state.own_client) .expect("can get myself") .edit() .set_description(&desc) .send(&mut conn) - .map_err(|e| error!("Failed to change description: {}", e)); }) .await - .unwrap() + .and_then(|v| v) + { + error!(self.logger, "Failed to change description: {}", e); + } } pub async fn send_message_to_channel(&mut self, text: String) { - self.handle + if let Err(e) = self + .handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |mut conn| { - let _ = conn - .get_state() + conn.get_state() .expect("can get state") .send_message(MessageTarget::Channel, &text) .send(&mut conn) - .map_err(|e| error!("Failed to send message: {}", e)); }) .await - .unwrap() + .and_then(|v| v) + { + error!(self.logger, "Failed to send message: {}", e); + } } pub async fn send_message_to_user(&mut self, client: ClientId, text: String) { - self.handle + if let Err(e) = self + .handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |mut conn| { - let _ = conn - .get_state() + conn.get_state() .expect("can get state") .send_message(MessageTarget::Client(client), &text) .send(&mut conn) - .map_err(|e| error!("Failed to send message: {}", e)); }) .await - .unwrap() + .and_then(|v| v) + { + error!(self.logger, "Failed to send message: {}", e); + } } - pub async fn subscribe(&mut self, id: ChannelId) { - self.handle - .with_connection(move |mut conn| { - let channel = match conn.get_state().expect("can get state").channels.get(&id) { - Some(c) => c, - None => { - error!("Failed to find channel to subscribe to"); - return; - } - }; - - if let Err(e) = channel.set_subscribed(true).send(&mut conn) { - error!("Failed to send subscribe packet: {}", e); - } - }) - .await - .unwrap() - } - - pub async fn disconnect(&mut self, reason: &str) { + pub async fn disconnect(&mut self, reason: &str) -> Result<(), tsclientlib::Error> { let opt = DisconnectOptions::new() .reason(Reason::Clientdisconnect) .message(reason); - self.handle.disconnect(opt).await.unwrap(); + self.handle + .as_mut() + .expect("connect_for_bot was called") + .disconnect(opt) + .await } } diff --git a/src/web_server.rs b/src/web_server.rs index d731fae..8e5d446 100644 --- a/src/web_server.rs +++ b/src/web_server.rs @@ -1,38 +1,38 @@ -use std::sync::Arc; use std::time::Duration; -use actix::{Actor, Addr}; -use actix_web::{get, middleware::Logger, post, web, App, HttpServer, Responder}; -use askama::Template; -use askama_actix::TemplateIntoResponse; +use actix_slog::StructuredLogger; +use actix_web::{get, post, web, App, HttpServer, Responder}; +use askama_actix::{Template, TemplateIntoResponse}; use serde::{Deserialize, Serialize}; +use slog::Logger; +use xtra::WeakAddress; use crate::bot::MasterBot; use crate::youtube_dl::AudioMetadata; mod api; -mod bot_executor; +mod bot_data; mod default; mod front_end_cookie; mod tmtu; -pub use bot_executor::*; +pub use bot_data::*; use front_end_cookie::FrontEnd; pub struct WebServerArgs { pub domain: String, pub bind_address: String, - pub bot: Arc<MasterBot>, + pub bot: WeakAddress<MasterBot>, } #[actix_rt::main] -pub async fn start(args: WebServerArgs) -> std::io::Result<()> { - let cbot = args.bot.clone(); - let bot_addr: Addr<BotExecutor> = BotExecutor(cbot.clone()).start(); +pub async fn start(args: WebServerArgs, logger: Logger) -> std::io::Result<()> { + let bot = args.bot; + let bind_address = args.bind_address; HttpServer::new(move || { App::new() - .data(bot_addr.clone()) - .wrap(Logger::default()) + .data(bot.clone()) + .wrap(StructuredLogger::new(logger.clone())) .service(index) .service(get_bot) .service(post_front_end) @@ -44,12 +44,10 @@ pub async fn start(args: WebServerArgs) -> std::io::Result<()> { .service(web::scope("/docs").service(get_api_docs)) .service(actix_files::Files::new("/static", "web_server/static/")) }) - .bind(args.bind_address)? + .bind(bind_address)? .run() .await?; - args.bot.quit(String::from("Stopping")); - Ok(()) } @@ -75,7 +73,7 @@ pub struct BotData { } #[get("/")] -async fn index(bot: web::Data<Addr<BotExecutor>>, front: FrontEnd) -> impl Responder { +async fn index(bot: web::Data<WeakAddress<MasterBot>>, front: FrontEnd) -> impl Responder { match front { FrontEnd::Default => default::index(bot).await, FrontEnd::Tmtu => tmtu::index(bot).await, @@ -84,7 +82,7 @@ async fn index(bot: web::Data<Addr<BotExecutor>>, front: FrontEnd) -> impl Respo #[get("/bot/{name}")] async fn get_bot( - bot: web::Data<Addr<BotExecutor>>, + bot: web::Data<WeakAddress<MasterBot>>, name: web::Path<String>, front: FrontEnd, ) -> impl Responder { diff --git a/src/web_server/api.rs b/src/web_server/api.rs index 4deedad..b1d50c4 100644 --- a/src/web_server/api.rs +++ b/src/web_server/api.rs @@ -1,22 +1,23 @@ -use actix::Addr; use actix_web::{get, web, HttpResponse, Responder, ResponseError}; use derive_more::Display; use serde::Serialize; +use xtra::WeakAddress; -use crate::web_server::{BotDataListRequest, BotDataRequest, BotExecutor}; +use crate::web_server::{BotDataListRequest, BotDataRequest}; +use crate::MasterBot; #[get("/bots")] -pub async fn get_bot_list(bot: web::Data<Addr<BotExecutor>>) -> impl Responder { - let bot_datas = match bot.send(BotDataListRequest).await.unwrap() { - Ok(data) => data, - Err(_) => Vec::with_capacity(0), - }; +pub async fn get_bot_list(bot: web::Data<WeakAddress<MasterBot>>) -> impl Responder { + let bot_datas = bot.send(BotDataListRequest).await.unwrap(); web::Json(bot_datas) } #[get("/bots/{name}")] -pub async fn get_bot(bot: web::Data<Addr<BotExecutor>>, name: web::Path<String>) -> impl Responder { +pub async fn get_bot( + bot: web::Data<WeakAddress<MasterBot>>, + name: web::Path<String>, +) -> impl Responder { if let Some(bot_data) = bot.send(BotDataRequest(name.into_inner())).await.unwrap() { Ok(web::Json(bot_data)) } else { diff --git a/src/web_server/bot_data.rs b/src/web_server/bot_data.rs new file mode 100644 index 0000000..af0c5e1 --- /dev/null +++ b/src/web_server/bot_data.rs @@ -0,0 +1,47 @@ +use async_trait::async_trait; + +use xtra::{Context, Handler, Message}; + +use crate::bot::MasterBot; +use crate::web_server::BotData; + +pub struct BotNameListRequest; + +impl Message for BotNameListRequest { + type Result = Vec<String>; +} + +#[async_trait] +impl Handler<BotNameListRequest> for MasterBot { + async fn handle(&mut self, _: BotNameListRequest, _: &mut Context<Self>) -> Vec<String> { + self.bot_names() + } +} + +pub struct BotDataListRequest; + +impl Message for BotDataListRequest { + type Result = Vec<BotData>; +} + +#[async_trait] +impl Handler<BotDataListRequest> for MasterBot { + async fn handle(&mut self, _: BotDataListRequest, _: &mut Context<Self>) -> Vec<BotData> { + self.bot_datas().await + } +} + +pub struct BotDataRequest(pub String); + +impl Message for BotDataRequest { + type Result = Option<BotData>; +} + +#[async_trait] +impl Handler<BotDataRequest> for MasterBot { + async fn handle(&mut self, r: BotDataRequest, _: &mut Context<Self>) -> Option<BotData> { + let name = r.0; + + self.bot_data(name).await + } +} diff --git a/src/web_server/bot_executor.rs b/src/web_server/bot_executor.rs deleted file mode 100644 index 0d3e7b7..0000000 --- a/src/web_server/bot_executor.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::sync::Arc; - -use actix::{Actor, Context, Handler, Message}; - -use crate::bot::MasterBot; -use crate::web_server::BotData; - -pub struct BotExecutor(pub Arc<MasterBot>); - -impl Actor for BotExecutor { - type Context = Context<Self>; -} - -pub struct BotNameListRequest; - -impl Message for BotNameListRequest { - // A plain Vec does not work for some reason - type Result = Result<Vec<String>, ()>; -} - -impl Handler<BotNameListRequest> for BotExecutor { - type Result = Result<Vec<String>, ()>; - - fn handle(&mut self, _: BotNameListRequest, _: &mut Self::Context) -> Self::Result { - let bot = &self.0; - - Ok(bot.bot_names()) - } -} - -pub struct BotDataListRequest; - -impl Message for BotDataListRequest { - // A plain Vec does not work for some reason - type Result = Result<Vec<BotData>, ()>; -} - -impl Handler<BotDataListRequest> for BotExecutor { - type Result = Result<Vec<BotData>, ()>; - - fn handle(&mut self, _: BotDataListRequest, _: &mut Self::Context) -> Self::Result { - let bot = &self.0; - - Ok(bot.bot_datas()) - } -} - -pub struct BotDataRequest(pub String); - -impl Message for BotDataRequest { - type Result = Option<BotData>; -} - -impl Handler<BotDataRequest> for BotExecutor { - type Result = Option<BotData>; - - fn handle(&mut self, r: BotDataRequest, _: &mut Self::Context) -> Self::Result { - let name = r.0; - let bot = &self.0; - - bot.bot_data(name) - } -} diff --git a/src/web_server/default.rs b/src/web_server/default.rs index 542dade..6b15784 100644 --- a/src/web_server/default.rs +++ b/src/web_server/default.rs @@ -1,9 +1,9 @@ -use actix::Addr; use actix_web::{http::header, web, Error, HttpResponse}; -use askama::Template; -use askama_actix::TemplateIntoResponse; +use askama_actix::{Template, TemplateIntoResponse}; +use xtra::WeakAddress; -use crate::web_server::{filters, BotData, BotDataRequest, BotExecutor, BotNameListRequest}; +use crate::web_server::{filters, BotData, BotDataRequest, BotNameListRequest}; +use crate::MasterBot; #[derive(Template)] #[template(path = "index.htm")] @@ -12,8 +12,8 @@ struct OverviewTemplate<'a> { bot: Option<&'a BotData>, } -pub async fn index(bot: web::Data<Addr<BotExecutor>>) -> Result<HttpResponse, Error> { - let bot_names = bot.send(BotNameListRequest).await.unwrap().unwrap(); +pub async fn index(bot: web::Data<WeakAddress<MasterBot>>) -> Result<HttpResponse, Error> { + let bot_names = bot.send(BotNameListRequest).await.unwrap(); OverviewTemplate { bot_names: &bot_names, @@ -23,10 +23,10 @@ pub async fn index(bot: web::Data<Addr<BotExecutor>>) -> Result<HttpResponse, Er } pub async fn get_bot( - bot: web::Data<Addr<BotExecutor>>, + bot: web::Data<WeakAddress<MasterBot>>, name: String, ) -> Result<HttpResponse, Error> { - let bot_names = bot.send(BotNameListRequest).await.unwrap().unwrap(); + let bot_names = bot.send(BotNameListRequest).await.unwrap(); if let Some(bot) = bot.send(BotDataRequest(name)).await.unwrap() { OverviewTemplate { @@ -35,7 +35,6 @@ pub async fn get_bot( } .into_response() } else { - // TODO to 404 or not to 404 Ok(HttpResponse::Found().header(header::LOCATION, "/").finish()) } } diff --git a/src/web_server/tmtu.rs b/src/web_server/tmtu.rs index 33a14af..e9eea98 100644 --- a/src/web_server/tmtu.rs +++ b/src/web_server/tmtu.rs @@ -1,9 +1,9 @@ -use actix::Addr; use actix_web::{http::header, web, Error, HttpResponse}; -use askama::Template; -use askama_actix::TemplateIntoResponse; +use askama_actix::{Template, TemplateIntoResponse}; +use xtra::WeakAddress; -use crate::web_server::{filters, BotData, BotDataRequest, BotExecutor, BotNameListRequest}; +use crate::web_server::{filters, BotData, BotDataRequest, BotNameListRequest}; +use crate::MasterBot; #[derive(Template)] #[template(path = "tmtu/index.htm")] @@ -12,8 +12,8 @@ struct TmtuTemplate { bot: Option<BotData>, } -pub async fn index(bot: web::Data<Addr<BotExecutor>>) -> Result<HttpResponse, Error> { - let bot_names = bot.send(BotNameListRequest).await.unwrap().unwrap(); +pub async fn index(bot: web::Data<WeakAddress<MasterBot>>) -> Result<HttpResponse, Error> { + let bot_names = bot.send(BotNameListRequest).await.unwrap(); TmtuTemplate { bot_names, @@ -23,10 +23,10 @@ pub async fn index(bot: web::Data<Addr<BotExecutor>>) -> Result<HttpResponse, Er } pub async fn get_bot( - bot: web::Data<Addr<BotExecutor>>, + bot: web::Data<WeakAddress<MasterBot>>, name: String, ) -> Result<HttpResponse, Error> { - let bot_names = bot.send(BotNameListRequest).await.unwrap().unwrap(); + let bot_names = bot.send(BotNameListRequest).await.unwrap(); if let Some(bot) = bot.send(BotDataRequest(name)).await.unwrap() { TmtuTemplate { @@ -35,7 +35,6 @@ pub async fn get_bot( } .into_response() } else { - // TODO to 404 or not to 404 Ok(HttpResponse::Found().header(header::LOCATION, "/").finish()) } } diff --git a/src/youtube_dl.rs b/src/youtube_dl.rs index 496d0b4..ea10107 100644 --- a/src/youtube_dl.rs +++ b/src/youtube_dl.rs @@ -5,7 +5,7 @@ use tokio::process::Command; use serde::{Deserialize, Serialize}; -use log::debug; +use slog::{debug, Logger}; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct AudioMetadata { @@ -28,13 +28,16 @@ where Ok(dur.map(Duration::from_secs_f64)) } -pub async fn get_audio_download_from_url(uri: String) -> Result<AudioMetadata, String> { +pub async fn get_audio_download_from_url( + url: String, + logger: &Logger, +) -> Result<AudioMetadata, String> { //youtube-dl sometimes just fails, so we give it a second try - let ytdl_output = match run_youtube_dl(&uri).await { + let ytdl_output = match run_youtube_dl(&url, &logger).await { Ok(o) => o, Err(e) => { if e.contains("Unable to extract video data") { - run_youtube_dl(&uri).await? + run_youtube_dl(&url, &logger).await? } else { return Err(e); } @@ -46,14 +49,14 @@ pub async fn get_audio_download_from_url(uri: String) -> Result<AudioMetadata, S Ok(output) } -async fn run_youtube_dl(url: &str) -> Result<String, String> { +async fn run_youtube_dl(url: &str, logger: &Logger) -> Result<String, String> { let ytdl_args = ["--no-playlist", "-f", "bestaudio/best", "-j", &url]; let mut cmd = Command::new("youtube-dl"); cmd.args(&ytdl_args); cmd.stdin(Stdio::null()); - debug!("yt-dl command: {:?}", cmd); + debug!(logger, "running yt-dl"; "command" => ?cmd); let ytdl_output = cmd.output().await.unwrap(); if !ytdl_output.status.success() { |
