aboutsummaryrefslogtreecommitdiffstats
path: root/src/bot/music.rs
diff options
context:
space:
mode:
authorJokler <jokler@protonmail.com>2020-10-15 13:11:54 +0000
committerGitHub <noreply@github.com>2020-10-15 13:11:54 +0000
commit43974717fee9a98701c6efa2e7221cdbfe7e537e (patch)
tree93fe1d75477ae3d1c8466611a2cedd7bed316aa2 /src/bot/music.rs
parent23671b51b4e207574a63bce820acbf43169e2b6c (diff)
parent4e1c2b9f04073294ecb8402486c20d9c01721598 (diff)
downloadpokebot-43974717fee9a98701c6efa2e7221cdbfe7e537e.tar.gz
pokebot-43974717fee9a98701c6efa2e7221cdbfe7e537e.zip
Merge pull request #70 from Mavulp/actor-bots
Replace channels&locks with actors & log with slog
Diffstat (limited to 'src/bot/music.rs')
-rw-r--r--src/bot/music.rs500
1 files changed, 273 insertions, 227 deletions
diff --git a/src/bot/music.rs b/src/bot/music.rs
index 90305d0..a57b66c 100644
--- a/src/bot/music.rs
+++ b/src/bot/music.rs
@@ -1,25 +1,22 @@
-use std::future::Future;
-use std::io::BufRead;
-use std::sync::{Arc, RwLock};
-use std::thread;
-use std::time::Duration;
+use async_trait::async_trait;
-use log::{debug, info};
use serde::Serialize;
+use slog::{debug, info, Logger};
use structopt::StructOpt;
-use tokio::sync::mpsc::UnboundedSender;
use tsclientlib::{data, ChannelId, ClientId, Connection, Identity, Invoker, MessageTarget};
+use xtra::{spawn::Tokio, Actor, Address, Context, Handler, Message, WeakAddress};
-use crate::audio_player::{AudioPlayer, AudioPlayerError, PollResult};
+use crate::audio_player::{AudioPlayer, AudioPlayerError};
+use crate::bot::{BotDisonnected, Connect, MasterBot, Quit};
use crate::command::Command;
use crate::command::VolumeChange;
use crate::playlist::Playlist;
use crate::teamspeak as ts;
-use crate::youtube_dl::AudioMetadata;
+use crate::youtube_dl::{self, AudioMetadata};
use ts::TeamSpeakConnection;
#[derive(Debug)]
-pub struct Message {
+pub struct ChatMessage {
pub target: MessageTarget,
pub invoker: Invoker,
pub text: String,
@@ -33,6 +30,10 @@ pub enum State {
EndOfStream,
}
+impl Message for State {
+ type Result = ();
+}
+
impl std::fmt::Display for State {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
match self {
@@ -47,7 +48,7 @@ impl std::fmt::Display for State {
#[derive(Debug)]
pub enum MusicBotMessage {
- TextMessage(Message),
+ TextMessage(ChatMessage),
ClientChannel {
client: ClientId,
old_channel: ChannelId,
@@ -59,112 +60,95 @@ pub enum MusicBotMessage {
client: Box<data::Client>,
},
StateChange(State),
- Quit(String),
+}
+
+impl Message for MusicBotMessage {
+ type Result = Result<(), AudioPlayerError>;
}
pub struct MusicBot {
name: String,
- player: Arc<AudioPlayer>,
+ identity: Identity,
+ player: AudioPlayer,
teamspeak: Option<TeamSpeakConnection>,
- playlist: Arc<RwLock<Playlist>>,
- state: Arc<RwLock<State>>,
+ master: Option<WeakAddress<MasterBot>>,
+ playlist: Playlist,
+ state: State,
+ logger: Logger,
}
pub struct MusicBotArgs {
pub name: String,
- pub name_index: usize,
- pub id_index: usize,
+ pub master: Option<WeakAddress<MasterBot>>,
pub local: bool,
pub address: String,
- pub id: Identity,
+ pub identity: Identity,
pub channel: String,
pub verbose: u8,
- pub disconnect_cb: Box<dyn FnMut(String, usize, usize) + Send + Sync>,
+ pub logger: Logger,
}
impl MusicBot {
- pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future<Output = ()>) {
- let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
- let tx = Arc::new(RwLock::new(tx));
- let (player, connection) = if args.local {
- info!("Starting in CLI mode");
- let audio_player = AudioPlayer::new(tx.clone(), None).unwrap();
-
- (audio_player, None)
- } else {
- info!("Starting in TeamSpeak mode");
-
- let con_config = Connection::build(args.address)
- .version(tsclientlib::Version::Linux_3_3_2)
- .name(format!("🎵 {}", args.name))
- .identity(args.id)
- .log_commands(args.verbose >= 1)
- .log_packets(args.verbose >= 2)
- .log_udp_packets(args.verbose >= 3)
- .channel(args.channel);
-
- let connection = TeamSpeakConnection::new(tx.clone(), con_config)
- .await
- .unwrap();
- let mut cconnection = connection.clone();
- let audio_player = AudioPlayer::new(
- tx.clone(),
- Some(Box::new(move |samples| {
- let mut rt = tokio::runtime::Runtime::new().unwrap();
- rt.block_on(cconnection.send_audio_packet(samples));
- })),
- )
- .unwrap();
-
- (audio_player, Some(connection))
- };
-
+ pub async fn spawn(args: MusicBotArgs) -> Address<Self> {
+ let mut player = AudioPlayer::new(args.logger.clone()).unwrap();
player.change_volume(VolumeChange::Absolute(0.5)).unwrap();
- let player = Arc::new(player);
- let playlist = Arc::new(RwLock::new(Playlist::new()));
- spawn_gstreamer_thread(player.clone(), tx.clone());
+ let playlist = Playlist::new(args.logger.clone());
- if args.local {
- spawn_stdin_reader(tx);
- }
+ let teamspeak = if args.local {
+ info!(args.logger, "Starting in CLI mode");
+ player.setup_with_audio_callback(None).unwrap();
- let bot = Arc::new(Self {
+ None
+ } else {
+ Some(TeamSpeakConnection::new(args.logger.clone()).await.unwrap())
+ };
+ let bot = Self {
name: args.name.clone(),
+ master: args.master,
+ identity: args.identity.clone(),
player,
- teamspeak: connection,
+ teamspeak,
playlist,
- state: Arc::new(RwLock::new(State::EndOfStream)),
- });
-
- let cbot = bot.clone();
- let mut disconnect_cb = args.disconnect_cb;
- let name = args.name;
- let name_index = args.name_index;
- let id_index = args.id_index;
- let msg_loop = async move {
- 'outer: loop {
- while let Some(msg) = rx.recv().await {
- if let MusicBotMessage::Quit(reason) = msg {
- if let Some(ts) = &cbot.teamspeak {
- let mut ts = ts.clone();
- ts.disconnect(&reason).await;
- }
- disconnect_cb(name, name_index, id_index);
- break 'outer;
- }
- cbot.on_message(msg).await.unwrap();
- }
- }
- debug!("Left message loop");
+ state: State::EndOfStream,
+ logger: args.logger.clone(),
};
- bot.update_name(State::EndOfStream).await;
+ let bot_addr = bot.create(None).spawn(&mut Tokio::Global);
+
+ info!(
+ args.logger,
+ "Connecting";
+ "name" => &args.name,
+ "channel" => &args.channel,
+ "address" => &args.address,
+ );
+
+ let opt = Connection::build(args.address)
+ .logger(args.logger.clone())
+ .version(tsclientlib::Version::Linux_3_3_2)
+ .name(format!("🎵 {}", args.name))
+ .identity(args.identity)
+ .log_commands(args.verbose >= 1)
+ .log_packets(args.verbose >= 2)
+ .log_udp_packets(args.verbose >= 3)
+ .channel(args.channel);
+ bot_addr.send(Connect(opt)).await.unwrap().unwrap();
+ bot_addr
+ .send(MusicBotMessage::StateChange(State::EndOfStream))
+ .await
+ .unwrap()
+ .unwrap();
+
+ if args.local {
+ debug!(args.logger, "Spawning stdin reader thread");
+ spawn_stdin_reader(bot_addr.downgrade());
+ }
- (bot, msg_loop)
+ bot_addr
}
- async fn start_playing_audio(&self, metadata: AudioMetadata) {
+ async fn start_playing_audio(&mut self, metadata: AudioMetadata) {
let duration = if let Some(duration) = metadata.duration {
format!("({})", ts::bold(&humantime::format_duration(duration)))
} else {
@@ -184,25 +168,16 @@ impl MusicBot {
self.player.play().unwrap();
}
- pub async fn add_audio(&self, url: String, user: String) {
- match crate::youtube_dl::get_audio_download_from_url(url).await {
+ pub async fn add_audio(&mut self, url: String, user: String) {
+ match youtube_dl::get_audio_download_from_url(url, &self.logger).await {
Ok(mut metadata) => {
metadata.added_by = user;
- info!("Found audio url: {}", metadata.url);
+ info!(self.logger, "Found source"; "url" => &metadata.url);
- // RWLockGuard can not be kept around or the compiler complains that
- // it might cross the await boundary
- self.playlist
- .write()
- .expect("RwLock was not poisoned")
- .push(metadata.clone());
+ self.playlist.push(metadata.clone());
if !self.player.is_started() {
- let entry = self
- .playlist
- .write()
- .expect("RwLock was not poisoned")
- .pop();
+ let entry = self.playlist.pop();
if let Some(request) = entry {
self.start_playing_audio(request).await;
}
@@ -222,7 +197,7 @@ impl MusicBot {
}
}
Err(e) => {
- info!("Failed to find audio url: {}", e);
+ info!(self.logger, "Failed to find audio url"; "error" => &e);
self.send_message(format!("Failed to find url: {}", e))
.await;
@@ -235,74 +210,50 @@ impl MusicBot {
}
pub fn state(&self) -> State {
- *self.state.read().expect("RwLock was not poisoned")
+ self.state
}
- pub fn volume(&self) -> f64 {
+ pub async fn volume(&self) -> f64 {
self.player.volume()
}
- pub fn position(&self) -> Option<Duration> {
- self.player.position()
- }
-
- pub fn currently_playing(&self) -> Option<AudioMetadata> {
- self.player.currently_playing()
- }
+ pub async fn current_channel(&mut self) -> Option<ChannelId> {
+ let ts = self.teamspeak.as_mut().expect("current_channel needs ts");
- pub fn playlist_to_vec(&self) -> Vec<AudioMetadata> {
- self.playlist.read().unwrap().to_vec()
+ ts.current_channel().await
}
- pub async fn my_channel(&self) -> ChannelId {
- let ts = self.teamspeak.as_ref().expect("my_channel needs ts");
-
- let mut ts = ts.clone();
- ts.my_channel().await
- }
+ async fn user_count(&mut self, channel: ChannelId) -> u32 {
+ let ts = self.teamspeak.as_mut().expect("user_count needs ts");
- async fn user_count(&self, channel: ChannelId) -> u32 {
- let ts = self.teamspeak.as_ref().expect("user_count needs ts");
-
- let mut ts = ts.clone();
ts.user_count(channel).await
}
- async fn send_message(&self, text: String) {
- debug!("Sending message to TeamSpeak: {}", text);
+ async fn send_message(&mut self, text: String) {
+ debug!(self.logger, "Sending message to TeamSpeak"; "message" => &text);
- if let Some(ts) = &self.teamspeak {
- let mut ts = ts.clone();
+ if let Some(ts) = &mut self.teamspeak {
ts.send_message_to_channel(text).await;
}
}
- async fn set_nickname(&self, name: String) {
- info!("Setting TeamSpeak nickname: {}", name);
+ async fn set_nickname(&mut self, name: String) {
+ info!(self.logger, "Setting TeamSpeak nickname"; "name" => &name);
- if let Some(ts) = &self.teamspeak {
- let mut ts = ts.clone();
+ if let Some(ts) = &mut self.teamspeak {
ts.set_nickname(name).await;
}
}
- async fn set_description(&self, desc: String) {
- info!("Setting TeamSpeak description: {}", desc);
+ async fn set_description(&mut self, desc: String) {
+ info!(self.logger, "Setting TeamSpeak description"; "description" => &desc);
- if let Some(ts) = &self.teamspeak {
- let mut ts = ts.clone();
+ if let Some(ts) = &mut self.teamspeak {
ts.set_description(desc).await;
}
}
- async fn subscribe(&self, id: ChannelId) {
- if let Some(ts) = &self.teamspeak {
- let mut ts = ts.clone();
- ts.subscribe(id).await;
- }
- }
-
- async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> {
+ async fn on_text(&mut self, message: ChatMessage) -> Result<(), AudioPlayerError> {
let msg = message.text;
if msg.starts_with('!') {
let tokens = msg[1..].split_whitespace().collect::<Vec<_>>();
@@ -319,13 +270,15 @@ impl MusicBot {
Ok(())
}
- async fn on_command(&self, command: Command, invoker: Invoker) -> Result<(), AudioPlayerError> {
+ async fn on_command(
+ &mut self,
+ command: Command,
+ invoker: Invoker,
+ ) -> Result<(), AudioPlayerError> {
match command {
Command::Play => {
- let playlist = self.playlist.read().expect("RwLock was not poisoned");
-
if !self.player.is_started() {
- if !playlist.is_empty() {
+ if !self.playlist.is_empty() {
self.player.stop_current()?;
}
} else {
@@ -357,35 +310,32 @@ impl MusicBot {
}
}
Command::Next => {
- let playlist = self.playlist.read().expect("RwLock was not poisoned");
- if !playlist.is_empty() {
- info!("Skipping to next track");
+ if !self.playlist.is_empty() {
+ info!(self.logger, "Skipping to next track");
self.player.stop_current()?;
} else {
- info!("Playlist empty, cannot skip");
+ info!(self.logger, "Playlist empty, cannot skip");
self.player.reset()?;
}
}
Command::Clear => {
- self.playlist
- .write()
- .expect("RwLock was not poisoned")
- .clear();
+ self.send_message(String::from("Cleared playlist")).await;
+ self.playlist.clear();
}
Command::Volume { volume } => {
self.player.change_volume(volume)?;
self.update_name(self.state()).await;
}
Command::Leave => {
- self.quit(String::from("Leaving"));
+ self.quit(String::from("Leaving"), true).await.unwrap();
}
}
Ok(())
}
- async fn update_name(&self, state: State) {
- let volume = (self.volume() * 100.0).round();
+ async fn update_name(&mut self, state: State) {
+ let volume = (self.volume().await * 100.0).round();
let name = match state {
State::EndOfStream => format!("🎵 {} ({}%)", self.name, volume),
_ => format!("🎵 {} - {} ({}%)", self.name, state, volume),
@@ -393,43 +343,39 @@ impl MusicBot {
self.set_nickname(name).await;
}
- async fn on_state(&self, state: State) -> Result<(), AudioPlayerError> {
- let current_state = *self.state.read().unwrap();
- if current_state != state {
- match state {
+ async fn on_state(&mut self, new_state: State) -> Result<(), AudioPlayerError> {
+ if self.state != new_state {
+ match new_state {
State::EndOfStream => {
- let next_track = self
- .playlist
- .write()
- .expect("RwLock was not poisoned")
- .pop();
+ self.player.reset()?;
+ let next_track = self.playlist.pop();
if let Some(request) = next_track {
- info!("Advancing playlist");
+ info!(self.logger, "Advancing playlist");
self.start_playing_audio(request).await;
} else {
- self.update_name(state).await;
+ self.update_name(new_state).await;
self.set_description(String::new()).await;
}
}
State::Stopped => {
- if current_state != State::EndOfStream {
- self.update_name(state).await;
+ if self.state != State::EndOfStream {
+ self.update_name(new_state).await;
self.set_description(String::new()).await;
}
}
- _ => self.update_name(state).await,
+ _ => self.update_name(new_state).await,
}
}
- if !(current_state == State::EndOfStream && state == State::Stopped) {
- *self.state.write().unwrap() = state;
+ if !(self.state == State::EndOfStream && new_state == State::Stopped) {
+ self.state = new_state;
}
Ok(())
}
- async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> {
+ async fn on_message(&mut self, message: MusicBotMessage) -> Result<(), AudioPlayerError> {
match message {
MusicBotMessage::TextMessage(message) => {
if MessageTarget::Channel == message.target {
@@ -446,9 +392,6 @@ impl MusicBot {
let old_channel = client.channel;
self.on_client_left_channel(old_channel).await;
}
- MusicBotMessage::ChannelAdded(id) => {
- self.subscribe(id).await;
- }
MusicBotMessage::StateChange(state) => {
self.on_state(state).await?;
}
@@ -458,60 +401,163 @@ impl MusicBot {
Ok(())
}
- async fn on_client_left_channel(&self, old_channel: ChannelId) {
- let my_channel = self.my_channel().await;
- if old_channel == my_channel && self.user_count(my_channel).await <= 1 {
- self.quit(String::from("Channel is empty"));
+ // FIXME logs an error if this music bot is the one leaving
+ async fn on_client_left_channel(&mut self, old_channel: ChannelId) {
+ let current_channel = match self.current_channel().await {
+ Some(c) => c,
+ None => {
+ return;
+ }
+ };
+ if old_channel == current_channel && self.user_count(current_channel).await <= 1 {
+ self.quit(String::from("Channel is empty"), true)
+ .await
+ .unwrap();
+ }
+ }
+
+ pub async fn quit(
+ &mut self,
+ reason: String,
+ inform_master: bool,
+ ) -> Result<(), tsclientlib::Error> {
+ // FIXME logs errors if the bot is playing something because it tries to
+ // change its name and description
+ self.player.reset().unwrap();
+
+ let ts = self.teamspeak.as_mut().unwrap();
+ ts.disconnect(&reason).await?;
+
+ if inform_master {
+ if let Some(master) = &self.master {
+ master
+ .send(BotDisonnected {
+ name: self.name.clone(),
+ identity: self.identity.clone(),
+ })
+ .await
+ .unwrap();
+ }
+ }
+
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl Actor for MusicBot {
+ async fn started(&mut self, ctx: &mut Context<Self>) {
+ let addr = ctx.address().unwrap().downgrade();
+ self.player.register_bot(addr);
+ }
+}
+
+#[async_trait]
+impl Handler<Connect> for MusicBot {
+ async fn handle(
+ &mut self,
+ opt: Connect,
+ ctx: &mut Context<Self>,
+ ) -> Result<(), tsclientlib::Error> {
+ let addr = ctx.address().unwrap().downgrade();
+ self.teamspeak
+ .as_mut()
+ .unwrap()
+ .connect_for_bot(opt.0, addr)?;
+
+ let mut connection = self.teamspeak.as_ref().unwrap().clone();
+ let handle = tokio::runtime::Handle::current();
+ self.player
+ .setup_with_audio_callback(Some(Box::new(move |samples| {
+ handle.block_on(connection.send_audio_packet(samples));
+ })))
+ .unwrap();
+
+ Ok(())
+ }
+}
+
+pub struct GetName;
+impl Message for GetName {
+ type Result = String;
+}
+
+#[async_trait]
+impl Handler<GetName> for MusicBot {
+ async fn handle(&mut self, _: GetName, _: &mut Context<Self>) -> String {
+ self.name().to_owned()
+ }
+}
+
+pub struct GetBotData;
+impl Message for GetBotData {
+ type Result = crate::web_server::BotData;
+}
+
+#[async_trait]
+impl Handler<GetBotData> for MusicBot {
+ async fn handle(&mut self, _: GetBotData, _: &mut Context<Self>) -> crate::web_server::BotData {
+ crate::web_server::BotData {
+ name: self.name.clone(),
+ playlist: self.playlist.to_vec(),
+ currently_playing: self.player.currently_playing(),
+ position: self.player.position(),
+ state: self.state(),
+ volume: self.volume().await,
}
}
+}
- pub fn quit(&self, reason: String) {
- self.player.quit(reason);
+pub struct GetChannel;
+impl Message for GetChannel {
+ type Result = Option<ChannelId>;
+}
+
+#[async_trait]
+impl Handler<GetChannel> for MusicBot {
+ async fn handle(&mut self, _: GetChannel, _: &mut Context<Self>) -> Option<ChannelId> {
+ self.current_channel().await
}
}
-fn spawn_stdin_reader(tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>) {
- debug!("Spawning stdin reader thread");
- thread::Builder::new()
- .name(String::from("stdin reader"))
- .spawn(move || {
- let stdin = ::std::io::stdin();
- let lock = stdin.lock();
- for line in lock.lines() {
- let line = line.unwrap();
-
- let message = MusicBotMessage::TextMessage(Message {
- target: MessageTarget::Channel,
- invoker: Invoker {
- name: String::from("stdin"),
- id: ClientId(0),
- uid: None,
- },
- text: line,
- });
-
- let tx = tx.read().unwrap();
- tx.send(message).unwrap();
- }
- })
- .expect("Failed to spawn stdin reader thread");
+#[async_trait]
+impl Handler<Quit> for MusicBot {
+ async fn handle(&mut self, q: Quit, _: &mut Context<Self>) -> Result<(), tsclientlib::Error> {
+ self.quit(q.0, false).await
+ }
}
-fn spawn_gstreamer_thread(
- player: Arc<AudioPlayer>,
- tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
-) {
- thread::Builder::new()
- .name(String::from("gstreamer polling"))
- .spawn(move || loop {
- if player.poll() == PollResult::Quit {
- break;
- }
+#[async_trait]
+impl Handler<MusicBotMessage> for MusicBot {
+ async fn handle(
+ &mut self,
+ msg: MusicBotMessage,
+ _: &mut Context<Self>,
+ ) -> Result<(), AudioPlayerError> {
+ self.on_message(msg).await
+ }
+}
- tx.read()
- .unwrap()
- .send(MusicBotMessage::StateChange(State::EndOfStream))
- .unwrap();
- })
- .expect("Failed to spawn gstreamer thread");
+fn spawn_stdin_reader(addr: WeakAddress<MusicBot>) {
+ use tokio::io::AsyncBufReadExt;
+
+ tokio::task::spawn(async move {
+ let stdin = tokio::io::stdin();
+ let reader = tokio::io::BufReader::new(stdin);
+ let mut lines = reader.lines();
+
+ while let Some(line) = lines.next_line().await.unwrap() {
+ let message = MusicBotMessage::TextMessage(ChatMessage {
+ target: MessageTarget::Channel,
+ invoker: Invoker {
+ name: String::from("stdin"),
+ id: ClientId(0),
+ uid: None,
+ },
+ text: line,
+ });
+
+ addr.send(message).await.unwrap().unwrap();
+ }
+ });
}