From 87973e2dd3d28293e661045cde1a5e90bc017613 Mon Sep 17 00:00:00 2001 From: Jokler Date: Fri, 24 Jan 2020 18:29:17 +0100 Subject: Initial multibot draft --- src/audio_player.rs | 73 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 22 deletions(-) (limited to 'src/audio_player.rs') diff --git a/src/audio_player.rs b/src/audio_player.rs index 97a61cd..2ff7c11 100644 --- a/src/audio_player.rs +++ b/src/audio_player.rs @@ -6,7 +6,7 @@ use gstreamer as gst; use gstreamer_app::{AppSink, AppSinkCallbacks}; use gstreamer_audio::{StreamVolume, StreamVolumeFormat}; -use crate::{ApplicationMessage, State}; +use crate::bot::{MusicBotMessage, State}; use glib::BoolError; use log::{debug, error, info, warn}; use std::sync::{Arc, Mutex}; @@ -14,22 +14,10 @@ use tokio02::sync::mpsc::UnboundedSender; static GST_INIT: Once = Once::new(); -#[derive(Debug)] -pub enum AudioPlayerError { - GStreamerError(glib::error::BoolError), - StateChangeFailed, -} - -impl From for AudioPlayerError { - fn from(err: BoolError) -> Self { - AudioPlayerError::GStreamerError(err) - } -} - -impl From for AudioPlayerError { - fn from(_err: gst::StateChangeError) -> Self { - AudioPlayerError::StateChangeFailed - } +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +pub enum PollResult { + Continue, + Quit, } pub struct AudioPlayer { @@ -38,7 +26,7 @@ pub struct AudioPlayer { http_src: gst::Element, volume: gst::Element, - sender: Arc>>, + sender: Arc>>, } fn make_element(factoryname: &str, display_name: &str) -> Result { @@ -87,7 +75,7 @@ fn add_decode_bin_new_pad_callback( impl AudioPlayer { pub fn new( - sender: Arc>>, + sender: Arc>>, callback: Option>, ) -> Result { GST_INIT.call_once(|| gst::init().unwrap()); @@ -239,13 +227,27 @@ impl AudioPlayer { Ok(()) } + pub fn quit(&self, reason: String) { + info!("Quitting audio player"); + + if let Err(e) = self + .bus + .post(&gst::Message::new_application(gst::Structure::new_empty("quit")).build()) + { + warn!("Failed to send \"quit\" app event: {}", e); + } + + let sender = self.sender.lock().unwrap(); + sender.send(MusicBotMessage::Quit(reason)).unwrap(); + } + fn send_state(&self, state: State) { info!("Sending state {:?} to application", state); let sender = self.sender.lock().unwrap(); - sender.send(ApplicationMessage::StateChange(state)).unwrap(); + sender.send(MusicBotMessage::StateChange(state)).unwrap(); } - pub fn poll(&self) { + pub fn poll(&self) -> PollResult { debug!("Polling GStreamer"); 'outer: loop { while let Some(msg) = self.bus.timed_pop(gst::ClockTime(None)) { @@ -308,12 +310,39 @@ impl AudioPlayer { ); break 'outer; } + MessageView::Application(content) => { + if let Some(s) = content.get_structure() { + if s.get_name() == "quit" { + return PollResult::Quit; + } + } + } _ => { - // debug!("{:?}", msg) + //debug!("{:?}", msg) } }; } } debug!("Left GStreamer message loop"); + + PollResult::Continue + } +} + +#[derive(Debug)] +pub enum AudioPlayerError { + GStreamerError(glib::error::BoolError), + StateChangeFailed, +} + +impl From for AudioPlayerError { + fn from(err: BoolError) -> Self { + AudioPlayerError::GStreamerError(err) + } +} + +impl From for AudioPlayerError { + fn from(_err: gst::StateChangeError) -> Self { + AudioPlayerError::StateChangeFailed } } -- cgit v1.2.3-70-g09d2 From 7be0a2f10f0cfeb89b2f498cfae316b35dcb0814 Mon Sep 17 00:00:00 2001 From: Jokler Date: Sat, 25 Jan 2020 16:30:37 +0100 Subject: Fix quit method not stopping audio playback --- src/audio_player.rs | 1 + src/bot/master.rs | 20 +++++++++++++++----- src/bot/music.rs | 7 +++++-- 3 files changed, 21 insertions(+), 7 deletions(-) (limited to 'src/audio_player.rs') diff --git a/src/audio_player.rs b/src/audio_player.rs index 2ff7c11..97ecfbf 100644 --- a/src/audio_player.rs +++ b/src/audio_player.rs @@ -313,6 +313,7 @@ impl AudioPlayer { MessageView::Application(content) => { if let Some(s) = content.get_structure() { if s.get_name() == "quit" { + self.reset().unwrap(); return PollResult::Quit; } } diff --git a/src/bot/master.rs b/src/bot/master.rs index ce8b25f..641938a 100644 --- a/src/bot/master.rs +++ b/src/bot/master.rs @@ -1,5 +1,6 @@ use std::future::Future; use std::sync::{Arc, Mutex}; +use std::collections::HashMap; use futures::future::{FutureExt, TryFutureExt}; use futures01::future::Future as Future01; @@ -17,7 +18,7 @@ use crate::bot::{MusicBot, MusicBotMessage, MusicBotArgs}; pub struct MasterBot { config: MasterConfig, teamspeak: Option>, - connected_bots: Arc>>>, + connected_bots: Arc>>>, } impl MasterBot { @@ -63,7 +64,7 @@ impl MasterBot { let bot = Arc::new(Self { config, teamspeak: connection, - connected_bots: Arc::new(Mutex::new(Vec::new())), + connected_bots: Arc::new(Mutex::new(HashMap::new())), }); let cbot = bot.clone(); @@ -85,22 +86,31 @@ impl MasterBot { String::from("local") }; - info!("Connecting to {} on {}", channel, self.config.address); let preset = self.config.bots[0].clone(); + let name = format!("{}({})", preset.name, self.config.name); + + let cconnected_bots = self.connected_bots.clone(); + let disconnect_cb = Box::new(move |n| { + let mut bots = cconnected_bots.lock().expect("Mutex was not poisoned"); + bots.remove(&n); + }); + + info!("Connecting to {} on {}", channel, self.config.address); let bot_args = MusicBotArgs { - name: format!("{}({})", preset.name, self.config.name), + name: name.clone(), owner: preset.owner, local: self.config.local, address: self.config.address.clone(), id: preset.id, channel, verbose: self.config.verbose, + disconnect_cb, }; let (app, fut) = MusicBot::new(bot_args).await; tokio::spawn(fut.unit_error().boxed().compat().map(|_| ())); let mut bots = self.connected_bots.lock().expect("Mutex was not poisoned"); - bots.push(app); + bots.insert(name, app); } async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> { diff --git a/src/bot/music.rs b/src/bot/music.rs index 3677796..4d67f88 100644 --- a/src/bot/music.rs +++ b/src/bot/music.rs @@ -44,7 +44,6 @@ pub struct MusicBot { state: Arc>, } -#[derive(Debug)] pub struct MusicBotArgs { pub name: String, pub owner: Option, @@ -53,6 +52,7 @@ pub struct MusicBotArgs { pub id: Identity, pub channel: String, pub verbose: u8, + pub disconnect_cb: Box, } impl MusicBot { @@ -104,7 +104,7 @@ impl MusicBot { } let bot = Arc::new(Self { - name: args.name, + name: args.name.clone(), player, teamspeak: connection, playlist, @@ -112,11 +112,14 @@ impl MusicBot { }); let cbot = bot.clone(); + let mut disconnect_cb = args.disconnect_cb; + let name = args.name; let msg_loop = async move { 'outer: loop { while let Some(msg) = rx.recv().await { if let MusicBotMessage::Quit(reason) = msg { cbot.with_teamspeak(|ts| ts.disconnect(&reason)); + disconnect_cb(name); break 'outer; } cbot.on_message(msg).await.unwrap(); -- cgit v1.2.3-70-g09d2