sync_ls/server/
dap_srv.rs1use dapts::IRequest;
2
3use super::*;
4
5impl<S: 'static> TypedLspClient<S> {
6 pub fn send_dap_event<E: dapts::IEvent>(&self, body: E::Body) {
8 let req_id = self.req_queue.lock().outgoing.alloc_request_id();
9
10 self.send_dap_event_(dap::Event::new(req_id as i64, E::EVENT.to_owned(), body));
11 }
12
13 pub fn send_dap_event_(&self, evt: dap::Event) {
15 self.sender.send_message(evt.into());
16 }
17}
18
19impl<Args: Initializer> LsBuilder<DapMessage, Args>
20where
21 Args::S: 'static,
22{
23 pub fn with_command<R: Serialize + 'static>(
25 mut self,
26 cmd: &'static str,
27 handler: AsyncHandler<Args::S, Vec<JsonValue>, R>,
28 ) -> Self {
29 self.command_handlers.insert(
30 cmd,
31 Box::new(move |s, req| erased_response(handler(s, req))),
32 );
33 self
34 }
35
36 pub fn with_raw_request<R: dapts::IRequest>(
39 mut self,
40 handler: RawHandler<Args::S, JsonValue>,
41 ) -> Self {
42 self.req_handlers.insert(R::COMMAND, Box::new(handler));
43 self
44 }
45
46 pub fn with_request_<R: dapts::IRequest>(
50 mut self,
51 handler: fn(&mut Args::S, R::Arguments) -> ScheduleResult,
52 ) -> Self {
53 self.req_handlers.insert(
54 R::COMMAND,
55 Box::new(move |s, req| handler(s, from_json(req)?)),
56 );
57 self
58 }
59
60 pub fn with_request<R: dapts::IRequest>(
62 mut self,
63 handler: AsyncHandler<Args::S, R::Arguments, R::Response>,
64 ) -> Self {
65 self.req_handlers.insert(
66 R::COMMAND,
67 Box::new(move |s, req| erased_response(handler(s, from_json(req)?))),
68 );
69 self
70 }
71}
72
73impl<Args: Initializer> LsDriver<DapMessage, Args>
74where
75 Args::S: 'static,
76{
77 #[cfg(feature = "system")]
85 pub fn start(
86 &mut self,
87 inbox: TConnectionRx<DapMessage>,
88 is_replay: bool,
89 ) -> anyhow::Result<()> {
90 let res = self.start_(inbox);
91
92 if is_replay {
93 let client = self.client.clone();
94 let _ = std::thread::spawn(move || {
95 let since = tinymist_std::time::Instant::now();
96 let timeout = std::env::var("REPLAY_TIMEOUT")
97 .ok()
98 .and_then(|s| s.parse().ok())
99 .unwrap_or(60);
100 client.handle.block_on(async {
101 while client.has_pending_requests() {
102 if since.elapsed().as_secs() > timeout {
103 log::error!("replay timeout reached, {timeout}s");
104 client.begin_panic();
105 }
106
107 tokio::time::sleep(tinymist_std::time::Duration::from_millis(10)).await;
108 }
109 })
110 })
111 .join();
112 }
113
114 res
115 }
116
117 #[cfg(feature = "system")]
119 pub fn start_(&mut self, inbox: TConnectionRx<DapMessage>) -> anyhow::Result<()> {
120 use EventOrMessage::*;
121
122 while let Ok(msg) = inbox.recv() {
123 let loop_start = Instant::now();
124 match msg {
125 Evt(event) => {
126 let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
127 log::warn!("unhandled event: {:?}", event.as_ref().type_id());
128 continue;
129 };
130
131 let s = match &mut self.state {
132 State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
133 State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
134 State::ShuttingDown => {
135 log::warn!("server is shutting down");
136 continue;
137 }
138 };
139
140 event_handler(s, &self.client, event)?;
141 }
142 Msg(DapMessage::Request(req)) => {
143 let client = self.client.clone();
144 let req_id = (req.seq as i32).into();
145 client.register_request(&req.command, &req_id, loop_start);
146 let fut = client.schedule_tail(req_id, self.on_request(req));
147 self.client.handle.spawn(fut);
148 }
149 Msg(DapMessage::Event(not)) => {
150 self.on_event(loop_start, not)?;
151 }
152 Msg(DapMessage::Response(resp)) => {
153 let s = match &mut self.state {
154 State::Ready(s) => s,
155 _ => {
156 log::warn!("server is not ready yet");
157 continue;
158 }
159 };
160
161 self.client.clone().complete_dap_request(s, resp)
162 }
163 }
164 }
165
166 log::warn!("client exited without proper shutdown sequence");
167 Ok(())
168 }
169
170 fn on_request(&mut self, req: dap::Request) -> ScheduleResult {
173 match (&mut self.state, &*req.command) {
174 (State::Uninitialized(args), dapts::request::Initialize::COMMAND) => {
175 let params = serde_json::from_value::<Args::I>(req.arguments);
177 match params {
178 Ok(params) => {
179 let args = args.take().expect("already initialized");
180 let (s, res) = args.initialize(params);
181 self.state = State::Ready(s);
182 res
183 }
184 Err(e) => just_result(Err(invalid_request(e))),
185 }
186 }
187 (State::Uninitialized(..) | State::Initializing(..), _) => {
209 just_result(Err(not_initialized()))
210 }
211 (_, dapts::request::Initialize::COMMAND) => {
212 just_result(Err(invalid_request("server is already initialized")))
213 }
214 (State::Ready(s), _) => 'serve_req: {
219 let method = req.command.as_str();
220
221 let is_disconnect = method == dapts::request::Disconnect::COMMAND;
222
223 let Some(handler) = self.requests.get(method) else {
224 log::warn!("unhandled dap request: {method}");
225 break 'serve_req just_result(Err(method_not_found()));
226 };
227
228 let resp = handler(s, req.arguments);
229
230 if is_disconnect {
231 self.state = State::ShuttingDown;
232 }
233
234 resp
235 }
236 (State::ShuttingDown, _) => {
237 just_result(Err(invalid_request("server is shutting down")))
238 }
239 }
240 }
241
242 fn on_event(&mut self, received_at: Instant, not: dap::Event) -> anyhow::Result<()> {
244 self.client.hook.start_notification(¬.event);
245 let handle = |s,
246 dap::Event {
247 seq: _,
248 event,
249 body,
250 }: dap::Event| {
251 let Some(handler) = self.notifications.get(event.as_str()) else {
252 log::warn!("unhandled event: {event}");
253 return Ok(());
254 };
255
256 let result = handler(s, body);
257 self.client
258 .hook
259 .stop_notification(&event, received_at, result);
260
261 Ok(())
262 };
263
264 match (&mut self.state, &*not.event) {
265 (State::Ready(state), _) => handle(state, not),
266 (State::Uninitialized(..) | State::Initializing(..), method) => {
268 log::warn!("server is not ready yet, while received event {method}");
269 Ok(())
270 }
271 (State::ShuttingDown, method) => {
272 log::warn!("server is shutting down, while received event {method}");
273 Ok(())
274 }
275 }
276 }
277}