diff options
Diffstat (limited to 'src/teamspeak/mod.rs')
| -rw-r--r-- | src/teamspeak/mod.rs | 330 |
1 files changed, 204 insertions, 126 deletions
diff --git a/src/teamspeak/mod.rs b/src/teamspeak/mod.rs index fc10116..59a9d57 100644 --- a/src/teamspeak/mod.rs +++ b/src/teamspeak/mod.rs @@ -1,16 +1,17 @@ use std::sync::{Arc, RwLock}; -use futures::compat::Future01CompatExt; -use futures01::{future::Future, sink::Sink}; -use tokio02::sync::mpsc::UnboundedSender; +use futures::stream::StreamExt; +use tokio::sync::mpsc::UnboundedSender; -use tsclientlib::Event::ConEvents; +use tsclientlib::data::exts::{M2BClientEditExt, M2BClientUpdateExt}; use tsclientlib::{ - events::Event, ChannelId, ClientId, ConnectOptions, Connection, DisconnectOptions, - MessageTarget, Reason, + events::Event, + sync::{SyncConnection, SyncConnectionHandle, SyncStreamItem}, + ChannelId, ClientId, ConnectOptions, Connection, DisconnectOptions, MessageTarget, + OutCommandExt, Reason, }; -use log::error; +use log::{debug, error}; use crate::bot::{Message, MusicBotMessage}; @@ -18,9 +19,9 @@ mod bbcode; pub use bbcode::*; +#[derive(Clone)] pub struct TeamSpeakConnection { - id: ClientId, - conn: Connection, + handle: SyncConnectionHandle, } fn get_message(event: &Event) -> Option<MusicBotMessage> { @@ -28,7 +29,7 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> { match event { Event::Message { - from: target, + target, invoker: sender, message: msg, } => Some(MusicBotMessage::TextMessage(Message { @@ -39,14 +40,17 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> { Event::PropertyAdded { id: property, invoker: _, + extra: _, } => match property { - PropertyId::Channel(id) => Some(MusicBotMessage::ChannelCreated(*id)), + PropertyId::Channel(id) => Some(MusicBotMessage::ChannelAdded(*id)), + PropertyId::Client(id) => Some(MusicBotMessage::ClientAdded(*id)), _ => None, }, Event::PropertyChanged { id: property, old: from, invoker: _, + extra: _, } => match property { PropertyId::ClientChannel(client) => { if let PropertyValue::ChannelId(from) = from { @@ -64,6 +68,7 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> { id: property, old: client, invoker: _, + extra: _, } => match property { PropertyId::Client(id) => { if let PropertyValue::Client(client) = client { @@ -86,30 +91,49 @@ impl TeamSpeakConnection { tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, options: ConnectOptions, ) -> Result<TeamSpeakConnection, tsclientlib::Error> { - let conn = Connection::new(options).compat().await?; - let packet = conn.lock().server.set_subscribed(true); - conn.send_packet(packet).compat().await.unwrap(); - - conn.add_event_listener( - String::from("listener"), - Box::new(move |e| { - if let ConEvents(_conn, events) = e { - for event in *events { - if let Some(msg) = get_message(event) { - let tx = tx.read().expect("RwLock was not poisoned"); - // Ignore the result because the receiver might get dropped first. - let _ = tx.send(msg); + let conn = Connection::new(options)?; + let conn = SyncConnection::from(conn); + let mut handle = conn.get_handle(); + + tokio::spawn(conn.for_each(move |i| { + let tx = tx.clone(); + async move { + match i { + Ok(SyncStreamItem::ConEvents(events)) => { + for event in &events { + if let Some(msg) = get_message(event) { + let tx = tx.read().expect("RwLock was not poisoned"); + // Ignore the result because the receiver might get dropped first. + let _ = tx.send(msg); + } } } + Err(e) => error!("Error occured during event reading: {}", e), + Ok(SyncStreamItem::DisconnectedTemporarily) => debug!("Temporary disconnect!"), + _ => (), } - }), - ); - - let id = conn.lock().own_client; - Ok(TeamSpeakConnection { conn, id }) + } + })); + + handle.wait_until_connected().await?; + + let mut chandle = handle.clone(); + chandle + .with_connection(|mut conn| { + conn.get_state() + .expect("is connected") + .server + .set_subscribed(true) + .send(&mut conn) + .unwrap() + }) + .await + .unwrap(); + + Ok(TeamSpeakConnection { handle }) } - pub fn send_audio_packet(&self, samples: &[u8]) { + pub async fn send_audio_packet(&mut self, samples: &[u8]) { let packet = tsproto_packets::packets::OutAudio::new(&tsproto_packets::packets::AudioData::C2S { id: 0, @@ -117,133 +141,187 @@ impl TeamSpeakConnection { data: samples, }); - let send_packet = self - .conn - .get_packet_sink() - .send(packet) - .map(|_| ()) - .map_err(|_| error!("Failed to send voice packet")); - - tokio::run(send_packet); + self.handle + .with_connection(|conn| { + if let Err(e) = conn + .get_tsproto_client_mut() + .expect("can get tsproto client") + .send_packet(packet) + { + error!("Failed to send voice packet: {}", e); + } + }) + .await + .unwrap(); } - pub fn channel_of_user(&self, id: ClientId) -> Option<ChannelId> { - Some(self.conn.lock().clients.get(&id)?.channel) + pub async fn channel_of_user(&mut self, id: ClientId) -> Option<ChannelId> { + self.handle + .with_connection(move |conn| { + conn.get_state() + .expect("can get state") + .clients + .get(&id) + .map(|c| c.channel) + }) + .await + .unwrap() } - pub fn channel_path_of_user(&self, id: ClientId) -> Option<String> { - let conn = self.conn.lock(); + pub async fn channel_path_of_user(&mut self, id: ClientId) -> Option<String> { + self.handle + .with_connection(move |conn| { + let state = conn.get_state().expect("can get state"); - let channel_id = conn.clients.get(&id)?.channel; + let channel_id = state.clients.get(&id)?.channel; - let mut channel = conn - .channels - .get(&channel_id) - .expect("can find user channel"); + let mut channel = state + .channels + .get(&channel_id) + .expect("can find user channel"); - let mut names = vec![&channel.name[..]]; + let mut names = vec![&channel.name[..]]; - // Channel 0 is the root channel - while channel.parent != ChannelId(0) { - names.push("/"); - channel = conn - .channels - .get(&channel.parent) - .expect("can find user channel"); - names.push(&channel.name); - } + // Channel 0 is the root channel + while channel.parent != ChannelId(0) { + names.push("/"); + channel = state + .channels + .get(&channel.parent) + .expect("can find user channel"); + names.push(&channel.name); + } - let mut path = String::new(); - while let Some(name) = names.pop() { - path.push_str(name); - } + let mut path = String::new(); + while let Some(name) = names.pop() { + path.push_str(name); + } - Some(path) + Some(path) + }) + .await + .unwrap() } - pub fn my_channel(&self) -> ChannelId { - let conn = self.conn.lock(); - conn.clients - .get(&conn.own_client) - .expect("can find myself") - .channel + pub async fn my_channel(&mut self) -> ChannelId { + self.handle + .with_connection(move |conn| { + let state = conn.get_state().expect("can get state"); + state + .clients + .get(&state.own_client) + .expect("can find myself") + .channel + }) + .await + .unwrap() } - pub fn my_id(&self) -> ClientId { - self.id + pub async fn my_id(&mut self) -> ClientId { + self.handle + .with_connection(move |conn| conn.get_state().expect("can get state").own_client) + .await + .unwrap() } - pub fn user_count(&self, channel: ChannelId) -> u32 { - let conn = self.conn.lock(); - let mut count = 0; - for client in conn.clients.values() { - if client.channel == channel { - count += 1; - } - } + pub async fn user_count(&mut self, channel: ChannelId) -> u32 { + self.handle + .with_connection(move |conn| { + let state = conn.get_state().expect("can get state"); + let mut count = 0; + for client in state.clients.values() { + if client.channel == channel { + count += 1; + } + } - count + count + }) + .await + .unwrap() } - pub fn set_nickname(&self, name: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .set_name(name) - .map_err(|e| error!("Failed to set nickname: {}", e)), - ); + pub async fn set_nickname(&mut self, name: String) { + self.handle + .with_connection(move |mut conn| { + conn.get_state() + .expect("can get state") + .client_update() + .set_name(&name) + .send(&mut conn) + .map_err(|e| error!("Failed to set nickname: {}", e)) + }) + .await + .unwrap() + .unwrap(); } - pub fn set_description(&self, desc: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .get_client(&self.conn.lock().own_client) - .expect("can get myself") - .set_description(desc) - .map_err(|e| error!("Failed to change description: {}", e)), - ); + pub async fn set_description(&mut self, desc: String) { + self.handle + .with_connection(move |mut conn| { + let state = conn.get_state().expect("can get state"); + let _ = state + .clients + .get(&state.own_client) + .expect("can get myself") + .edit() + .set_description(&desc) + .send(&mut conn) + .map_err(|e| error!("Failed to change description: {}", e)); + }) + .await + .unwrap() } - pub fn send_message_to_channel(&self, text: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .send_message(MessageTarget::Channel, text) - .map_err(|e| error!("Failed to send message: {}", e)), - ); + pub async fn send_message_to_channel(&mut self, text: String) { + self.handle + .with_connection(move |mut conn| { + let _ = conn + .get_state() + .expect("can get state") + .send_message(MessageTarget::Channel, &text) + .send(&mut conn) + .map_err(|e| error!("Failed to send message: {}", e)); + }) + .await + .unwrap() } - pub fn send_message_to_user(&self, client: ClientId, text: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .send_message(MessageTarget::Client(client), text) - .map_err(|e| error!("Failed to send message: {}", e)), - ); + pub async fn send_message_to_user(&mut self, client: ClientId, text: String) { + self.handle + .with_connection(move |mut conn| { + let _ = conn + .get_state() + .expect("can get state") + .send_message(MessageTarget::Client(client), &text) + .send(&mut conn) + .map_err(|e| error!("Failed to send message: {}", e)); + }) + .await + .unwrap() } - pub fn subscribe_all(&self) { - let packet = self.conn.lock().to_mut().server.set_subscribed(true); - tokio::spawn( - self.conn - .send_packet(packet) - .map_err(|e| error!("Failed to send subscribe packet: {}", e)), - ); + pub async fn subscribe_all(&mut self) { + self.handle + .with_connection(move |mut conn| { + if let Err(e) = conn + .get_state() + .expect("can get state") + .server + .set_subscribed(true) + .send(&mut conn) + { + error!("Failed to send subscribe packet: {}", e); + } + }) + .await + .unwrap() } - pub fn disconnect(&self, reason: &str) { + pub async fn disconnect(&mut self, reason: &str) { let opt = DisconnectOptions::new() .reason(Reason::Clientdisconnect) .message(reason); - tokio::spawn( - self.conn - .disconnect(opt) - .map_err(|e| error!("Failed to send message: {}", e)), - ); + self.handle.disconnect(opt).await.unwrap(); } } |
