moly_kit/utils/
asynchronous.rs

1//! Asynchronous utilities for MolyKit.
2//!
3//! Mainly helps you to deal with the runtime differences across native and web
4//! and do workaround integrations of async code in Makepad.
5//!
6//! For example: `rfd::FileHandle` is `Send` on native, but not on web. And on web
7//! it may need to be send back to the UI through `Cx::post_action` which requires
8//! `Send` unconditionally.
9//!
10//! Since Makepad doesn't expose an equivalent to `wasm_bindgen_futures::spawn_local`
11//! tied to its own event loop, we need to run Tokio on a separate thread which causes
12//! problems with `Send`.
13
14use std::pin::Pin;
15
16use futures::{
17    future::{AbortHandle, Abortable, Future, abortable},
18    stream::Stream,
19};
20
21cfg_if::cfg_if! {
22    if #[cfg(target_arch = "wasm32")] {
23        pub trait PlatformSendInner {}
24        impl<T> PlatformSendInner for T {}
25    } else {
26        pub trait PlatformSendInner: Send {}
27        impl<T> PlatformSendInner for T where T: Send {}
28    }
29}
30
31/// Implies [`Send`] only on native platforms, but not on WASM.
32///
33/// In other words:
34/// - On native this gets implemented by all types that implement [`Send`].
35/// - On WASM this gets implemented by all types, regardless of [`Send`].
36pub trait PlatformSend: PlatformSendInner {}
37impl<T> PlatformSend for T where T: PlatformSendInner {}
38
39/// A future that requires [`Send`] on native platforms, but not on WASM.
40pub trait PlatformSendFuture: Future + PlatformSend {}
41impl<F, O> PlatformSendFuture for F where F: Future<Output = O> + PlatformSend {}
42
43/// A stream that requires [`Send`] on native platforms, but not on WASM.
44pub trait PlatformSendStream: Stream + PlatformSend {}
45impl<S, T> PlatformSendStream for S where S: Stream<Item = T> + PlatformSend {}
46
47/// An owned dynamically typed Future that only requires [`Send`] on native platforms, but not on WASM.
48pub type BoxPlatformSendFuture<'a, T> = Pin<Box<dyn PlatformSendFuture<Output = T> + 'a>>;
49
50/// An owned dynamically typed Stream that only requires [`Send`] on native platforms, but not on WASM.
51pub type BoxPlatformSendStream<'a, T> = Pin<Box<dyn PlatformSendStream<Item = T> + 'a>>;
52
53/// Runs a future independently, in a platform-specific way.
54///
55/// - Uses tokio and requires [`Send`] on native platforms.
56/// - Uses wasm-bindgen-futures on WASM and does not require [`Send`].
57///
58/// **Note:** This function may spawn it's own runtime if it can't find an existing one.
59/// Currently, Makepad doesn't expose the entry point in certain platforms (like Android),
60/// making harder to configure a runtime manually.
61pub fn spawn(fut: impl PlatformSendFuture<Output = ()> + 'static) {
62    spawn_impl(fut);
63}
64
65#[cfg(feature = "async-rt")]
66#[cfg(not(target_arch = "wasm32"))]
67fn spawn_impl(fut: impl Future<Output = ()> + 'static + Send) {
68    use std::sync::OnceLock;
69    use tokio::runtime::{Builder, Handle, Runtime};
70
71    static RUNTIME: OnceLock<Runtime> = OnceLock::new();
72
73    if let Ok(handle) = Handle::try_current() {
74        handle.spawn(fut);
75    } else {
76        log::warn!("No Tokio runtime found on this native platform. Creating a shared runtime.");
77        let rt = RUNTIME.get_or_init(|| {
78            Builder::new_multi_thread()
79                .enable_io()
80                .enable_time()
81                .thread_name("moly-kit-tokio")
82                .build()
83                .expect("Failed to create Tokio runtime for MolyKit")
84        });
85        rt.spawn(fut);
86    }
87}
88
89#[cfg(feature = "async-web")]
90#[cfg(target_arch = "wasm32")]
91fn spawn_impl(fut: impl Future<Output = ()> + 'static) {
92    wasm_bindgen_futures::spawn_local(fut);
93}
94
95/// A handle that aborts its associated future when dropped.
96///
97/// Similar to https://docs.rs/tokio-util/latest/tokio_util/task/struct.AbortOnDropHandle.html
98/// but runtime agnostic.
99///
100/// This is created from the [`abort_on_drop`] function.
101///
102/// This is useful in Makepad to ensure tasks gets cancelled on widget drop instead
103/// of keep running in the background unnoticed.
104///
105/// Note: In makepad, widgets may be cached or reused causing this to not work as expected
106/// in many scenarios.
107// TODO: Consider having a shared lightweight supervisor task that awakes makepad to check
108// for responding handles through it's event system, but only if there are active tasks.
109pub struct AbortOnDropHandle(AbortHandle);
110
111impl Drop for AbortOnDropHandle {
112    fn drop(&mut self) {
113        self.abort();
114    }
115}
116
117impl AbortOnDropHandle {
118    /// Manually aborts the future associated with this handle before it is dropped.
119    pub fn abort(&mut self) {
120        self.0.abort();
121    }
122}
123
124/// Constructs a future + [`AbortOnDropHandle`] pair.
125///
126/// See [`AbortOnDropHandle`] for more details.
127pub fn abort_on_drop<F, T>(future: F) -> (Abortable<F>, AbortOnDropHandle)
128where
129    F: PlatformSendFuture<Output = T> + 'static,
130{
131    let (abort_handle, abort_registration) = abortable(future);
132    (abort_handle, AbortOnDropHandle(abort_registration))
133}
134
135mod thread_token {
136    use std::any::Any;
137    use std::cell::RefCell;
138    use std::collections::HashMap;
139    use std::sync::Arc;
140    use std::sync::atomic::{AtomicU64, Ordering};
141
142    static NEXT_KEY: AtomicU64 = AtomicU64::new(0);
143
144    thread_local! {
145        static STORAGE: RefCell<HashMap<u64, Option<Box<dyn Any>>>> = RefCell::new(HashMap::new());
146    }
147
148    struct ThreadTokenInner<T: 'static> {
149        key: u64,
150        _phantom: std::marker::PhantomData<fn() -> T>,
151    }
152
153    impl<T> Drop for ThreadTokenInner<T> {
154        fn drop(&mut self) {
155            STORAGE.with_borrow_mut(|storage| {
156                storage
157                    .remove(&self.key)
158                    .expect("Token dropped in a different thread.");
159            });
160        }
161    }
162
163    /// Holds a value inside a thread-local storage.
164    ///
165    /// Then, this token can be used to access the underlying value as long you
166    /// are in the same thread that created it.
167    ///
168    /// This is useful on the web, where you are always in the same thread, but you
169    /// need to pass some kind of non-`Send` value across `Send` boundries of Makepad.
170    ///
171    /// **Warning**: Trying to read the value from a different thread will panic.
172    ///
173    /// **Warning**: This token is reference counted so you can have copies of it,
174    /// but the last copy must be dropped in the same thread that created it to
175    /// avoid leaks. If this value is dropped in a different thread, it will panic.
176    pub struct ThreadToken<T: 'static>(Arc<ThreadTokenInner<T>>);
177
178    impl<T> Clone for ThreadToken<T> {
179        fn clone(&self) -> Self {
180            Self(Arc::clone(&self.0))
181        }
182    }
183
184    impl<T> std::fmt::Debug for ThreadToken<T> {
185        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186            write!(
187                f,
188                "ThreadToken<{}>({})",
189                std::any::type_name::<T>(),
190                self.0.key
191            )
192        }
193    }
194
195    impl<T> ThreadToken<T> {
196        /// Put the given value in thread-local storage and return a token to access it.
197        pub fn new(value: T) -> Self {
198            let key = NEXT_KEY.fetch_add(1, Ordering::Relaxed);
199
200            STORAGE.with_borrow_mut(|storage| {
201                storage.insert(key, Some(Box::new(value)));
202            });
203
204            Self(Arc::new(ThreadTokenInner {
205                key,
206                _phantom: std::marker::PhantomData,
207            }))
208        }
209
210        /// Immutable access to the value associated with this token.
211        pub fn peek<R>(&self, f: impl FnOnce(&T) -> R) -> R {
212            STORAGE.with_borrow_mut(|storage| {
213                let option = storage
214                    .get(&self.0.key)
215                    .expect("Token `peek` called from different thread");
216
217                let boxed = option
218                    .as_ref()
219                    .expect("Token `peek` called after value was taken");
220
221                let value = boxed.downcast_ref::<T>().unwrap();
222                f(value)
223            })
224        }
225
226        /// Mutable access to the value associated with this token.
227        pub fn peek_mut<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
228            STORAGE.with_borrow_mut(|storage| {
229                let option = storage
230                    .get_mut(&self.0.key)
231                    .expect("Token `peek_mut` called from different thread");
232
233                let boxed = option
234                    .as_mut()
235                    .expect("Token `peek_mut` called after value was taken");
236
237                let value = boxed.downcast_mut::<T>().unwrap();
238                f(value)
239            })
240        }
241    }
242
243    impl<T: Clone> ThreadToken<T> {
244        /// Clone the associated value of this token and return it.
245        pub fn clone_inner(&self) -> T {
246            self.peek(|value| value.clone())
247        }
248    }
249}
250
251pub use thread_token::*;