Skip to main content

falco_plugin/listen/
routine.rs

1use crate::error::as_result::WithLastError;
2use crate::error::last_error::LastError;
3use falco_plugin_api::{
4    ss_plugin_bool, ss_plugin_owner_t, ss_plugin_rc, ss_plugin_routine_fn_t,
5    ss_plugin_routine_state_t, ss_plugin_routine_t, ss_plugin_routine_vtable, ss_plugin_t,
6};
7use std::ops::ControlFlow;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use thiserror::Error;
11
12const IDLE: usize = 0;
13const RUNNING: usize = 1;
14const DROP_REQUESTED: usize = 2;
15
16#[derive(Error, Debug)]
17pub(super) enum ThreadPoolError {
18    #[error("Missing entry {0} in thread pool operations vtable")]
19    BadVtable(&'static str),
20}
21
22/// Shared coordination state between [`Routine`] and `cb_wrapper`.
23///
24/// This is what gets passed to the thread pool (via `Arc::into_raw`) as the
25/// `ss_plugin_routine_state_t` pointer. Both [`Routine`] and the callback
26/// hold `Arc` clones, so the `phase` flag is guaranteed to outlive both sides.
27///
28/// The `func` pointer and `dtor` are set once at creation and never modified.
29/// `func` points to a heap-allocated closure (`Box::into_raw`); exactly one
30/// side frees it, determined by the `phase` protocol:
31///
32/// - **`cb_wrapper`**: swap `RUNNING` into `phase`. If the previous value was
33///   `DROP_REQUESTED`, free the closure and return 0. Otherwise, execute the
34///   closure, then CAS `RUNNING → IDLE`; if that fails (`DROP_REQUESTED`),
35///   free the closure.
36/// - **`Routine::drop`**: swap `DROP_REQUESTED` into `phase`. If the previous
37///   value was `IDLE`, free the closure. Otherwise (`RUNNING`), the callback
38///   will free it on return.
39struct SharedState {
40    phase: AtomicUsize,
41    /// Pointer to the heap-allocated closure, created via `Box::into_raw`.
42    func: *mut (),
43    /// Typed destructor that calls `Box::from_raw` on `func`.
44    dtor: unsafe fn(*mut ()),
45}
46
47// SAFETY: `func` (a raw pointer) is the only non-Send/Sync field.
48// The `phase` protocol ensures it is only accessed by one side at a time.
49unsafe impl Send for SharedState {}
50unsafe impl Sync for SharedState {}
51
52/// # A handle for a routine running in the background
53///
54/// Returned by [`ThreadPool::subscribe`]. Dropping the handle:
55/// 1. Calls `unsubscribe` (prevents future scheduling)
56/// 2. Frees the closure and all captured state (immediately if the callback
57///    is not running, or deferred to the callback otherwise)
58#[must_use]
59pub struct Routine {
60    routine: *mut ss_plugin_routine_t,
61    owner: *mut ss_plugin_owner_t,
62    unsubscribe_fn: unsafe extern "C" fn(
63        o: *mut ss_plugin_owner_t,
64        r: *mut ss_plugin_routine_t,
65    ) -> ss_plugin_rc,
66    state: Arc<SharedState>,
67}
68
69impl std::fmt::Debug for Routine {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("Routine")
72            .field("routine", &self.routine)
73            .finish_non_exhaustive()
74    }
75}
76
77// SAFETY: `routine` and `owner` are opaque framework handles passed only to
78// the C API; they are never dereferenced on the Rust side.
79unsafe impl Send for Routine {}
80unsafe impl Sync for Routine {}
81
82impl Drop for Routine {
83    fn drop(&mut self) {
84        if !self.routine.is_null() {
85            // Prevents future scheduling.
86            unsafe { (self.unsubscribe_fn)(self.owner, self.routine) };
87        }
88
89        // Atomically set DROP_REQUESTED and check the previous state.
90        // - Was IDLE: callback is not running — free the closure now.
91        // - Was RUNNING: callback will see DROP_REQUESTED when it finishes
92        //   and free the closure itself.
93        let prev = self.state.phase.swap(DROP_REQUESTED, Ordering::AcqRel);
94
95        if prev == IDLE {
96            unsafe {
97                (self.state.dtor)(self.state.func);
98            }
99            // The thread pool's Arc clone (from Arc::into_raw) is not
100            // reclaimed here: a callback may have already been dispatched
101            // but not yet started executing. It will see DROP_REQUESTED
102            // and return without touching the closure. The Arc clone is
103            // a small leak (AtomicUsize + two pointers) in this case.
104        }
105    }
106}
107
108/// # Thread pool for managing background tasks
109///
110/// The thread pool operates on "routines", which are effectively closures called repeatedly
111/// by the thread pool until they return [`ControlFlow::Break`].
112///
113/// To submit a task, pass it to [`ThreadPool::subscribe`] and store the received handle.
114/// Dropping the handle automatically unsubscribes and frees the routine.
115#[derive(Debug)]
116pub struct ThreadPool {
117    owner: *mut ss_plugin_owner_t,
118    subscribe: unsafe extern "C" fn(
119        o: *mut ss_plugin_owner_t,
120        f: ss_plugin_routine_fn_t,
121        i: *mut ss_plugin_routine_state_t,
122    ) -> *mut ss_plugin_routine_t,
123    unsubscribe: unsafe extern "C" fn(
124        o: *mut ss_plugin_owner_t,
125        r: *mut ss_plugin_routine_t,
126    ) -> ss_plugin_rc,
127
128    last_error: LastError,
129}
130
131impl ThreadPool {
132    pub(super) fn try_from(
133        owner: *mut ss_plugin_owner_t,
134        vtable: *const ss_plugin_routine_vtable,
135        last_error: LastError,
136    ) -> Result<Self, ThreadPoolError> {
137        let vtable = unsafe { vtable.as_ref() }.ok_or(ThreadPoolError::BadVtable("vtable"))?;
138
139        let subscribe = vtable
140            .subscribe
141            .ok_or(ThreadPoolError::BadVtable("subscribe"))?;
142        let unsubscribe = vtable
143            .unsubscribe
144            .ok_or(ThreadPoolError::BadVtable("unsubscribe"))?;
145
146        Ok(Self {
147            owner,
148            subscribe,
149            unsubscribe,
150            last_error,
151        })
152    }
153
154    /// Run a task in a background thread
155    ///
156    /// Returns a [`Routine`]. Dropping the handle automatically unsubscribes
157    /// the routine and frees the closure (immediately if idle, or after the
158    /// current callback invocation finishes).
159    pub fn subscribe<F>(&self, func: F) -> Result<Routine, anyhow::Error>
160    where
161        F: FnMut() -> ControlFlow<()> + Send + 'static,
162    {
163        unsafe extern "C" fn cb_wrapper<F>(
164            _plugin: *mut ss_plugin_t,
165            data: *mut ss_plugin_routine_state_t,
166        ) -> ss_plugin_bool
167        where
168            F: FnMut() -> ControlFlow<()> + Send + 'static,
169        {
170            // Reconstruct the Arc from the raw pointer. Only cb_wrapper
171            // touches this refcount — Routine::drop never reclaims it.
172            // If we return Continue (1), we forget it to preserve the
173            // refcount for the next call. Otherwise we let it drop.
174            let arc = unsafe { Arc::from_raw(data as *const SharedState) };
175
176            // Swap RUNNING into phase. If previous value was DROP_REQUESTED,
177            // the handle has been dropped and the closure already freed.
178            let prev = arc.phase.swap(RUNNING, Ordering::AcqRel);
179            if prev == DROP_REQUESTED {
180                // arc drops here, reclaiming the refcount.
181                return 0;
182            }
183
184            // We hold RUNNING — safe to access the closure.
185            let f = unsafe { &mut *(arc.func as *mut F) };
186            let result = match f() {
187                ControlFlow::Continue(()) => 1,
188                ControlFlow::Break(()) => 0,
189            };
190
191            // Try to go back to IDLE. If drop set DROP_REQUESTED, we free.
192            if arc
193                .phase
194                .compare_exchange(RUNNING, IDLE, Ordering::AcqRel, Ordering::Acquire)
195                .is_err()
196            {
197                // DROP_REQUESTED: free the closure.
198                unsafe {
199                    (arc.dtor)(arc.func);
200                }
201                // arc drops here, reclaiming the refcount.
202                return 0;
203            }
204
205            if result == 1 {
206                // Continue — preserve the refcount for the next call.
207                std::mem::forget(arc);
208            }
209            // else: Break — arc drops here, reclaiming the refcount.
210
211            result
212        }
213
214        unsafe fn cb_drop<F>(ptr: *mut ()) {
215            unsafe {
216                drop(Box::from_raw(ptr as *mut F));
217            }
218        }
219
220        let callback = Some(
221            cb_wrapper::<F>
222                as unsafe extern "C" fn(
223                    _plugin: *mut ss_plugin_t,
224                    data: *mut ss_plugin_routine_state_t,
225                ) -> ss_plugin_bool,
226        );
227
228        let func_ptr = Box::into_raw(Box::new(func));
229
230        let state = Arc::new(SharedState {
231            phase: AtomicUsize::new(IDLE),
232            func: func_ptr as *mut (),
233            dtor: cb_drop::<F>,
234        });
235
236        // Give the thread pool its own Arc clone via into_raw.
237        let tp_clone = Arc::clone(&state);
238        let raw_ptr = Arc::into_raw(tp_clone);
239
240        let ptr = unsafe {
241            (self.subscribe)(
242                self.owner,
243                callback,
244                raw_ptr as *mut ss_plugin_routine_state_t,
245            )
246        };
247
248        if ptr.is_null() {
249            // Reclaim the thread pool's Arc clone.
250            unsafe {
251                Arc::from_raw(raw_ptr);
252            }
253            // Free the closure.
254            unsafe {
255                drop(Box::from_raw(func_ptr));
256            }
257            Err(anyhow::anyhow!("Failed to subscribe function")).with_last_error(&self.last_error)
258        } else {
259            Ok(Routine {
260                routine: ptr,
261                owner: self.owner,
262                unsubscribe_fn: self.unsubscribe,
263                state,
264            })
265        }
266    }
267}