summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml2
-rw-r--r--src/audio_player.rs6
-rw-r--r--src/main.rs15
-rw-r--r--src/teamspeak.rs4
5 files changed, 17 insertions, 11 deletions
diff --git a/Cargo.lock b/Cargo.lock
index a02a6cd..13a9f68 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2697,6 +2697,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e61fe9bd8b36108a3c679bb6de916e39595a7a7f18c5e314c0014bb1b6ba13f7"
dependencies = [
"bytes 0.5.3",
+ "fnv",
"iovec",
"lazy_static",
"memchr",
diff --git a/Cargo.toml b/Cargo.toml
index 6dec758..408e79e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,7 +18,7 @@ tokio = "0.1"
tokio-process = "0.2.4"
tokio-signal = "0.2"
# Changes enabled features on tsclientlib to fix errors on it
-tokio2 = { package = "tokio", version = "0.2.0", features = ["tcp", "io-util"] }
+tokio02 = { package = "tokio", version = "0.2.0", features = ["tcp", "io-util", "sync"] }
futures01 = { package = "futures", version = "0.1.21" }
futures-preview = { version = "=0.3.0-alpha.19", features = ["compat"] }
diff --git a/src/audio_player.rs b/src/audio_player.rs
index c4b4b26..68fcc7b 100644
--- a/src/audio_player.rs
+++ b/src/audio_player.rs
@@ -9,8 +9,8 @@ use gstreamer_audio::{StreamVolume, StreamVolumeFormat};
use crate::{ApplicationMessage, State};
use glib::BoolError;
use log::{debug, error, info, warn};
-use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
+use tokio02::sync::mpsc::UnboundedSender;
static GST_INIT: Once = Once::new();
@@ -38,7 +38,7 @@ pub struct AudioPlayer {
http_src: gst::Element,
volume: gst::Element,
- sender: Arc<Mutex<Sender<ApplicationMessage>>>,
+ sender: Arc<Mutex<UnboundedSender<ApplicationMessage>>>,
}
fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> {
@@ -87,7 +87,7 @@ fn add_decode_bin_new_pad_callback(
impl AudioPlayer {
pub fn new(
- sender: Arc<Mutex<Sender<ApplicationMessage>>>,
+ sender: Arc<Mutex<UnboundedSender<ApplicationMessage>>>,
callback: Option<Box<dyn FnMut(&[u8]) + Send>>,
) -> Result<Self, AudioPlayerError> {
GST_INIT.call_once(|| gst::init().unwrap());
diff --git a/src/main.rs b/src/main.rs
index 0941c27..b93b815 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -8,6 +8,7 @@ use futures::future::{FutureExt, TryFutureExt};
use log::{debug, info};
use structopt::clap::AppSettings;
use structopt::StructOpt;
+use tokio02::sync::mpsc::UnboundedSender;
use tsclientlib::{ClientId, ConnectOptions, Identity, Invoker, MessageTarget};
mod audio_player;
@@ -17,7 +18,6 @@ mod youtube_dl;
use audio_player::*;
use playlist::*;
-use std::sync::mpsc::Sender;
use teamspeak::*;
#[derive(StructOpt, Debug)]
@@ -58,6 +58,7 @@ struct Args {
// 3. Print udp packets
}
+#[derive(Debug)]
pub struct Message {
pub target: MessageTarget,
pub invoker: Invoker,
@@ -72,6 +73,7 @@ pub enum State {
EndOfStream,
}
+#[derive(Debug)]
pub enum ApplicationMessage {
TextMessage(Message),
StateChange(State),
@@ -299,7 +301,7 @@ async fn async_main() {
debug!("Received CLI arguments: {:?}", std::env::args());
- let (tx, rx) = ::std::sync::mpsc::channel();
+ let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel();
let tx = Arc::new(Mutex::new(tx));
let (player, connection) = if args.local {
info!("Starting in CLI mode");
@@ -365,13 +367,13 @@ async fn async_main() {
}
loop {
- while let Ok(msg) = rx.recv() {
+ while let Some(msg) = rx.recv().await {
application.on_message(msg).unwrap();
}
}
}
-fn spawn_stdin_reader(tx: Arc<Mutex<Sender<ApplicationMessage>>>) {
+fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<ApplicationMessage>>>) {
thread::spawn(move || {
let stdin = ::std::io::stdin();
let lock = stdin.lock();
@@ -394,7 +396,10 @@ fn spawn_stdin_reader(tx: Arc<Mutex<Sender<ApplicationMessage>>>) {
});
}
-fn spawn_gstreamer_thread(player: Arc<AudioPlayer>, tx: Arc<Mutex<Sender<ApplicationMessage>>>) {
+fn spawn_gstreamer_thread(
+ player: Arc<AudioPlayer>,
+ tx: Arc<Mutex<UnboundedSender<ApplicationMessage>>>,
+) {
thread::spawn(move || loop {
player.poll();
diff --git a/src/teamspeak.rs b/src/teamspeak.rs
index 2cd87e7..79dc1bc 100644
--- a/src/teamspeak.rs
+++ b/src/teamspeak.rs
@@ -1,8 +1,8 @@
use futures::compat::Future01CompatExt;
use futures01::{future::Future, sink::Sink};
+use tokio02::sync::mpsc::UnboundedSender;
use crate::{ApplicationMessage, Message};
-use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use tsclientlib::Event::ConEvents;
use tsclientlib::{events::Event, ClientId, ConnectOptions, Connection, MessageTarget};
@@ -30,7 +30,7 @@ fn get_message<'a>(event: &Event) -> Option<Message> {
impl TeamSpeakConnection {
pub async fn new(
- tx: Arc<Mutex<Sender<ApplicationMessage>>>,
+ tx: Arc<Mutex<UnboundedSender<ApplicationMessage>>>,
options: ConnectOptions,
) -> Result<TeamSpeakConnection, tsclientlib::Error> {
let conn = Connection::new(options).compat().await?;