aboutsummaryrefslogtreecommitdiffstats
path: root/src/teamspeak/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/teamspeak/mod.rs')
-rw-r--r--src/teamspeak/mod.rs245
1 files changed, 144 insertions, 101 deletions
diff --git a/src/teamspeak/mod.rs b/src/teamspeak/mod.rs
index beb3f44..7a68d38 100644
--- a/src/teamspeak/mod.rs
+++ b/src/teamspeak/mod.rs
@@ -1,7 +1,5 @@
-use std::sync::{Arc, RwLock};
-
use futures::stream::StreamExt;
-use tokio::sync::mpsc::UnboundedSender;
+use xtra::{Actor, Handler, WeakAddress};
use tsclientlib::data::exts::{M2BClientEditExt, M2BClientUpdateExt};
use tsclientlib::{
@@ -10,9 +8,9 @@ use tsclientlib::{
ChannelId, ClientId, ConnectOptions, DisconnectOptions, MessageTarget, OutCommandExt, Reason,
};
-use log::{debug, error};
+use slog::{debug, error, info, trace, Logger};
-use crate::bot::{Message, MusicBotMessage};
+use crate::bot::{ChatMessage, MusicBotMessage};
mod bbcode;
@@ -20,7 +18,8 @@ pub use bbcode::*;
#[derive(Clone)]
pub struct TeamSpeakConnection {
- handle: SyncConnectionHandle,
+ handle: Option<SyncConnectionHandle>,
+ logger: Logger,
}
fn get_message(event: &Event) -> Option<MusicBotMessage> {
@@ -31,7 +30,7 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> {
target,
invoker: sender,
message: msg,
- } => Some(MusicBotMessage::TextMessage(Message {
+ } => Some(MusicBotMessage::TextMessage(ChatMessage {
target: *target,
invoker: sender.clone(),
text: msg.clone(),
@@ -86,50 +85,82 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> {
}
impl TeamSpeakConnection {
- pub async fn new(
- tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
+ pub async fn new(logger: Logger) -> Result<TeamSpeakConnection, tsclientlib::Error> {
+ Ok(TeamSpeakConnection {
+ handle: None,
+ logger,
+ })
+ }
+
+ pub fn connect_for_bot<T: Actor + Handler<MusicBotMessage>>(
+ &mut self,
options: ConnectOptions,
- ) -> Result<TeamSpeakConnection, tsclientlib::Error> {
+ bot: WeakAddress<T>,
+ ) -> Result<(), tsclientlib::Error> {
+ info!(self.logger, "Starting TeamSpeak connection");
+
let conn = options.connect()?;
- let conn = SyncConnection::from(conn);
- let mut handle = conn.get_handle();
-
- tokio::spawn(conn.for_each(move |i| {
- let tx = tx.clone();
- async move {
- match i {
- Ok(SyncStreamItem::ConEvents(events)) => {
+ let mut conn = SyncConnection::from(conn);
+ let handle = conn.get_handle();
+ self.handle = Some(handle);
+
+ let ev_logger = self.logger.clone();
+ tokio::spawn(async move {
+ while let Some(item) = conn.next().await {
+ use SyncStreamItem::*;
+
+ match item {
+ Ok(ConEvents(events)) => {
for event in &events {
if let Some(msg) = get_message(event) {
- let tx = tx.read().expect("RwLock was not poisoned");
- // Ignore the result because the receiver might get dropped first.
- let _ = tx.send(msg);
+ tokio::spawn(bot.send(msg));
}
}
}
- Err(e) => error!("Error occured during event reading: {}", e),
- Ok(SyncStreamItem::DisconnectedTemporarily) => debug!("Temporary disconnect!"),
- _ => (),
+ Err(e) => error!(ev_logger, "Error occured during event reading: {}", e),
+ Ok(DisconnectedTemporarily(r)) => {
+ debug!(ev_logger, "Temporary disconnect"; "reason" => ?r)
+ }
+ Ok(Audio(_)) => {
+ trace!(ev_logger, "Audio received");
+ }
+ Ok(IdentityLevelIncreasing(_)) => {
+ trace!(ev_logger, "Identity level increasing");
+ }
+ Ok(IdentityLevelIncreased) => {
+ trace!(ev_logger, "Identity level increased");
+ }
+ Ok(NetworkStatsUpdated) => {
+ trace!(ev_logger, "Network stats updated");
+ }
}
}
- }));
-
- handle.wait_until_connected().await?;
-
- let mut chandle = handle.clone();
- chandle
- .with_connection(|mut conn| {
- conn.get_state()
- .expect("is connected")
- .server
- .set_subscribed(true)
- .send(&mut conn)
- .unwrap()
- })
- .await
- .unwrap();
-
- Ok(TeamSpeakConnection { handle })
+ });
+
+ let mut handle = self.handle.clone();
+ tokio::spawn(async move {
+ handle
+ .as_mut()
+ .expect("connect_for_bot was called")
+ .wait_until_connected()
+ .await
+ .unwrap();
+ handle
+ .as_mut()
+ .expect("connect_for_bot was called")
+ .with_connection(|mut conn| {
+ conn.get_state()
+ .expect("can get state")
+ .server
+ .set_subscribed(true)
+ .send(&mut conn)
+ })
+ .await
+ .and_then(|v| v)
+ .unwrap();
+ });
+
+ Ok(())
}
pub async fn send_audio_packet(&mut self, samples: &[u8]) {
@@ -140,22 +171,25 @@ impl TeamSpeakConnection {
data: samples,
});
- self.handle
- .with_connection(|conn| {
- if let Err(e) = conn
- .get_tsproto_client_mut()
+ if let Err(e) = self
+ .handle
+ .as_mut()
+ .expect("connect_for_bot was called")
+ .with_connection(move |conn| {
+ conn.get_tsproto_client_mut()
.expect("can get tsproto client")
.send_packet(packet)
- {
- error!("Failed to send voice packet: {}", e);
- }
})
.await
- .unwrap();
+ {
+ error!(self.logger, "Failed to send voice packet: {}", e);
+ }
}
pub async fn channel_of_user(&mut self, id: ClientId) -> Option<ChannelId> {
self.handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |conn| {
conn.get_state()
.expect("can get state")
@@ -164,30 +198,28 @@ impl TeamSpeakConnection {
.map(|c| c.channel)
})
.await
- .unwrap()
+ .map_err(|e| error!(self.logger, "Failed to get channel of user"; "error" => %e))
+ .ok()
+ .and_then(|v| v)
}
pub async fn channel_path_of_user(&mut self, id: ClientId) -> Option<String> {
self.handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |conn| {
let state = conn.get_state().expect("can get state");
let channel_id = state.clients.get(&id)?.channel;
- let mut channel = state
- .channels
- .get(&channel_id)
- .expect("can find user channel");
+ let mut channel = state.channels.get(&channel_id)?;
let mut names = vec![&channel.name[..]];
// Channel 0 is the root channel
while channel.parent != ChannelId(0) {
names.push("/");
- channel = state
- .channels
- .get(&channel.parent)
- .expect("can find user channel");
+ channel = state.channels.get(&channel.parent)?;
names.push(&channel.name);
}
@@ -199,11 +231,15 @@ impl TeamSpeakConnection {
Some(path)
})
.await
- .unwrap()
+ .map_err(|e| error!(self.logger, "Failed to get channel path of user"; "error" => %e))
+ .ok()
+ .and_then(|v| v)
}
- pub async fn my_channel(&mut self) -> ChannelId {
+ pub async fn current_channel(&mut self) -> Option<ChannelId> {
self.handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |conn| {
let state = conn.get_state().expect("can get state");
state
@@ -213,11 +249,14 @@ impl TeamSpeakConnection {
.channel
})
.await
- .unwrap()
+ .map_err(|e| error!(self.logger, "Failed to get channel"; "error" => %e))
+ .ok()
}
pub async fn my_id(&mut self) -> ClientId {
self.handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |conn| conn.get_state().expect("can get state").own_client)
.await
.unwrap()
@@ -225,6 +264,8 @@ impl TeamSpeakConnection {
pub async fn user_count(&mut self, channel: ChannelId) -> u32 {
self.handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |conn| {
let state = conn.get_state().expect("can get state");
let mut count = 0;
@@ -241,88 +282,90 @@ impl TeamSpeakConnection {
}
pub async fn set_nickname(&mut self, name: String) {
- self.handle
+ if let Err(e) = self
+ .handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |mut conn| {
conn.get_state()
.expect("can get state")
.client_update()
.set_name(&name)
.send(&mut conn)
- .map_err(|e| error!("Failed to set nickname: {}", e))
})
.await
- .unwrap()
- .unwrap();
+ .and_then(|v| v)
+ {
+ error!(self.logger, "Failed to set nickname: {}", e);
+ }
}
pub async fn set_description(&mut self, desc: String) {
- self.handle
+ if let Err(e) = self
+ .handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |mut conn| {
let state = conn.get_state().expect("can get state");
- let _ = state
+ state
.clients
.get(&state.own_client)
.expect("can get myself")
.edit()
.set_description(&desc)
.send(&mut conn)
- .map_err(|e| error!("Failed to change description: {}", e));
})
.await
- .unwrap()
+ .and_then(|v| v)
+ {
+ error!(self.logger, "Failed to change description: {}", e);
+ }
}
pub async fn send_message_to_channel(&mut self, text: String) {
- self.handle
+ if let Err(e) = self
+ .handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |mut conn| {
- let _ = conn
- .get_state()
+ conn.get_state()
.expect("can get state")
.send_message(MessageTarget::Channel, &text)
.send(&mut conn)
- .map_err(|e| error!("Failed to send message: {}", e));
})
.await
- .unwrap()
+ .and_then(|v| v)
+ {
+ error!(self.logger, "Failed to send message: {}", e);
+ }
}
pub async fn send_message_to_user(&mut self, client: ClientId, text: String) {
- self.handle
+ if let Err(e) = self
+ .handle
+ .as_mut()
+ .expect("connect_for_bot was called")
.with_connection(move |mut conn| {
- let _ = conn
- .get_state()
+ conn.get_state()
.expect("can get state")
.send_message(MessageTarget::Client(client), &text)
.send(&mut conn)
- .map_err(|e| error!("Failed to send message: {}", e));
})
.await
- .unwrap()
+ .and_then(|v| v)
+ {
+ error!(self.logger, "Failed to send message: {}", e);
+ }
}
- pub async fn subscribe(&mut self, id: ChannelId) {
- self.handle
- .with_connection(move |mut conn| {
- let channel = match conn.get_state().expect("can get state").channels.get(&id) {
- Some(c) => c,
- None => {
- error!("Failed to find channel to subscribe to");
- return;
- }
- };
-
- if let Err(e) = channel.set_subscribed(true).send(&mut conn) {
- error!("Failed to send subscribe packet: {}", e);
- }
- })
- .await
- .unwrap()
- }
-
- pub async fn disconnect(&mut self, reason: &str) {
+ pub async fn disconnect(&mut self, reason: &str) -> Result<(), tsclientlib::Error> {
let opt = DisconnectOptions::new()
.reason(Reason::Clientdisconnect)
.message(reason);
- self.handle.disconnect(opt).await.unwrap();
+ self.handle
+ .as_mut()
+ .expect("connect_for_bot was called")
+ .disconnect(opt)
+ .await
}
}