diff options
| -rw-r--r-- | Cargo.lock | 1 | ||||
| -rw-r--r-- | Cargo.toml | 2 | ||||
| -rw-r--r-- | src/audio_player.rs | 6 | ||||
| -rw-r--r-- | src/main.rs | 15 | ||||
| -rw-r--r-- | src/teamspeak.rs | 4 |
5 files changed, 17 insertions, 11 deletions
@@ -2697,6 +2697,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e61fe9bd8b36108a3c679bb6de916e39595a7a7f18c5e314c0014bb1b6ba13f7" dependencies = [ "bytes 0.5.3", + "fnv", "iovec", "lazy_static", "memchr", @@ -18,7 +18,7 @@ tokio = "0.1" tokio-process = "0.2.4" tokio-signal = "0.2" # Changes enabled features on tsclientlib to fix errors on it -tokio2 = { package = "tokio", version = "0.2.0", features = ["tcp", "io-util"] } +tokio02 = { package = "tokio", version = "0.2.0", features = ["tcp", "io-util", "sync"] } futures01 = { package = "futures", version = "0.1.21" } futures-preview = { version = "=0.3.0-alpha.19", features = ["compat"] } diff --git a/src/audio_player.rs b/src/audio_player.rs index c4b4b26..68fcc7b 100644 --- a/src/audio_player.rs +++ b/src/audio_player.rs @@ -9,8 +9,8 @@ use gstreamer_audio::{StreamVolume, StreamVolumeFormat}; use crate::{ApplicationMessage, State}; use glib::BoolError; use log::{debug, error, info, warn}; -use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; +use tokio02::sync::mpsc::UnboundedSender; static GST_INIT: Once = Once::new(); @@ -38,7 +38,7 @@ pub struct AudioPlayer { http_src: gst::Element, volume: gst::Element, - sender: Arc<Mutex<Sender<ApplicationMessage>>>, + sender: Arc<Mutex<UnboundedSender<ApplicationMessage>>>, } fn make_element(factoryname: &str, display_name: &str) -> Result<gst::Element, AudioPlayerError> { @@ -87,7 +87,7 @@ fn add_decode_bin_new_pad_callback( impl AudioPlayer { pub fn new( - sender: Arc<Mutex<Sender<ApplicationMessage>>>, + sender: Arc<Mutex<UnboundedSender<ApplicationMessage>>>, callback: Option<Box<dyn FnMut(&[u8]) + Send>>, ) -> Result<Self, AudioPlayerError> { GST_INIT.call_once(|| gst::init().unwrap()); diff --git a/src/main.rs b/src/main.rs index 0941c27..b93b815 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ use futures::future::{FutureExt, TryFutureExt}; use log::{debug, info}; use structopt::clap::AppSettings; use structopt::StructOpt; +use tokio02::sync::mpsc::UnboundedSender; use tsclientlib::{ClientId, ConnectOptions, Identity, Invoker, MessageTarget}; mod audio_player; @@ -17,7 +18,6 @@ mod youtube_dl; use audio_player::*; use playlist::*; -use std::sync::mpsc::Sender; use teamspeak::*; #[derive(StructOpt, Debug)] @@ -58,6 +58,7 @@ struct Args { // 3. Print udp packets } +#[derive(Debug)] pub struct Message { pub target: MessageTarget, pub invoker: Invoker, @@ -72,6 +73,7 @@ pub enum State { EndOfStream, } +#[derive(Debug)] pub enum ApplicationMessage { TextMessage(Message), StateChange(State), @@ -299,7 +301,7 @@ async fn async_main() { debug!("Received CLI arguments: {:?}", std::env::args()); - let (tx, rx) = ::std::sync::mpsc::channel(); + let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); let tx = Arc::new(Mutex::new(tx)); let (player, connection) = if args.local { info!("Starting in CLI mode"); @@ -365,13 +367,13 @@ async fn async_main() { } loop { - while let Ok(msg) = rx.recv() { + while let Some(msg) = rx.recv().await { application.on_message(msg).unwrap(); } } } -fn spawn_stdin_reader(tx: Arc<Mutex<Sender<ApplicationMessage>>>) { +fn spawn_stdin_reader(tx: Arc<Mutex<UnboundedSender<ApplicationMessage>>>) { thread::spawn(move || { let stdin = ::std::io::stdin(); let lock = stdin.lock(); @@ -394,7 +396,10 @@ fn spawn_stdin_reader(tx: Arc<Mutex<Sender<ApplicationMessage>>>) { }); } -fn spawn_gstreamer_thread(player: Arc<AudioPlayer>, tx: Arc<Mutex<Sender<ApplicationMessage>>>) { +fn spawn_gstreamer_thread( + player: Arc<AudioPlayer>, + tx: Arc<Mutex<UnboundedSender<ApplicationMessage>>>, +) { thread::spawn(move || loop { player.poll(); diff --git a/src/teamspeak.rs b/src/teamspeak.rs index 2cd87e7..79dc1bc 100644 --- a/src/teamspeak.rs +++ b/src/teamspeak.rs @@ -1,8 +1,8 @@ use futures::compat::Future01CompatExt; use futures01::{future::Future, sink::Sink}; +use tokio02::sync::mpsc::UnboundedSender; use crate::{ApplicationMessage, Message}; -use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; use tsclientlib::Event::ConEvents; use tsclientlib::{events::Event, ClientId, ConnectOptions, Connection, MessageTarget}; @@ -30,7 +30,7 @@ fn get_message<'a>(event: &Event) -> Option<Message> { impl TeamSpeakConnection { pub async fn new( - tx: Arc<Mutex<Sender<ApplicationMessage>>>, + tx: Arc<Mutex<UnboundedSender<ApplicationMessage>>>, options: ConnectOptions, ) -> Result<TeamSpeakConnection, tsclientlib::Error> { let conn = Connection::new(options).compat().await?; |
