aboutsummaryrefslogtreecommitdiffstats
path: root/src/bot/master.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bot/master.rs')
-rw-r--r--src/bot/master.rs363
1 files changed, 172 insertions, 191 deletions
diff --git a/src/bot/master.rs b/src/bot/master.rs
index 1480e17..94332ac 100644
--- a/src/bot/master.rs
+++ b/src/bot/master.rs
@@ -1,41 +1,52 @@
use std::collections::HashMap;
-use std::future::Future;
-use std::sync::{Arc, RwLock};
-use log::info;
+use async_trait::async_trait;
+use futures::future;
use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
use serde::{Deserialize, Serialize};
-use tokio::sync::mpsc::UnboundedSender;
-use tsclientlib::{ClientId, Connection, Identity, MessageTarget};
+use slog::{error, info, o, trace, Logger};
+use tsclientlib::{ClientId, ConnectOptions, Connection, Identity, MessageTarget};
+use xtra::{spawn::Tokio, Actor, Address, Context, Handler, Message, WeakAddress};
use crate::audio_player::AudioPlayerError;
use crate::teamspeak::TeamSpeakConnection;
use crate::Args;
-use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage};
+use crate::bot::{GetBotData, GetChannel, GetName, MusicBot, MusicBotArgs, MusicBotMessage};
pub struct MasterBot {
- config: Arc<MasterConfig>,
- music_bots: Arc<RwLock<MusicBots>>,
+ config: MasterConfig,
+ my_addr: Option<WeakAddress<Self>>,
teamspeak: TeamSpeakConnection,
- sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
+ available_names: Vec<String>,
+ available_ids: Vec<Identity>,
+ connected_bots: HashMap<String, Address<MusicBot>>,
+ rng: SmallRng,
+ logger: Logger,
}
-struct MusicBots {
- rng: SmallRng,
- available_names: Vec<usize>,
- available_ids: Vec<usize>,
- connected_bots: HashMap<String, Arc<MusicBot>>,
+#[derive(Debug, Serialize, Deserialize)]
+pub struct MasterArgs {
+ #[serde(default = "default_name")]
+ pub master_name: String,
+ pub address: String,
+ pub channel: Option<String>,
+ #[serde(default = "default_verbose")]
+ pub verbose: u8,
+ pub domain: String,
+ pub bind_address: String,
+ pub names: Vec<String>,
+ pub id: Option<Identity>,
+ pub ids: Option<Vec<Identity>>,
}
impl MasterBot {
- pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) {
- let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
- let tx = Arc::new(RwLock::new(tx));
- info!("Starting in TeamSpeak mode");
+ pub async fn spawn(args: MasterArgs, logger: Logger) -> Address<Self> {
+ info!(logger, "Starting in TeamSpeak mode");
let mut con_config = Connection::build(args.address.clone())
+ .logger(logger.clone())
.version(tsclientlib::Version::Linux_3_3_2)
.name(args.master_name.clone())
.identity(args.id.expect("identity should exist"))
@@ -47,168 +58,116 @@ impl MasterBot {
con_config = con_config.channel(channel);
}
- let connection = TeamSpeakConnection::new(tx.clone(), con_config)
- .await
- .unwrap();
+ let connection = TeamSpeakConnection::new(logger.clone()).await.unwrap();
+ trace!(logger, "Created teamspeak connection");
- let config = Arc::new(MasterConfig {
+ let config = MasterConfig {
master_name: args.master_name,
address: args.address,
- names: args.names,
- ids: args.ids.expect("identies should exists"),
- local: args.local,
verbose: args.verbose,
- });
-
- let name_count = config.names.len();
- let id_count = config.ids.len();
+ };
- let music_bots = Arc::new(RwLock::new(MusicBots {
+ let bot_addr = Self {
+ config,
+ my_addr: None,
+ teamspeak: connection,
+ logger: logger.clone(),
rng: SmallRng::from_entropy(),
- available_names: (0..name_count).collect(),
- available_ids: (0..id_count).collect(),
+ available_names: args.names,
+ available_ids: args.ids.expect("identities"),
connected_bots: HashMap::new(),
- }));
+ }
+ .create(None)
+ .spawn(&mut Tokio::Global);
- let bot = Arc::new(Self {
- config,
- music_bots,
- teamspeak: connection,
- sender: tx.clone(),
- });
-
- let cbot = bot.clone();
- let msg_loop = async move {
- 'outer: loop {
- while let Some(msg) = rx.recv().await {
- match msg {
- MusicBotMessage::Quit(reason) => {
- let mut cteamspeak = cbot.teamspeak.clone();
- cteamspeak.disconnect(&reason).await;
- break 'outer;
- }
- MusicBotMessage::ClientDisconnected { id, .. } => {
- if id == cbot.my_id().await {
- // TODO Reconnect since quit was not called
- break 'outer;
- }
- }
- _ => cbot.on_message(msg).await.unwrap(),
- }
- }
- }
- };
+ bot_addr.send(Connect(con_config)).await.unwrap().unwrap();
+ trace!(logger, "Spawned master bot actor");
- (bot, msg_loop)
+ bot_addr
}
- async fn build_bot_args_for(&self, id: ClientId) -> Result<MusicBotArgs, BotCreationError> {
- let mut cteamspeak = self.teamspeak.clone();
- let channel = match cteamspeak.channel_of_user(id).await {
+ async fn bot_args_for_client(
+ &mut self,
+ user_id: ClientId,
+ ) -> Result<MusicBotArgs, BotCreationError> {
+ let channel = match self.teamspeak.channel_of_user(user_id).await {
Some(channel) => channel,
None => return Err(BotCreationError::UnfoundUser),
};
- if channel == cteamspeak.my_channel().await {
+ if channel == self.teamspeak.current_channel().await.unwrap() {
return Err(BotCreationError::MasterChannel(
self.config.master_name.clone(),
));
}
- let MusicBots {
- ref mut rng,
- ref mut available_names,
- ref mut available_ids,
- ref connected_bots,
- } = &mut *self.music_bots.write().expect("RwLock was not poisoned");
-
- for bot in connected_bots.values() {
- if bot.my_channel().await == channel {
- return Err(BotCreationError::MultipleBots(bot.name().to_owned()));
+ for bot in self.connected_bots.values() {
+ if bot.send(GetChannel).await.unwrap() == Some(channel) {
+ return Err(BotCreationError::MultipleBots(
+ bot.send(GetName).await.unwrap(),
+ ));
}
}
- let channel_path = cteamspeak
- .channel_path_of_user(id)
+ let channel_path = self
+ .teamspeak
+ .channel_path_of_user(user_id)
.await
.expect("can find poke sender");
- available_names.shuffle(rng);
- let name_index = match available_names.pop() {
+ self.available_names.shuffle(&mut self.rng);
+ let name = match self.available_names.pop() {
Some(v) => v,
None => {
return Err(BotCreationError::OutOfNames);
}
};
- let name = self.config.names[name_index].clone();
- available_ids.shuffle(rng);
- let id_index = match available_ids.pop() {
+ self.available_ids.shuffle(&mut self.rng);
+ let identity = match self.available_ids.pop() {
Some(v) => v,
None => {
return Err(BotCreationError::OutOfIdentities);
}
};
- let id = self.config.ids[id_index].clone();
-
- let cmusic_bots = self.music_bots.clone();
- let disconnect_cb = Box::new(move |n, name_index, id_index| {
- let mut music_bots = cmusic_bots.write().expect("RwLock was not poisoned");
- music_bots.connected_bots.remove(&n);
- music_bots.available_names.push(name_index);
- music_bots.available_ids.push(id_index);
- });
-
- info!("Connecting to {} on {}", channel_path, self.config.address);
-
Ok(MusicBotArgs {
- name,
- name_index,
- id_index,
- local: self.config.local,
+ name: name.clone(),
+ master: self.my_addr.clone(),
address: self.config.address.clone(),
- id,
+ identity,
+ local: false,
channel: channel_path,
verbose: self.config.verbose,
- disconnect_cb,
+ logger: self.logger.new(o!("musicbot" => name)),
})
}
- async fn spawn_bot_for(&self, id: ClientId) {
- match self.build_bot_args_for(id).await {
+ async fn spawn_bot_for_client(&mut self, id: ClientId) {
+ match self.bot_args_for_client(id).await {
Ok(bot_args) => {
- let (bot, fut) = MusicBot::new(bot_args).await;
- tokio::spawn(fut);
- let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned");
- music_bots
- .connected_bots
- .insert(bot.name().to_string(), bot);
- }
- Err(e) => {
- let mut cteamspeak = self.teamspeak.clone();
- cteamspeak.send_message_to_user(id, e.to_string()).await
+ let name = bot_args.name.clone();
+ let bot = MusicBot::spawn(bot_args).await;
+ self.connected_bots.insert(name, bot);
}
+ Err(e) => self.teamspeak.send_message_to_user(id, e.to_string()).await,
}
}
- async fn on_message(&self, message: MusicBotMessage) -> Result<(), AudioPlayerError> {
+ async fn on_message(&mut self, message: MusicBotMessage) -> Result<(), AudioPlayerError> {
match message {
MusicBotMessage::TextMessage(message) => {
if let MessageTarget::Poke(who) = message.target {
- info!("Poked by {}, creating bot for their channel", who);
- self.spawn_bot_for(who).await;
+ info!(
+ self.logger,
+ "Poked, creating bot"; "user" => %who
+ );
+ self.spawn_bot_for_client(who).await;
}
}
- MusicBotMessage::ChannelAdded(id) => {
- let mut cteamspeak = self.teamspeak.clone();
- cteamspeak.subscribe(id).await;
- }
MusicBotMessage::ClientAdded(id) => {
- let mut cteamspeak = self.teamspeak.clone();
-
- if id == cteamspeak.my_id().await {
- cteamspeak
+ if id == self.teamspeak.my_id().await {
+ self.teamspeak
.set_description(String::from("Poke me if you want a music bot!"))
.await;
}
@@ -219,41 +178,17 @@ impl MasterBot {
Ok(())
}
- async fn my_id(&self) -> ClientId {
- let mut cteamspeak = self.teamspeak.clone();
+ pub async fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> {
+ let bot = self.connected_bots.get(&name)?;
- cteamspeak.my_id().await
+ bot.send(GetBotData).await.ok()
}
- pub fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> {
- let music_bots = self.music_bots.read().unwrap();
- let bot = music_bots.connected_bots.get(&name)?;
-
- Some(crate::web_server::BotData {
- name,
- state: bot.state(),
- volume: bot.volume(),
- position: bot.position(),
- currently_playing: bot.currently_playing(),
- playlist: bot.playlist_to_vec(),
- })
- }
-
- pub fn bot_datas(&self) -> Vec<crate::web_server::BotData> {
- let music_bots = self.music_bots.read().unwrap();
-
- let len = music_bots.connected_bots.len();
+ pub async fn bot_datas(&self) -> Vec<crate::web_server::BotData> {
+ let len = self.connected_bots.len();
let mut result = Vec::with_capacity(len);
- for (name, bot) in &music_bots.connected_bots {
- let bot_data = crate::web_server::BotData {
- name: name.clone(),
- state: bot.state(),
- volume: bot.volume(),
- position: bot.position(),
- currently_playing: bot.currently_playing(),
- playlist: bot.playlist_to_vec(),
- };
-
+ for bot in self.connected_bots.values() {
+ let bot_data = bot.send(GetBotData).await.unwrap();
result.push(bot_data);
}
@@ -261,24 +196,96 @@ impl MasterBot {
}
pub fn bot_names(&self) -> Vec<String> {
- let music_bots = self.music_bots.read().unwrap();
-
- let len = music_bots.connected_bots.len();
+ let len = self.connected_bots.len();
let mut result = Vec::with_capacity(len);
- for name in music_bots.connected_bots.keys() {
+ for name in self.connected_bots.keys() {
result.push(name.clone());
}
result
}
- pub fn quit(&self, reason: String) {
- let music_bots = self.music_bots.read().unwrap();
- for bot in music_bots.connected_bots.values() {
- bot.quit(reason.clone())
+ fn on_bot_disconnect(&mut self, name: String, id: Identity) {
+ self.connected_bots.remove(&name);
+ self.available_names.push(name);
+ self.available_ids.push(id);
+ }
+
+ pub async fn quit(&mut self, reason: String) -> Result<(), tsclientlib::Error> {
+ let futures = self
+ .connected_bots
+ .values()
+ .map(|b| b.send(Quit(reason.clone())));
+ for res in future::join_all(futures).await {
+ if let Err(e) = res {
+ error!(self.logger, "Failed to shut down bot"; "error" => %e);
+ }
}
- let sender = self.sender.read().unwrap();
- sender.send(MusicBotMessage::Quit(reason)).unwrap();
+ self.teamspeak.disconnect(&reason).await
+ }
+}
+
+#[async_trait]
+impl Actor for MasterBot {
+ async fn started(&mut self, ctx: &mut Context<Self>) {
+ self.my_addr = Some(ctx.address().unwrap().downgrade());
+ }
+}
+
+pub struct Connect(pub ConnectOptions);
+impl Message for Connect {
+ type Result = Result<(), tsclientlib::Error>;
+}
+
+#[async_trait]
+impl Handler<Connect> for MasterBot {
+ async fn handle(
+ &mut self,
+ opt: Connect,
+ ctx: &mut Context<Self>,
+ ) -> Result<(), tsclientlib::Error> {
+ let addr = ctx.address().unwrap();
+ self.teamspeak.connect_for_bot(opt.0, addr.downgrade())?;
+ Ok(())
+ }
+}
+
+pub struct Quit(pub String);
+impl Message for Quit {
+ type Result = Result<(), tsclientlib::Error>;
+}
+
+#[async_trait]
+impl Handler<Quit> for MasterBot {
+ async fn handle(&mut self, q: Quit, _: &mut Context<Self>) -> Result<(), tsclientlib::Error> {
+ self.quit(q.0).await
+ }
+}
+
+pub struct BotDisonnected {
+ pub name: String,
+ pub identity: Identity,
+}
+
+impl Message for BotDisonnected {
+ type Result = ();
+}
+
+#[async_trait]
+impl Handler<BotDisonnected> for MasterBot {
+ async fn handle(&mut self, dc: BotDisonnected, _: &mut Context<Self>) {
+ self.on_bot_disconnect(dc.name, dc.identity);
+ }
+}
+
+#[async_trait]
+impl Handler<MusicBotMessage> for MasterBot {
+ async fn handle(
+ &mut self,
+ msg: MusicBotMessage,
+ _: &mut Context<Self>,
+ ) -> Result<(), AudioPlayerError> {
+ self.on_message(msg).await
}
}
@@ -313,31 +320,10 @@ impl std::fmt::Display for BotCreationError {
}
}
-#[derive(Debug, Serialize, Deserialize)]
-pub struct MasterArgs {
- #[serde(default = "default_name")]
- pub master_name: String,
- #[serde(default = "default_local")]
- pub local: bool,
- pub address: String,
- pub channel: Option<String>,
- #[serde(default = "default_verbose")]
- pub verbose: u8,
- pub domain: String,
- pub bind_address: String,
- pub names: Vec<String>,
- pub id: Option<Identity>,
- pub ids: Option<Vec<Identity>>,
-}
-
fn default_name() -> String {
String::from("PokeBot")
}
-fn default_local() -> bool {
- false
-}
-
fn default_verbose() -> u8 {
0
}
@@ -345,7 +331,6 @@ fn default_verbose() -> u8 {
impl MasterArgs {
pub fn merge(self, args: Args) -> Self {
let address = args.address.unwrap_or(self.address);
- let local = args.local || self.local;
let channel = args.master_channel.or(self.channel);
let verbose = if args.verbose > 0 {
args.verbose
@@ -357,7 +342,6 @@ impl MasterArgs {
master_name: self.master_name,
names: self.names,
ids: self.ids,
- local,
address,
domain: self.domain,
bind_address: self.bind_address,
@@ -371,8 +355,5 @@ impl MasterArgs {
pub struct MasterConfig {
pub master_name: String,
pub address: String,
- pub names: Vec<String>,
- pub ids: Vec<Identity>,
- pub local: bool,
pub verbose: u8,
}