diff options
| author | Jokler <jokler@protonmail.com> | 2020-10-15 13:11:54 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-15 13:11:54 +0000 |
| commit | 43974717fee9a98701c6efa2e7221cdbfe7e537e (patch) | |
| tree | 93fe1d75477ae3d1c8466611a2cedd7bed316aa2 /src/teamspeak | |
| parent | 23671b51b4e207574a63bce820acbf43169e2b6c (diff) | |
| parent | 4e1c2b9f04073294ecb8402486c20d9c01721598 (diff) | |
| download | pokebot-43974717fee9a98701c6efa2e7221cdbfe7e537e.tar.gz pokebot-43974717fee9a98701c6efa2e7221cdbfe7e537e.zip | |
Merge pull request #70 from Mavulp/actor-bots
Replace channels&locks with actors & log with slog
Diffstat (limited to 'src/teamspeak')
| -rw-r--r-- | src/teamspeak/mod.rs | 245 |
1 files changed, 144 insertions, 101 deletions
diff --git a/src/teamspeak/mod.rs b/src/teamspeak/mod.rs index beb3f44..7a68d38 100644 --- a/src/teamspeak/mod.rs +++ b/src/teamspeak/mod.rs @@ -1,7 +1,5 @@ -use std::sync::{Arc, RwLock}; - use futures::stream::StreamExt; -use tokio::sync::mpsc::UnboundedSender; +use xtra::{Actor, Handler, WeakAddress}; use tsclientlib::data::exts::{M2BClientEditExt, M2BClientUpdateExt}; use tsclientlib::{ @@ -10,9 +8,9 @@ use tsclientlib::{ ChannelId, ClientId, ConnectOptions, DisconnectOptions, MessageTarget, OutCommandExt, Reason, }; -use log::{debug, error}; +use slog::{debug, error, info, trace, Logger}; -use crate::bot::{Message, MusicBotMessage}; +use crate::bot::{ChatMessage, MusicBotMessage}; mod bbcode; @@ -20,7 +18,8 @@ pub use bbcode::*; #[derive(Clone)] pub struct TeamSpeakConnection { - handle: SyncConnectionHandle, + handle: Option<SyncConnectionHandle>, + logger: Logger, } fn get_message(event: &Event) -> Option<MusicBotMessage> { @@ -31,7 +30,7 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> { target, invoker: sender, message: msg, - } => Some(MusicBotMessage::TextMessage(Message { + } => Some(MusicBotMessage::TextMessage(ChatMessage { target: *target, invoker: sender.clone(), text: msg.clone(), @@ -86,50 +85,82 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> { } impl TeamSpeakConnection { - pub async fn new( - tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, + pub async fn new(logger: Logger) -> Result<TeamSpeakConnection, tsclientlib::Error> { + Ok(TeamSpeakConnection { + handle: None, + logger, + }) + } + + pub fn connect_for_bot<T: Actor + Handler<MusicBotMessage>>( + &mut self, options: ConnectOptions, - ) -> Result<TeamSpeakConnection, tsclientlib::Error> { + bot: WeakAddress<T>, + ) -> Result<(), tsclientlib::Error> { + info!(self.logger, "Starting TeamSpeak connection"); + let conn = options.connect()?; - let conn = SyncConnection::from(conn); - let mut handle = conn.get_handle(); - - tokio::spawn(conn.for_each(move |i| { - let tx = tx.clone(); - async move { - match i { - Ok(SyncStreamItem::ConEvents(events)) => { + let mut conn = SyncConnection::from(conn); + let handle = conn.get_handle(); + self.handle = Some(handle); + + let ev_logger = self.logger.clone(); + tokio::spawn(async move { + while let Some(item) = conn.next().await { + use SyncStreamItem::*; + + match item { + Ok(ConEvents(events)) => { for event in &events { if let Some(msg) = get_message(event) { - let tx = tx.read().expect("RwLock was not poisoned"); - // Ignore the result because the receiver might get dropped first. - let _ = tx.send(msg); + tokio::spawn(bot.send(msg)); } } } - Err(e) => error!("Error occured during event reading: {}", e), - Ok(SyncStreamItem::DisconnectedTemporarily) => debug!("Temporary disconnect!"), - _ => (), + Err(e) => error!(ev_logger, "Error occured during event reading: {}", e), + Ok(DisconnectedTemporarily(r)) => { + debug!(ev_logger, "Temporary disconnect"; "reason" => ?r) + } + Ok(Audio(_)) => { + trace!(ev_logger, "Audio received"); + } + Ok(IdentityLevelIncreasing(_)) => { + trace!(ev_logger, "Identity level increasing"); + } + Ok(IdentityLevelIncreased) => { + trace!(ev_logger, "Identity level increased"); + } + Ok(NetworkStatsUpdated) => { + trace!(ev_logger, "Network stats updated"); + } } } - })); - - handle.wait_until_connected().await?; - - let mut chandle = handle.clone(); - chandle - .with_connection(|mut conn| { - conn.get_state() - .expect("is connected") - .server - .set_subscribed(true) - .send(&mut conn) - .unwrap() - }) - .await - .unwrap(); - - Ok(TeamSpeakConnection { handle }) + }); + + let mut handle = self.handle.clone(); + tokio::spawn(async move { + handle + .as_mut() + .expect("connect_for_bot was called") + .wait_until_connected() + .await + .unwrap(); + handle + .as_mut() + .expect("connect_for_bot was called") + .with_connection(|mut conn| { + conn.get_state() + .expect("can get state") + .server + .set_subscribed(true) + .send(&mut conn) + }) + .await + .and_then(|v| v) + .unwrap(); + }); + + Ok(()) } pub async fn send_audio_packet(&mut self, samples: &[u8]) { @@ -140,22 +171,25 @@ impl TeamSpeakConnection { data: samples, }); - self.handle - .with_connection(|conn| { - if let Err(e) = conn - .get_tsproto_client_mut() + if let Err(e) = self + .handle + .as_mut() + .expect("connect_for_bot was called") + .with_connection(move |conn| { + conn.get_tsproto_client_mut() .expect("can get tsproto client") .send_packet(packet) - { - error!("Failed to send voice packet: {}", e); - } }) .await - .unwrap(); + { + error!(self.logger, "Failed to send voice packet: {}", e); + } } pub async fn channel_of_user(&mut self, id: ClientId) -> Option<ChannelId> { self.handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |conn| { conn.get_state() .expect("can get state") @@ -164,30 +198,28 @@ impl TeamSpeakConnection { .map(|c| c.channel) }) .await - .unwrap() + .map_err(|e| error!(self.logger, "Failed to get channel of user"; "error" => %e)) + .ok() + .and_then(|v| v) } pub async fn channel_path_of_user(&mut self, id: ClientId) -> Option<String> { self.handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |conn| { let state = conn.get_state().expect("can get state"); let channel_id = state.clients.get(&id)?.channel; - let mut channel = state - .channels - .get(&channel_id) - .expect("can find user channel"); + let mut channel = state.channels.get(&channel_id)?; let mut names = vec![&channel.name[..]]; // Channel 0 is the root channel while channel.parent != ChannelId(0) { names.push("/"); - channel = state - .channels - .get(&channel.parent) - .expect("can find user channel"); + channel = state.channels.get(&channel.parent)?; names.push(&channel.name); } @@ -199,11 +231,15 @@ impl TeamSpeakConnection { Some(path) }) .await - .unwrap() + .map_err(|e| error!(self.logger, "Failed to get channel path of user"; "error" => %e)) + .ok() + .and_then(|v| v) } - pub async fn my_channel(&mut self) -> ChannelId { + pub async fn current_channel(&mut self) -> Option<ChannelId> { self.handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |conn| { let state = conn.get_state().expect("can get state"); state @@ -213,11 +249,14 @@ impl TeamSpeakConnection { .channel }) .await - .unwrap() + .map_err(|e| error!(self.logger, "Failed to get channel"; "error" => %e)) + .ok() } pub async fn my_id(&mut self) -> ClientId { self.handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |conn| conn.get_state().expect("can get state").own_client) .await .unwrap() @@ -225,6 +264,8 @@ impl TeamSpeakConnection { pub async fn user_count(&mut self, channel: ChannelId) -> u32 { self.handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |conn| { let state = conn.get_state().expect("can get state"); let mut count = 0; @@ -241,88 +282,90 @@ impl TeamSpeakConnection { } pub async fn set_nickname(&mut self, name: String) { - self.handle + if let Err(e) = self + .handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |mut conn| { conn.get_state() .expect("can get state") .client_update() .set_name(&name) .send(&mut conn) - .map_err(|e| error!("Failed to set nickname: {}", e)) }) .await - .unwrap() - .unwrap(); + .and_then(|v| v) + { + error!(self.logger, "Failed to set nickname: {}", e); + } } pub async fn set_description(&mut self, desc: String) { - self.handle + if let Err(e) = self + .handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |mut conn| { let state = conn.get_state().expect("can get state"); - let _ = state + state .clients .get(&state.own_client) .expect("can get myself") .edit() .set_description(&desc) .send(&mut conn) - .map_err(|e| error!("Failed to change description: {}", e)); }) .await - .unwrap() + .and_then(|v| v) + { + error!(self.logger, "Failed to change description: {}", e); + } } pub async fn send_message_to_channel(&mut self, text: String) { - self.handle + if let Err(e) = self + .handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |mut conn| { - let _ = conn - .get_state() + conn.get_state() .expect("can get state") .send_message(MessageTarget::Channel, &text) .send(&mut conn) - .map_err(|e| error!("Failed to send message: {}", e)); }) .await - .unwrap() + .and_then(|v| v) + { + error!(self.logger, "Failed to send message: {}", e); + } } pub async fn send_message_to_user(&mut self, client: ClientId, text: String) { - self.handle + if let Err(e) = self + .handle + .as_mut() + .expect("connect_for_bot was called") .with_connection(move |mut conn| { - let _ = conn - .get_state() + conn.get_state() .expect("can get state") .send_message(MessageTarget::Client(client), &text) .send(&mut conn) - .map_err(|e| error!("Failed to send message: {}", e)); }) .await - .unwrap() + .and_then(|v| v) + { + error!(self.logger, "Failed to send message: {}", e); + } } - pub async fn subscribe(&mut self, id: ChannelId) { - self.handle - .with_connection(move |mut conn| { - let channel = match conn.get_state().expect("can get state").channels.get(&id) { - Some(c) => c, - None => { - error!("Failed to find channel to subscribe to"); - return; - } - }; - - if let Err(e) = channel.set_subscribed(true).send(&mut conn) { - error!("Failed to send subscribe packet: {}", e); - } - }) - .await - .unwrap() - } - - pub async fn disconnect(&mut self, reason: &str) { + pub async fn disconnect(&mut self, reason: &str) -> Result<(), tsclientlib::Error> { let opt = DisconnectOptions::new() .reason(Reason::Clientdisconnect) .message(reason); - self.handle.disconnect(opt).await.unwrap(); + self.handle + .as_mut() + .expect("connect_for_bot was called") + .disconnect(opt) + .await } } |
