aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJokler <jokler@protonmail.com>2020-07-27 01:17:30 +0200
committerJokler <jokler@protonmail.com>2020-09-01 18:52:46 +0200
commitbbe3e1fffc94e7e87237a331de7b09253b0aa3fb (patch)
tree6092b3db497ee0a795f70db695ff2adb3c16e5ee /src
parent130cde033795382b70a312846a8f2704a15d11e3 (diff)
downloadpokebot-bbe3e1fffc94e7e87237a331de7b09253b0aa3fb.tar.gz
pokebot-bbe3e1fffc94e7e87237a331de7b09253b0aa3fb.zip
Upgrade dependencies & use tokio 0.2 exclusively
Diffstat (limited to 'src')
-rw-r--r--src/audio_player.rs12
-rw-r--r--src/bot/master.rs68
-rw-r--r--src/bot/music.rs176
-rw-r--r--src/command.rs4
-rw-r--r--src/main.rs99
-rw-r--r--src/teamspeak/mod.rs330
-rw-r--r--src/web_server.rs6
-rw-r--r--src/web_server/bot_executor.rs4
-rw-r--r--src/web_server/default.rs2
-rw-r--r--src/web_server/tmtu.rs2
-rw-r--r--src/youtube_dl.rs7
11 files changed, 404 insertions, 306 deletions
diff --git a/src/audio_player.rs b/src/audio_player.rs
index 79c54ef..1f6649f 100644
--- a/src/audio_player.rs
+++ b/src/audio_player.rs
@@ -11,7 +11,7 @@ use crate::bot::{MusicBotMessage, State};
use glib::BoolError;
use log::{debug, error, info, warn};
use std::sync::{Arc, RwLock};
-use tokio02::sync::mpsc::UnboundedSender;
+use tokio::sync::mpsc::UnboundedSender;
use crate::command::{Seek, VolumeChange};
use crate::youtube_dl::AudioMetadata;
@@ -144,7 +144,7 @@ impl AudioPlayer {
"audio/x-opus",
&[("channels", &(2i32)), ("rate", &(48_000i32))],
)));
- let callbacks = AppSinkCallbacks::new()
+ let callbacks = AppSinkCallbacks::builder()
.new_sample(move |sink| {
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.get_buffer().ok_or(gst::FlowError::Error)?;
@@ -169,7 +169,7 @@ impl AudioPlayer {
gst::Element::link_many(&[&queue, &convert, &volume, &resample, &sink])?;
};
- let ghost_pad = GhostPad::new(Some("audio bin sink"), queue_sink_pad).unwrap();
+ let ghost_pad = GhostPad::with_target(Some("audio bin sink"), queue_sink_pad).unwrap();
ghost_pad.set_active(true)?;
audio_bin.add_pad(&ghost_pad)?;
@@ -302,7 +302,7 @@ impl AudioPlayer {
pub fn stop_current(&self) -> Result<(), AudioPlayerError> {
info!("Stopping pipeline, sending EOS");
- self.bus.post(&gst::Message::new_eos().build())?;
+ self.bus.post(&gst::message::Eos::new())?;
Ok(())
}
@@ -312,7 +312,9 @@ impl AudioPlayer {
if self
.bus
- .post(&gst::Message::new_application(gst::Structure::new_empty("quit")).build())
+ .post(&gst::message::Application::new(gst::Structure::new_empty(
+ "quit",
+ )))
.is_err()
{
warn!("Tried to send \"quit\" app event on flushing bus.");
diff --git a/src/bot/master.rs b/src/bot/master.rs
index dad2bed..fe3c3fe 100644
--- a/src/bot/master.rs
+++ b/src/bot/master.rs
@@ -2,12 +2,10 @@ use std::collections::HashMap;
use std::future::Future;
use std::sync::{Arc, RwLock};
-use futures::future::{FutureExt, TryFutureExt};
-use futures01::future::Future as Future01;
use log::info;
use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
use serde::{Deserialize, Serialize};
-use tokio02::sync::mpsc::UnboundedSender;
+use tokio::sync::mpsc::UnboundedSender;
use tsclientlib::{ClientId, ConnectOptions, Identity, MessageTarget};
use crate::audio_player::AudioPlayerError;
@@ -20,7 +18,7 @@ use crate::bot::{MusicBot, MusicBotArgs, MusicBotMessage};
pub struct MasterBot {
config: Arc<MasterConfig>,
music_bots: Arc<RwLock<MusicBots>>,
- teamspeak: Arc<TeamSpeakConnection>,
+ teamspeak: TeamSpeakConnection,
sender: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
}
@@ -33,7 +31,7 @@ struct MusicBots {
impl MasterBot {
pub async fn new(args: MasterArgs) -> (Arc<Self>, impl Future) {
- let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel();
+ let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let tx = Arc::new(RwLock::new(tx));
info!("Starting in TeamSpeak mode");
@@ -49,11 +47,9 @@ impl MasterBot {
con_config = con_config.channel(channel);
}
- let connection = Arc::new(
- TeamSpeakConnection::new(tx.clone(), con_config)
- .await
- .unwrap(),
- );
+ let connection = TeamSpeakConnection::new(tx.clone(), con_config)
+ .await
+ .unwrap();
let config = Arc::new(MasterConfig {
master_name: args.master_name,
@@ -81,24 +77,22 @@ impl MasterBot {
sender: tx.clone(),
});
- bot.teamspeak
- .set_description("Poke me if you want a music bot!");
-
let cbot = bot.clone();
let msg_loop = async move {
'outer: loop {
while let Some(msg) = rx.recv().await {
match msg {
MusicBotMessage::Quit(reason) => {
- cbot.teamspeak.disconnect(&reason);
+ let mut cteamspeak = cbot.teamspeak.clone();
+ cteamspeak.disconnect(&reason).await;
break 'outer;
}
MusicBotMessage::ClientDisconnected { id, .. } => {
- if id == cbot.my_id() {
+ if id == cbot.my_id().await {
// TODO Reconnect since quit was not called
break 'outer;
}
- },
+ }
_ => cbot.on_message(msg).await.unwrap(),
}
}
@@ -108,13 +102,14 @@ impl MasterBot {
(bot, msg_loop)
}
- fn build_bot_args_for(&self, id: ClientId) -> Result<MusicBotArgs, BotCreationError> {
- let channel = match self.teamspeak.channel_of_user(id) {
+ async fn build_bot_args_for(&self, id: ClientId) -> Result<MusicBotArgs, BotCreationError> {
+ let mut cteamspeak = self.teamspeak.clone();
+ let channel = match cteamspeak.channel_of_user(id).await {
Some(channel) => channel,
None => return Err(BotCreationError::UnfoundUser),
};
- if channel == self.teamspeak.my_channel() {
+ if channel == cteamspeak.my_channel().await {
return Err(BotCreationError::MasterChannel(
self.config.master_name.clone(),
));
@@ -128,14 +123,14 @@ impl MasterBot {
} = &mut *self.music_bots.write().expect("RwLock was not poisoned");
for bot in connected_bots.values() {
- if bot.my_channel() == channel {
+ if bot.my_channel().await == channel {
return Err(BotCreationError::MultipleBots(bot.name().to_owned()));
}
}
- let channel_path = self
- .teamspeak
+ let channel_path = cteamspeak
.channel_path_of_user(id)
+ .await
.expect("can find poke sender");
available_names.shuffle(rng);
@@ -181,16 +176,19 @@ impl MasterBot {
}
async fn spawn_bot_for(&self, id: ClientId) {
- match self.build_bot_args_for(id) {
+ match self.build_bot_args_for(id).await {
Ok(bot_args) => {
let (bot, fut) = MusicBot::new(bot_args).await;
- tokio::spawn(fut.unit_error().boxed().compat().map(|_| ()));
+ tokio::spawn(fut);
let mut music_bots = self.music_bots.write().expect("RwLock was not poisoned");
music_bots
.connected_bots
.insert(bot.name().to_string(), bot);
}
- Err(e) => self.teamspeak.send_message_to_user(id, &e.to_string()),
+ Err(e) => {
+ let mut cteamspeak = self.teamspeak.clone();
+ cteamspeak.send_message_to_user(id, e.to_string()).await
+ }
}
}
@@ -202,9 +200,19 @@ impl MasterBot {
self.spawn_bot_for(who).await;
}
}
- MusicBotMessage::ChannelCreated(_) => {
+ MusicBotMessage::ChannelAdded(_) => {
// TODO Only subscribe to one channel
- self.teamspeak.subscribe_all();
+ let mut cteamspeak = self.teamspeak.clone();
+ cteamspeak.subscribe_all().await;
+ }
+ MusicBotMessage::ClientAdded(id) => {
+ let mut cteamspeak = self.teamspeak.clone();
+
+ if id == cteamspeak.my_id().await {
+ cteamspeak
+ .set_description(String::from("Poke me if you want a music bot!"))
+ .await;
+ }
}
_ => (),
}
@@ -212,8 +220,10 @@ impl MasterBot {
Ok(())
}
- fn my_id(&self) -> ClientId {
- self.teamspeak.my_id()
+ async fn my_id(&self) -> ClientId {
+ let mut cteamspeak = self.teamspeak.clone();
+
+ cteamspeak.my_id().await
}
pub fn bot_data(&self, name: String) -> Option<crate::web_server::BotData> {
diff --git a/src/bot/music.rs b/src/bot/music.rs
index 71e7b58..656a169 100644
--- a/src/bot/music.rs
+++ b/src/bot/music.rs
@@ -7,7 +7,7 @@ use std::time::Duration;
use log::{debug, info};
use serde::Serialize;
use structopt::StructOpt;
-use tokio02::sync::mpsc::UnboundedSender;
+use tokio::sync::mpsc::UnboundedSender;
use tsclientlib::{data, ChannelId, ClientId, ConnectOptions, Identity, Invoker, MessageTarget};
use crate::audio_player::{AudioPlayer, AudioPlayerError, PollResult};
@@ -52,7 +52,8 @@ pub enum MusicBotMessage {
client: ClientId,
old_channel: ChannelId,
},
- ChannelCreated(ChannelId),
+ ChannelAdded(ChannelId),
+ ClientAdded(ClientId),
ClientDisconnected {
id: ClientId,
client: Box<data::Client>,
@@ -64,7 +65,7 @@ pub enum MusicBotMessage {
pub struct MusicBot {
name: String,
player: Arc<AudioPlayer>,
- teamspeak: Option<Arc<TeamSpeakConnection>>,
+ teamspeak: Option<TeamSpeakConnection>,
playlist: Arc<RwLock<Playlist>>,
state: Arc<RwLock<State>>,
}
@@ -82,8 +83,8 @@ pub struct MusicBotArgs {
}
impl MusicBot {
- pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future) {
- let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel();
+ pub async fn new(args: MusicBotArgs) -> (Arc<Self>, impl Future<Output = ()>) {
+ let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let tx = Arc::new(RwLock::new(tx));
let (player, connection) = if args.local {
info!("Starting in CLI mode");
@@ -102,16 +103,15 @@ impl MusicBot {
.log_udp_packets(args.verbose >= 3)
.channel(args.channel);
- let connection = Arc::new(
- TeamSpeakConnection::new(tx.clone(), con_config)
- .await
- .unwrap(),
- );
- let cconnection = connection.clone();
+ let connection = TeamSpeakConnection::new(tx.clone(), con_config)
+ .await
+ .unwrap();
+ let mut cconnection = connection.clone();
let audio_player = AudioPlayer::new(
tx.clone(),
Some(Box::new(move |samples| {
- cconnection.send_audio_packet(samples);
+ let mut rt = tokio::runtime::Runtime::new().unwrap();
+ rt.block_on(cconnection.send_audio_packet(samples));
})),
)
.unwrap();
@@ -146,7 +146,10 @@ impl MusicBot {
'outer: loop {
while let Some(msg) = rx.recv().await {
if let MusicBotMessage::Quit(reason) = msg {
- cbot.with_teamspeak(|ts| ts.disconnect(&reason));
+ if let Some(ts) = &cbot.teamspeak {
+ let mut ts = ts.clone();
+ ts.disconnect(&reason).await;
+ }
disconnect_cb(name, name_index, id_index);
break 'outer;
}
@@ -156,31 +159,26 @@ impl MusicBot {
debug!("Left message loop");
};
- bot.update_name(State::EndOfStream);
+ bot.update_name(State::EndOfStream).await;
(bot, msg_loop)
}
- #[inline(always)]
- fn with_teamspeak<F: Fn(&TeamSpeakConnection)>(&self, func: F) {
- if let Some(ts) = &self.teamspeak {
- func(&ts);
- }
- }
-
- fn start_playing_audio(&self, metadata: AudioMetadata) {
+ async fn start_playing_audio(&self, metadata: AudioMetadata) {
let duration = if let Some(duration) = metadata.duration {
format!("({})", ts::bold(&humantime::format_duration(duration)))
} else {
format!("")
};
- self.send_message(&format!(
+ self.send_message(format!(
"Playing {} {}",
ts::underline(&metadata.title),
duration
- ));
- self.set_description(&format!("Currently playing '{}'", metadata.title));
+ ))
+ .await;
+ self.set_description(format!("Currently playing '{}'", metadata.title))
+ .await;
self.player.reset().unwrap();
self.player.set_metadata(metadata).unwrap();
self.player.play().unwrap();
@@ -192,12 +190,21 @@ impl MusicBot {
metadata.added_by = user;
info!("Found audio url: {}", metadata.url);
- let mut playlist = self.playlist.write().expect("RwLock was not poisoned");
- playlist.push(metadata.clone());
+ // RWLockGuard can not be kept around or the compiler complains that
+ // it might cross the await boundary
+ self.playlist
+ .write()
+ .expect("RwLock was not poisoned")
+ .push(metadata.clone());
if !self.player.is_started() {
- if let Some(request) = playlist.pop() {
- self.start_playing_audio(request);
+ let entry = self
+ .playlist
+ .write()
+ .expect("RwLock was not poisoned")
+ .pop();
+ if let Some(request) = entry {
+ self.start_playing_audio(request).await;
}
} else {
let duration = if let Some(duration) = metadata.duration {
@@ -206,17 +213,19 @@ impl MusicBot {
format!("")
};
- self.send_message(&format!(
+ self.send_message(format!(
"Added {}{} to playlist",
ts::underline(&metadata.title),
duration
- ));
+ ))
+ .await;
}
}
Err(e) => {
info!("Failed to find audio url: {}", e);
- self.send_message(&format!("Failed to find url: {}", e));
+ self.send_message(format!("Failed to find url: {}", e))
+ .await;
}
}
}
@@ -245,40 +254,52 @@ impl MusicBot {
self.playlist.read().unwrap().to_vec()
}
- pub fn my_channel(&self) -> ChannelId {
- self.teamspeak
- .as_ref()
- .map(|ts| ts.my_channel())
- .expect("my_channel needs ts")
+ pub async fn my_channel(&self) -> ChannelId {
+ let ts = self.teamspeak.as_ref().expect("my_channel needs ts");
+
+ let mut ts = ts.clone();
+ ts.my_channel().await
}
- fn user_count(&self, channel: ChannelId) -> u32 {
- self.teamspeak
- .as_ref()
- .map(|ts| ts.user_count(channel))
- .expect("user_count needs ts")
+ async fn user_count(&self, channel: ChannelId) -> u32 {
+ let ts = self.teamspeak.as_ref().expect("user_count needs ts");
+
+ let mut ts = ts.clone();
+ ts.user_count(channel).await
}
- fn send_message(&self, text: &str) {
+ async fn send_message(&self, text: String) {
debug!("Sending message to TeamSpeak: {}", text);
- self.with_teamspeak(|ts| ts.send_message_to_channel(text));
+ if let Some(ts) = &self.teamspeak {
+ let mut ts = ts.clone();
+ ts.send_message_to_channel(text).await;
+ }
}
- fn set_nickname(&self, name: &str) {
+ async fn set_nickname(&self, name: String) {
info!("Setting TeamSpeak nickname: {}", name);
- self.with_teamspeak(|ts| ts.set_nickname(name));
+ if let Some(ts) = &self.teamspeak {
+ let mut ts = ts.clone();
+ ts.set_nickname(name).await;
+ }
}
- fn set_description(&self, desc: &str) {
+ async fn set_description(&self, desc: String) {
info!("Setting TeamSpeak description: {}", desc);
- self.with_teamspeak(|ts| ts.set_description(desc));
+ if let Some(ts) = &self.teamspeak {
+ let mut ts = ts.clone();
+ ts.set_description(desc).await;
+ }
}
- fn subscribe_all(&self) {
- self.with_teamspeak(|ts| ts.subscribe_all());
+ async fn subscribe_all(&self) {
+ if let Some(ts) = &self.teamspeak {
+ let mut ts = ts.clone();
+ ts.subscribe_all().await;
+ }
}
async fn on_text(&self, message: Message) -> Result<(), AudioPlayerError> {
@@ -289,7 +310,7 @@ impl MusicBot {
match Command::from_iter_safe(&tokens) {
Ok(args) => self.on_command(args, message.invoker).await?,
Err(e) if e.kind == structopt::clap::ErrorKind::HelpDisplayed => {
- self.send_message(&format!("\n{}", e.message));
+ self.send_message(format!("\n{}", e.message)).await;
}
_ => (),
}
@@ -329,9 +350,10 @@ impl MusicBot {
}
Command::Seek { amount } => {
if let Ok(time) = self.player.seek(amount) {
- self.send_message(&format!("New position: {}", ts::bold(&time)));
+ self.send_message(format!("New position: {}", ts::bold(&time)))
+ .await;
} else {
- self.send_message("Failed to seek");
+ self.send_message(String::from("Failed to seek")).await;
}
}
Command::Next => {
@@ -352,7 +374,7 @@ impl MusicBot {
}
Command::Volume { volume } => {
self.player.change_volume(volume)?;
- self.update_name(self.state());
+ self.update_name(self.state()).await;
}
Command::Leave => {
self.quit(String::from("Leaving"));
@@ -362,18 +384,18 @@ impl MusicBot {
Ok(())
}
- fn update_name(&self, state: State) {
+ async fn update_name(&self, state: State) {
let volume = (self.volume() * 100.0).round();
let name = match state {
State::EndOfStream => format!("🎵 {} ({}%)", self.name, volume),
_ => format!("🎵 {} - {} ({}%)", self.name, state, volume),
};
- self.set_nickname(&name);
+ self.set_nickname(name).await;
}
- fn on_state(&self, state: State) -> Result<(), AudioPlayerError> {
- let mut current_state = self.state.write().unwrap();
- if *current_state != state {
+ async fn on_state(&self, state: State) -> Result<(), AudioPlayerError> {
+ let current_state = *self.state.read().unwrap();
+ if current_state != state {
match state {
State::EndOfStream => {
let next_track = self
@@ -384,24 +406,24 @@ impl MusicBot {
if let Some(request) = next_track {
info!("Advancing playlist");
- self.start_playing_audio(request);
+ self.start_playing_audio(request).await;
} else {
- self.update_name(state);
- self.set_description("");
+ self.update_name(state).await;
+ self.set_description(String::new()).await;
}
}
State::Stopped => {
- if *current_state != State::EndOfStream {
- self.update_name(state);
- self.set_description("");
+ if current_state != State::EndOfStream {
+ self.update_name(state).await;
+ self.set_description(String::new()).await;
}
}
- _ => self.update_name(state),
+ _ => self.update_name(state).await,
}
}
- if !(*current_state == State::EndOfStream && state == State::Stopped) {
- *current_state = state;
+ if !(current_state == State::EndOfStream && state == State::Stopped) {
+ *self.state.write().unwrap() = state;
}
Ok(())
@@ -418,28 +440,28 @@ impl MusicBot {
client: _,
old_channel,
} => {
- self.on_client_left_channel(old_channel);
+ self.on_client_left_channel(old_channel).await;
}
MusicBotMessage::ClientDisconnected { id: _, client } => {
let old_channel = client.channel;
- self.on_client_left_channel(old_channel);
+ self.on_client_left_channel(old_channel).await;
}
- MusicBotMessage::ChannelCreated(_) => {
+ MusicBotMessage::ChannelAdded(_) => {
// TODO Only subscribe to one channel
- self.subscribe_all();
+ self.subscribe_all().await;
}
MusicBotMessage::StateChange(state) => {
- self.on_state(state)?;
+ self.on_state(state).await?;
}
- MusicBotMessage::Quit(_) => (),
+ _ => (),
}
Ok(())
}
- fn on_client_left_channel(&self, old_channel: ChannelId) {
- let my_channel = self.my_channel();
- if old_channel == my_channel && self.user_count(my_channel) <= 1 {
+ async fn on_client_left_channel(&self, old_channel: ChannelId) {
+ let my_channel = self.my_channel().await;
+ if old_channel == my_channel && self.user_count(my_channel).await <= 1 {
self.quit(String::from("Channel is empty"));
}
}
diff --git a/src/command.rs b/src/command.rs
index 999ee37..bdac3be 100644
--- a/src/command.rs
+++ b/src/command.rs
@@ -7,12 +7,12 @@ use structopt::StructOpt;
#[structopt(
rename_all = "kebab-case",
template = "{subcommands}",
- raw(global_settings = "&[VersionlessSubcommands,
+ global_settings = &[VersionlessSubcommands,
DisableHelpFlags,
DisableVersion,
ColorNever,
NoBinaryName,
- AllowLeadingHyphen]",)
+ AllowLeadingHyphen],
)]
pub enum Command {
/// Adds url to playlist
diff --git a/src/main.rs b/src/main.rs
index c8c93a4..f755db0 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,10 +2,8 @@ use std::fs::File;
use std::io::{Read, Write};
use std::path::PathBuf;
use std::thread;
-use std::time::{Duration, Instant};
+use std::time::Duration;
-use futures::compat::Future01CompatExt;
-use futures::future::{FutureExt, TryFutureExt};
use log::{debug, error, info};
use structopt::clap::AppSettings;
use structopt::StructOpt;
@@ -22,7 +20,7 @@ mod youtube_dl;
use bot::{MasterArgs, MasterBot, MusicBot, MusicBotArgs};
#[derive(StructOpt, Debug)]
-#[structopt(raw(global_settings = "&[AppSettings::ColoredHelp]"))]
+#[structopt(global_settings = &[AppSettings::ColoredHelp])]
pub struct Args {
#[structopt(short = "l", long = "local", help = "Run locally in text mode")]
local: bool,
@@ -66,13 +64,14 @@ pub struct Args {
// 3. Print udp packets
}
-fn main() {
- if let Err(e) = run() {
+#[tokio::main]
+async fn main() {
+ if let Err(e) = run().await {
println!("Error: {}", e);
}
}
-fn run() -> Result<(), Box<dyn std::error::Error>> {
+async fn run() -> Result<(), Box<dyn std::error::Error>> {
log4rs::init_file("log4rs.yml", Default::default()).unwrap();
// Parse command line options
@@ -137,56 +136,44 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
info!("Starting PokeBot!");
debug!("Received CLI arguments: {:?}", std::env::args());
- tokio::runtime::Runtime::new()?
- .block_on(
- async {
- if bot_args.local {
- let name = bot_args.names[0].clone();
- let id = bot_args.ids.expect("identies should exists")[0].clone();
-
- let disconnect_cb = Box::new(move |_, _, _| {});
-
- let bot_args = MusicBotArgs {
- name,
- name_index: 0,
- id_index: 0,
- local: true,
- address: bot_args.address.clone(),
- id,
- channel: String::from("local"),
- verbose: bot_args.verbose,
- disconnect_cb,
- };
- MusicBot::new(bot_args).await.1.await;
- } else {
- let domain = bot_args.domain.clone();
- let bind_address = bot_args.bind_address.clone();
- let (bot, fut) = MasterBot::new(bot_args).await;
-
- thread::spawn(|| {
- let web_args = web_server::WebServerArgs {
- domain,
- bind_address,
- bot,
- };
- if let Err(e) = web_server::start(web_args) {
- error!("Error in web server: {}", e);
- }
- });
-
- fut.await;
- // Keep tokio running while the bot disconnects
- tokio::timer::Delay::new(Instant::now() + Duration::from_secs(1))
- .compat()
- .await
- .expect("Failed to wait for delay");
- }
+ if bot_args.local {
+ let name = bot_args.names[0].clone();
+ let id = bot_args.ids.expect("identies should exists")[0].clone();
+
+ let disconnect_cb = Box::new(move |_, _, _| {});
+
+ let bot_args = MusicBotArgs {
+ name,
+ name_index: 0,
+ id_index: 0,
+ local: true,
+ address: bot_args.address.clone(),
+ id,
+ channel: String::from("local"),
+ verbose: bot_args.verbose,
+ disconnect_cb,
+ };
+ MusicBot::new(bot_args).await.1.await;
+ } else {
+ let domain = bot_args.domain.clone();
+ let bind_address = bot_args.bind_address.clone();
+ let (bot, fut) = MasterBot::new(bot_args).await;
+
+ thread::spawn(|| {
+ let web_args = web_server::WebServerArgs {
+ domain,
+ bind_address,
+ bot,
+ };
+ if let Err(e) = web_server::start(web_args) {
+ error!("Error in web server: {}", e);
}
- .unit_error()
- .boxed()
- .compat(),
- )
- .expect("Runtime exited on an error");
+ });
+
+ fut.await;
+ // Keep tokio running while the bot disconnects
+ tokio::time::delay_for(Duration::from_secs(1)).await;
+ }
Ok(())
}
diff --git a/src/teamspeak/mod.rs b/src/teamspeak/mod.rs
index fc10116..59a9d57 100644
--- a/src/teamspeak/mod.rs
+++ b/src/teamspeak/mod.rs
@@ -1,16 +1,17 @@
use std::sync::{Arc, RwLock};
-use futures::compat::Future01CompatExt;
-use futures01::{future::Future, sink::Sink};
-use tokio02::sync::mpsc::UnboundedSender;
+use futures::stream::StreamExt;
+use tokio::sync::mpsc::UnboundedSender;
-use tsclientlib::Event::ConEvents;
+use tsclientlib::data::exts::{M2BClientEditExt, M2BClientUpdateExt};
use tsclientlib::{
- events::Event, ChannelId, ClientId, ConnectOptions, Connection, DisconnectOptions,
- MessageTarget, Reason,
+ events::Event,
+ sync::{SyncConnection, SyncConnectionHandle, SyncStreamItem},
+ ChannelId, ClientId, ConnectOptions, Connection, DisconnectOptions, MessageTarget,
+ OutCommandExt, Reason,
};
-use log::error;
+use log::{debug, error};
use crate::bot::{Message, MusicBotMessage};
@@ -18,9 +19,9 @@ mod bbcode;
pub use bbcode::*;
+#[derive(Clone)]
pub struct TeamSpeakConnection {
- id: ClientId,
- conn: Connection,
+ handle: SyncConnectionHandle,
}
fn get_message(event: &Event) -> Option<MusicBotMessage> {
@@ -28,7 +29,7 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> {
match event {
Event::Message {
- from: target,
+ target,
invoker: sender,
message: msg,
} => Some(MusicBotMessage::TextMessage(Message {
@@ -39,14 +40,17 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> {
Event::PropertyAdded {
id: property,
invoker: _,
+ extra: _,
} => match property {
- PropertyId::Channel(id) => Some(MusicBotMessage::ChannelCreated(*id)),
+ PropertyId::Channel(id) => Some(MusicBotMessage::ChannelAdded(*id)),
+ PropertyId::Client(id) => Some(MusicBotMessage::ClientAdded(*id)),
_ => None,
},
Event::PropertyChanged {
id: property,
old: from,
invoker: _,
+ extra: _,
} => match property {
PropertyId::ClientChannel(client) => {
if let PropertyValue::ChannelId(from) = from {
@@ -64,6 +68,7 @@ fn get_message(event: &Event) -> Option<MusicBotMessage> {
id: property,
old: client,
invoker: _,
+ extra: _,
} => match property {
PropertyId::Client(id) => {
if let PropertyValue::Client(client) = client {
@@ -86,30 +91,49 @@ impl TeamSpeakConnection {
tx: Arc<RwLock<UnboundedSender<MusicBotMessage>>>,
options: ConnectOptions,
) -> Result<TeamSpeakConnection, tsclientlib::Error> {
- let conn = Connection::new(options).compat().await?;
- let packet = conn.lock().server.set_subscribed(true);
- conn.send_packet(packet).compat().await.unwrap();
-
- conn.add_event_listener(
- String::from("listener"),
- Box::new(move |e| {
- if let ConEvents(_conn, events) = e {
- for event in *events {
- if let Some(msg) = get_message(event) {
- let tx = tx.read().expect("RwLock was not poisoned");
- // Ignore the result because the receiver might get dropped first.
- let _ = tx.send(msg);
+ let conn = Connection::new(options)?;
+ let conn = SyncConnection::from(conn);
+ let mut handle = conn.get_handle();
+
+ tokio::spawn(conn.for_each(move |i| {
+ let tx = tx.clone();
+ async move {
+ match i {
+ Ok(SyncStreamItem::ConEvents(events)) => {
+ for event in &events {
+ if let Some(msg) = get_message(event) {
+ let tx = tx.read().expect("RwLock was not poisoned");
+ // Ignore the result because the receiver might get dropped first.
+ let _ = tx.send(msg);
+ }
}
}
+ Err(e) => error!("Error occured during event reading: {}", e),
+ Ok(SyncStreamItem::DisconnectedTemporarily) => debug!("Temporary disconnect!"),
+ _ => (),
}
- }),
- );
-
- let id = conn.lock().own_client;
- Ok(TeamSpeakConnection { conn, id })
+ }
+ }));
+
+ handle.wait_until_connected().await?;
+
+ let mut chandle = handle.clone();
+ chandle
+ .with_connection(|mut conn| {
+ conn.get_state()
+ .expect("is connected")
+ .server
+ .set_subscribed(true)
+ .send(&mut conn)
+ .unwrap()
+ })
+ .await
+ .unwrap();
+
+ Ok(TeamSpeakConnection { handle })
}
- pub fn send_audio_packet(&self, samples: &[u8]) {
+ pub async fn send_audio_packet(&mut self, samples: &[u8]) {
let packet =
tsproto_packets::packets::OutAudio::new(&tsproto_packets::packets::AudioData::C2S {
id: 0,
@@ -117,133 +141,187 @@ impl TeamSpeakConnection {
data: samples,
});
- let send_packet = self
- .conn
- .get_packet_sink()
- .send(packet)
- .map(|_| ())
- .map_err(|_| error!("Failed to send voice packet"));
-
- tokio::run(send_packet);
+ self.handle
+ .with_connection(|conn| {
+ if let Err(e) = conn
+ .get_tsproto_client_mut()
+ .expect("can get tsproto client")
+ .send_packet(packet)
+ {
+ error!("Failed to send voice packet: {}", e);
+ }
+ })
+ .await
+ .unwrap();
}
- pub fn channel_of_user(&self, id: ClientId) -> Option<ChannelId> {
- Some(self.conn.lock().clients.get(&id)?.channel)
+ pub async fn channel_of_user(&mut self, id: ClientId) -> Option<ChannelId> {
+ self.handle
+ .with_connection(move |conn| {
+ conn.get_state()
+ .expect("can get state")
+ .clients
+ .get(&id)
+ .map(|c| c.channel)
+ })
+ .await
+ .unwrap()
}
- pub fn channel_path_of_user(&self, id: ClientId) -> Option<String> {
- let conn = self.conn.lock();
+ pub async fn channel_path_of_user(&mut self, id: ClientId) -> Option<String> {
+ self.handle
+ .with_connection(move |conn| {
+ let state = conn.get_state().expect("can get state");
- let channel_id = conn.clients.get(&id)?.channel;
+ let channel_id = state.clients.get(&id)?.channel;
- let mut channel = conn
- .channels
- .get(&channel_id)
- .expect("can find user channel");
+ let mut channel = state
+ .channels
+ .get(&channel_id)
+ .expect("can find user channel");
- let mut names = vec![&channel.name[..]];
+ let mut names = vec![&channel.name[..]];
- // Channel 0 is the root channel
- while channel.parent != ChannelId(0) {
- names.push("/");
- channel = conn
- .channels
- .get(&channel.parent)
- .expect("can find user channel");
- names.push(&channel.name);
- }
+ // Channel 0 is the root channel
+ while channel.parent != ChannelId(0) {
+ names.push("/");
+ channel = state
+ .channels
+ .get(&channel.parent)
+ .expect("can find user channel");
+ names.push(&channel.name);
+ }
- let mut path = String::new();
- while let Some(name) = names.pop() {
- path.push_str(name);
- }
+ let mut path = String::new();
+ while let Some(name) = names.pop() {
+ path.push_str(name);
+ }
- Some(path)
+ Some(path)
+ })
+ .await
+ .unwrap()
}
- pub fn my_channel(&self) -> ChannelId {
- let conn = self.conn.lock();
- conn.clients
- .get(&conn.own_client)
- .expect("can find myself")
- .channel
+ pub async fn my_channel(&mut self) -> ChannelId {
+ self.handle
+ .with_connection(move |conn| {
+ let state = conn.get_state().expect("can get state");
+ state
+ .clients
+ .get(&state.own_client)
+ .expect("can find myself")
+ .channel
+ })
+ .await
+ .unwrap()
}
- pub fn my_id(&self) -> ClientId {
- self.id
+ pub async fn my_id(&mut self) -> ClientId {
+ self.handle
+ .with_connection(move |conn| conn.get_state().expect("can get state").own_client)
+ .await
+ .unwrap()
}
- pub fn user_count(&self, channel: ChannelId) -> u32 {
- let conn = self.conn.lock();
- let mut count = 0;
- for client in conn.clients.values() {
- if client.channel == channel {
- count += 1;
- }
- }
+ pub async fn user_count(&mut self, channel: ChannelId) -> u32 {
+ self.handle
+ .with_connection(move |conn| {
+ let state = conn.get_state().expect("can get state");
+ let mut count = 0;
+ for client in state.clients.values() {
+ if client.channel == channel {
+ count += 1;
+ }
+ }
- count
+ count
+ })
+ .await
+ .unwrap()
}
- pub fn set_nickname(&self, name: &str) {
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .set_name(name)
- .map_err(|e| error!("Failed to set nickname: {}", e)),
- );
+ pub async fn set_nickname(&mut self, name: String) {
+ self.handle
+ .with_connection(move |mut conn| {
+ conn.get_state()
+ .expect("can get state")
+ .client_update()
+ .set_name(&name)
+ .send(&mut conn)
+ .map_err(|e| error!("Failed to set nickname: {}", e))
+ })
+ .await
+ .unwrap()
+ .unwrap();
}
- pub fn set_description(&self, desc: &str) {
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .get_client(&self.conn.lock().own_client)
- .expect("can get myself")
- .set_description(desc)
- .map_err(|e| error!("Failed to change description: {}", e)),
- );
+ pub async fn set_description(&mut self, desc: String) {
+ self.handle
+ .with_connection(move |mut conn| {
+ let state = conn.get_state().expect("can get state");
+ let _ = state
+ .clients
+ .get(&state.own_client)
+ .expect("can get myself")
+ .edit()
+ .set_description(&desc)
+ .send(&mut conn)
+ .map_err(|e| error!("Failed to change description: {}", e));
+ })
+ .await
+ .unwrap()
}
- pub fn send_message_to_channel(&self, text: &str) {
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .send_message(MessageTarget::Channel, text)
- .map_err(|e| error!("Failed to send message: {}", e)),
- );
+ pub async fn send_message_to_channel(&mut self, text: String) {
+ self.handle
+ .with_connection(move |mut conn| {
+ let _ = conn
+ .get_state()
+ .expect("can get state")
+ .send_message(MessageTarget::Channel, &text)
+ .send(&mut conn)
+ .map_err(|e| error!("Failed to send message: {}", e));
+ })
+ .await
+ .unwrap()
}
- pub fn send_message_to_user(&self, client: ClientId, text: &str) {
- tokio::spawn(
- self.conn
- .lock()
- .to_mut()
- .send_message(MessageTarget::Client(client), text)
- .map_err(|e| error!("Failed to send message: {}", e)),
- );
+ pub async fn send_message_to_user(&mut self, client: ClientId, text: String) {
+ self.handle
+ .with_connection(move |mut conn| {
+ let _ = conn
+ .get_state()
+ .expect("can get state")
+ .send_message(MessageTarget::Client(client), &text)
+ .send(&mut conn)
+ .map_err(|e| error!("Failed to send message: {}", e));
+ })
+ .await
+ .unwrap()
}
- pub fn subscribe_all(&self) {
- let packet = self.conn.lock().to_mut().server.set_subscribed(true);
- tokio::spawn(
- self.conn
- .send_packet(packet)
- .map_err(|e| error!("Failed to send subscribe packet: {}", e)),
- );
+ pub async fn subscribe_all(&mut self) {
+ self.handle
+ .with_connection(move |mut conn| {
+ if let Err(e) = conn
+ .get_state()
+ .expect("can get state")
+ .server
+ .set_subscribed(true)
+ .send(&mut conn)
+ {
+ error!("Failed to send subscribe packet: {}", e);
+ }
+ })
+ .await
+ .unwrap()
}
- pub fn disconnect(&self, reason: &str) {
+ pub async fn disconnect(&mut self, reason: &str) {
let opt = DisconnectOptions::new()
.reason(Reason::Clientdisconnect)
.message(reason);
- tokio::spawn(
- self.conn
- .disconnect(opt)
- .map_err(|e| error!("Failed to send message: {}", e)),
- );
+ self.handle.disconnect(opt).await.unwrap();
}
}
diff --git a/src/web_server.rs b/src/web_server.rs
index be373e4..d731fae 100644
--- a/src/web_server.rs
+++ b/src/web_server.rs
@@ -1,10 +1,10 @@
use std::sync::Arc;
use std::time::Duration;
-use actix::{Addr, SyncArbiter};
+use actix::{Actor, Addr};
use actix_web::{get, middleware::Logger, post, web, App, HttpServer, Responder};
-use askama::actix_web::TemplateIntoResponse;
use askama::Template;
+use askama_actix::TemplateIntoResponse;
use serde::{Deserialize, Serialize};
use crate::bot::MasterBot;
@@ -27,7 +27,7 @@ pub struct WebServerArgs {
#[actix_rt::main]
pub async fn start(args: WebServerArgs) -> std::io::Result<()> {
let cbot = args.bot.clone();
- let bot_addr: Addr<BotExecutor> = SyncArbiter::start(4, move || BotExecutor(cbot.clone()));
+ let bot_addr: Addr<BotExecutor> = BotExecutor(cbot.clone()).start();
HttpServer::new(move || {
App::new()
diff --git a/src/web_server/bot_executor.rs b/src/web_server/bot_executor.rs
index fde3c08..0d3e7b7 100644
--- a/src/web_server/bot_executor.rs
+++ b/src/web_server/bot_executor.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;
-use actix::{Actor, Handler, Message, SyncContext};
+use actix::{Actor, Context, Handler, Message};
use crate::bot::MasterBot;
use crate::web_server::BotData;
@@ -8,7 +8,7 @@ use crate::web_server::BotData;
pub struct BotExecutor(pub Arc<MasterBot>);
impl Actor for BotExecutor {
- type Context = SyncContext<Self>;
+ type Context = Context<Self>;
}
pub struct BotNameListRequest;
diff --git a/src/web_server/default.rs b/src/web_server/default.rs
index ec86182..542dade 100644
--- a/src/web_server/default.rs
+++ b/src/web_server/default.rs
@@ -1,7 +1,7 @@
use actix::Addr;
use actix_web::{http::header, web, Error, HttpResponse};
-use askama::actix_web::TemplateIntoResponse;
use askama::Template;
+use askama_actix::TemplateIntoResponse;
use crate::web_server::{filters, BotData, BotDataRequest, BotExecutor, BotNameListRequest};
diff --git a/src/web_server/tmtu.rs b/src/web_server/tmtu.rs
index 0645ee4..33a14af 100644
--- a/src/web_server/tmtu.rs
+++ b/src/web_server/tmtu.rs
@@ -1,7 +1,7 @@
use actix::Addr;
use actix_web::{http::header, web, Error, HttpResponse};
-use askama::actix_web::TemplateIntoResponse;
use askama::Template;
+use askama_actix::TemplateIntoResponse;
use crate::web_server::{filters, BotData, BotDataRequest, BotExecutor, BotNameListRequest};
diff --git a/src/youtube_dl.rs b/src/youtube_dl.rs
index 1b77303..cc708af 100644
--- a/src/youtube_dl.rs
+++ b/src/youtube_dl.rs
@@ -1,8 +1,7 @@
use std::time::Duration;
-use futures::compat::Future01CompatExt;
-use std::process::{Command, Stdio};
-use tokio_process::CommandExt;
+use std::process::Stdio;
+use tokio::process::Command;
use serde::{Deserialize, Serialize};
@@ -38,7 +37,7 @@ pub async fn get_audio_download_url(uri: String) -> Result<AudioMetadata, String
debug!("yt-dl command: {:?}", cmd);
- let ytdl_output = cmd.output_async().compat().await.unwrap();
+ let ytdl_output = cmd.output().await.unwrap();
if !ytdl_output.status.success() {
return Err(String::from_utf8(ytdl_output.stderr).unwrap());