diff options
| author | Jokler <jokler@protonmail.com> | 2020-07-27 01:17:30 +0200 |
|---|---|---|
| committer | Jokler <jokler@protonmail.com> | 2020-09-01 18:52:46 +0200 |
| commit | bbe3e1fffc94e7e87237a331de7b09253b0aa3fb (patch) | |
| tree | 6092b3db497ee0a795f70db695ff2adb3c16e5ee /src | |
| parent | 130cde033795382b70a312846a8f2704a15d11e3 (diff) | |
| download | pokebot-bbe3e1fffc94e7e87237a331de7b09253b0aa3fb.tar.gz pokebot-bbe3e1fffc94e7e87237a331de7b09253b0aa3fb.zip | |
Upgrade dependencies & use tokio 0.2 exclusively
Diffstat (limited to 'src')
| -rw-r--r-- | src/audio_player.rs | 12 | ||||
| -rw-r--r-- | src/bot/master.rs | 68 | ||||
| -rw-r--r-- | src/bot/music.rs | 176 | ||||
| -rw-r--r-- | src/command.rs | 4 | ||||
| -rw-r--r-- | src/main.rs | 99 | ||||
| -rw-r--r-- | src/teamspeak/mod.rs | 330 | ||||
| -rw-r--r-- | src/web_server.rs | 6 | ||||
| -rw-r--r-- | src/web_server/bot_executor.rs | 4 | ||||
| -rw-r--r-- | src/web_server/default.rs | 2 | ||||
| -rw-r--r-- | src/web_server/tmtu.rs | 2 | ||||
| -rw-r--r-- | src/youtube_dl.rs | 7 |
11 files changed, 404 insertions, 306 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs index 79c54ef..1f6649f 100644 --- a/src/audio_player.rs +++ b/src/audio_player.rs @@ -11,7 +11,7 @@ use crate::bot::{MusicBotMessage, State}; use glib::BoolError; use log::{debug, error, info, warn}; use std::sync::{Arc, RwLock}; -use tokio02::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::UnboundedSender; use crate::command::{Seek, VolumeChange}; use crate::youtube_dl::AudioMetadata; @@ -144,7 +144,7 @@ impl AudioPlayer { "audio/x-opus", &[("channels", &(2i32)), ("rate", &(48_000i32))], ))); - let callbacks = AppSinkCallbacks::new() + let callbacks = AppSinkCallbacks::builder() .new_sample(move |sink| { let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; let buffer = sample.get_buffer().ok_or(gst::FlowError::Error)?; @@ -169,7 +169,7 @@ impl AudioPlayer { gst::Element::link_many(&[&queue, &convert, &volume, &resample, &sink])?; }; - let ghost_pad = GhostPad::new(Some("audio bin sink"), queue_sink_pad).unwrap(); + let ghost_pad = GhostPad::with_target(Some("audio bin sink"), queue_sink_pad).unwrap(); ghost_pad.set_active(true)?; audio_bin.add_pad(&ghost_pad)?; @@ -302,7 +302,7 @@ impl AudioPlayer { pub fn stop_current(&self) -> Result<(), AudioPlayerError> { info!("Stopping pipeline, sending EOS"); - self.bus.post(&gst::Message::new_eos().build())?; + self.bus.post(&gst::message::Eos::new())?; Ok(()) } @@ -312,7 +312,9 @@ impl AudioPlayer { if self .bus - .post(&gst::Message::new_application(gst::Structure::new_empty("quit")).build()) + .post(&gst::message::Application::new(gst::Structure::new_empty( + "quit", + ))) .is_err() { warn!("Tried to send \"quit\" app event on flushing bus."); diff --git a/src/bot/master.rs b/src/bot/master.rs index dad2bed..fe3c3fe 100644 --- a/src/bot/master.rs +++ b/src/bot/master.rs @@ -2,12 +2,10 @@ use std::collections::HashMap; use std::future::Future; use std::sync::{Arc, RwLock}; -use futures::future::{FutureExt, TryFutureExt}; -use futures01::future::Future as Future01; use log::info; use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; use serde::{Deserialize, Serialize}; -use tokio02::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::UnboundedSender; use tsclientlib::{ClientId, ConnectOptions, Identity, MessageTarget}; use crate::audio_player::AudioPlayerError; @@ -20,7 +18,7 @@ use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage}; pub struct MasterBot { config: Arc<MasterConfig>, music_bots: Arc<RwLock<MusicBots>>, - teamspeak: Arc<TeamSpeakConnection>, + teamspeak: TeamSpeakConnection, sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, } @@ -33,7 +31,7 @@ struct MusicBots { impl MasterBot { pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) { - let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let tx = Arc::new(RwLock::new(tx)); info!("Starting in TeamSpeak mode"); @@ -49,11 +47,9 @@ impl MasterBot { con_config = con_config.channel(channel); } - let connection = Arc::new( - TeamSpeakConnection::new(tx.clone(), con_config) - .await - .unwrap(), - ); + let connection = TeamSpeakConnection::new(tx.clone(), con_config) + .await + .unwrap(); let config = Arc::new(MasterConfig { master_name: args.master_name, @@ -81,24 +77,22 @@ impl MasterBot { sender: tx.clone(), }); - bot.teamspeak - .set_description("Poke me if you want a music bot!"); - let cbot = bot.clone(); let msg_loop = async move { 'outer: loop { while let Some(msg) = rx.recv().await { match msg { MusicBotMessage::Quit(reason) => { - cbot.teamspeak.disconnect(&reason); + let mut cteamspeak = cbot.teamspeak.clone(); + cteamspeak.disconnect(&reason).await; break 'outer; } MusicBotMessage::ClientDisconnected { id, .. } => { - if id == cbot.my_id() { + if id == cbot.my_id().await { // TODO Reconnect since quit was not called break 'outer; } - }, + } _ => cbot.on_message(msg).await.unwrap(), } } @@ -108,13 +102,14 @@ impl MasterBot { (bot, msg_loop) } - fn build_bot_args_for(&self, id: ClientId) -> Result<MusicBotArgs, BotCreationError> { - let channel = match self.teamspeak.channel_of_user(id) { + async fn build_bot_args_for(&self, id: ClientId) -> Result<MusicBotArgs, BotCreationError> { + let mut cteamspeak = self.teamspeak.clone(); + let channel = match cteamspeak.channel_of_user(id).await { Some(channel) => channel, None => return Err(BotCreationError::UnfoundUser), }; - if channel == self.teamspeak.my_channel() { + if channel == cteamspeak.my_channel().await { return Err(BotCreationError::MasterChannel( self.config.master_name.clone(), )); @@ -128,14 +123,14 @@ impl MasterBot { } = &mut *self.music_bots.write().expect("RwLock was not poisoned"); for bot in connected_bots.values() { - if bot.my_channel() == channel { + if bot.my_channel().await == channel { return Err(BotCreationError::MultipleBots(bot.name().to_owned())); } } - let channel_path = self - .teamspeak + let channel_path = cteamspeak .channel_path_of_user(id) + .await .expect("can find poke sender"); available_names.shuffle(rng); @@ -181,16 +176,19 @@ impl MasterBot { } async fn spawn_bot_for(&self, id: ClientId) { - match self.build_bot_args_for(id) { + match self.build_bot_args_for(id).await { Ok(bot_args) => { let (bot, fut) = MusicBot::new(bot_args).await; - tokio::spawn(fut.unit_error().boxed().compat().map(|_| ())); + tokio::spawn(fut); let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned"); music_bots .connected_bots .insert(bot.name().to_string(), bot); } - Err(e) => self.teamspeak.send_message_to_user(id, &e.to_string()), + Err(e) => { + let mut cteamspeak = self.teamspeak.clone(); + cteamspeak.send_message_to_user(id, e.to_string()).await + } } } @@ -202,9 +200,19 @@ impl MasterBot { self.spawn_bot_for(who).await; } } - MusicBotMessage::ChannelCreated(_) => { + MusicBotMessage::ChannelAdded(_) => { // TODO Only subscribe to one channel - self.teamspeak.subscribe_all(); + let mut cteamspeak = self.teamspeak.clone(); + cteamspeak.subscribe_all().await; + } + MusicBotMessage::ClientAdded(id) => { + let mut cteamspeak = self.teamspeak.clone(); + + if id == cteamspeak.my_id().await { + cteamspeak + .set_description(String::from("Poke me if you want a music bot!")) + .await; + } } _ => (), } @@ -212,8 +220,10 @@ impl MasterBot { Ok(()) } - fn my_id(&self) -> ClientId { - self.teamspeak.my_id() + async fn my_id(&self) -> ClientId { + let mut cteamspeak = self.teamspeak.clone(); + + cteamspeak.my_id().await } pub fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> { diff --git a/src/bot/music.rs b/src/bot/music.rs index 71e7b58..656a169 100644 --- a/src/bot/music.rs +++ b/src/bot/music.rs @@ -7,7 +7,7 @@ use std::time::Duration; use log::{debug, info}; use serde::Serialize; use structopt::StructOpt; -use tokio02::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::UnboundedSender; use tsclientlib::{data, ChannelId, ClientId, ConnectOptions, Identity, Invoker, MessageTarget}; use crate::audio_player::{AudioPlayer, AudioPlayerError, PollResult}; @@ -52,7 +52,8 @@ pub enum MusicBotMessage { client: ClientId, old_channel: ChannelId, }, - ChannelCreated(ChannelId), + ChannelAdded(ChannelId), + ClientAdded(ClientId), ClientDisconnected { id: ClientId, client: Box<data::Client>, @@ -64,7 +65,7 @@ pub enum MusicBotMessage { pub struct MusicBot { name: String, player: Arc<AudioPlayer>, - teamspeak: Option<Arc<TeamSpeakConnection>>, + teamspeak: Option<TeamSpeakConnection>, playlist: Arc<RwLock<Playlist>>, state: Arc<RwLock<State>>, } @@ -82,8 +83,8 @@ pub struct MusicBotArgs { } impl MusicBot { - pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) { - let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); + pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future<Output = ()>) { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let tx = Arc::new(RwLock::new(tx)); let (player, connection) = if args.local { info!("Starting in CLI mode"); @@ -102,16 +103,15 @@ impl MusicBot { .log_udp_packets(args.verbose >= 3) .channel(args.channel); - let connection = Arc::new( - TeamSpeakConnection::new(tx.clone(), con_config) - .await - .unwrap(), - ); - let cconnection = connection.clone(); + let connection = TeamSpeakConnection::new(tx.clone(), con_config) + .await + .unwrap(); + let mut cconnection = connection.clone(); let audio_player = AudioPlayer::new( tx.clone(), Some(Box::new(move |samples| { - cconnection.send_audio_packet(samples); + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(cconnection.send_audio_packet(samples)); })), ) .unwrap(); @@ -146,7 +146,10 @@ impl MusicBot { 'outer: loop { while let Some(msg) = rx.recv().await { if let MusicBotMessage::Quit(reason) = msg { - cbot.with_teamspeak(|ts| ts.disconnect(&reason)); + if let Some(ts) = &cbot.teamspeak { + let mut ts = ts.clone(); + ts.disconnect(&reason).await; + } disconnect_cb(name, name_index, id_index); break 'outer; } @@ -156,31 +159,26 @@ impl MusicBot { debug!("Left message loop"); }; - bot.update_name(State::EndOfStream); + bot.update_name(State::EndOfStream).await; (bot, msg_loop) } - #[inline(always)] - fn with_teamspeak<F: Fn(&TeamSpeakConnection)>(&self, func: F) { - if let Some(ts) = &self.teamspeak { - func(&ts); - } - } - - fn start_playing_audio(&self, metadata: AudioMetadata) { + async fn start_playing_audio(&self, metadata: AudioMetadata) { let duration = if let Some(duration) = metadata.duration { format!("({})", ts::bold(&humantime::format_duration(duration))) } else { format!("") }; - self.send_message(&format!( + self.send_message(format!( "Playing {} {}", ts::underline(&metadata.title), duration - )); - self.set_description(&format!("Currently playing '{}'", metadata.title)); + )) + .await; + self.set_description(format!("Currently playing '{}'", metadata.title)) + .await; self.player.reset().unwrap(); self.player.set_metadata(metadata).unwrap(); self.player.play().unwrap(); @@ -192,12 +190,21 @@ impl MusicBot { metadata.added_by = user; info!("Found audio url: {}", metadata.url); - let mut playlist = self.playlist.write().expect("RwLock was not poisoned"); - playlist.push(metadata.clone()); + // RWLockGuard can not be kept around or the compiler complains that + // it might cross the await boundary + self.playlist + .write() + .expect("RwLock was not poisoned") + .push(metadata.clone()); if !self.player.is_started() { - if let Some(request) = playlist.pop() { - self.start_playing_audio(request); + let entry = self + .playlist + .write() + .expect("RwLock was not poisoned") + .pop(); + if let Some(request) = entry { + self.start_playing_audio(request).await; } } else { let duration = if let Some(duration) = metadata.duration { @@ -206,17 +213,19 @@ impl MusicBot { format!("") }; - self.send_message(&format!( + self.send_message(format!( "Added {}{} to playlist", ts::underline(&metadata.title), duration - )); + )) + .await; } } Err(e) => { info!("Failed to find audio url: {}", e); - self.send_message(&format!("Failed to find url: {}", e)); + self.send_message(format!("Failed to find url: {}", e)) + .await; } } } @@ -245,40 +254,52 @@ impl MusicBot { self.playlist.read().unwrap().to_vec() } - pub fn my_channel(&self) -> ChannelId { - self.teamspeak - .as_ref() - .map(|ts| ts.my_channel()) - .expect("my_channel needs ts") + pub async fn my_channel(&self) -> ChannelId { + let ts = self.teamspeak.as_ref().expect("my_channel needs ts"); + + let mut ts = ts.clone(); + ts.my_channel().await } - fn user_count(&self, channel: ChannelId) -> u32 { - self.teamspeak - .as_ref() - .map(|ts| ts.user_count(channel)) - .expect("user_count needs ts") + async fn user_count(&self, channel: ChannelId) -> u32 { + let ts = self.teamspeak.as_ref().expect("user_count needs ts"); + + let mut ts = ts.clone(); + ts.user_count(channel).await } - fn send_message(&self, text: &str) { + async fn send_message(&self, text: String) { debug!("Sending message to TeamSpeak: {}", text); - self.with_teamspeak(|ts| ts.send_message_to_channel(text)); + if let Some(ts) = &self.teamspeak { + let mut ts = ts.clone(); + ts.send_message_to_channel(text).await; + } } - fn set_nickname(&self, name: &str) { + async fn set_nickname(&self, name: String) { info!("Setting TeamSpeak nickname: {}", name); - self.with_teamspeak(|ts| ts.set_nickname(name)); + if let Some(ts) = &self.teamspeak { + let mut ts = ts.clone(); + ts.set_nickname(name).await; + } } - fn set_description(&self, desc: &str) { + async fn set_description(&self, desc: String) { info!("Setting TeamSpeak description: {}", desc); - self.with_teamspeak(|ts| ts.set_description(desc)); + if let Some(ts) = &self.teamspeak { + let mut ts = ts.clone(); + ts.set_description(desc).await; + } } - fn subscribe_all(&self) { - self.with_teamspeak(|ts| ts.subscribe_all()); + async fn subscribe_all(&self) { + if let Some(ts) = &self.teamspeak { + let mut ts = ts.clone(); + ts.subscribe_all().await; + } } async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> { @@ -289,7 +310,7 @@ impl MusicBot { match Command::from_iter_safe(&tokens) { Ok(args) => self.on_command(args, message.invoker).await?, Err(e) if e.kind == structopt::clap::ErrorKind::HelpDisplayed => { - self.send_message(&format!("\n{}", e.message)); + self.send_message(format!("\n{}", e.message)).await; } _ => (), } @@ -329,9 +350,10 @@ impl MusicBot { } Command::Seek { amount } => { if let Ok(time) = self.player.seek(amount) { - self.send_message(&format!("New position: {}", ts::bold(&time))); + self.send_message(format!("New position: {}", ts::bold(&time))) + .await; } else { - self.send_message("Failed to seek"); + self.send_message(String::from("Failed to seek")).await; } } Command::Next => { @@ -352,7 +374,7 @@ impl MusicBot { } Command::Volume { volume } => { self.player.change_volume(volume)?; - self.update_name(self.state()); + self.update_name(self.state()).await; } Command::Leave => { self.quit(String::from("Leaving")); @@ -362,18 +384,18 @@ impl MusicBot { Ok(()) } - fn update_name(&self, state: State) { + async fn update_name(&self, state: State) { let volume = (self.volume() * 100.0).round(); let name = match state { State::EndOfStream => format!("🎵 {} ({}%)", self.name, volume), _ => format!("🎵 {} - {} ({}%)", self.name, state, volume), }; - self.set_nickname(&name); + self.set_nickname(name).await; } - fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { - let mut current_state = self.state.write().unwrap(); - if *current_state != state { + async fn on_state(&self, state: State) -> Result<(), AudioPlayerError> { + let current_state = *self.state.read().unwrap(); + if current_state != state { match state { State::EndOfStream => { let next_track = self @@ -384,24 +406,24 @@ impl MusicBot { if let Some(request) = next_track { info!("Advancing playlist"); - self.start_playing_audio(request); + self.start_playing_audio(request).await; } else { - self.update_name(state); - self.set_description(""); + self.update_name(state).await; + self.set_description(String::new()).await; } } State::Stopped => { - if *current_state != State::EndOfStream { - self.update_name(state); - self.set_description(""); + if current_state != State::EndOfStream { + self.update_name(state).await; + self.set_description(String::new()).await; } } - _ => self.update_name(state), + _ => self.update_name(state).await, } } - if !(*current_state == State::EndOfStream && state == State::Stopped) { - *current_state = state; + if !(current_state == State::EndOfStream && state == State::Stopped) { + *self.state.write().unwrap() = state; } Ok(()) @@ -418,28 +440,28 @@ impl MusicBot { client: _, old_channel, } => { - self.on_client_left_channel(old_channel); + self.on_client_left_channel(old_channel).await; } MusicBotMessage::ClientDisconnected { id: _, client } => { let old_channel = client.channel; - self.on_client_left_channel(old_channel); + self.on_client_left_channel(old_channel).await; } - MusicBotMessage::ChannelCreated(_) => { + MusicBotMessage::ChannelAdded(_) => { // TODO Only subscribe to one channel - self.subscribe_all(); + self.subscribe_all().await; } MusicBotMessage::StateChange(state) => { - self.on_state(state)?; + self.on_state(state).await?; } - MusicBotMessage::Quit(_) => (), + _ => (), } Ok(()) } - fn on_client_left_channel(&self, old_channel: ChannelId) { - let my_channel = self.my_channel(); - if old_channel == my_channel && self.user_count(my_channel) <= 1 { + async fn on_client_left_channel(&self, old_channel: ChannelId) { + let my_channel = self.my_channel().await; + if old_channel == my_channel && self.user_count(my_channel).await <= 1 { self.quit(String::from("Channel is empty")); } } diff --git a/src/command.rs b/src/command.rs index 999ee37..bdac3be 100644 --- a/src/command.rs +++ b/src/command.rs @@ -7,12 +7,12 @@ use structopt::StructOpt; #[structopt( rename_all = "kebab-case", template = "{subcommands}", - raw(global_settings = "&[VersionlessSubcommands, + global_settings = &[VersionlessSubcommands, DisableHelpFlags, DisableVersion, ColorNever, NoBinaryName, - AllowLeadingHyphen]",) + AllowLeadingHyphen], )] pub enum Command { /// Adds url to playlist diff --git a/src/main.rs b/src/main.rs index c8c93a4..f755db0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,10 +2,8 @@ use std::fs::File; use std::io::{Read, Write}; use std::path::PathBuf; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Duration; -use futures::compat::Future01CompatExt; -use futures::future::{FutureExt, TryFutureExt}; use log::{debug, error, info}; use structopt::clap::AppSettings; use structopt::StructOpt; @@ -22,7 +20,7 @@ mod youtube_dl; use bot::{MasterArgs, MasterBot, MusicBot, MusicBotArgs}; #[derive(StructOpt, Debug)] -#[structopt(raw(global_settings = "&[AppSettings::ColoredHelp]"))] +#[structopt(global_settings = &[AppSettings::ColoredHelp])] pub struct Args { #[structopt(short = "l", long = "local", help = "Run locally in text mode")] local: bool, @@ -66,13 +64,14 @@ pub struct Args { // 3. Print udp packets } -fn main() { - if let Err(e) = run() { +#[tokio::main] +async fn main() { + if let Err(e) = run().await { println!("Error: {}", e); } } -fn run() -> Result<(), Box<dyn std::error::Error>> { +async fn run() -> Result<(), Box<dyn std::error::Error>> { log4rs::init_file("log4rs.yml", Default::default()).unwrap(); // Parse command line options @@ -137,56 +136,44 @@ fn run() -> Result<(), Box<dyn std::error::Error>> { info!("Starting PokeBot!"); debug!("Received CLI arguments: {:?}", std::env::args()); - tokio::runtime::Runtime::new()? - .block_on( - async { - if bot_args.local { - let name = bot_args.names[0].clone(); - let id = bot_args.ids.expect("identies should exists")[0].clone(); - - let disconnect_cb = Box::new(move |_, _, _| {}); - - let bot_args = MusicBotArgs { - name, - name_index: 0, - id_index: 0, - local: true, - address: bot_args.address.clone(), - id, - channel: String::from("local"), - verbose: bot_args.verbose, - disconnect_cb, - }; - MusicBot::new(bot_args).await.1.await; - } else { - let domain = bot_args.domain.clone(); - let bind_address = bot_args.bind_address.clone(); - let (bot, fut) = MasterBot::new(bot_args).await; - - thread::spawn(|| { - let web_args = web_server::WebServerArgs { - domain, - bind_address, - bot, - }; - if let Err(e) = web_server::start(web_args) { - error!("Error in web server: {}", e); - } - }); - - fut.await; - // Keep tokio running while the bot disconnects - tokio::timer::Delay::new(Instant::now() + Duration::from_secs(1)) - .compat() - .await - .expect("Failed to wait for delay"); - } + if bot_args.local { + let name = bot_args.names[0].clone(); + let id = bot_args.ids.expect("identies should exists")[0].clone(); + + let disconnect_cb = Box::new(move |_, _, _| {}); + + let bot_args = MusicBotArgs { + name, + name_index: 0, + id_index: 0, + local: true, + address: bot_args.address.clone(), + id, + channel: String::from("local"), + verbose: bot_args.verbose, + disconnect_cb, + }; + MusicBot::new(bot_args).await.1.await; + } else { + let domain = bot_args.domain.clone(); + let bind_address = bot_args.bind_address.clone(); + let (bot, fut) = MasterBot::new(bot_args).await; + + thread::spawn(|| { + let web_args = web_server::WebServerArgs { + domain, + bind_address, + bot, + }; + if let Err(e) = web_server::start(web_args) { + error!("Error in web server: {}", e); } - .unit_error() - .boxed() - .compat(), - ) - .expect("Runtime exited on an error"); + }); + + fut.await; + // Keep tokio running while the bot disconnects + tokio::time::delay_for(Duration::from_secs(1)).await; + } Ok(()) } diff --git a/src/teamspeak/mod.rs b/src/teamspeak/mod.rs index fc10116..59a9d57 100644 --- a/src/teamspeak/mod.rs +++ b/src/teamspeak/mod.rs @@ -1,16 +1,17 @@ use std::sync::{Arc, RwLock}; -use futures::compat::Future01CompatExt; -use futures01::{future::Future, sink::Sink}; -use tokio02::sync::mpsc::UnboundedSender; +use futures::stream::StreamExt; +use tokio::sync::mpsc::UnboundedSender; -use tsclientlib::Event::ConEvents; +use tsclientlib::data::exts::{M2BClientEditExt, M2BClientUpdateExt}; use tsclientlib::{ - events::Event, ChannelId, ClientId, ConnectOptions, Connection, DisconnectOptions, - MessageTarget, Reason, + events::Event, + sync::{SyncConnection, SyncConnectionHandle, SyncStreamItem}, + ChannelId, ClientId, ConnectOptions, Connection, DisconnectOptions, MessageTarget, + OutCommandExt, Reason, }; -use log::error; +use log::{debug, error}; use crate::bot::{Message, MusicBotMessage}; @@ -18,9 +19,9 @@ mod bbcode; pub use bbcode::*; +#[derive(Clone)] pub struct TeamSpeakConnection { - id: ClientId, - conn: Connection, + handle: SyncConnectionHandle, } fn get_message(event: &Event) -> Option<MusicBotMessage> { @@ -28,7 +29,7 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> { match event { Event::Message { - from: target, + target, invoker: sender, message: msg, } => Some(MusicBotMessage::TextMessage(Message { @@ -39,14 +40,17 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> { Event::PropertyAdded { id: property, invoker: _, + extra: _, } => match property { - PropertyId::Channel(id) => Some(MusicBotMessage::ChannelCreated(*id)), + PropertyId::Channel(id) => Some(MusicBotMessage::ChannelAdded(*id)), + PropertyId::Client(id) => Some(MusicBotMessage::ClientAdded(*id)), _ => None, }, Event::PropertyChanged { id: property, old: from, invoker: _, + extra: _, } => match property { PropertyId::ClientChannel(client) => { if let PropertyValue::ChannelId(from) = from { @@ -64,6 +68,7 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> { id: property, old: client, invoker: _, + extra: _, } => match property { PropertyId::Client(id) => { if let PropertyValue::Client(client) = client { @@ -86,30 +91,49 @@ impl TeamSpeakConnection { tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>, options: ConnectOptions, ) -> Result<TeamSpeakConnection, tsclientlib::Error> { - let conn = Connection::new(options).compat().await?; - let packet = conn.lock().server.set_subscribed(true); - conn.send_packet(packet).compat().await.unwrap(); - - conn.add_event_listener( - String::from("listener"), - Box::new(move |e| { - if let ConEvents(_conn, events) = e { - for event in *events { - if let Some(msg) = get_message(event) { - let tx = tx.read().expect("RwLock was not poisoned"); - // Ignore the result because the receiver might get dropped first. - let _ = tx.send(msg); + let conn = Connection::new(options)?; + let conn = SyncConnection::from(conn); + let mut handle = conn.get_handle(); + + tokio::spawn(conn.for_each(move |i| { + let tx = tx.clone(); + async move { + match i { + Ok(SyncStreamItem::ConEvents(events)) => { + for event in &events { + if let Some(msg) = get_message(event) { + let tx = tx.read().expect("RwLock was not poisoned"); + // Ignore the result because the receiver might get dropped first. + let _ = tx.send(msg); + } } } + Err(e) => error!("Error occured during event reading: {}", e), + Ok(SyncStreamItem::DisconnectedTemporarily) => debug!("Temporary disconnect!"), + _ => (), } - }), - ); - - let id = conn.lock().own_client; - Ok(TeamSpeakConnection { conn, id }) + } + })); + + handle.wait_until_connected().await?; + + let mut chandle = handle.clone(); + chandle + .with_connection(|mut conn| { + conn.get_state() + .expect("is connected") + .server + .set_subscribed(true) + .send(&mut conn) + .unwrap() + }) + .await + .unwrap(); + + Ok(TeamSpeakConnection { handle }) } - pub fn send_audio_packet(&self, samples: &[u8]) { + pub async fn send_audio_packet(&mut self, samples: &[u8]) { let packet = tsproto_packets::packets::OutAudio::new(&tsproto_packets::packets::AudioData::C2S { id: 0, @@ -117,133 +141,187 @@ impl TeamSpeakConnection { data: samples, }); - let send_packet = self - .conn - .get_packet_sink() - .send(packet) - .map(|_| ()) - .map_err(|_| error!("Failed to send voice packet")); - - tokio::run(send_packet); + self.handle + .with_connection(|conn| { + if let Err(e) = conn + .get_tsproto_client_mut() + .expect("can get tsproto client") + .send_packet(packet) + { + error!("Failed to send voice packet: {}", e); + } + }) + .await + .unwrap(); } - pub fn channel_of_user(&self, id: ClientId) -> Option<ChannelId> { - Some(self.conn.lock().clients.get(&id)?.channel) + pub async fn channel_of_user(&mut self, id: ClientId) -> Option<ChannelId> { + self.handle + .with_connection(move |conn| { + conn.get_state() + .expect("can get state") + .clients + .get(&id) + .map(|c| c.channel) + }) + .await + .unwrap() } - pub fn channel_path_of_user(&self, id: ClientId) -> Option<String> { - let conn = self.conn.lock(); + pub async fn channel_path_of_user(&mut self, id: ClientId) -> Option<String> { + self.handle + .with_connection(move |conn| { + let state = conn.get_state().expect("can get state"); - let channel_id = conn.clients.get(&id)?.channel; + let channel_id = state.clients.get(&id)?.channel; - let mut channel = conn - .channels - .get(&channel_id) - .expect("can find user channel"); + let mut channel = state + .channels + .get(&channel_id) + .expect("can find user channel"); - let mut names = vec![&channel.name[..]]; + let mut names = vec![&channel.name[..]]; - // Channel 0 is the root channel - while channel.parent != ChannelId(0) { - names.push("/"); - channel = conn - .channels - .get(&channel.parent) - .expect("can find user channel"); - names.push(&channel.name); - } + // Channel 0 is the root channel + while channel.parent != ChannelId(0) { + names.push("/"); + channel = state + .channels + .get(&channel.parent) + .expect("can find user channel"); + names.push(&channel.name); + } - let mut path = String::new(); - while let Some(name) = names.pop() { - path.push_str(name); - } + let mut path = String::new(); + while let Some(name) = names.pop() { + path.push_str(name); + } - Some(path) + Some(path) + }) + .await + .unwrap() } - pub fn my_channel(&self) -> ChannelId { - let conn = self.conn.lock(); - conn.clients - .get(&conn.own_client) - .expect("can find myself") - .channel + pub async fn my_channel(&mut self) -> ChannelId { + self.handle + .with_connection(move |conn| { + let state = conn.get_state().expect("can get state"); + state + .clients + .get(&state.own_client) + .expect("can find myself") + .channel + }) + .await + .unwrap() } - pub fn my_id(&self) -> ClientId { - self.id + pub async fn my_id(&mut self) -> ClientId { + self.handle + .with_connection(move |conn| conn.get_state().expect("can get state").own_client) + .await + .unwrap() } - pub fn user_count(&self, channel: ChannelId) -> u32 { - let conn = self.conn.lock(); - let mut count = 0; - for client in conn.clients.values() { - if client.channel == channel { - count += 1; - } - } + pub async fn user_count(&mut self, channel: ChannelId) -> u32 { + self.handle + .with_connection(move |conn| { + let state = conn.get_state().expect("can get state"); + let mut count = 0; + for client in state.clients.values() { + if client.channel == channel { + count += 1; + } + } - count + count + }) + .await + .unwrap() } - pub fn set_nickname(&self, name: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .set_name(name) - .map_err(|e| error!("Failed to set nickname: {}", e)), - ); + pub async fn set_nickname(&mut self, name: String) { + self.handle + .with_connection(move |mut conn| { + conn.get_state() + .expect("can get state") + .client_update() + .set_name(&name) + .send(&mut conn) + .map_err(|e| error!("Failed to set nickname: {}", e)) + }) + .await + .unwrap() + .unwrap(); } - pub fn set_description(&self, desc: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .get_client(&self.conn.lock().own_client) - .expect("can get myself") - .set_description(desc) - .map_err(|e| error!("Failed to change description: {}", e)), - ); + pub async fn set_description(&mut self, desc: String) { + self.handle + .with_connection(move |mut conn| { + let state = conn.get_state().expect("can get state"); + let _ = state + .clients + .get(&state.own_client) + .expect("can get myself") + .edit() + .set_description(&desc) + .send(&mut conn) + .map_err(|e| error!("Failed to change description: {}", e)); + }) + .await + .unwrap() } - pub fn send_message_to_channel(&self, text: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .send_message(MessageTarget::Channel, text) - .map_err(|e| error!("Failed to send message: {}", e)), - ); + pub async fn send_message_to_channel(&mut self, text: String) { + self.handle + .with_connection(move |mut conn| { + let _ = conn + .get_state() + .expect("can get state") + .send_message(MessageTarget::Channel, &text) + .send(&mut conn) + .map_err(|e| error!("Failed to send message: {}", e)); + }) + .await + .unwrap() } - pub fn send_message_to_user(&self, client: ClientId, text: &str) { - tokio::spawn( - self.conn - .lock() - .to_mut() - .send_message(MessageTarget::Client(client), text) - .map_err(|e| error!("Failed to send message: {}", e)), - ); + pub async fn send_message_to_user(&mut self, client: ClientId, text: String) { + self.handle + .with_connection(move |mut conn| { + let _ = conn + .get_state() + .expect("can get state") + .send_message(MessageTarget::Client(client), &text) + .send(&mut conn) + .map_err(|e| error!("Failed to send message: {}", e)); + }) + .await + .unwrap() } - pub fn subscribe_all(&self) { - let packet = self.conn.lock().to_mut().server.set_subscribed(true); - tokio::spawn( - self.conn - .send_packet(packet) - .map_err(|e| error!("Failed to send subscribe packet: {}", e)), - ); + pub async fn subscribe_all(&mut self) { + self.handle + .with_connection(move |mut conn| { + if let Err(e) = conn + .get_state() + .expect("can get state") + .server + .set_subscribed(true) + .send(&mut conn) + { + error!("Failed to send subscribe packet: {}", e); + } + }) + .await + .unwrap() } - pub fn disconnect(&self, reason: &str) { + pub async fn disconnect(&mut self, reason: &str) { let opt = DisconnectOptions::new() .reason(Reason::Clientdisconnect) .message(reason); - tokio::spawn( - self.conn - .disconnect(opt) - .map_err(|e| error!("Failed to send message: {}", e)), - ); + self.handle.disconnect(opt).await.unwrap(); } } diff --git a/src/web_server.rs b/src/web_server.rs index be373e4..d731fae 100644 --- a/src/web_server.rs +++ b/src/web_server.rs @@ -1,10 +1,10 @@ use std::sync::Arc; use std::time::Duration; -use actix::{Addr, SyncArbiter}; +use actix::{Actor, Addr}; use actix_web::{get, middleware::Logger, post, web, App, HttpServer, Responder}; -use askama::actix_web::TemplateIntoResponse; use askama::Template; +use askama_actix::TemplateIntoResponse; use serde::{Deserialize, Serialize}; use crate::bot::MasterBot; @@ -27,7 +27,7 @@ pub struct WebServerArgs { #[actix_rt::main] pub async fn start(args: WebServerArgs) -> std::io::Result<()> { let cbot = args.bot.clone(); - let bot_addr: Addr<BotExecutor> = SyncArbiter::start(4, move || BotExecutor(cbot.clone())); + let bot_addr: Addr<BotExecutor> = BotExecutor(cbot.clone()).start(); HttpServer::new(move || { App::new() diff --git a/src/web_server/bot_executor.rs b/src/web_server/bot_executor.rs index fde3c08..0d3e7b7 100644 --- a/src/web_server/bot_executor.rs +++ b/src/web_server/bot_executor.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use actix::{Actor, Handler, Message, SyncContext}; +use actix::{Actor, Context, Handler, Message}; use crate::bot::MasterBot; use crate::web_server::BotData; @@ -8,7 +8,7 @@ use crate::web_server::BotData; pub struct BotExecutor(pub Arc<MasterBot>); impl Actor for BotExecutor { - type Context = SyncContext<Self>; + type Context = Context<Self>; } pub struct BotNameListRequest; diff --git a/src/web_server/default.rs b/src/web_server/default.rs index ec86182..542dade 100644 --- a/src/web_server/default.rs +++ b/src/web_server/default.rs @@ -1,7 +1,7 @@ use actix::Addr; use actix_web::{http::header, web, Error, HttpResponse}; -use askama::actix_web::TemplateIntoResponse; use askama::Template; +use askama_actix::TemplateIntoResponse; use crate::web_server::{filters, BotData, BotDataRequest, BotExecutor, BotNameListRequest}; diff --git a/src/web_server/tmtu.rs b/src/web_server/tmtu.rs index 0645ee4..33a14af 100644 --- a/src/web_server/tmtu.rs +++ b/src/web_server/tmtu.rs @@ -1,7 +1,7 @@ use actix::Addr; use actix_web::{http::header, web, Error, HttpResponse}; -use askama::actix_web::TemplateIntoResponse; use askama::Template; +use askama_actix::TemplateIntoResponse; use crate::web_server::{filters, BotData, BotDataRequest, BotExecutor, BotNameListRequest}; diff --git a/src/youtube_dl.rs b/src/youtube_dl.rs index 1b77303..cc708af 100644 --- a/src/youtube_dl.rs +++ b/src/youtube_dl.rs @@ -1,8 +1,7 @@ use std::time::Duration; -use futures::compat::Future01CompatExt; -use std::process::{Command, Stdio}; -use tokio_process::CommandExt; +use std::process::Stdio; +use tokio::process::Command; use serde::{Deserialize, Serialize}; @@ -38,7 +37,7 @@ pub async fn get_audio_download_url(uri: String) -> Result<AudioMetadata, String debug!("yt-dl command: {:?}", cmd); - let ytdl_output = cmd.output_async().compat().await.unwrap(); + let ytdl_output = cmd.output().await.unwrap(); if !ytdl_output.status.success() { return Err(String::from_utf8(ytdl_output.stderr).unwrap()); |
