moly_kit/utils/
asynchronous.rs1use std::pin::Pin;
15
16use futures::{
17 future::{AbortHandle, Abortable, Future, abortable},
18 stream::Stream,
19};
20
21cfg_if::cfg_if! {
22 if #[cfg(target_arch = "wasm32")] {
23 pub trait PlatformSendInner {}
24 impl<T> PlatformSendInner for T {}
25 } else {
26 pub trait PlatformSendInner: Send {}
27 impl<T> PlatformSendInner for T where T: Send {}
28 }
29}
30
31pub trait PlatformSend: PlatformSendInner {}
37impl<T> PlatformSend for T where T: PlatformSendInner {}
38
39pub trait PlatformSendFuture: Future + PlatformSend {}
41impl<F, O> PlatformSendFuture for F where F: Future<Output = O> + PlatformSend {}
42
43pub trait PlatformSendStream: Stream + PlatformSend {}
45impl<S, T> PlatformSendStream for S where S: Stream<Item = T> + PlatformSend {}
46
47pub type BoxPlatformSendFuture<'a, T> = Pin<Box<dyn PlatformSendFuture<Output = T> + 'a>>;
49
50pub type BoxPlatformSendStream<'a, T> = Pin<Box<dyn PlatformSendStream<Item = T> + 'a>>;
52
53pub fn spawn(fut: impl PlatformSendFuture<Output = ()> + 'static) {
62 spawn_impl(fut);
63}
64
65#[cfg(feature = "async-rt")]
66#[cfg(not(target_arch = "wasm32"))]
67fn spawn_impl(fut: impl Future<Output = ()> + 'static + Send) {
68 use std::sync::OnceLock;
69 use tokio::runtime::{Builder, Handle, Runtime};
70
71 static RUNTIME: OnceLock<Runtime> = OnceLock::new();
72
73 if let Ok(handle) = Handle::try_current() {
74 handle.spawn(fut);
75 } else {
76 log::warn!("No Tokio runtime found on this native platform. Creating a shared runtime.");
77 let rt = RUNTIME.get_or_init(|| {
78 Builder::new_multi_thread()
79 .enable_io()
80 .enable_time()
81 .thread_name("moly-kit-tokio")
82 .build()
83 .expect("Failed to create Tokio runtime for MolyKit")
84 });
85 rt.spawn(fut);
86 }
87}
88
89#[cfg(feature = "async-web")]
90#[cfg(target_arch = "wasm32")]
91fn spawn_impl(fut: impl Future<Output = ()> + 'static) {
92 wasm_bindgen_futures::spawn_local(fut);
93}
94
95pub struct AbortOnDropHandle(AbortHandle);
110
111impl Drop for AbortOnDropHandle {
112 fn drop(&mut self) {
113 self.abort();
114 }
115}
116
117impl AbortOnDropHandle {
118 pub fn abort(&mut self) {
120 self.0.abort();
121 }
122}
123
124pub fn abort_on_drop<F, T>(future: F) -> (Abortable<F>, AbortOnDropHandle)
128where
129 F: PlatformSendFuture<Output = T> + 'static,
130{
131 let (abort_handle, abort_registration) = abortable(future);
132 (abort_handle, AbortOnDropHandle(abort_registration))
133}
134
135mod thread_token {
136 use std::any::Any;
137 use std::cell::RefCell;
138 use std::collections::HashMap;
139 use std::sync::Arc;
140 use std::sync::atomic::{AtomicU64, Ordering};
141
142 static NEXT_KEY: AtomicU64 = AtomicU64::new(0);
143
144 thread_local! {
145 static STORAGE: RefCell<HashMap<u64, Option<Box<dyn Any>>>> = RefCell::new(HashMap::new());
146 }
147
148 struct ThreadTokenInner<T: 'static> {
149 key: u64,
150 _phantom: std::marker::PhantomData<fn() -> T>,
151 }
152
153 impl<T> Drop for ThreadTokenInner<T> {
154 fn drop(&mut self) {
155 STORAGE.with_borrow_mut(|storage| {
156 storage
157 .remove(&self.key)
158 .expect("Token dropped in a different thread.");
159 });
160 }
161 }
162
163 pub struct ThreadToken<T: 'static>(Arc<ThreadTokenInner<T>>);
177
178 impl<T> Clone for ThreadToken<T> {
179 fn clone(&self) -> Self {
180 Self(Arc::clone(&self.0))
181 }
182 }
183
184 impl<T> std::fmt::Debug for ThreadToken<T> {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 write!(
187 f,
188 "ThreadToken<{}>({})",
189 std::any::type_name::<T>(),
190 self.0.key
191 )
192 }
193 }
194
195 impl<T> ThreadToken<T> {
196 pub fn new(value: T) -> Self {
198 let key = NEXT_KEY.fetch_add(1, Ordering::Relaxed);
199
200 STORAGE.with_borrow_mut(|storage| {
201 storage.insert(key, Some(Box::new(value)));
202 });
203
204 Self(Arc::new(ThreadTokenInner {
205 key,
206 _phantom: std::marker::PhantomData,
207 }))
208 }
209
210 pub fn peek<R>(&self, f: impl FnOnce(&T) -> R) -> R {
212 STORAGE.with_borrow_mut(|storage| {
213 let option = storage
214 .get(&self.0.key)
215 .expect("Token `peek` called from different thread");
216
217 let boxed = option
218 .as_ref()
219 .expect("Token `peek` called after value was taken");
220
221 let value = boxed.downcast_ref::<T>().unwrap();
222 f(value)
223 })
224 }
225
226 pub fn peek_mut<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
228 STORAGE.with_borrow_mut(|storage| {
229 let option = storage
230 .get_mut(&self.0.key)
231 .expect("Token `peek_mut` called from different thread");
232
233 let boxed = option
234 .as_mut()
235 .expect("Token `peek_mut` called after value was taken");
236
237 let value = boxed.downcast_mut::<T>().unwrap();
238 f(value)
239 })
240 }
241 }
242
243 impl<T: Clone> ThreadToken<T> {
244 pub fn clone_inner(&self) -> T {
246 self.peek(|value| value.clone())
247 }
248 }
249}
250
251pub use thread_token::*;