aboutsummaryrefslogtreecommitdiffstats
path: root/src/bot
diff options
context:
space:
mode:
authorJokler <jokler@protonmail.com>2020-10-14 00:19:27 +0200
committerJokler <jokler@protonmail.com>2020-10-15 01:45:29 +0200
commit4e1c2b9f04073294ecb8402486c20d9c01721598 (patch)
tree93fe1d75477ae3d1c8466611a2cedd7bed316aa2 /src/bot
parent23671b51b4e207574a63bce820acbf43169e2b6c (diff)
downloadpokebot-4e1c2b9f04073294ecb8402486c20d9c01721598.tar.gz
pokebot-4e1c2b9f04073294ecb8402486c20d9c01721598.zip
Replace channels&locks with actors & log with slog
Diffstat (limited to 'src/bot')
-rw-r--r--src/bot/master.rs363
-rw-r--r--src/bot/music.rs500
2 files changed, 445 insertions, 418 deletions
diff --git a/src/bot/master.rs b/src/bot/master.rs
index 1480e17..94332ac 100644
--- a/src/bot/master.rs
+++ b/src/bot/master.rs
@@ -1,41 +1,52 @@
use std::collections::HashMap;
-use std::future::Future;
-use std::sync::{Arc, RwLock};
-use log::info;
+use async_trait::async_trait;
+use futures::future;
use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
use serde::{Deserialize, Serialize};
-use tokio::sync::mpsc::UnboundedSender;
-use tsclientlib::{ClientId, Connection, Identity, MessageTarget};
+use slog::{error, info, o, trace, Logger};
+use tsclientlib::{ClientId, ConnectOptions, Connection, Identity, MessageTarget};
+use xtra::{spawn::Tokio, Actor, Address, Context, Handler, Message, WeakAddress};
use crate::audio_player::AudioPlayerError;
use crate::teamspeak::TeamSpeakConnection;
use crate::Args;
-use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage};
+use crate::bot::{GetBotData, GetChannel, GetName, MusicBot, MusicBotArgs, MusicBotMessage};
pub struct MasterBot {
- config: Arc<MasterConfig>,
- music_bots: Arc<RwLock<MusicBots>>,
+ config: MasterConfig,
+ my_addr: Option<WeakAddress<Self>>,
teamspeak: TeamSpeakConnection,
- sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
+ available_names: Vec<String>,
+ available_ids: Vec<Identity>,
+ connected_bots: HashMap<String, Address<MusicBot>>,
+ rng: SmallRng,
+ logger: Logger,
}
-struct MusicBots {
- rng: SmallRng,
- available_names: Vec<usize>,
- available_ids: Vec<usize>,
- connected_bots: HashMap<String, Arc<MusicBot>>,
+#[derive(Debug, Serialize, Deserialize)]
+pub struct MasterArgs {
+ #[serde(default = "default_name")]
+ pub master_name: String,
+ pub address: String,
+ pub channel: Option<String>,
+ #[serde(default = "default_verbose")]
+ pub verbose: u8,
+ pub domain: String,
+ pub bind_address: String,
+ pub names: Vec<String>,
+ pub id: Option<Identity>,
+ pub ids: Option<Vec<Identity>>,
}
impl MasterBot {
- pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) {
- let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
- let tx = Arc::new(RwLock::new(tx));
- info!("Starting in TeamSpeak mode");
+ pub async fn spawn(args: MasterArgs, logger: Logger) -> Address<Self> {
+ info!(logger, "Starting in TeamSpeak mode");
let mut con_config = Connection::build(args.address.clone())
+ .logger(logger.clone())
.version(tsclientlib::Version::Linux_3_3_2)
.name(args.master_name.clone())
.identity(args.id.expect("identity should exist"))
@@ -47,168 +58,116 @@ impl MasterBot {
con_config = con_config.channel(channel);
}
- let connection = TeamSpeakConnection::new(tx.clone(), con_config)
- .await
- .unwrap();
+ let connection = TeamSpeakConnection::new(logger.clone()).await.unwrap();
+ trace!(logger, "Created teamspeak connection");
- let config = Arc::new(MasterConfig {
+ let config = MasterConfig {
master_name: args.master_name,
address: args.address,
- names: args.names,
- ids: args.ids.expect("identies should exists"),
- local: args.local,
verbose: args.verbose,
- });
-
- let name_count = config.names.len();
- let id_count = config.ids.len();
+ };
- let music_bots = Arc::new(RwLock::new(MusicBots {
+ let bot_addr = Self {
+ config,
+ my_addr: None,
+ teamspeak: connection,
+ logger: logger.clone(),
rng: SmallRng::from_entropy(),
- available_names: (0..name_count).collect(),
- available_ids: (0..id_count).collect(),
+ available_names: args.names,
+ available_ids: args.ids.expect("identities"),
connected_bots: HashMap::new(),
- }));
+ }
+ .create(None)
+ .spawn(&mut Tokio::Global);
- let bot = Arc::new(Self {
- config,
- music_bots,
- teamspeak: connection,
- sender: tx.clone(),
- });
-
- let cbot = bot.clone();
- let msg_loop = async move {
- 'outer: loop {
- while let Some(msg) = rx.recv().await {
- match msg {
- MusicBotMessage::Quit(reason) => {
- let mut cteamspeak = cbot.teamspeak.clone();
- cteamspeak.disconnect(&reason).await;
- break 'outer;
- }
- MusicBotMessage::ClientDisconnected { id, .. } => {
- if id == cbot.my_id().await {
- // TODO Reconnect since quit was not called
- break 'outer;
- }
- }
- _ => cbot.on_message(msg).await.unwrap(),
- }
- }
- }
- };
+ bot_addr.send(Connect(con_config)).await.unwrap().unwrap();
+ trace!(logger, "Spawned master bot actor");
- (bot, msg_loop)
+ bot_addr
}
- async fn build_bot_args_for(&self, id: ClientId) -> Result<MusicBotArgs, BotCreationError> {
- let mut cteamspeak = self.teamspeak.clone();
- let channel = match cteamspeak.channel_of_user(id).await {
+ async fn bot_args_for_client(
+ &mut self,
+ user_id: ClientId,
+ ) -> Result<MusicBotArgs, BotCreationError> {
+ let channel = match self.teamspeak.channel_of_user(user_id).await {
Some(channel) => channel,
None => return Err(BotCreationError::UnfoundUser),
};
- if channel == cteamspeak.my_channel().await {
+ if channel == self.teamspeak.current_channel().await.unwrap() {
return Err(BotCreationError::MasterChannel(
self.config.master_name.clone(),
));
}
- let MusicBots {
- ref mut rng,
- ref mut available_names,
- ref mut available_ids,
- ref connected_bots,
- } = &mut *self.music_bots.write().expect("RwLock was not poisoned");
-
- for bot in connected_bots.values() {
- if bot.my_channel().await == channel {
- return Err(BotCreationError::MultipleBots(bot.name().to_owned()));
+ for bot in self.connected_bots.values() {
+ if bot.send(GetChannel).await.unwrap() == Some(channel) {
+ return Err(BotCreationError::MultipleBots(
+ bot.send(GetName).await.unwrap(),
+ ));
}
}
- let channel_path = cteamspeak
- .channel_path_of_user(id)
+ let channel_path = self
+ .teamspeak
+ .channel_path_of_user(user_id)
.await
.expect("can find poke sender");
- available_names.shuffle(rng);
- let name_index = match available_names.pop() {
+ self.available_names.shuffle(&mut self.rng);
+ let name = match self.available_names.pop() {
Some(v) => v,
None => {
return Err(BotCreationError::OutOfNames);
}
};
- let name = self.config.names[name_index].clone();
- available_ids.shuffle(rng);
- let id_index = match available_ids.pop() {
+ self.available_ids.shuffle(&mut self.rng);
+ let identity = match self.available_ids.pop() {
Some(v) => v,
None => {
return Err(BotCreationError::OutOfIdentities);
}
};
- let id = self.config.ids[id_index].clone();
-
- let cmusic_bots = self.music_bots.clone();
- let disconnect_cb = Box::new(move |n, name_index, id_index| {
- let mut music_bots = cmusic_bots.write().expect("RwLock was not poisoned");
- music_bots.connected_bots.remove(&n);
- music_bots.available_names.push(name_index);
- music_bots.available_ids.push(id_index);
- });
-
- info!("Connecting to {} on {}", channel_path, self.config.address);
-
Ok(MusicBotArgs {
- name,
- name_index,
- id_index,
- local: self.config.local,
+ name: name.clone(),
+ master: self.my_addr.clone(),
address: self.config.address.clone(),
- id,
+ identity,
+ local: false,
channel: channel_path,
verbose: self.config.verbose,
- disconnect_cb,
+ logger: self.logger.new(o!("musicbot" => name)),
})
}
- async fn spawn_bot_for(&self, id: ClientId) {
- match self.build_bot_args_for(id).await {
+ async fn spawn_bot_for_client(&mut self, id: ClientId) {
+ match self.bot_args_for_client(id).await {
Ok(bot_args) => {
- let (bot, fut) = MusicBot::new(bot_args).await;
- tokio::spawn(fut);
- let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned");
- music_bots
- .connected_bots
- .insert(bot.name().to_string(), bot);
- }
- Err(e) => {
- let mut cteamspeak = self.teamspeak.clone();
- cteamspeak.send_message_to_user(id, e.to_string()).await
+ let name = bot_args.name.clone();
+ let bot = MusicBot::spawn(bot_args).await;
+ self.connected_bots.insert(name, bot);
}
+ Err(e) => self.teamspeak.send_message_to_user(id, e.to_string()).await,
}
}
- async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> {
+ async fn on_message(&mut self, message: MusicBotMessage) -> Result<(), AudioPlayerError> {
match message {
MusicBotMessage::TextMessage(message) => {
if let MessageTarget::Poke(who) = message.target {
- info!("Poked by {}, creating bot for their channel", who);
- self.spawn_bot_for(who).await;
+ info!(
+ self.logger,
+ "Poked, creating bot"; "user" => %who
+ );
+ self.spawn_bot_for_client(who).await;
}
}
- MusicBotMessage::ChannelAdded(id) => {
- let mut cteamspeak = self.teamspeak.clone();
- cteamspeak.subscribe(id).await;
- }
MusicBotMessage::ClientAdded(id) => {
- let mut cteamspeak = self.teamspeak.clone();
-
- if id == cteamspeak.my_id().await {
- cteamspeak
+ if id == self.teamspeak.my_id().await {
+ self.teamspeak
.set_description(String::from("Poke me if you want a music bot!"))
.await;
}
@@ -219,41 +178,17 @@ impl MasterBot {
Ok(())
}
- async fn my_id(&self) -> ClientId {
- let mut cteamspeak = self.teamspeak.clone();
+ pub async fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> {
+ let bot = self.connected_bots.get(&name)?;
- cteamspeak.my_id().await
+ bot.send(GetBotData).await.ok()
}
- pub fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> {
- let music_bots = self.music_bots.read().unwrap();
- let bot = music_bots.connected_bots.get(&name)?;
-
- Some(crate::web_server::BotData {
- name,
- state: bot.state(),
- volume: bot.volume(),
- position: bot.position(),
- currently_playing: bot.currently_playing(),
- playlist: bot.playlist_to_vec(),
- })
- }
-
- pub fn bot_datas(&self) -> Vec<crate::web_server::BotData> {
- let music_bots = self.music_bots.read().unwrap();
-
- let len = music_bots.connected_bots.len();
+ pub async fn bot_datas(&self) -> Vec<crate::web_server::BotData> {
+ let len = self.connected_bots.len();
let mut result = Vec::with_capacity(len);
- for (name, bot) in &music_bots.connected_bots {
- let bot_data = crate::web_server::BotData {
- name: name.clone(),
- state: bot.state(),
- volume: bot.volume(),
- position: bot.position(),
- currently_playing: bot.currently_playing(),
- playlist: bot.playlist_to_vec(),
- };
-
+ for bot in self.connected_bots.values() {
+ let bot_data = bot.send(GetBotData).await.unwrap();
result.push(bot_data);
}
@@ -261,24 +196,96 @@ impl MasterBot {
}
pub fn bot_names(&self) -> Vec<String> {
- let music_bots = self.music_bots.read().unwrap();
-
- let len = music_bots.connected_bots.len();
+ let len = self.connected_bots.len();
let mut result = Vec::with_capacity(len);
- for name in music_bots.connected_bots.keys() {
+ for name in self.connected_bots.keys() {
result.push(name.clone());
}
result
}
- pub fn quit(&self, reason: String) {
- let music_bots = self.music_bots.read().unwrap();
- for bot in music_bots.connected_bots.values() {
- bot.quit(reason.clone())
+ fn on_bot_disconnect(&mut self, name: String, id: Identity) {
+ self.connected_bots.remove(&name);
+ self.available_names.push(name);
+ self.available_ids.push(id);
+ }
+
+ pub async fn quit(&mut self, reason: String) -> Result<(), tsclientlib::Error> {
+ let futures = self
+ .connected_bots
+ .values()
+ .map(|b| b.send(Quit(reason.clone())));
+ for res in future::join_all(futures).await {
+ if let Err(e) = res {
+ error!(self.logger, "Failed to shut down bot"; "error" => %e);
+ }
}
- let sender = self.sender.read().unwrap();
- sender.send(MusicBotMessage::Quit(reason)).unwrap();
+ self.teamspeak.disconnect(&reason).await
+ }
+}
+
+#[async_trait]
+impl Actor for MasterBot {
+ async fn started(&mut self, ctx: &mut Context<Self>) {
+ self.my_addr = Some(ctx.address().unwrap().downgrade());
+ }
+}
+
+pub struct Connect(pub ConnectOptions);
+impl Message for Connect {
+ type Result = Result<(), tsclientlib::Error>;
+}
+
+#[async_trait]
+impl Handler<Connect> for MasterBot {
+ async fn handle(
+ &mut self,
+ opt: Connect,
+ ctx: &mut Context<Self>,
+ ) -> Result<(), tsclientlib::Error> {
+ let addr = ctx.address().unwrap();
+ self.teamspeak.connect_for_bot(opt.0, addr.downgrade())?;
+ Ok(())
+ }
+}
+
+pub struct Quit(pub String);
+impl Message for Quit {
+ type Result = Result<(), tsclientlib::Error>;
+}
+
+#[async_trait]
+impl Handler<Quit> for MasterBot {
+ async fn handle(&mut self, q: Quit, _: &mut Context<Self>) -> Result<(), tsclientlib::Error> {
+ self.quit(q.0).await
+ }
+}
+
+pub struct BotDisonnected {
+ pub name: String,
+ pub identity: Identity,
+}
+
+impl Message for BotDisonnected {
+ type Result = ();
+}
+
+#[async_trait]
+impl Handler<BotDisonnected> for MasterBot {
+ async fn handle(&mut self, dc: BotDisonnected, _: &mut Context<Self>) {
+ self.on_bot_disconnect(dc.name, dc.identity);
+ }
+}
+
+#[async_trait]
+impl Handler<MusicBotMessage> for MasterBot {
+ async fn handle(
+ &mut self,
+ msg: MusicBotMessage,
+ _: &mut Context<Self>,
+ ) -> Result<(), AudioPlayerError> {
+ self.on_message(msg).await
}
}
@@ -313,31 +320,10 @@ impl std::fmt::Display for BotCreationError {
}
}
-#[derive(Debug, Serialize, Deserialize)]
-pub struct MasterArgs {
- #[serde(default = "default_name")]
- pub master_name: String,
- #[serde(default = "default_local")]
- pub local: bool,
- pub address: String,
- pub channel: Option<String>,
- #[serde(default = "default_verbose")]
- pub verbose: u8,
- pub domain: String,
- pub bind_address: String,
- pub names: Vec<String>,
- pub id: Option<Identity>,
- pub ids: Option<Vec<Identity>>,
-}
-
fn default_name() -> String {
String::from("PokeBot")
}
-fn default_local() -> bool {
- false
-}
-
fn default_verbose() -> u8 {
0
}
@@ -345,7 +331,6 @@ fn default_verbose() -> u8 {
impl MasterArgs {
pub fn merge(self, args: Args) -> Self {
let address = args.address.unwrap_or(self.address);
- let local = args.local || self.local;
let channel = args.master_channel.or(self.channel);
let verbose = if args.verbose > 0 {
args.verbose
@@ -357,7 +342,6 @@ impl MasterArgs {
master_name: self.master_name,
names: self.names,
ids: self.ids,
- local,
address,
domain: self.domain,
bind_address: self.bind_address,
@@ -371,8 +355,5 @@ impl MasterArgs {
pub struct MasterConfig {
pub master_name: String,
pub address: String,
- pub names: Vec<String>,
- pub ids: Vec<Identity>,
- pub local: bool,
pub verbose: u8,
}
diff --git a/src/bot/music.rs b/src/bot/music.rs
index 90305d0..a57b66c 100644
--- a/src/bot/music.rs
+++ b/src/bot/music.rs
@@ -1,25 +1,22 @@
-use std::future::Future;
-use std::io::BufRead;
-use std::sync::{Arc, RwLock};
-use std::thread;
-use std::time::Duration;
+use async_trait::async_trait;
-use log::{debug, info};
use serde::Serialize;
+use slog::{debug, info, Logger};
use structopt::StructOpt;
-use tokio::sync::mpsc::UnboundedSender;
use tsclientlib::{data, ChannelId, ClientId, Connection, Identity, Invoker, MessageTarget};
+use xtra::{spawn::Tokio, Actor, Address, Context, Handler, Message, WeakAddress};
-use crate::audio_player::{AudioPlayer, AudioPlayerError, PollResult};
+use crate::audio_player::{AudioPlayer, AudioPlayerError};
+use crate::bot::{BotDisonnected, Connect, MasterBot, Quit};
use crate::command::Command;
use crate::command::VolumeChange;
use crate::playlist::Playlist;
use crate::teamspeak as ts;
-use crate::youtube_dl::AudioMetadata;
+use crate::youtube_dl::{self, AudioMetadata};
use ts::TeamSpeakConnection;
#[derive(Debug)]
-pub struct Message {
+pub struct ChatMessage {
pub target: MessageTarget,
pub invoker: Invoker,
pub text: String,
@@ -33,6 +30,10 @@ pub enum State {
EndOfStream,
}
+impl Message for State {
+ type Result = ();
+}
+
impl std::fmt::Display for State {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
match self {
@@ -47,7 +48,7 @@ impl std::fmt::Display for State {
#[derive(Debug)]
pub enum MusicBotMessage {
- TextMessage(Message),
+ TextMessage(ChatMessage),
ClientChannel {
client: ClientId,
old_channel: ChannelId,
@@ -59,112 +60,95 @@ pub enum MusicBotMessage {
client: Box<data::Client>,
},
StateChange(State),
- Quit(String),
+}
+
+impl Message for MusicBotMessage {
+ type Result = Result<(), AudioPlayerError>;
}
pub struct MusicBot {
name: String,
- player: Arc<AudioPlayer>,
+ identity: Identity,
+ player: AudioPlayer,
teamspeak: Option<TeamSpeakConnection>,
- playlist: Arc<RwLock<Playlist>>,
- state: Arc<RwLock<State>>,
+ master: Option<WeakAddress<MasterBot>>,
+ playlist: Playlist,
+ state: State,
+ logger: Logger,
}
pub struct MusicBotArgs {
pub name: String,
- pub name_index: usize,
- pub id_index: usize,
+ pub master: Option<WeakAddress<MasterBot>>,
pub local: bool,
pub address: String,
- pub id: Identity,
+ pub identity: Identity,
pub channel: String,
pub verbose: u8,
- pub disconnect_cb: Box<dyn FnMut(String, usize, usize) + Send + Sync>,
+ pub logger: Logger,
}
impl MusicBot {
- pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future<Output = ()>) {
- let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
- let tx = Arc::new(RwLock::new(tx));
- let (player, connection) = if args.local {
- info!("Starting in CLI mode");
- let audio_player = AudioPlayer::new(tx.clone(), None).unwrap();
-
- (audio_player, None)
- } else {
- info!("Starting in TeamSpeak mode");
-
- let con_config = Connection::build(args.address)
- .version(tsclientlib::Version::Linux_3_3_2)
- .name(format!("🎵 {}", args.name))
- .identity(args.id)
- .log_commands(args.verbose >= 1)
- .log_packets(args.verbose >= 2)
- .log_udp_packets(args.verbose >= 3)
- .channel(args.channel);
-
- let connection = TeamSpeakConnection::new(tx.clone(), con_config)
- .await
- .unwrap();
- let mut cconnection = connection.clone();
- let audio_player = AudioPlayer::new(
- tx.clone(),
- Some(Box::new(move |samples| {
- let mut rt = tokio::runtime::Runtime::new().unwrap();
- rt.block_on(cconnection.send_audio_packet(samples));
- })),
- )
- .unwrap();
-
- (audio_player, Some(connection))
- };
-
+ pub async fn spawn(args: MusicBotArgs) -> Address<Self> {
+ let mut player = AudioPlayer::new(args.logger.clone()).unwrap();
player.change_volume(VolumeChange::Absolute(0.5)).unwrap();
- let player = Arc::new(player);
- let playlist = Arc::new(RwLock::new(Playlist::new()));
- spawn_gstreamer_thread(player.clone(), tx.clone());
+ let playlist = Playlist::new(args.logger.clone());
- if args.local {
- spawn_stdin_reader(tx);
- }
+ let teamspeak = if args.local {
+ info!(args.logger, "Starting in CLI mode");
+ player.setup_with_audio_callback(None).unwrap();
- let bot = Arc::new(Self {
+ None
+ } else {
+ Some(TeamSpeakConnection::new(args.logger.clone()).await.unwrap())
+ };
+ let bot = Self {
name: args.name.clone(),
+ master: args.master,
+ identity: args.identity.clone(),
player,
- teamspeak: connection,
+ teamspeak,
playlist,
- state: Arc::new(RwLock::new(State::EndOfStream)),
- });
-
- let cbot = bot.clone();
- let mut disconnect_cb = args.disconnect_cb;
- let name = args.name;
- let name_index = args.name_index;
- let id_index = args.id_index;
- let msg_loop = async move {
- 'outer: loop {
- while let Some(msg) = rx.recv().await {
- if let MusicBotMessage::Quit(reason) = msg {
- if let Some(ts) = &cbot.teamspeak {
- let mut ts = ts.clone();
- ts.disconnect(&reason).await;
- }
- disconnect_cb(name, name_index, id_index);
- break 'outer;
- }
- cbot.on_message(msg).await.unwrap();
- }
- }
- debug!("Left message loop");
+ state: State::EndOfStream,
+ logger: args.logger.clone(),
};
- bot.update_name(State::EndOfStream).await;
+ let bot_addr = bot.create(None).spawn(&mut Tokio::Global);
+
+ info!(
+ args.logger,
+ "Connecting";
+ "name" => &args.name,
+ "channel" => &args.channel,
+ "address" => &args.address,
+ );
+
+ let opt = Connection::build(args.address)
+ .logger(args.logger.clone())
+ .version(tsclientlib::Version::Linux_3_3_2)
+ .name(format!("🎵 {}", args.name))
+ .identity(args.identity)
+ .log_commands(args.verbose >= 1)
+ .log_packets(args.verbose >= 2)
+ .log_udp_packets(args.verbose >= 3)
+ .channel(args.channel);
+ bot_addr.send(Connect(opt)).await.unwrap().unwrap();
+ bot_addr
+ .send(MusicBotMessage::StateChange(State::EndOfStream))
+ .await
+ .unwrap()
+ .unwrap();
+
+ if args.local {
+ debug!(args.logger, "Spawning stdin reader thread");
+ spawn_stdin_reader(bot_addr.downgrade());
+ }
- (bot, msg_loop)
+ bot_addr
}
- async fn start_playing_audio(&self, metadata: AudioMetadata) {
+ async fn start_playing_audio(&mut self, metadata: AudioMetadata) {
let duration = if let Some(duration) = metadata.duration {
format!("({})", ts::bold(&humantime::format_duration(duration)))
} else {
@@ -184,25 +168,16 @@ impl MusicBot {
self.player.play().unwrap();
}
- pub async fn add_audio(&self, url: String, user: String) {
- match crate::youtube_dl::get_audio_download_from_url(url).await {
+ pub async fn add_audio(&mut self, url: String, user: String) {
+ match youtube_dl::get_audio_download_from_url(url, &self.logger).await {
Ok(mut metadata) => {
metadata.added_by = user;
- info!("Found audio url: {}", metadata.url);
+ info!(self.logger, "Found source"; "url" => &metadata.url);
- // RWLockGuard can not be kept around or the compiler complains that
- // it might cross the await boundary
- self.playlist
- .write()
- .expect("RwLock was not poisoned")
- .push(metadata.clone());
+ self.playlist.push(metadata.clone());
if !self.player.is_started() {
- let entry = self
- .playlist
- .write()
- .expect("RwLock was not poisoned")
- .pop();
+ let entry = self.playlist.pop();
if let Some(request) = entry {
self.start_playing_audio(request).await;
}
@@ -222,7 +197,7 @@ impl MusicBot {
}
}
Err(e) => {
- info!("Failed to find audio url: {}", e);
+ info!(self.logger, "Failed to find audio url"; "error" => &e);
self.send_message(format!("Failed to find url: {}", e))
.await;
@@ -235,74 +210,50 @@ impl MusicBot {
}
pub fn state(&self) -> State {
- *self.state.read().expect("RwLock was not poisoned")
+ self.state
}
- pub fn volume(&self) -> f64 {
+ pub async fn volume(&self) -> f64 {
self.player.volume()
}
- pub fn position(&self) -> Option<Duration> {
- self.player.position()
- }
-
- pub fn currently_playing(&self) -> Option<AudioMetadata> {
- self.player.currently_playing()
- }
+ pub async fn current_channel(&mut self) -> Option<ChannelId> {
+ let ts = self.teamspeak.as_mut().expect("current_channel needs ts");
- pub fn playlist_to_vec(&self) -> Vec<AudioMetadata> {
- self.playlist.read().unwrap().to_vec()
+ ts.current_channel().await
}
- pub async fn my_channel(&self) -> ChannelId {
- let ts = self.teamspeak.as_ref().expect("my_channel needs ts");
-
- let mut ts = ts.clone();
- ts.my_channel().await
- }
+ async fn user_count(&mut self, channel: ChannelId) -> u32 {
+ let ts = self.teamspeak.as_mut().expect("user_count needs ts");
- async fn user_count(&self, channel: ChannelId) -> u32 {
- let ts = self.teamspeak.as_ref().expect("user_count needs ts");
-
- let mut ts = ts.clone();
ts.user_count(channel).await
}
- async fn send_message(&self, text: String) {
- debug!("Sending message to TeamSpeak: {}", text);
+ async fn send_message(&mut self, text: String) {
+ debug!(self.logger, "Sending message to TeamSpeak"; "message" => &text);
- if let Some(ts) = &self.teamspeak {
- let mut ts = ts.clone();
+ if let Some(ts) = &mut self.teamspeak {
ts.send_message_to_channel(text).await;
}
}
- async fn set_nickname(&self, name: String) {
- info!("Setting TeamSpeak nickname: {}", name);
+ async fn set_nickname(&mut self, name: String) {
+ info!(self.logger, "Setting TeamSpeak nickname"; "name" => &name);
- if let Some(ts) = &self.teamspeak {
- let mut ts = ts.clone();
+ if let Some(ts) = &mut self.teamspeak {
ts.set_nickname(name).await;
}
}
- async fn set_description(&self, desc: String) {
- info!("Setting TeamSpeak description: {}", desc);
+ async fn set_description(&mut self, desc: String) {
+ info!(self.logger, "Setting TeamSpeak description"; "description" => &desc);
- if let Some(ts) = &self.teamspeak {
- let mut ts = ts.clone();
+ if let Some(ts) = &mut self.teamspeak {
ts.set_description(desc).await;
}
}
- async fn subscribe(&self, id: ChannelId) {
- if let Some(ts) = &self.teamspeak {
- let mut ts = ts.clone();
- ts.subscribe(id).await;
- }
- }
-
- async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> {
+ async fn on_text(&mut self, message: ChatMessage) -> Result<(), AudioPlayerError> {
let msg = message.text;
if msg.starts_with('!') {
let tokens = msg[1..].split_whitespace().collect::<Vec<_>>();
@@ -319,13 +270,15 @@ impl MusicBot {
Ok(())
}
- async fn on_command(&self, command: Command, invoker: Invoker) -> Result<(), AudioPlayerError> {
+ async fn on_command(
+ &mut self,
+ command: Command,
+ invoker: Invoker,
+ ) -> Result<(), AudioPlayerError> {
match command {
Command::Play => {
- let playlist = self.playlist.read().expect("RwLock was not poisoned");
-
if !self.player.is_started() {
- if !playlist.is_empty() {
+ if !self.playlist.is_empty() {
self.player.stop_current()?;
}
} else {
@@ -357,35 +310,32 @@ impl MusicBot {
}
}
Command::Next => {
- let playlist = self.playlist.read().expect("RwLock was not poisoned");
- if !playlist.is_empty() {
- info!("Skipping to next track");
+ if !self.playlist.is_empty() {
+ info!(self.logger, "Skipping to next track");
self.player.stop_current()?;
} else {
- info!("Playlist empty, cannot skip");
+ info!(self.logger, "Playlist empty, cannot skip");
self.player.reset()?;
}
}
Command::Clear => {
- self.playlist
- .write()
- .expect("RwLock was not poisoned")
- .clear();
+ self.send_message(String::from("Cleared playlist")).await;
+ self.playlist.clear();
}
Command::Volume { volume } => {
self.player.change_volume(volume)?;
self.update_name(self.state()).await;
}
Command::Leave => {
- self.quit(String::from("Leaving"));
+ self.quit(String::from("Leaving"), true).await.unwrap();
}
}
Ok(())
}
- async fn update_name(&self, state: State) {
- let volume = (self.volume() * 100.0).round();
+ async fn update_name(&mut self, state: State) {
+ let volume = (self.volume().await * 100.0).round();
let name = match state {
State::EndOfStream => format!("🎵 {} ({}%)", self.name, volume),
_ => format!("🎵 {} - {} ({}%)", self.name, state, volume),
@@ -393,43 +343,39 @@ impl MusicBot {
self.set_nickname(name).await;
}
- async fn on_state(&self, state: State) -> Result<(), AudioPlayerError> {
- let current_state = *self.state.read().unwrap();
- if current_state != state {
- match state {
+ async fn on_state(&mut self, new_state: State) -> Result<(), AudioPlayerError> {
+ if self.state != new_state {
+ match new_state {
State::EndOfStream => {
- let next_track = self
- .playlist
- .write()
- .expect("RwLock was not poisoned")
- .pop();
+ self.player.reset()?;
+ let next_track = self.playlist.pop();
if let Some(request) = next_track {
- info!("Advancing playlist");
+ info!(self.logger, "Advancing playlist");
self.start_playing_audio(request).await;
} else {
- self.update_name(state).await;
+ self.update_name(new_state).await;
self.set_description(String::new()).await;
}
}
State::Stopped => {
- if current_state != State::EndOfStream {
- self.update_name(state).await;
+ if self.state != State::EndOfStream {
+ self.update_name(new_state).await;
self.set_description(String::new()).await;
}
}
- _ => self.update_name(state).await,
+ _ => self.update_name(new_state).await,
}
}
- if !(current_state == State::EndOfStream && state == State::Stopped) {
- *self.state.write().unwrap() = state;
+ if !(self.state == State::EndOfStream && new_state == State::Stopped) {
+ self.state = new_state;
}
Ok(())
}
- async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> {
+ async fn on_message(&mut self, message: MusicBotMessage) -> Result<(), AudioPlayerError> {
match message {
MusicBotMessage::TextMessage(message) => {
if MessageTarget::Channel == message.target {
@@ -446,9 +392,6 @@ impl MusicBot {
let old_channel = client.channel;
self.on_client_left_channel(old_channel).await;
}
- MusicBotMessage::ChannelAdded(id) => {
- self.subscribe(id).await;
- }
MusicBotMessage::StateChange(state) => {
self.on_state(state).await?;
}
@@ -458,60 +401,163 @@ impl MusicBot {
Ok(())
}
- async fn on_client_left_channel(&self, old_channel: ChannelId) {
- let my_channel = self.my_channel().await;
- if old_channel == my_channel && self.user_count(my_channel).await <= 1 {
- self.quit(String::from("Channel is empty"));
+ // FIXME logs an error if this music bot is the one leaving
+ async fn on_client_left_channel(&mut self, old_channel: ChannelId) {
+ let current_channel = match self.current_channel().await {
+ Some(c) => c,
+ None => {
+ return;
+ }
+ };
+ if old_channel == current_channel && self.user_count(current_channel).await <= 1 {
+ self.quit(String::from("Channel is empty"), true)
+ .await
+ .unwrap();
+ }
+ }
+
+ pub async fn quit(
+ &mut self,
+ reason: String,
+ inform_master: bool,
+ ) -> Result<(), tsclientlib::Error> {
+ // FIXME logs errors if the bot is playing something because it tries to
+ // change its name and description
+ self.player.reset().unwrap();
+
+ let ts = self.teamspeak.as_mut().unwrap();
+ ts.disconnect(&reason).await?;
+
+ if inform_master {
+ if let Some(master) = &self.master {
+ master
+ .send(BotDisonnected {
+ name: self.name.clone(),
+ identity: self.identity.clone(),
+ })
+ .await
+ .unwrap();
+ }
+ }
+
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl Actor for MusicBot {
+ async fn started(&mut self, ctx: &mut Context<Self>) {
+ let addr = ctx.address().unwrap().downgrade();
+ self.player.register_bot(addr);
+ }
+}
+
+#[async_trait]
+impl Handler<Connect> for MusicBot {
+ async fn handle(
+ &mut self,
+ opt: Connect,
+ ctx: &mut Context<Self>,
+ ) -> Result<(), tsclientlib::Error> {
+ let addr = ctx.address().unwrap().downgrade();
+ self.teamspeak
+ .as_mut()
+ .unwrap()
+ .connect_for_bot(opt.0, addr)?;
+
+ let mut connection = self.teamspeak.as_ref().unwrap().clone();
+ let handle = tokio::runtime::Handle::current();
+ self.player
+ .setup_with_audio_callback(Some(Box::new(move |samples| {
+ handle.block_on(connection.send_audio_packet(samples));
+ })))
+ .unwrap();
+
+ Ok(())
+ }
+}
+
+pub struct GetName;
+impl Message for GetName {
+ type Result = String;
+}
+
+#[async_trait]
+impl Handler<GetName> for MusicBot {
+ async fn handle(&mut self, _: GetName, _: &mut Context<Self>) -> String {
+ self.name().to_owned()
+ }
+}
+
+pub struct GetBotData;
+impl Message for GetBotData {
+ type Result = crate::web_server::BotData;
+}
+
+#[async_trait]
+impl Handler<GetBotData> for MusicBot {
+ async fn handle(&mut self, _: GetBotData, _: &mut Context<Self>) -> crate::web_server::BotData {
+ crate::web_server::BotData {
+ name: self.name.clone(),
+ playlist: self.playlist.to_vec(),
+ currently_playing: self.player.currently_playing(),
+ position: self.player.position(),
+ state: self.state(),
+ volume: self.volume().await,
}
}
+}
- pub fn quit(&self, reason: String) {
- self.player.quit(reason);
+pub struct GetChannel;
+impl Message for GetChannel {
+ type Result = Option<ChannelId>;
+}
+
+#[async_trait]
+impl Handler<GetChannel> for MusicBot {
+ async fn handle(&mut self, _: GetChannel, _: &mut Context<Self>) -> Option<ChannelId> {
+ self.current_channel().await
}
}
-fn spawn_stdin_reader(tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>) {
- debug!("Spawning stdin reader thread");
- thread::Builder::new()
- .name(String::from("stdin reader"))
- .spawn(move || {
- let stdin = ::std::io::stdin();
- let lock = stdin.lock();
- for line in lock.lines() {
- let line = line.unwrap();
-
- let message = MusicBotMessage::TextMessage(Message {
- target: MessageTarget::Channel,
- invoker: Invoker {
- name: String::from("stdin"),
- id: ClientId(0),
- uid: None,
- },
- text: line,
- });
-
- let tx = tx.read().unwrap();
- tx.send(message).unwrap();
- }
- })
- .expect("Failed to spawn stdin reader thread");
+#[async_trait]
+impl Handler<Quit> for MusicBot {
+ async fn handle(&mut self, q: Quit, _: &mut Context<Self>) -> Result<(), tsclientlib::Error> {
+ self.quit(q.0, false).await
+ }
}
-fn spawn_gstreamer_thread(
- player: Arc<AudioPlayer>,
- tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
-) {
- thread::Builder::new()
- .name(String::from("gstreamer polling"))
- .spawn(move || loop {
- if player.poll() == PollResult::Quit {
- break;
- }
+#[async_trait]
+impl Handler<MusicBotMessage> for MusicBot {
+ async fn handle(
+ &mut self,
+ msg: MusicBotMessage,
+ _: &mut Context<Self>,
+ ) -> Result<(), AudioPlayerError> {
+ self.on_message(msg).await
+ }
+}
- tx.read()
- .unwrap()
- .send(MusicBotMessage::StateChange(State::EndOfStream))
- .unwrap();
- })
- .expect("Failed to spawn gstreamer thread");
+fn spawn_stdin_reader(addr: WeakAddress<MusicBot>) {
+ use tokio::io::AsyncBufReadExt;
+
+ tokio::task::spawn(async move {
+ let stdin = tokio::io::stdin();
+ let reader = tokio::io::BufReader::new(stdin);
+ let mut lines = reader.lines();
+
+ while let Some(line) = lines.next_line().await.unwrap() {
+ let message = MusicBotMessage::TextMessage(ChatMessage {
+ target: MessageTarget::Channel,
+ invoker: Invoker {
+ name: String::from("stdin"),
+ id: ClientId(0),
+ uid: None,
+ },
+ text: line,
+ });
+
+ addr.send(message).await.unwrap().unwrap();
+ }
+ });
}