aboutsummaryrefslogtreecommitdiffstats
path: root/src/teamspeak/mod.rs
diff options
context:
space:
mode:
authorJokler <jokler@protonmail.com>2020-09-29 15:18:47 +0200
committerGitHub <noreply@github.com>2020-09-29 15:18:47 +0200
commite44a251fe0e1b82c859515768e483f19b1b5aaf3 (patch)
tree6092b3db497ee0a795f70db695ff2adb3c16e5ee /src/teamspeak/mod.rs
parent130cde033795382b70a312846a8f2704a15d11e3 (diff)
parentbbe3e1fffc94e7e87237a331de7b09253b0aa3fb (diff)
downloadpokebot-e44a251fe0e1b82c859515768e483f19b1b5aaf3.tar.gz
pokebot-e44a251fe0e1b82c859515768e483f19b1b5aaf3.zip
Merge pull request #59 from Mavulp/update-dependencies
Upgrade dependencies & use tokio 0.2 exclusively
Diffstat (limited to 'src/teamspeak/mod.rs')
-rw-r--r--src/teamspeak/mod.rs330
1 files changed, 204 insertions, 126 deletions
diff --git a/src/teamspeak/mod.rs b/src/teamspeak/mod.rs
index fc10116..59a9d57 100644
--- a/src/teamspeak/mod.rs
+++ b/src/teamspeak/mod.rs
@@ -1,16 +1,17 @@
use std::sync::{Arc, RwLock};
-use futures::compat::Future01CompatExt;
-use futures01::{future::Future, sink::Sink};
-use tokio02::sync::mpsc::UnboundedSender;
+use futures::stream::StreamExt;
+use tokio::sync::mpsc::UnboundedSender;
-use tsclientlib::Event::ConEvents;
+use tsclientlib::data::exts::{M2BClientEditExt, M2BClientUpdateExt};
use tsclientlib::{
- events::Event, ChannelId, ClientId, ConnectOptions, Connection, DisconnectOptions,
- MessageTarget, Reason,
+ events::Event,
+ sync::{SyncConnection, SyncConnectionHandle, SyncStreamItem},
+ ChannelId, ClientId, ConnectOptions, Connection, DisconnectOptions, MessageTarget,
+ OutCommandExt, Reason,
};
-use log::error;
+use log::{debug, error};
use crate::bot::{Message, MusicBotMessage};
@@ -18,9 +19,9 @@ mod bbcode;
pub use bbcode::*;
+#[derive(Clone)]
pub struct TeamSpeakConnection {
- id: ClientId,
- conn: Connection,
+ handle: SyncConnectionHandle,
}
fn get_message(event: &Event) -> Option<MusicBotMessage> {
@@ -28,7 +29,7 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> {
match event {
Event::Message {
- from: target,
+ target,
invoker: sender,
message: msg,
} => Some(MusicBotMessage::TextMessage(Message {
@@ -39,14 +40,17 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> {
Event::PropertyAdded {
id: property,
invoker: _,
+ extra: _,
} => match property {
- PropertyId::Channel(id) => Some(MusicBotMessage::ChannelCreated(*id)),
+ PropertyId::Channel(id) => Some(MusicBotMessage::ChannelAdded(*id)),
+ PropertyId::Client(id) => Some(MusicBotMessage::ClientAdded(*id)),
_ => None,
},
Event::PropertyChanged {
id: property,
old: from,
invoker: _,
+ extra: _,
} => match property {
PropertyId::ClientChannel(client) => {
if let PropertyValue::ChannelId(from) = from {
@@ -64,6 +68,7 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> {
id: property,
old: client,
invoker: _,
+ extra: _,
} => match property {
PropertyId::Client(id) => {
if let PropertyValue::Client(client) = client {
@@ -86,30 +91,49 @@ impl TeamSpeakConnection {
tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
options: ConnectOptions,
) -> Result<TeamSpeakConnection, tsclientlib::Error> {
- let conn = Connection::new(options).compat().await?;
- let packet = conn.lock().server.set_subscribed(true);
- conn.send_packet(packet).compat().await.unwrap();
-
- conn.add_event_listener(
- String::from("listener"),
- Box::new(move |e| {
- if let ConEvents(_conn, events) = e {
- for event in *events {
- if let Some(msg) = get_message(event) {
- let tx = tx.read().expect("RwLock was not poisoned");
- // Ignore the result because the receiver might get dropped first.
- let _ = tx.send(msg);
+ let conn = Connection::new(options)?;
+ let conn = SyncConnection::from(conn);
+ let mut handle = conn.get_handle();
+
+ tokio::spawn(conn.for_each(move |i| {
+ let tx = tx.clone();
+ async move {
+ match i {
+ Ok(SyncStreamItem::ConEvents(events)) => {
+ for event in &events {
+ if let Some(msg) = get_message(event) {
+ let tx = tx.read().expect("RwLock was not poisoned");
+ // Ignore the result because the receiver might get dropped first.
+ let _ = tx.send(msg);
+ }
}
}
+ Err(e) => error!("Error occured during event reading: {}", e),
+ Ok(SyncStreamItem::DisconnectedTemporarily) => debug!("Temporary disconnect!"),
+ _ => (),
}
- }),
- );
-
- let id = conn.lock().own_client;
- Ok(TeamSpeakConnection { conn, id })
+ }
+ }));
+
+ handle.wait_until_connected().await?;
+
+ let mut chandle = handle.clone();
+ chandle
+ .with_connection(|mut conn| {
+ conn.get_state()
+ .expect("is connected")
+ .server
+ .set_subscribed(true)
+ .send(&mut conn)
+ .unwrap()
+ })
+ .await
+ .unwrap();
+
+ Ok(TeamSpeakConnection { handle })
}
- pub fn send_audio_packet(&self, samples: &[u8]) {
+ pub async fn send_audio_packet(&mut self, samples: &[u8]) {
let packet =
tsproto_packets::packets::OutAudio::new(&tsproto_packets::packets::AudioData::C2S {
id: 0,
@@ -117,133 +141,187 @@ impl TeamSpeakConnection {
data: samples,
});
- let send_packet = self
- .conn
- .get_packet_sink()
- .send(packet)
- .map(|_| ())
- .map_err(|_| error!("Failed to send voice packet"));
-
- tokio::run(send_packet);
+ self.handle
+ .with_connection(|conn| {
+ if let Err(e) = conn
+ .get_tsproto_client_mut()
+ .expect("can get tsproto client")
+ .send_packet(packet)
+ {
+ error!("Failed to send voice packet: {}", e);
+ }
+ })
+ .await
+ .unwrap();
}
- pub fn channel_of_user(&self, id: ClientId) -> Option<ChannelId> {
- Some(self.conn.lock().clients.get(&id)?.channel)
+ pub async fn channel_of_user(&mut self, id: ClientId) -> Option<ChannelId> {
+ self.handle
+ .with_connection(move |conn| {
+ conn.get_state()
+ .expect("can get state")
+ .clients
+ .get(&id)
+ .map(|c| c.channel)
+ })
+ .await
+ .unwrap()
}
- pub fn channel_path_of_user(&self, id: ClientId) -> Option<String> {
- let conn = self.conn.lock();
+ pub async fn channel_path_of_user(&mut self, id: ClientId) -> Option<String> {
+ self.handle
+ .with_connection(move |conn| {
+ let state = conn.get_state().expect("can get state");
- let channel_id = conn.clients.get(&id)?.channel;
+ let channel_id = state.clients.get(&id)?.channel;
- let mut channel = conn
- .channels
- .get(&channel_id)
- .expect("can find user channel");
+ let mut channel = state
+ .channels
+ .get(&channel_id)
+ .expect("can find user channel");
- let mut names = vec![&channel.name[..]];
+ let mut names = vec![&channel.name[..]];
- // Channel 0 is the root channel
- while channel.parent != ChannelId(0) {
- names.push("/");
- channel = conn
- .channels
- .get(&channel.parent)
- .expect("can find user channel");
- names.push(&channel.name);
- }
+ // Channel 0 is the root channel
+ while channel.parent != ChannelId(0) {
+ names.push("/");
+ channel = state
+ .channels
+ .get(&channel.parent)
+ .expect("can find user channel");
+ names.push(&channel.name);
+ }
- let mut path = String::new();
- while let Some(name) = names.pop() {
- path.push_str(name);
- }
+ let mut path = String::new();
+ while let Some(name) = names.pop() {
+ path.push_str(name);
+ }
- Some(path)
+ Some(path)
+ })
+ .await
+ .unwrap()
}
- pub fn my_channel(&self) -> ChannelId {
- let conn = self.conn.lock();
- conn.clients
- .get(&conn.own_client)
- .expect("can find myself")
- .channel
+ pub async fn my_channel(&mut self) -> ChannelId {
+ self.handle
+ .with_connection(move |conn| {
+ let state = conn.get_state().expect("can get state");
+ state
+ .clients
+ .get(&state.own_client)
+ .expect("can find myself")
+ .channel
+ })
+ .await
+ .unwrap()
}
- pub fn my_id(&self) -> ClientId {
- self.id
+ pub async fn my_id(&mut self) -> ClientId {
+ self.handle
+ .with_connection(move |conn| conn.get_state().expect("can get state").own_client)
+ .await
+ .unwrap()
}
- pub fn user_count(&self, channel: ChannelId) -> u32 {
- let conn = self.conn.lock();
- let mut count = 0;
- for client in conn.clients.values() {
- if client.channel == channel {
- count += 1;
- }
- }
+ pub async fn user_count(&mut self, channel: ChannelId) -> u32 {
+ self.handle
+ .with_connection(move |conn| {
+ let state = conn.get_state().expect("can get state");
+ let mut count = 0;
+ for client in state.clients.values() {
+ if client.channel == channel {
+ count += 1;
+ }
+ }
- count
+ count
+ })
+ .await
+ .unwrap()
}
- pub fn set_nickname(&self, name: &str) {
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .set_name(name)
- .map_err(|e| error!("Failed to set nickname: {}", e)),
- );
+ pub async fn set_nickname(&mut self, name: String) {
+ self.handle
+ .with_connection(move |mut conn| {
+ conn.get_state()
+ .expect("can get state")
+ .client_update()
+ .set_name(&name)
+ .send(&mut conn)
+ .map_err(|e| error!("Failed to set nickname: {}", e))
+ })
+ .await
+ .unwrap()
+ .unwrap();
}
- pub fn set_description(&self, desc: &str) {
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .get_client(&self.conn.lock().own_client)
- .expect("can get myself")
- .set_description(desc)
- .map_err(|e| error!("Failed to change description: {}", e)),
- );
+ pub async fn set_description(&mut self, desc: String) {
+ self.handle
+ .with_connection(move |mut conn| {
+ let state = conn.get_state().expect("can get state");
+ let _ = state
+ .clients
+ .get(&state.own_client)
+ .expect("can get myself")
+ .edit()
+ .set_description(&desc)
+ .send(&mut conn)
+ .map_err(|e| error!("Failed to change description: {}", e));
+ })
+ .await
+ .unwrap()
}
- pub fn send_message_to_channel(&self, text: &str) {
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .send_message(MessageTarget::Channel, text)
- .map_err(|e| error!("Failed to send message: {}", e)),
- );
+ pub async fn send_message_to_channel(&mut self, text: String) {
+ self.handle
+ .with_connection(move |mut conn| {
+ let _ = conn
+ .get_state()
+ .expect("can get state")
+ .send_message(MessageTarget::Channel, &text)
+ .send(&mut conn)
+ .map_err(|e| error!("Failed to send message: {}", e));
+ })
+ .await
+ .unwrap()
}
- pub fn send_message_to_user(&self, client: ClientId, text: &str) {
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .send_message(MessageTarget::Client(client), text)
- .map_err(|e| error!("Failed to send message: {}", e)),
- );
+ pub async fn send_message_to_user(&mut self, client: ClientId, text: String) {
+ self.handle
+ .with_connection(move |mut conn| {
+ let _ = conn
+ .get_state()
+ .expect("can get state")
+ .send_message(MessageTarget::Client(client), &text)
+ .send(&mut conn)
+ .map_err(|e| error!("Failed to send message: {}", e));
+ })
+ .await
+ .unwrap()
}
- pub fn subscribe_all(&self) {
- let packet = self.conn.lock().to_mut().server.set_subscribed(true);
- tokio::spawn(
- self.conn
- .send_packet(packet)
- .map_err(|e| error!("Failed to send subscribe packet: {}", e)),
- );
+ pub async fn subscribe_all(&mut self) {
+ self.handle
+ .with_connection(move |mut conn| {
+ if let Err(e) = conn
+ .get_state()
+ .expect("can get state")
+ .server
+ .set_subscribed(true)
+ .send(&mut conn)
+ {
+ error!("Failed to send subscribe packet: {}", e);
+ }
+ })
+ .await
+ .unwrap()
}
- pub fn disconnect(&self, reason: &str) {
+ pub async fn disconnect(&mut self, reason: &str) {
let opt = DisconnectOptions::new()
.reason(Reason::Clientdisconnect)
.message(reason);
- tokio::spawn(
- self.conn
- .disconnect(opt)
- .map_err(|e| error!("Failed to send message: {}", e)),
- );
+ self.handle.disconnect(opt).await.unwrap();
}
}