falco_plugin/listen/routine.rs
1use crate::error::as_result::WithLastError;
2use crate::error::last_error::LastError;
3use falco_plugin_api::{
4 ss_plugin_bool, ss_plugin_owner_t, ss_plugin_rc, ss_plugin_routine_fn_t,
5 ss_plugin_routine_state_t, ss_plugin_routine_t, ss_plugin_routine_vtable, ss_plugin_t,
6};
7use std::ops::ControlFlow;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use thiserror::Error;
11
12const IDLE: usize = 0;
13const RUNNING: usize = 1;
14const DROP_REQUESTED: usize = 2;
15
16#[derive(Error, Debug)]
17pub(super) enum ThreadPoolError {
18 #[error("Missing entry {0} in thread pool operations vtable")]
19 BadVtable(&'static str),
20}
21
22/// Shared coordination state between [`Routine`] and `cb_wrapper`.
23///
24/// This is what gets passed to the thread pool (via `Arc::into_raw`) as the
25/// `ss_plugin_routine_state_t` pointer. Both [`Routine`] and the callback
26/// hold `Arc` clones, so the `phase` flag is guaranteed to outlive both sides.
27///
28/// The `func` pointer and `dtor` are set once at creation and never modified.
29/// `func` points to a heap-allocated closure (`Box::into_raw`); exactly one
30/// side frees it, determined by the `phase` protocol:
31///
32/// - **`cb_wrapper`**: swap `RUNNING` into `phase`. If the previous value was
33/// `DROP_REQUESTED`, free the closure and return 0. Otherwise, execute the
34/// closure, then CAS `RUNNING → IDLE`; if that fails (`DROP_REQUESTED`),
35/// free the closure.
36/// - **`Routine::drop`**: swap `DROP_REQUESTED` into `phase`. If the previous
37/// value was `IDLE`, free the closure. Otherwise (`RUNNING`), the callback
38/// will free it on return.
39struct SharedState {
40 phase: AtomicUsize,
41 /// Pointer to the heap-allocated closure, created via `Box::into_raw`.
42 func: *mut (),
43 /// Typed destructor that calls `Box::from_raw` on `func`.
44 dtor: unsafe fn(*mut ()),
45}
46
47// SAFETY: `func` (a raw pointer) is the only non-Send/Sync field.
48// The `phase` protocol ensures it is only accessed by one side at a time.
49unsafe impl Send for SharedState {}
50unsafe impl Sync for SharedState {}
51
52/// # A handle for a routine running in the background
53///
54/// Returned by [`ThreadPool::subscribe`]. Dropping the handle:
55/// 1. Calls `unsubscribe` (prevents future scheduling)
56/// 2. Frees the closure and all captured state (immediately if the callback
57/// is not running, or deferred to the callback otherwise)
58#[must_use]
59pub struct Routine {
60 routine: *mut ss_plugin_routine_t,
61 owner: *mut ss_plugin_owner_t,
62 unsubscribe_fn: unsafe extern "C" fn(
63 o: *mut ss_plugin_owner_t,
64 r: *mut ss_plugin_routine_t,
65 ) -> ss_plugin_rc,
66 state: Arc<SharedState>,
67}
68
69impl std::fmt::Debug for Routine {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("Routine")
72 .field("routine", &self.routine)
73 .finish_non_exhaustive()
74 }
75}
76
77// SAFETY: `routine` and `owner` are opaque framework handles passed only to
78// the C API; they are never dereferenced on the Rust side.
79unsafe impl Send for Routine {}
80unsafe impl Sync for Routine {}
81
82impl Drop for Routine {
83 fn drop(&mut self) {
84 if !self.routine.is_null() {
85 // Prevents future scheduling.
86 unsafe { (self.unsubscribe_fn)(self.owner, self.routine) };
87 }
88
89 // Atomically set DROP_REQUESTED and check the previous state.
90 // - Was IDLE: callback is not running — free the closure now.
91 // - Was RUNNING: callback will see DROP_REQUESTED when it finishes
92 // and free the closure itself.
93 let prev = self.state.phase.swap(DROP_REQUESTED, Ordering::AcqRel);
94
95 if prev == IDLE {
96 unsafe {
97 (self.state.dtor)(self.state.func);
98 }
99 // The thread pool's Arc clone (from Arc::into_raw) is not
100 // reclaimed here: a callback may have already been dispatched
101 // but not yet started executing. It will see DROP_REQUESTED
102 // and return without touching the closure. The Arc clone is
103 // a small leak (AtomicUsize + two pointers) in this case.
104 }
105 }
106}
107
108/// # Thread pool for managing background tasks
109///
110/// The thread pool operates on "routines", which are effectively closures called repeatedly
111/// by the thread pool until they return [`ControlFlow::Break`].
112///
113/// To submit a task, pass it to [`ThreadPool::subscribe`] and store the received handle.
114/// Dropping the handle automatically unsubscribes and frees the routine.
115#[derive(Debug)]
116pub struct ThreadPool {
117 owner: *mut ss_plugin_owner_t,
118 subscribe: unsafe extern "C" fn(
119 o: *mut ss_plugin_owner_t,
120 f: ss_plugin_routine_fn_t,
121 i: *mut ss_plugin_routine_state_t,
122 ) -> *mut ss_plugin_routine_t,
123 unsubscribe: unsafe extern "C" fn(
124 o: *mut ss_plugin_owner_t,
125 r: *mut ss_plugin_routine_t,
126 ) -> ss_plugin_rc,
127
128 last_error: LastError,
129}
130
131impl ThreadPool {
132 pub(super) fn try_from(
133 owner: *mut ss_plugin_owner_t,
134 vtable: *const ss_plugin_routine_vtable,
135 last_error: LastError,
136 ) -> Result<Self, ThreadPoolError> {
137 let vtable = unsafe { vtable.as_ref() }.ok_or(ThreadPoolError::BadVtable("vtable"))?;
138
139 let subscribe = vtable
140 .subscribe
141 .ok_or(ThreadPoolError::BadVtable("subscribe"))?;
142 let unsubscribe = vtable
143 .unsubscribe
144 .ok_or(ThreadPoolError::BadVtable("unsubscribe"))?;
145
146 Ok(Self {
147 owner,
148 subscribe,
149 unsubscribe,
150 last_error,
151 })
152 }
153
154 /// Run a task in a background thread
155 ///
156 /// Returns a [`Routine`]. Dropping the handle automatically unsubscribes
157 /// the routine and frees the closure (immediately if idle, or after the
158 /// current callback invocation finishes).
159 pub fn subscribe<F>(&self, func: F) -> Result<Routine, anyhow::Error>
160 where
161 F: FnMut() -> ControlFlow<()> + Send + 'static,
162 {
163 unsafe extern "C" fn cb_wrapper<F>(
164 _plugin: *mut ss_plugin_t,
165 data: *mut ss_plugin_routine_state_t,
166 ) -> ss_plugin_bool
167 where
168 F: FnMut() -> ControlFlow<()> + Send + 'static,
169 {
170 // Reconstruct the Arc from the raw pointer. Only cb_wrapper
171 // touches this refcount — Routine::drop never reclaims it.
172 // If we return Continue (1), we forget it to preserve the
173 // refcount for the next call. Otherwise we let it drop.
174 let arc = unsafe { Arc::from_raw(data as *const SharedState) };
175
176 // Swap RUNNING into phase. If previous value was DROP_REQUESTED,
177 // the handle has been dropped and the closure already freed.
178 let prev = arc.phase.swap(RUNNING, Ordering::AcqRel);
179 if prev == DROP_REQUESTED {
180 // arc drops here, reclaiming the refcount.
181 return 0;
182 }
183
184 // We hold RUNNING — safe to access the closure.
185 let f = unsafe { &mut *(arc.func as *mut F) };
186 let result = match f() {
187 ControlFlow::Continue(()) => 1,
188 ControlFlow::Break(()) => 0,
189 };
190
191 // Try to go back to IDLE. If drop set DROP_REQUESTED, we free.
192 if arc
193 .phase
194 .compare_exchange(RUNNING, IDLE, Ordering::AcqRel, Ordering::Acquire)
195 .is_err()
196 {
197 // DROP_REQUESTED: free the closure.
198 unsafe {
199 (arc.dtor)(arc.func);
200 }
201 // arc drops here, reclaiming the refcount.
202 return 0;
203 }
204
205 if result == 1 {
206 // Continue — preserve the refcount for the next call.
207 std::mem::forget(arc);
208 }
209 // else: Break — arc drops here, reclaiming the refcount.
210
211 result
212 }
213
214 unsafe fn cb_drop<F>(ptr: *mut ()) {
215 unsafe {
216 drop(Box::from_raw(ptr as *mut F));
217 }
218 }
219
220 let callback = Some(
221 cb_wrapper::<F>
222 as unsafe extern "C" fn(
223 _plugin: *mut ss_plugin_t,
224 data: *mut ss_plugin_routine_state_t,
225 ) -> ss_plugin_bool,
226 );
227
228 let func_ptr = Box::into_raw(Box::new(func));
229
230 let state = Arc::new(SharedState {
231 phase: AtomicUsize::new(IDLE),
232 func: func_ptr as *mut (),
233 dtor: cb_drop::<F>,
234 });
235
236 // Give the thread pool its own Arc clone via into_raw.
237 let tp_clone = Arc::clone(&state);
238 let raw_ptr = Arc::into_raw(tp_clone);
239
240 let ptr = unsafe {
241 (self.subscribe)(
242 self.owner,
243 callback,
244 raw_ptr as *mut ss_plugin_routine_state_t,
245 )
246 };
247
248 if ptr.is_null() {
249 // Reclaim the thread pool's Arc clone.
250 unsafe {
251 Arc::from_raw(raw_ptr);
252 }
253 // Free the closure.
254 unsafe {
255 drop(Box::from_raw(func_ptr));
256 }
257 Err(anyhow::anyhow!("Failed to subscribe function")).with_last_error(&self.last_error)
258 } else {
259 Ok(Routine {
260 routine: ptr,
261 owner: self.owner,
262 unsubscribe_fn: self.unsubscribe,
263 state,
264 })
265 }
266 }
267}