有什么方法可以使通知去抖观察程序异步吗?

问题描述 投票:0回答:2

我在这里的通知仓库中问了同样的问题有什么办法可以使 debounce-watcher 异步吗?

但是有了

tokio::sync::mpsc::channel
,程序仍然停留在
while let Some(res) = rx.recv().await
,等待事件。

我已经检查了通知异步观察器的用法async_monitor不知道为什么它会卡住,我错过了什么吗?

这是我的代码

use notify::{RecursiveMode, Watcher, ReadDirectoryChangesWatcher};
use std::{thread, path::Path, time::Duration};
use chrono::prelude::*;
use notify_debouncer_full::{new_debouncer, Debouncer, FileIdMap, DebounceEventResult};
use tokio::runtime::{Runtime, Handle};
use tokio::sync::mpsc;

fn get_runtime_handle() -> (Handle, Option<Runtime>) {
    match Handle::try_current() {
        Ok(h) => (h, None),
        Err(_) => {
              let rt = Runtime::new().unwrap();
              (rt.handle().clone(), Some(rt))
            }
    }
}

pub struct NotifyHandler {
    pub notify_watcher: Option<Debouncer<ReadDirectoryChangesWatcher, FileIdMap>>,
}

impl NotifyHandler {    
    pub async fn initialize_notify_scheduler(&mut self) {
        let (mut tx, mut rx) = tokio::sync::mpsc::channel(1);
    
        let debouncer = new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| {
            let (handle, _rt) = get_runtime_handle();
            handle.block_on(async {
                tx.send(result).await.unwrap();
            })
        });
    
        match debouncer {
            Ok(watcher)=> {
                println!("Initialize notify watcher success");
                self.notify_watcher = Some(watcher);

                while let Some(res) = rx.recv().await {
                    match res {
                        Ok(events) => {
                            println!("events: {:?}", events);
                        },
                        Err(errors) => {
                            println!("erros: {:?}", errors)
                        }
                    }
                }
            },
            Err(error) => {
                println!("{:?}", error);
            }
        }
    }

    
    pub fn watch(&mut self, path: &str) -> notify::Result<()> {
        let watch_path = Path::new(path);

        if watch_path.exists() {
            let is_file = watch_path.is_file();
            println!("Valid path {} is file {}", path, is_file);
        } else {
            println!("watch path {:?} not exists", watch_path);
        }

        if let Some(mut watcher) = self.notify_watcher.take() {
            watcher
                .watcher()
                .watch(watch_path, RecursiveMode::Recursive)?;         

            watcher
                .cache()
                .add_root(watch_path, RecursiveMode::Recursive);                    
        }

        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let mut notifier: NotifyHandler = NotifyHandler { notify_watcher: None };

    notifier.initialize_notify_scheduler().await;

    loop {
        thread::sleep(Duration::from_secs(2));

        let time: DateTime<Local> = Local::now();

        println!("{}: Hello, world!", time.format("%Y-%m-%d %H:%M:%S").to_string());
    }
}

这是我的代码,没有

block_on()
并使用
await
,它有一些关于
result
tx
生命周期的问题。

use notify::{RecursiveMode, Watcher, ReadDirectoryChangesWatcher, Error};
use std::{path::Path, time::Duration};
use chrono::prelude::*;
use notify_debouncer_full::{new_debouncer, Debouncer, FileIdMap, DebounceEventResult, DebouncedEvent};
use tokio::sync::mpsc::Receiver;

pub struct NotifyHandler {
    pub notify_watcher: Option<Debouncer<ReadDirectoryChangesWatcher, FileIdMap>>,
    pub receiver: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<Error>>>>
}

impl NotifyHandler {    
    pub async fn initialize_notify_scheduler(&mut self) {
        let (tx, rx) = tokio::sync::mpsc::channel(1);
    
        let debouncer = new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| {
            tokio::spawn(async move {
                if let Err(e) = tx.send(result).await {
                    println!("Error sending event result: {:?}", e);
                }
            });
        });
    
        match debouncer {
            Ok(watcher)=> {
                println!("Initialize notify watcher success");
                self.notify_watcher = Some(watcher);

                self.receiver = Some(rx);
            },
            Err(error) => {
                println!("{:?}", error);
            }
        }
    }

    
    pub async fn watch(&mut self, path: &str) -> notify::Result<()> {
        let watch_path = Path::new(path);

        if watch_path.exists() {
            let is_file = watch_path.is_file();
            println!("Valid path {} is file {}", path, is_file);
        } else {
            println!("watch path {:?} not exists", watch_path);
        }

        if let Some(mut watcher) = self.notify_watcher.take() {
            watcher
                .watcher()
                .watch(watch_path, RecursiveMode::Recursive)?;         

            watcher
                .cache()
                .add_root(watch_path, RecursiveMode::Recursive);     

            if let Some(mut rx) = self.receiver.take() {
                tokio::spawn(async move {
                    while let Some(res) = rx.recv().await {
                        match res {
                            Ok(events) => {
                                println!("events: {:?}", events);
                            },
                            Err(errors) => {
                                println!("errors: {:?}", errors)
                            }
                        }
                    }                    
                });  
            }
        }

        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let mut notifier: NotifyHandler = NotifyHandler { notify_watcher: None, receiver: None };

    notifier.initialize_notify_scheduler().await;
    notifier.watch("D:\\TEMP\\TestNote.txt").await.unwrap();

    loop {
        tokio::time::sleep(Duration::from_secs(3)).await;

        let time: DateTime<Local> = Local::now();

        println!("{}: Hello, world!", time.format("%Y-%m-%d %H:%M:%S").to_string());
    }
}
expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`
required for `[closure@src\main.rs:16:69: 16:103]` to implement `DebounceEventHandler`rustcClick for full compiler diagnostic
main.rs(16, 69): the requirement to implement `FnMut` derives from here
main.rs(18, 33): closure is `FnOnce` because it moves the variable `tx` out of its environment
main.rs(16, 25): required by a bound introduced by this call
lib.rs(634, 25): required by a bound in `new_debouncer`
asynchronous rust rust-tokio notify
2个回答
0
投票

非常感谢@fakeshadow,对于任何可能遇到同样问题的人,关键是

runtime
,这里是工作代码。

[package]
name = "notify_test"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
chrono = "0.4.26"
notify = { version = "6.0.1", default-features = false, features = ["macos_kqueue"] }
notify-debouncer-full = "0.2.0"
tokio = { version = "1", features = ["full"] }
use chrono::prelude::*;
use notify::{Error, ReadDirectoryChangesWatcher, RecursiveMode, Watcher};
use notify_debouncer_full::{
    new_debouncer, DebounceEventResult, DebouncedEvent, Debouncer, FileIdMap,
};
use std::{path::Path, time::Duration};
use tokio::{runtime::Handle, sync::mpsc::Receiver};

pub struct NotifyHandler {
    pub notify_watcher: Option<Debouncer<ReadDirectoryChangesWatcher, FileIdMap>>,
    pub receiver: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<Error>>>>,
}

impl NotifyHandler {
    pub async fn initialize_notify_scheduler(&mut self) {
        let (tx, rx) = tokio::sync::mpsc::channel(1);
        let rt = Handle::current();

        let debouncer = new_debouncer(
            Duration::from_secs(3),
            None,
            move |result: DebounceEventResult| {
                let tx = tx.clone();

                println!("calling by notify -> {:?}", &result);
                rt.spawn(async move {
                    if let Err(e) = tx.send(result).await {
                        println!("Error sending event result: {:?}", e);
                    }
                });
            },
        );

        match debouncer {
            Ok(watcher) => {
                println!("Initialize notify watcher success");
                self.notify_watcher = Some(watcher);

                self.receiver = Some(rx);
            }
            Err(error) => {
                println!("{:?}", error);
            }
        }
    }

    pub async fn watch(&mut self, path: &str) -> notify::Result<()> {
        let watch_path = Path::new(path);

        if watch_path.exists() {
            let is_file = watch_path.is_file();
            println!("Valid path {} is file {}", path, is_file);
        } else {
            println!("watch path {:?} not exists", watch_path);
        }

        if let Some(watcher) = self.notify_watcher.as_mut() {
            watcher
                .watcher()
                .watch(watch_path, RecursiveMode::Recursive)?;

            watcher
                .cache()
                .add_root(watch_path, RecursiveMode::Recursive);

            if let Some(mut rx) = self.receiver.take() {
                tokio::spawn(async move {
                    while let Some(res) = rx.recv().await {
                        match res {
                            Ok(events) => {
                                println!("events: {:?}", events);
                            }
                            Err(errors) => {
                                println!("errors: {:?}", errors)
                            }
                        }
                    }
                });
            }
        }

        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let mut notifier: NotifyHandler = NotifyHandler {
        notify_watcher: None,
        receiver: None,
    };

    notifier.initialize_notify_scheduler().await;
    notifier.watch("D:\\Temp\\program\\test_md.txt").await.unwrap();

    loop {
        tokio::time::sleep(Duration::from_secs(3)).await;

        let time: DateTime<Local> = Local::now();

        println!(
            "{}: Hello, world!",
            time.format("%Y-%m-%d %H:%M:%S").to_string()
        );
    }
}


-3
投票

看起来您正在尝试使用

notify
板条箱进行文件系统事件通知,并结合
notify_debouncer_full
板条箱中的防抖器来延迟事件。您遇到的问题是程序卡在
while let Some(res) = rx.recv().await
行。

问题在于您使用 tokio

channel
和运行时的方式。您不应在去抖动器的闭包内创建新的运行时和句柄。相反,您应该使用与主
tokio::main
函数相同的运行时。此外,还可以改进代码结构以使其更加高效。

这是代码的更新版本,已修复问题:

use notify::{RecursiveMode, Watcher, ReadDirectoryChangesWatcher};
use std::{path::Path, time::Duration};
use chrono::prelude::*;
use notify_debouncer_full::{new_debouncer, Debouncer, FileIdMap, DebounceEventResult};
use tokio::sync::mpsc;

pub struct NotifyHandler {
    pub notify_watcher: Option<Debouncer<ReadDirectoryChangesWatcher, FileIdMap>>,
}

impl NotifyHandler {    
    pub async fn initialize_notify_scheduler(&mut self) {
        let (mut tx, mut rx) = tokio::sync::mpsc::channel(1);
    
        let debouncer = new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| {
            // Send the result to the channel without using a new runtime.
            tokio::spawn(async move {
                if let Err(e) = tx.send(result).await {
                    println!("Error sending event result: {:?}", e);
                }
            });
        });
    
        match debouncer {
            Ok(watcher) => {
                println!("Initialize notify watcher success");
                self.notify_watcher = Some(watcher);

                // No need to use a loop here, the main function will keep running and processing events as they arrive.
            },
            Err(error) => {
                println!("{:?}", error);
            }
        }
    }

    
    pub fn watch(&mut self, path: &str) -> notify::Result<()> {
        let watch_path = Path::new(path);

        if watch_path.exists() {
            let is_file = watch_path.is_file();
            println!("Valid path {} is file {}", path, is_file);
        } else {
            println!("watch path {:?} not exists", watch_path);
        }

        if let Some(mut watcher) = self.notify_watcher.take() {
            watcher
                .watcher()
                .watch(watch_path, RecursiveMode::Recursive)?;
            
            watcher
                .cache()
                .add_root(watch_path, RecursiveMode::Recursive);
            
            // Spawn a new task to process events without blocking the main thread.
            tokio::spawn(async move {
                while let Some(res) = rx.recv().await {
                    match res {
                        Ok(events) => {
                            println!("events: {:?}", events);
                        },
                        Err(errors) => {
                            println!("errors: {:?}", errors)
                        }
                    }
                }
            });
        }

        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let mut notifier: NotifyHandler = NotifyHandler { notify_watcher: None };

    notifier.initialize_notify_scheduler().await;

    loop {
        tokio::time::sleep(Duration::from_secs(2)).await;

        let time: DateTime<Local> = Local::now();

        println!("{}: Hello, world!", time.format("%Y-%m-%d %H:%M:%S").to_string());
    }
}

在更新后的代码中,我们使用

tokio::spawn
创建一个新的异步任务来处理从通道接收到的事件,并且主线程继续运行而不会被阻塞。此外,我们使用
tokio::time::sleep
而不是
thread::sleep
来暂停主线程,允许其他任务继续处理。

© www.soinside.com 2019 - 2024. All rights reserved.