1use std::collections::HashMap;
13
14use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
15use tinymist_std::{ImmutPath, error::IgnoreLogging};
16use tinymist_world::vfs::notify::NotifyDeps;
17use tokio::sync::mpsc;
18use typst::diag::FileError;
19
20use tinymist_world::vfs::{
21 FileChangeSet, FileSnapshot, PathAccessModel,
22 notify::{FilesystemEvent, NotifyMessage, UpstreamUpdateEvent},
23 system::SystemAccessModel,
24};
25
26type WatcherPair = (RecommendedWatcher, mpsc::UnboundedReceiver<NotifyEvent>);
27type NotifyEvent = notify::Result<notify::Event>;
28type FileEntry = (ImmutPath, FileSnapshot);
29
30#[derive(Debug)]
34enum WatchState {
35 Stable,
38 EmptyOrRemoval {
41 recheck_at: usize,
42 payload: FileSnapshot,
43 },
44}
45
46impl Default for WatchState {
48 fn default() -> Self {
49 Self::Stable
50 }
51}
52
53#[derive(Debug)]
55struct WatchedEntry {
56 lifetime: usize,
61 watching: bool,
63 seen: bool,
65 state: WatchState,
67 prev: Option<FileSnapshot>,
69}
70
71#[derive(Debug)]
73struct UndeterminedNotifyEvent {
74 at_realtime: tinymist_std::time::Instant,
76 at_logical_tick: usize,
78 path: ImmutPath,
80}
81
82#[derive(Debug)]
86pub struct NotifyActor<F: FnMut(FilesystemEvent)> {
87 inner: SystemAccessModel,
90
91 lifetime: usize,
93 logical_tick: usize,
95
96 undetermined_send: mpsc::UnboundedSender<UndeterminedNotifyEvent>,
98 undetermined_recv: mpsc::UnboundedReceiver<UndeterminedNotifyEvent>,
99
100 watched_entries: HashMap<ImmutPath, WatchedEntry>,
102
103 interrupted_by_events: F,
104
105 watcher: Option<WatcherPair>,
107}
108
109impl<F: FnMut(FilesystemEvent) + Send + Sync> NotifyActor<F> {
110 pub fn new(interrupted_by_events: F) -> Self {
112 let (undetermined_send, undetermined_recv) = mpsc::unbounded_channel();
113 let (watcher_tx, watcher_rx) = mpsc::unbounded_channel();
114 let watcher = log_notify_error(
115 RecommendedWatcher::new(
116 move |event| {
117 watcher_tx.send(event).log_error("failed to send fs notify");
118 },
119 Config::default(),
120 ),
121 "failed to create watcher",
122 );
123
124 NotifyActor {
125 inner: SystemAccessModel,
126 lifetime: 1,
128 logical_tick: 1,
129
130 interrupted_by_events,
131
132 undetermined_send,
133 undetermined_recv,
134
135 watched_entries: HashMap::new(),
136 watcher: watcher.map(|it| (it, watcher_rx)),
137 }
138 }
139
140 async fn get_notify_event(watcher: &mut Option<WatcherPair>) -> Option<NotifyEvent> {
142 match watcher {
143 Some((_, watcher_receiver)) => watcher_receiver.recv().await,
144 None => None,
145 }
146 }
147
148 pub async fn run(mut self, mut inbox: mpsc::UnboundedReceiver<NotifyMessage>) {
150 use NotifyMessage::*;
151 #[derive(Debug)]
153 enum ActorEvent {
154 ReCheck(UndeterminedNotifyEvent),
156 Message(Option<NotifyMessage>),
158 NotifyEvent(NotifyEvent),
160 }
161
162 'event_loop: loop {
163 let event = tokio::select! {
165 it = inbox.recv() => ActorEvent::Message(it),
166 Some(it) = Self::get_notify_event(&mut self.watcher) => ActorEvent::NotifyEvent(it),
167 Some(it) = self.undetermined_recv.recv() => ActorEvent::ReCheck(it),
168 };
169
170 self.logical_tick += 1;
172
173 match event {
176 ActorEvent::Message(None) => {
177 log::info!("NotifyActor: failed to get event, exiting...");
178 break 'event_loop;
179 }
180 ActorEvent::Message(Some(Settle)) => {
181 log::info!("NotifyActor: settle event received");
182 break 'event_loop;
183 }
184 ActorEvent::Message(Some(UpstreamUpdate(event))) => {
185 self.invalidate_upstream(event);
186 }
187 ActorEvent::Message(Some(SyncDependency(paths))) => {
188 if let Some(changeset) = self.update_watches(paths.as_ref()) {
189 (self.interrupted_by_events)(FilesystemEvent::Update(changeset, true));
190 }
191 }
192 ActorEvent::NotifyEvent(event) => {
193 if let Some(event) = log_notify_error(event, "failed to notify") {
195 self.notify_event(event);
196 }
197 }
198 ActorEvent::ReCheck(event) => {
199 self.recheck_notify_event(event).await;
200 }
201 }
202 }
203
204 log::info!("NotifyActor: exited");
205 }
206
207 fn invalidate_upstream(&mut self, event: UpstreamUpdateEvent) {
209 let changeset = self.update_watches(&event.invalidates).unwrap_or_default();
211
212 (self.interrupted_by_events)(FilesystemEvent::UpstreamUpdate {
214 changeset,
215 upstream_event: Some(event),
216 });
217 }
218
219 fn update_watches(&mut self, paths: &dyn NotifyDeps) -> Option<FileChangeSet> {
221 self.lifetime += 1;
223
224 let mut changeset = FileChangeSet::default();
225
226 for path in self.watched_entries.values_mut() {
228 path.seen = false;
229 }
230
231 paths.dependencies(&mut |path| {
236 let mut contained = false;
237 let entry = self
239 .watched_entries
240 .entry(path.clone())
241 .and_modify(|watch_entry| {
242 contained = true;
243 watch_entry.lifetime = self.lifetime;
244 })
245 .or_insert_with(|| WatchedEntry {
246 lifetime: self.lifetime,
247 watching: false,
248 seen: false,
249 state: WatchState::Stable,
250 prev: None,
251 });
252
253 if entry.seen {
254 return;
255 }
256 entry.seen = true;
257
258 let meta = path.metadata().map_err(|e| FileError::from_io(e, path));
260
261 if let Some((watcher, _)) = &mut self.watcher {
262 if meta
267 .as_ref()
268 .is_ok_and(|meta| !meta.is_dir() && (!contained || !entry.watching))
269 {
270 log::debug!("watching {path:?}");
271 entry.watching = log_notify_error(
272 watcher.watch(path.as_ref(), RecursiveMode::NonRecursive),
273 "failed to watch",
274 )
275 .is_some();
276 }
277
278 changeset.may_insert(self.notify_entry_update(path.clone()));
279 } else {
280 let watched = self.inner.content(path);
281 changeset.inserts.push((path.clone(), watched.into()));
282 }
283 });
284
285 self.watched_entries.retain(|path, entry| {
289 if !entry.seen && entry.watching {
290 log::debug!("unwatch {path:?}");
291 if let Some(watcher) = &mut self.watcher {
292 log_notify_error(watcher.0.unwatch(path), "failed to unwatch");
293 entry.watching = false;
294 }
295 }
296
297 let fresh = self.lifetime - entry.lifetime < 30;
298 if !fresh {
299 changeset.removes.push(path.clone());
300 }
301 fresh
302 });
303
304 (!changeset.is_empty()).then_some(changeset)
305 }
306
307 fn notify_event(&mut self, event: notify::Event) {
309 let mut changeset = FileChangeSet::default();
311 for path in event.paths.iter() {
312 changeset.may_insert(self.notify_entry_update(path.as_path().into()));
314 }
315
316 if matches!(
322 event.kind,
323 notify::EventKind::Remove(notify::event::RemoveKind::File)
324 | notify::EventKind::Modify(notify::event::ModifyKind::Name(
325 notify::event::RenameMode::From
326 ))
327 ) {
328 for path in &event.paths {
329 let Some(entry) = self.watched_entries.get_mut(path.as_path()) else {
330 continue;
331 };
332 if !entry.watching {
333 continue;
334 }
335 if let Some(watcher) = &mut self.watcher {
338 log_notify_error(watcher.0.unwatch(path), "failed to unwatch");
339 }
340 entry.watching = false;
341 }
342 }
343
344 if !changeset.is_empty() {
346 (self.interrupted_by_events)(FilesystemEvent::Update(changeset, false));
347 }
348 }
349
350 fn notify_entry_update(&mut self, path: ImmutPath) -> Option<FileEntry> {
352 let entry = self.watched_entries.get_mut(&path)?;
363
364 let file = FileSnapshot::from(self.inner.content(&path));
366
367 match (entry.prev.as_deref(), file.as_ref()) {
370 (None, ..) | (Some(Err(..)), Ok(..)) => {}
375 (Some(it), Err(err)) => match &mut entry.state {
377 WatchState::Stable => {
381 if matches!(err.as_ref(), FileError::NotFound(..) | FileError::Other(..)) {
382 entry.state = WatchState::EmptyOrRemoval {
383 recheck_at: self.logical_tick,
384 payload: file.clone(),
385 };
386 entry.prev = Some(file);
387 let event = UndeterminedNotifyEvent {
388 at_realtime: tinymist_std::time::Instant::now(),
389 at_logical_tick: self.logical_tick,
390 path: path.clone(),
391 };
392 log_send_error("recheck", self.undetermined_send.send(event));
393 return None;
394 }
395 if it.as_ref().is_err_and(|it| it == err) {
399 return None;
400 }
401 }
402
403 WatchState::EmptyOrRemoval { payload, .. } => {
406 *payload = file;
408 return None;
409 }
410 },
411 (Some(Ok(prev_content)), Ok(next_content)) => {
413 if prev_content == next_content {
415 return None;
416 }
417
418 match entry.state {
419 WatchState::Stable => {
423 if next_content.is_empty() {
424 entry.state = WatchState::EmptyOrRemoval {
425 recheck_at: self.logical_tick,
426 payload: file.clone(),
427 };
428 entry.prev = Some(file);
429 let event = UndeterminedNotifyEvent {
430 at_realtime: tinymist_std::time::Instant::now(),
431 at_logical_tick: self.logical_tick,
432 path,
433 };
434 log_send_error("recheck", self.undetermined_send.send(event));
435 return None;
436 }
437 }
438
439 WatchState::EmptyOrRemoval { .. } if next_content.is_empty() => return None,
441 WatchState::EmptyOrRemoval { .. } => {}
443 }
444 }
445 };
446
447 entry.state = WatchState::Stable;
450 entry.prev = Some(file.clone());
451
452 Some((path, file))
454 }
455
456 async fn recheck_notify_event(&mut self, event: UndeterminedNotifyEvent) -> Option<()> {
458 let now = tinymist_std::time::Instant::now();
459 log::debug!("recheck event {event:?} at {now:?}");
460
461 let reserved = now - event.at_realtime;
463 if reserved < tinymist_std::time::Duration::from_millis(50) {
464 let send = self.undetermined_send.clone();
465 tokio::spawn(async move {
466 tokio::time::sleep(tinymist_std::time::Duration::from_millis(50) - reserved).await;
468 log_send_error("reschedule", send.send(event));
469 });
470 return None;
471 }
472
473 let entry = self.watched_entries.get_mut(&event.path)?;
475
476 match std::mem::take(&mut entry.state) {
478 WatchState::Stable => {}
480 WatchState::EmptyOrRemoval {
483 recheck_at,
484 payload,
485 } => {
486 if recheck_at == event.at_logical_tick {
487 log::debug!("notify event real happened {event:?}, state: {payload:?}");
488
489 if Some(&payload) == entry.prev.as_ref() {
490 return None;
491 }
492
493 let mut changeset = FileChangeSet::default();
495 changeset.inserts.push((event.path, payload));
496
497 (self.interrupted_by_events)(FilesystemEvent::Update(changeset, false));
498 }
499 }
500 };
501
502 Some(())
503 }
504}
505
506#[inline]
507fn log_notify_error<T>(res: notify::Result<T>, reason: &'static str) -> Option<T> {
508 res.map_err(|err| log::warn!("{reason}: notify error: {err}"))
509 .ok()
510}
511
512#[inline]
513fn log_send_error<T>(chan: &'static str, res: Result<(), mpsc::error::SendError<T>>) -> bool {
514 res.map_err(|err| log::warn!("NotifyActor: send to {chan} error: {err}"))
515 .is_ok()
516}
517
518pub async fn watch_deps(
520 inbox: mpsc::UnboundedReceiver<NotifyMessage>,
521 interrupted_by_events: impl FnMut(FilesystemEvent) + Send + Sync + 'static,
522) {
523 log::info!("NotifyActor: start watching files...");
524 tokio::spawn(NotifyActor::new(interrupted_by_events).run(inbox));
526}