From 8fe65c0e790e15eab8a3009c7ae6eb272a6bf862 Mon Sep 17 00:00:00 2001 From: Jokler Date: Tue, 14 Jan 2020 04:46:57 +0100 Subject: Use async channel to guarantee tokio never blocks --- src/audio_player.rs | 6 +++--- src/main.rs | 15 ++++++++++----- src/teamspeak.rs | 4 ++-- 3 files changed, 15 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/audio_player.rs b/src/audio_player.rs index c4b4b26..68fcc7b 100644 --- a/src/audio_player.rs +++ b/src/audio_player.rs @@ -9,8 +9,8 @@ use gstreamer_audio::{StreamVolume, StreamVolumeFormat}; use crate::{ApplicationMessage, State}; use glib::BoolError; use log::{debug, error, info, warn}; -use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; +use tokio02::sync::mpsc::UnboundedSender; static GST_INIT: Once = Once::new(); @@ -38,7 +38,7 @@ pub struct AudioPlayer { http_src: gst::Element, volume: gst::Element, - sender: Arc>>, + sender: Arc>>, } fn make_element(factoryname: &str, display_name: &str) -> Result { @@ -87,7 +87,7 @@ fn add_decode_bin_new_pad_callback( impl AudioPlayer { pub fn new( - sender: Arc>>, + sender: Arc>>, callback: Option>, ) -> Result { GST_INIT.call_once(|| gst::init().unwrap()); diff --git a/src/main.rs b/src/main.rs index 0941c27..b93b815 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ use futures::future::{FutureExt, TryFutureExt}; use log::{debug, info}; use structopt::clap::AppSettings; use structopt::StructOpt; +use tokio02::sync::mpsc::UnboundedSender; use tsclientlib::{ClientId, ConnectOptions, Identity, Invoker, MessageTarget}; mod audio_player; @@ -17,7 +18,6 @@ mod youtube_dl; use audio_player::*; use playlist::*; -use std::sync::mpsc::Sender; use teamspeak::*; #[derive(StructOpt, Debug)] @@ -58,6 +58,7 @@ struct Args { // 3. Print udp packets } +#[derive(Debug)] pub struct Message { pub target: MessageTarget, pub invoker: Invoker, @@ -72,6 +73,7 @@ pub enum State { EndOfStream, } +#[derive(Debug)] pub enum ApplicationMessage { TextMessage(Message), StateChange(State), @@ -299,7 +301,7 @@ async fn async_main() { debug!("Received CLI arguments: {:?}", std::env::args()); - let (tx, rx) = ::std::sync::mpsc::channel(); + let (tx, mut rx) = tokio02::sync::mpsc::unbounded_channel(); let tx = Arc::new(Mutex::new(tx)); let (player, connection) = if args.local { info!("Starting in CLI mode"); @@ -365,13 +367,13 @@ async fn async_main() { } loop { - while let Ok(msg) = rx.recv() { + while let Some(msg) = rx.recv().await { application.on_message(msg).unwrap(); } } } -fn spawn_stdin_reader(tx: Arc>>) { +fn spawn_stdin_reader(tx: Arc>>) { thread::spawn(move || { let stdin = ::std::io::stdin(); let lock = stdin.lock(); @@ -394,7 +396,10 @@ fn spawn_stdin_reader(tx: Arc>>) { }); } -fn spawn_gstreamer_thread(player: Arc, tx: Arc>>) { +fn spawn_gstreamer_thread( + player: Arc, + tx: Arc>>, +) { thread::spawn(move || loop { player.poll(); diff --git a/src/teamspeak.rs b/src/teamspeak.rs index 2cd87e7..79dc1bc 100644 --- a/src/teamspeak.rs +++ b/src/teamspeak.rs @@ -1,8 +1,8 @@ use futures::compat::Future01CompatExt; use futures01::{future::Future, sink::Sink}; +use tokio02::sync::mpsc::UnboundedSender; use crate::{ApplicationMessage, Message}; -use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; use tsclientlib::Event::ConEvents; use tsclientlib::{events::Event, ClientId, ConnectOptions, Connection, MessageTarget}; @@ -30,7 +30,7 @@ fn get_message<'a>(event: &Event) -> Option { impl TeamSpeakConnection { pub async fn new( - tx: Arc>>, + tx: Arc>>, options: ConnectOptions, ) -> Result { let conn = Connection::new(options).compat().await?; -- cgit v1.2.3-70-g09d2