sync_ls/server/
dap_srv.rs

1use dapts::IRequest;
2
3use super::*;
4
5impl<S: 'static> TypedLspClient<S> {
6    /// Sends a dap event to the client.
7    pub fn send_dap_event<E: dapts::IEvent>(&self, body: E::Body) {
8        let req_id = self.req_queue.lock().outgoing.alloc_request_id();
9
10        self.send_dap_event_(dap::Event::new(req_id as i64, E::EVENT.to_owned(), body));
11    }
12
13    /// Sends an untyped dap_event to the client.
14    pub fn send_dap_event_(&self, evt: dap::Event) {
15        self.sender.send_message(evt.into());
16    }
17}
18
19impl<Args: Initializer> LsBuilder<DapMessage, Args>
20where
21    Args::S: 'static,
22{
23    /// Registers an async command handler.
24    pub fn with_command<R: Serialize + 'static>(
25        mut self,
26        cmd: &'static str,
27        handler: AsyncHandler<Args::S, Vec<JsonValue>, R>,
28    ) -> Self {
29        self.command_handlers.insert(
30            cmd,
31            Box::new(move |s, req| erased_response(handler(s, req))),
32        );
33        self
34    }
35
36    /// Registers a raw request handler that handlers a kind of untyped lsp
37    /// request.
38    pub fn with_raw_request<R: dapts::IRequest>(
39        mut self,
40        handler: RawHandler<Args::S, JsonValue>,
41    ) -> Self {
42        self.req_handlers.insert(R::COMMAND, Box::new(handler));
43        self
44    }
45
46    // todo: unsafe typed
47    /// Registers an raw request handler that handlers a kind of typed lsp
48    /// request.
49    pub fn with_request_<R: dapts::IRequest>(
50        mut self,
51        handler: fn(&mut Args::S, R::Arguments) -> ScheduleResult,
52    ) -> Self {
53        self.req_handlers.insert(
54            R::COMMAND,
55            Box::new(move |s, req| handler(s, from_json(req)?)),
56        );
57        self
58    }
59
60    /// Registers a typed request handler.
61    pub fn with_request<R: dapts::IRequest>(
62        mut self,
63        handler: AsyncHandler<Args::S, R::Arguments, R::Response>,
64    ) -> Self {
65        self.req_handlers.insert(
66            R::COMMAND,
67            Box::new(move |s, req| erased_response(handler(s, from_json(req)?))),
68        );
69        self
70    }
71}
72
73impl<Args: Initializer> LsDriver<DapMessage, Args>
74where
75    Args::S: 'static,
76{
77    /// Starts the debug adaptor on the given connection.
78    ///
79    /// If `is_replay` is true, the server will wait for all pending requests to
80    /// finish before exiting. This is useful for testing the language server.
81    ///
82    /// See [`transport::MirrorArgs`] for information about the record-replay
83    /// feature.
84    #[cfg(feature = "system")]
85    pub fn start(
86        &mut self,
87        inbox: TConnectionRx<DapMessage>,
88        is_replay: bool,
89    ) -> anyhow::Result<()> {
90        let res = self.start_(inbox);
91
92        if is_replay {
93            let client = self.client.clone();
94            let _ = std::thread::spawn(move || {
95                let since = tinymist_std::time::Instant::now();
96                let timeout = std::env::var("REPLAY_TIMEOUT")
97                    .ok()
98                    .and_then(|s| s.parse().ok())
99                    .unwrap_or(60);
100                client.handle.block_on(async {
101                    while client.has_pending_requests() {
102                        if since.elapsed().as_secs() > timeout {
103                            log::error!("replay timeout reached, {timeout}s");
104                            client.begin_panic();
105                        }
106
107                        tokio::time::sleep(tinymist_std::time::Duration::from_millis(10)).await;
108                    }
109                })
110            })
111            .join();
112        }
113
114        res
115    }
116
117    /// Starts the debug adaptor on the given connection.
118    #[cfg(feature = "system")]
119    pub fn start_(&mut self, inbox: TConnectionRx<DapMessage>) -> anyhow::Result<()> {
120        use EventOrMessage::*;
121
122        while let Ok(msg) = inbox.recv() {
123            let loop_start = Instant::now();
124            match msg {
125                Evt(event) => {
126                    let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
127                        log::warn!("unhandled event: {:?}", event.as_ref().type_id());
128                        continue;
129                    };
130
131                    let s = match &mut self.state {
132                        State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
133                        State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
134                        State::ShuttingDown => {
135                            log::warn!("server is shutting down");
136                            continue;
137                        }
138                    };
139
140                    event_handler(s, &self.client, event)?;
141                }
142                Msg(DapMessage::Request(req)) => {
143                    let client = self.client.clone();
144                    let req_id = (req.seq as i32).into();
145                    client.register_request(&req.command, &req_id, loop_start);
146                    let fut = client.schedule_tail(req_id, self.on_request(req));
147                    self.client.handle.spawn(fut);
148                }
149                Msg(DapMessage::Event(not)) => {
150                    self.on_event(loop_start, not)?;
151                }
152                Msg(DapMessage::Response(resp)) => {
153                    let s = match &mut self.state {
154                        State::Ready(s) => s,
155                        _ => {
156                            log::warn!("server is not ready yet");
157                            continue;
158                        }
159                    };
160
161                    self.client.clone().complete_dap_request(s, resp)
162                }
163            }
164        }
165
166        log::warn!("client exited without proper shutdown sequence");
167        Ok(())
168    }
169
170    /// Registers and handles a request. This should only be called once per
171    /// incoming request.
172    fn on_request(&mut self, req: dap::Request) -> ScheduleResult {
173        match (&mut self.state, &*req.command) {
174            (State::Uninitialized(args), dapts::request::Initialize::COMMAND) => {
175                // todo: what will happen if the request cannot be deserialized?
176                let params = serde_json::from_value::<Args::I>(req.arguments);
177                match params {
178                    Ok(params) => {
179                        let args = args.take().expect("already initialized");
180                        let (s, res) = args.initialize(params);
181                        self.state = State::Ready(s);
182                        res
183                    }
184                    Err(e) => just_result(Err(invalid_request(e))),
185                }
186            }
187            // (state, dap::events::Initialized::METHOD) => {
188            //     let mut s = State::ShuttingDown;
189            //     std::mem::swap(state, &mut s);
190            //     match s {
191            //         State::Initializing(s) => {
192            //             *state = State::Ready(s);
193            //         }
194            //         _ => {
195            //             std::mem::swap(state, &mut s);
196            //         }
197            //     }
198
199            //     let s = match state {
200            //         State::Ready(s) => s,
201            //         _ => {
202            //             log::warn!("server is not ready yet");
203            //             return Ok(());
204            //         }
205            //     };
206            //     handle(s, not)
207            // }
208            (State::Uninitialized(..) | State::Initializing(..), _) => {
209                just_result(Err(not_initialized()))
210            }
211            (_, dapts::request::Initialize::COMMAND) => {
212                just_result(Err(invalid_request("server is already initialized")))
213            }
214            // todo: generalize this
215            // (State::Ready(..), request::ExecuteCommand::METHOD) => {
216            // reschedule!(self.on_execute_command(req))
217            // }
218            (State::Ready(s), _) => 'serve_req: {
219                let method = req.command.as_str();
220
221                let is_disconnect = method == dapts::request::Disconnect::COMMAND;
222
223                let Some(handler) = self.requests.get(method) else {
224                    log::warn!("unhandled dap request: {method}");
225                    break 'serve_req just_result(Err(method_not_found()));
226                };
227
228                let resp = handler(s, req.arguments);
229
230                if is_disconnect {
231                    self.state = State::ShuttingDown;
232                }
233
234                resp
235            }
236            (State::ShuttingDown, _) => {
237                just_result(Err(invalid_request("server is shutting down")))
238            }
239        }
240    }
241
242    /// Handles an incoming event.
243    fn on_event(&mut self, received_at: Instant, not: dap::Event) -> anyhow::Result<()> {
244        self.client.hook.start_notification(&not.event);
245        let handle = |s,
246                      dap::Event {
247                          seq: _,
248                          event,
249                          body,
250                      }: dap::Event| {
251            let Some(handler) = self.notifications.get(event.as_str()) else {
252                log::warn!("unhandled event: {event}");
253                return Ok(());
254            };
255
256            let result = handler(s, body);
257            self.client
258                .hook
259                .stop_notification(&event, received_at, result);
260
261            Ok(())
262        };
263
264        match (&mut self.state, &*not.event) {
265            (State::Ready(state), _) => handle(state, not),
266            // todo: whether it is safe to ignore events
267            (State::Uninitialized(..) | State::Initializing(..), method) => {
268                log::warn!("server is not ready yet, while received event {method}");
269                Ok(())
270            }
271            (State::ShuttingDown, method) => {
272                log::warn!("server is shutting down, while received event {method}");
273                Ok(())
274            }
275        }
276    }
277}