tinymist_project/
watch.rs

1//! upstream <https://github.com/rust-lang/rust-analyzer/tree/master/crates/vfs-notify>
2//!
3//! An implementation of `watch_deps` using `notify` crate.
4//!
5//! The file watching bits here are untested and quite probably buggy. For this
6//! reason, by default we don't watch files and rely on editor's file watching
7//! capabilities.
8//!
9//! Hopefully, one day a reliable file watching/walking crate appears on
10//! crates.io, and we can reduce this to trivial glue code.
11
12use std::collections::HashMap;
13
14use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
15use tinymist_std::{ImmutPath, error::IgnoreLogging};
16use tinymist_world::vfs::notify::NotifyDeps;
17use tokio::sync::mpsc;
18use typst::diag::FileError;
19
20use tinymist_world::vfs::{
21    FileChangeSet, FileSnapshot, PathAccessModel,
22    notify::{FilesystemEvent, NotifyMessage, UpstreamUpdateEvent},
23    system::SystemAccessModel,
24};
25
26type WatcherPair = (RecommendedWatcher, mpsc::UnboundedReceiver<NotifyEvent>);
27type NotifyEvent = notify::Result<notify::Event>;
28type FileEntry = (/* key */ ImmutPath, /* value */ FileSnapshot);
29
30/// The state of a watched file.
31///
32/// It is used to determine some dirty editors' implementation.
33#[derive(Debug)]
34enum WatchState {
35    /// The file is stable, which means we believe that it keeps synchronized
36    /// as expected.
37    Stable,
38    /// The file is empty or removed, but there is a chance that the file is not
39    /// stable. So we need to recheck the file after a while.
40    EmptyOrRemoval {
41        recheck_at: usize,
42        payload: FileSnapshot,
43    },
44}
45
46/// By default, the state is stable.
47impl Default for WatchState {
48    fn default() -> Self {
49        Self::Stable
50    }
51}
52
53/// The data entry of a watched file.
54#[derive(Debug)]
55struct WatchedEntry {
56    /// The lifetime of the entry.
57    ///
58    /// The entry will be removed if the entry is too old.
59    // todo: generalize lifetime
60    lifetime: usize,
61    /// A flag for whether it is really watching.
62    watching: bool,
63    /// A flag for watch update.
64    seen: bool,
65    /// The state of the entry.
66    state: WatchState,
67    /// Previous content of the file.
68    prev: Option<FileSnapshot>,
69}
70
71/// Self produced event that check whether the file is stable after a while.
72#[derive(Debug)]
73struct UndeterminedNotifyEvent {
74    /// The time when the event is produced.
75    at_realtime: tinymist_std::time::Instant,
76    /// The logical tick when the event is produced.
77    at_logical_tick: usize,
78    /// The path of the file.
79    path: ImmutPath,
80}
81
82// Drop order is significant.
83/// The actor that watches files.
84/// It is used to watch files and send events to the consumers
85#[derive(Debug)]
86pub struct NotifyActor<F: FnMut(FilesystemEvent)> {
87    /// The access model of the actor.
88    /// We concrete the access model to `SystemAccessModel` for now.
89    inner: SystemAccessModel,
90
91    /// The lifetime of the watched files.
92    lifetime: usize,
93    /// The logical tick of the actor.
94    logical_tick: usize,
95
96    /// Internal channel for recheck events.
97    undetermined_send: mpsc::UnboundedSender<UndeterminedNotifyEvent>,
98    undetermined_recv: mpsc::UnboundedReceiver<UndeterminedNotifyEvent>,
99
100    /// The hold entries for watching, one entry for per file.
101    watched_entries: HashMap<ImmutPath, WatchedEntry>,
102
103    interrupted_by_events: F,
104
105    /// The builtin watcher object.
106    watcher: Option<WatcherPair>,
107}
108
109impl<F: FnMut(FilesystemEvent) + Send + Sync> NotifyActor<F> {
110    /// Create a new actor.
111    pub fn new(interrupted_by_events: F) -> Self {
112        let (undetermined_send, undetermined_recv) = mpsc::unbounded_channel();
113        let (watcher_tx, watcher_rx) = mpsc::unbounded_channel();
114        let watcher = log_notify_error(
115            RecommendedWatcher::new(
116                move |event| {
117                    watcher_tx.send(event).log_error("failed to send fs notify");
118                },
119                Config::default(),
120            ),
121            "failed to create watcher",
122        );
123
124        NotifyActor {
125            inner: SystemAccessModel,
126            // we start from 1 to distinguish from 0 (default value)
127            lifetime: 1,
128            logical_tick: 1,
129
130            interrupted_by_events,
131
132            undetermined_send,
133            undetermined_recv,
134
135            watched_entries: HashMap::new(),
136            watcher: watcher.map(|it| (it, watcher_rx)),
137        }
138    }
139
140    /// Get the notify event from the watcher.
141    async fn get_notify_event(watcher: &mut Option<WatcherPair>) -> Option<NotifyEvent> {
142        match watcher {
143            Some((_, watcher_receiver)) => watcher_receiver.recv().await,
144            None => None,
145        }
146    }
147
148    /// Main loop of the actor.
149    pub async fn run(mut self, mut inbox: mpsc::UnboundedReceiver<NotifyMessage>) {
150        use NotifyMessage::*;
151        /// The event of the actor.
152        #[derive(Debug)]
153        enum ActorEvent {
154            /// Recheck the notify event.
155            ReCheck(UndeterminedNotifyEvent),
156            /// external message to change notifier's state
157            Message(Option<NotifyMessage>),
158            /// notify event from builtin watcher
159            NotifyEvent(NotifyEvent),
160        }
161
162        'event_loop: loop {
163            // Get the event from the inbox or the watcher.
164            let event = tokio::select! {
165                it = inbox.recv() => ActorEvent::Message(it),
166                Some(it) = Self::get_notify_event(&mut self.watcher) => ActorEvent::NotifyEvent(it),
167                Some(it) = self.undetermined_recv.recv() => ActorEvent::ReCheck(it),
168            };
169
170            // Increase the logical tick per event.
171            self.logical_tick += 1;
172
173            // log::info!("vfs-notify event {event:?}");
174            // function entries to handle some event
175            match event {
176                ActorEvent::Message(None) => {
177                    log::info!("NotifyActor: failed to get event, exiting...");
178                    break 'event_loop;
179                }
180                ActorEvent::Message(Some(Settle)) => {
181                    log::info!("NotifyActor: settle event received");
182                    break 'event_loop;
183                }
184                ActorEvent::Message(Some(UpstreamUpdate(event))) => {
185                    self.invalidate_upstream(event);
186                }
187                ActorEvent::Message(Some(SyncDependency(paths))) => {
188                    if let Some(changeset) = self.update_watches(paths.as_ref()) {
189                        (self.interrupted_by_events)(FilesystemEvent::Update(changeset, true));
190                    }
191                }
192                ActorEvent::NotifyEvent(event) => {
193                    // log::info!("notify event {event:?}");
194                    if let Some(event) = log_notify_error(event, "failed to notify") {
195                        self.notify_event(event);
196                    }
197                }
198                ActorEvent::ReCheck(event) => {
199                    self.recheck_notify_event(event).await;
200                }
201            }
202        }
203
204        log::info!("NotifyActor: exited");
205    }
206
207    /// Update the watches of corresponding invalidation
208    fn invalidate_upstream(&mut self, event: UpstreamUpdateEvent) {
209        // Update watches of invalidated files.
210        let changeset = self.update_watches(&event.invalidates).unwrap_or_default();
211
212        // Send the event to the consumer.
213        (self.interrupted_by_events)(FilesystemEvent::UpstreamUpdate {
214            changeset,
215            upstream_event: Some(event),
216        });
217    }
218
219    /// Update the watches of corresponding files.
220    fn update_watches(&mut self, paths: &dyn NotifyDeps) -> Option<FileChangeSet> {
221        // Increase the lifetime per external message.
222        self.lifetime += 1;
223
224        let mut changeset = FileChangeSet::default();
225
226        // Mark the old entries as unseen.
227        for path in self.watched_entries.values_mut() {
228            path.seen = false;
229        }
230
231        // Update watched entries.
232        //
233        // Also check whether the file is updated since there is a window
234        // between unwatch the file and watch the file again.
235        paths.dependencies(&mut |path| {
236            let mut contained = false;
237            // Update or insert the entry with the new lifetime.
238            let entry = self
239                .watched_entries
240                .entry(path.clone())
241                .and_modify(|watch_entry| {
242                    contained = true;
243                    watch_entry.lifetime = self.lifetime;
244                })
245                .or_insert_with(|| WatchedEntry {
246                    lifetime: self.lifetime,
247                    watching: false,
248                    seen: false,
249                    state: WatchState::Stable,
250                    prev: None,
251                });
252
253            if entry.seen {
254                return;
255            }
256            entry.seen = true;
257
258            // Update in-memory metadata for now.
259            let meta = path.metadata().map_err(|e| FileError::from_io(e, path));
260
261            if let Some((watcher, _)) = &mut self.watcher {
262                // Case1. meta = Err(..) We cannot get the metadata successfully, so we
263                // are okay to ignore this file for watching.
264                //
265                // Case2. meta = Ok(..) Watch the file if it's not watched.
266                if meta
267                    .as_ref()
268                    .is_ok_and(|meta| !meta.is_dir() && (!contained || !entry.watching))
269                {
270                    log::debug!("watching {path:?}");
271                    entry.watching = log_notify_error(
272                        watcher.watch(path.as_ref(), RecursiveMode::NonRecursive),
273                        "failed to watch",
274                    )
275                    .is_some();
276                }
277
278                changeset.may_insert(self.notify_entry_update(path.clone()));
279            } else {
280                let watched = self.inner.content(path);
281                changeset.inserts.push((path.clone(), watched.into()));
282            }
283        });
284
285        // Remove old entries.
286        // Note: since we have increased the lifetime, it is safe to remove the
287        // old entries after updating the watched entries.
288        self.watched_entries.retain(|path, entry| {
289            if !entry.seen && entry.watching {
290                log::debug!("unwatch {path:?}");
291                if let Some(watcher) = &mut self.watcher {
292                    log_notify_error(watcher.0.unwatch(path), "failed to unwatch");
293                    entry.watching = false;
294                }
295            }
296
297            let fresh = self.lifetime - entry.lifetime < 30;
298            if !fresh {
299                changeset.removes.push(path.clone());
300            }
301            fresh
302        });
303
304        (!changeset.is_empty()).then_some(changeset)
305    }
306
307    /// Notify the event from the builtin watcher.
308    fn notify_event(&mut self, event: notify::Event) {
309        // Account file updates.
310        let mut changeset = FileChangeSet::default();
311        for path in event.paths.iter() {
312            // todo: remove this clone: path.into()
313            changeset.may_insert(self.notify_entry_update(path.as_path().into()));
314        }
315
316        // Workaround for notify-rs' implicit unwatch on remove/rename
317        // (triggered by some editors when saving files) with the
318        // inotify backend. By keeping track of the potentially
319        // unwatched files, we can allow those we still depend on to be
320        // watched again later on.
321        if matches!(
322            event.kind,
323            notify::EventKind::Remove(notify::event::RemoveKind::File)
324                | notify::EventKind::Modify(notify::event::ModifyKind::Name(
325                    notify::event::RenameMode::From
326                ))
327        ) {
328            for path in &event.paths {
329                let Some(entry) = self.watched_entries.get_mut(path.as_path()) else {
330                    continue;
331                };
332                if !entry.watching {
333                    continue;
334                }
335                // Remove affected path from the watched map to restart
336                // watching on it later again.
337                if let Some(watcher) = &mut self.watcher {
338                    log_notify_error(watcher.0.unwatch(path), "failed to unwatch");
339                }
340                entry.watching = false;
341            }
342        }
343
344        // Send file updates.
345        if !changeset.is_empty() {
346            (self.interrupted_by_events)(FilesystemEvent::Update(changeset, false));
347        }
348    }
349
350    /// Notify any update of the file entry
351    fn notify_entry_update(&mut self, path: ImmutPath) -> Option<FileEntry> {
352        // The following code in rust-analyzer is commented out
353        // todo: check whether we need this
354        // if meta.file_type().is_dir() && self
355        //   .watched_entries.iter().any(|entry| entry.contains_dir(&path))
356        // {
357        //     self.watch(path);
358        //     return None;
359        // }
360
361        // Find entry and continue
362        let entry = self.watched_entries.get_mut(&path)?;
363
364        // Check meta, path, and content
365        let file = FileSnapshot::from(self.inner.content(&path));
366
367        // Check state in fast path: compare state, return None on not sending
368        // the file change
369        match (entry.prev.as_deref(), file.as_ref()) {
370            // update the content of the entry in the following cases:
371            // + Case 1: previous content is clear
372            // + Case 2: previous content is not clear but some error, and the
373            // current content is ok
374            (None, ..) | (Some(Err(..)), Ok(..)) => {}
375            // Meet some error currently
376            (Some(it), Err(err)) => match &mut entry.state {
377                // If the file is stable, check whether the editor is removing
378                // or truncating the file. They are possibly flushing the file
379                // but not finished yet.
380                WatchState::Stable => {
381                    if matches!(err.as_ref(), FileError::NotFound(..) | FileError::Other(..)) {
382                        entry.state = WatchState::EmptyOrRemoval {
383                            recheck_at: self.logical_tick,
384                            payload: file.clone(),
385                        };
386                        entry.prev = Some(file);
387                        let event = UndeterminedNotifyEvent {
388                            at_realtime: tinymist_std::time::Instant::now(),
389                            at_logical_tick: self.logical_tick,
390                            path: path.clone(),
391                        };
392                        log_send_error("recheck", self.undetermined_send.send(event));
393                        return None;
394                    }
395                    // Otherwise, we push the error to the consumer.
396
397                    // Ignores the error if the error is stable
398                    if it.as_ref().is_err_and(|it| it == err) {
399                        return None;
400                    }
401                }
402
403                // Very complicated case of check error sequence, so we simplify
404                // a bit, we regard any subsequent error as the same error.
405                WatchState::EmptyOrRemoval { payload, .. } => {
406                    // update payload
407                    *payload = file;
408                    return None;
409                }
410            },
411            // Compare content for transitional the state
412            (Some(Ok(prev_content)), Ok(next_content)) => {
413                // So far it is accurately no change for the file, skip it
414                if prev_content == next_content {
415                    return None;
416                }
417
418                match entry.state {
419                    // If the file is stable, check whether the editor is
420                    // removing or truncating the file. They are possibly
421                    // flushing the file but not finished yet.
422                    WatchState::Stable => {
423                        if next_content.is_empty() {
424                            entry.state = WatchState::EmptyOrRemoval {
425                                recheck_at: self.logical_tick,
426                                payload: file.clone(),
427                            };
428                            entry.prev = Some(file);
429                            let event = UndeterminedNotifyEvent {
430                                at_realtime: tinymist_std::time::Instant::now(),
431                                at_logical_tick: self.logical_tick,
432                                path,
433                            };
434                            log_send_error("recheck", self.undetermined_send.send(event));
435                            return None;
436                        }
437                    }
438
439                    // Still empty
440                    WatchState::EmptyOrRemoval { .. } if next_content.is_empty() => return None,
441                    // Otherwise, we push the diff to the consumer.
442                    WatchState::EmptyOrRemoval { .. } => {}
443                }
444            }
445        };
446
447        // Send the update to the consumer
448        // Update the entry according to the state
449        entry.state = WatchState::Stable;
450        entry.prev = Some(file.clone());
451
452        // Slow path: trigger the file change for consumer
453        Some((path, file))
454    }
455
456    /// Recheck the notify event after a while.
457    async fn recheck_notify_event(&mut self, event: UndeterminedNotifyEvent) -> Option<()> {
458        let now = tinymist_std::time::Instant::now();
459        log::debug!("recheck event {event:?} at {now:?}");
460
461        // The async scheduler is not accurate, so we need to ensure a window here
462        let reserved = now - event.at_realtime;
463        if reserved < tinymist_std::time::Duration::from_millis(50) {
464            let send = self.undetermined_send.clone();
465            tokio::spawn(async move {
466                // todo: sleep in browser
467                tokio::time::sleep(tinymist_std::time::Duration::from_millis(50) - reserved).await;
468                log_send_error("reschedule", send.send(event));
469            });
470            return None;
471        }
472
473        // Check whether the entry is still valid
474        let entry = self.watched_entries.get_mut(&event.path)?;
475
476        // Check the state of the entry
477        match std::mem::take(&mut entry.state) {
478            // If the entry is stable, we do nothing
479            WatchState::Stable => {}
480            // If the entry is not stable, and no other event is produced after
481            // this event, we send the event to the consumer.
482            WatchState::EmptyOrRemoval {
483                recheck_at,
484                payload,
485            } => {
486                if recheck_at == event.at_logical_tick {
487                    log::debug!("notify event real happened {event:?}, state: {payload:?}");
488
489                    if Some(&payload) == entry.prev.as_ref() {
490                        return None;
491                    }
492
493                    // Send the underlying change to the consumer
494                    let mut changeset = FileChangeSet::default();
495                    changeset.inserts.push((event.path, payload));
496
497                    (self.interrupted_by_events)(FilesystemEvent::Update(changeset, false));
498                }
499            }
500        };
501
502        Some(())
503    }
504}
505
506#[inline]
507fn log_notify_error<T>(res: notify::Result<T>, reason: &'static str) -> Option<T> {
508    res.map_err(|err| log::warn!("{reason}: notify error: {err}"))
509        .ok()
510}
511
512#[inline]
513fn log_send_error<T>(chan: &'static str, res: Result<(), mpsc::error::SendError<T>>) -> bool {
514    res.map_err(|err| log::warn!("NotifyActor: send to {chan} error: {err}"))
515        .is_ok()
516}
517
518/// Watches on a set of *files*.
519pub async fn watch_deps(
520    inbox: mpsc::UnboundedReceiver<NotifyMessage>,
521    interrupted_by_events: impl FnMut(FilesystemEvent) + Send + Sync + 'static,
522) {
523    log::info!("NotifyActor: start watching files...");
524    // Watch messages to notify
525    tokio::spawn(NotifyActor::new(interrupted_by_events).run(inbox));
526}