falco_plugin/plugin/listen/
routine.rs

1use crate::plugin::error::as_result::{AsResult, WithLastError};
2use crate::plugin::error::last_error::LastError;
3use falco_plugin_api::{
4    ss_plugin_bool, ss_plugin_owner_t, ss_plugin_rc, ss_plugin_routine_fn_t,
5    ss_plugin_routine_state_t, ss_plugin_routine_t, ss_plugin_routine_vtable, ss_plugin_t,
6};
7use std::ops::ControlFlow;
8use thiserror::Error;
9
10#[derive(Error, Debug)]
11pub enum ThreadPoolError {
12    #[error("Missing entry {0} in thread pool operations vtable")]
13    BadVtable(&'static str),
14}
15
16/// # A handle for a routine running in the background
17///
18/// This is an opaque object, coming from [`ThreadPool::subscribe`], that will drop
19/// the wrapped closure when dropped itself.
20///
21/// *Note*: it's your responsibility to hold on to the handle as long as the closure
22/// may be called. Sadly, our capabilities are limited here, so one approach might be
23/// to skip the destructor call with e.g. [`std::mem::ManuallyDrop`] and dropping the wrapper.
24/// This will leak memory but will be guaranteed safe.
25#[derive(Debug)]
26#[must_use]
27pub struct Routine {
28    routine: *mut ss_plugin_routine_t,
29    state: *mut ss_plugin_routine_state_t,
30    dtor: unsafe fn(*mut ss_plugin_routine_state_t) -> (),
31}
32
33impl Drop for Routine {
34    fn drop(&mut self) {
35        unsafe { (self.dtor)(self.state) }
36    }
37}
38
39/// # Thread pool for managing background tasks
40///
41/// The thread pool operates on "routines", which are effectively closures called repeatedly
42/// by the thread pool until they return [`ControlFlow::Break`].
43///
44/// To submit a task, pass it to [`ThreadPool::subscribe`] and store the received handle.
45/// To cancel a task, pass its handle to [`ThreadPool::unsubscribe`].
46#[derive(Debug)]
47pub struct ThreadPool {
48    owner: *mut ss_plugin_owner_t,
49    subscribe: unsafe extern "C-unwind" fn(
50        o: *mut ss_plugin_owner_t,
51        f: ss_plugin_routine_fn_t,
52        i: *mut ss_plugin_routine_state_t,
53    ) -> *mut ss_plugin_routine_t,
54    unsubscribe: unsafe extern "C-unwind" fn(
55        o: *mut ss_plugin_owner_t,
56        r: *mut ss_plugin_routine_t,
57    ) -> ss_plugin_rc,
58
59    last_error: LastError,
60}
61
62impl ThreadPool {
63    pub(in crate::plugin::listen) fn try_from(
64        owner: *mut ss_plugin_owner_t,
65        vtable: *const ss_plugin_routine_vtable,
66        last_error: LastError,
67    ) -> Result<Self, ThreadPoolError> {
68        let vtable = unsafe { vtable.as_ref() }.ok_or(ThreadPoolError::BadVtable("vtable"))?;
69
70        let subscribe = vtable
71            .subscribe
72            .ok_or(ThreadPoolError::BadVtable("subscribe"))?;
73        let unsubscribe = vtable
74            .unsubscribe
75            .ok_or(ThreadPoolError::BadVtable("unsubscribe"))?;
76
77        Ok(Self {
78            owner,
79            subscribe,
80            unsubscribe,
81            last_error,
82        })
83    }
84
85    /// Run a task in a background thread
86    pub fn subscribe<F>(&self, func: F) -> Result<Routine, anyhow::Error>
87    where
88        F: FnMut() -> ControlFlow<()> + Send + 'static,
89    {
90        unsafe extern "C-unwind" fn cb_wrapper<F>(
91            _plugin: *mut ss_plugin_t,
92            data: *mut ss_plugin_routine_state_t,
93        ) -> ss_plugin_bool
94        where
95            F: FnMut() -> ControlFlow<()> + Send + 'static,
96        {
97            let f = data as *mut F;
98            unsafe {
99                match (*f)() {
100                    ControlFlow::Continue(()) => 1,
101                    ControlFlow::Break(()) => 0,
102                }
103            }
104        }
105
106        unsafe fn cb_drop<F>(data: *mut ss_plugin_routine_state_t) {
107            let cb = data as *mut F;
108            let _ = Box::from_raw(cb);
109        }
110
111        let callback = Some(
112            cb_wrapper::<F>
113                as unsafe extern "C-unwind" fn(
114                    _plugin: *mut ss_plugin_t,
115                    data: *mut ss_plugin_routine_state_t,
116                ) -> ss_plugin_bool,
117        );
118
119        let boxed_func = Box::new(func);
120        let boxed_func = Box::into_raw(boxed_func) as *mut ss_plugin_routine_state_t;
121
122        let ptr = unsafe { (self.subscribe)(self.owner, callback, boxed_func) };
123
124        if ptr.is_null() {
125            Err(anyhow::anyhow!("Failed to subscribe function")).with_last_error(&self.last_error)
126        } else {
127            Ok(Routine {
128                routine: ptr,
129                state: boxed_func,
130                dtor: cb_drop::<F>,
131            })
132        }
133    }
134
135    /// Cancel a task running in a background thread
136    ///
137    /// *Note*: this does not kill a running task, only prevent it from being scheduled again
138    pub fn unsubscribe(&self, routine: &Routine) -> Result<(), anyhow::Error> {
139        unsafe {
140            (self.unsubscribe)(self.owner, routine.routine)
141                .as_result()
142                .with_last_error(&self.last_error)
143        }
144    }
145}