tinymist_vfs/
notify.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use core::fmt;
use std::path::Path;

use rpds::RedBlackTreeMapSync;
use typst::diag::FileResult;

use crate::{Bytes, FileChangeSet, FileSnapshot, ImmutPath, PathAccessModel};

/// A memory event that is notified by some external source
#[derive(Debug, Clone)]
pub enum MemoryEvent {
    /// Reset all dependencies and update according to the given changeset
    ///
    /// We have not provided a way to reset all dependencies without updating
    /// yet, but you can create a memory event with empty changeset to achieve
    /// this:
    ///
    /// ```
    /// use tinymist_vfs::{FileChangeSet, notify::MemoryEvent};
    /// let event = MemoryEvent::Sync(FileChangeSet::default());
    /// ```
    Sync(FileChangeSet),
    /// Update according to the given changeset
    Update(FileChangeSet),
}

/// A upstream update event that is notified by some external source.
///
/// This event is used to notify some file watcher to invalidate some files
/// before applying upstream changes. This is very important to make some atomic
/// changes.
#[derive(Debug)]
pub struct UpstreamUpdateEvent {
    /// Associated files that the event causes to invalidate
    pub invalidates: Vec<ImmutPath>,
    /// Opaque data that is passed to the file watcher
    pub opaque: Box<dyn std::any::Any + Send>,
}

/// Aggregated filesystem events from some file watcher
#[derive(Debug)]
pub enum FilesystemEvent {
    /// Update file system files according to the given changeset
    Update(FileChangeSet),
    /// See [`UpstreamUpdateEvent`]
    UpstreamUpdate {
        /// New changeset produced by invalidation
        changeset: FileChangeSet,
        /// The upstream event that causes the invalidation
        upstream_event: Option<UpstreamUpdateEvent>,
    },
}

impl FilesystemEvent {
    pub fn split(self) -> (FileChangeSet, Option<UpstreamUpdateEvent>) {
        match self {
            FilesystemEvent::UpstreamUpdate {
                changeset,
                upstream_event,
            } => (changeset, upstream_event),
            FilesystemEvent::Update(changeset) => (changeset, None),
        }
    }
}

pub trait NotifyDeps: fmt::Debug + Send + Sync {
    fn dependencies(&self, f: &mut dyn FnMut(&ImmutPath));
}

impl NotifyDeps for Vec<ImmutPath> {
    fn dependencies(&self, f: &mut dyn FnMut(&ImmutPath)) {
        for path in self.iter() {
            f(path);
        }
    }
}

/// A message that is sent to some file watcher
#[derive(Debug)]
pub enum NotifyMessage {
    /// Oettle the watching
    Settle,
    /// Overrides all dependencies
    SyncDependency(Box<dyn NotifyDeps>),
    /// upstream invalidation This is very important to make some atomic changes
    ///
    /// Example:
    /// ```plain
    ///   /// Receive memory event
    ///   let event: MemoryEvent = retrieve();
    ///   let invalidates = event.invalidates();
    ///
    ///   /// Send memory change event to [`NotifyActor`]
    ///   let event = Box::new(event);
    ///   self.send(NotifyMessage::UpstreamUpdate{ invalidates, opaque: event });
    ///
    ///   /// Wait for [`NotifyActor`] to finish
    ///   let fs_event = self.fs_notify.block_receive();
    ///   let event: MemoryEvent = fs_event.opaque.downcast().unwrap();
    ///
    ///   /// Apply changes
    ///   self.lock();
    ///   update_memory(event);
    ///   apply_fs_changes(fs_event.changeset);
    ///   self.unlock();
    /// ```
    UpstreamUpdate(UpstreamUpdateEvent),
}

/// Provides notify access model which retrieves file system events and changes
/// from some notify backend.
///
/// It simply hold notified filesystem data in memory, but still have a fallback
/// access model, whose the typical underlying access model is
/// [`crate::system::SystemAccessModel`]
#[derive(Debug, Clone)]
pub struct NotifyAccessModel<M> {
    files: RedBlackTreeMapSync<ImmutPath, FileSnapshot>,
    /// The fallback access model when the file is not notified ever.
    pub inner: M,
}

impl<M: PathAccessModel> NotifyAccessModel<M> {
    /// Create a new notify access model
    pub fn new(inner: M) -> Self {
        Self {
            files: RedBlackTreeMapSync::default(),
            inner,
        }
    }

    /// Notify the access model with a filesystem event
    pub fn notify(&mut self, changeset: FileChangeSet) {
        for path in changeset.removes {
            self.files.remove_mut(&path);
        }

        for (path, contents) in changeset.inserts {
            self.files.insert_mut(path, contents);
        }
    }
}

impl<M: PathAccessModel> PathAccessModel for NotifyAccessModel<M> {
    #[inline]
    fn reset(&mut self) {
        self.inner.reset();
    }

    fn content(&self, src: &Path) -> FileResult<Bytes> {
        if let Some(entry) = self.files.get(src) {
            return entry.content().cloned();
        }

        self.inner.content(src)
    }
}