Skip to content

Commit 94b1c25

Browse files
authored
Merge pull request #60 from stepancheg/with-gil-2
Move Python::attach calls outside of tokio event loop in future_into_py
2 parents 2b243ef + f6ebac3 commit 94b1c25

File tree

4 files changed

+109
-34
lines changed

4 files changed

+109
-34
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ To see unreleased changes, please see the CHANGELOG on the main branch.
1111
<!-- towncrier release notes start -->
1212

1313
- Avoid attaching to the runtime when cloning TaskLocals by using std::sync::Arc. [#62](https://github.com/PyO3/pyo3-async-runtimes/pull/62)
14+
- **Breaking**: Finalize the future without holding GIL inside async-std/tokio runtime.
15+
Trait `Runtime` now requires `spawn_blocking` function,
16+
`future_into_py` functions now require future return type to be `Send`.
17+
[#60](https://github.com/PyO3/pyo3-async-runtimes/pull/60)
1418

1519
## [0.26.0] - 2025-09-02
1620

src/async_std.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
use async_std::task;
1818
use futures::FutureExt;
1919
use pyo3::prelude::*;
20-
use std::{any::Any, cell::RefCell, future::Future, panic::AssertUnwindSafe, pin::Pin};
20+
use std::{any::Any, cell::RefCell, future::Future, panic, panic::AssertUnwindSafe, pin::Pin};
2121

2222
use crate::{
2323
generic::{self, ContextExt, JoinError, LocalContextExt, Runtime, SpawnLocalExt},
@@ -74,6 +74,15 @@ impl Runtime for AsyncStdRuntime {
7474
.map_err(AsyncStdJoinErr)
7575
})
7676
}
77+
78+
fn spawn_blocking<F>(f: F) -> Self::JoinHandle
79+
where
80+
F: FnOnce() + Send + 'static,
81+
{
82+
task::spawn_blocking(move || {
83+
panic::catch_unwind(AssertUnwindSafe(f)).map_err(|e| AsyncStdJoinErr(Box::new(e)))
84+
})
85+
}
7786
}
7887

7988
impl ContextExt for AsyncStdRuntime {
@@ -276,7 +285,7 @@ pub fn future_into_py_with_locals<F, T>(
276285
) -> PyResult<Bound<PyAny>>
277286
where
278287
F: Future<Output = PyResult<T>> + Send + 'static,
279-
T: for<'py> IntoPyObject<'py>,
288+
T: for<'py> IntoPyObject<'py> + Send + 'static,
280289
{
281290
generic::future_into_py_with_locals::<AsyncStdRuntime, F, T>(py, locals, fut)
282291
}
@@ -322,7 +331,7 @@ where
322331
pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
323332
where
324333
F: Future<Output = PyResult<T>> + Send + 'static,
325-
T: for<'py> IntoPyObject<'py>,
334+
T: for<'py> IntoPyObject<'py> + Send + 'static,
326335
{
327336
generic::future_into_py::<AsyncStdRuntime, _, T>(py, fut)
328337
}

src/generic.rs

Lines changed: 84 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ pub trait Runtime: Send + 'static {
5252
fn spawn<F>(fut: F) -> Self::JoinHandle
5353
where
5454
F: Future<Output = ()> + Send + 'static;
55+
56+
/// Spawn a function onto this runtime's blocking event loop
57+
fn spawn_blocking<F>(f: F) -> Self::JoinHandle
58+
where
59+
F: FnOnce() + Send + 'static;
5560
}
5661

5762
/// Extension trait for async/await runtimes that support spawning local tasks
@@ -161,6 +166,10 @@ where
161166
/// # {
162167
/// # unreachable!()
163168
/// # }
169+
/// #
170+
/// # fn spawn_blocking<F>(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static {
171+
/// # unreachable!()
172+
/// # }
164173
/// # }
165174
/// #
166175
/// # impl ContextExt for MyCustomRuntime {
@@ -265,6 +274,10 @@ where
265274
/// # {
266275
/// # unreachable!()
267276
/// # }
277+
/// #
278+
/// # fn spawn_blocking<F>(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static {
279+
/// # unreachable!()
280+
/// # }
268281
/// # }
269282
/// #
270283
/// # impl ContextExt for MyCustomRuntime {
@@ -415,6 +428,10 @@ fn set_result(
415428
/// # {
416429
/// # unreachable!()
417430
/// # }
431+
/// #
432+
/// # fn spawn_blocking<F>(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static {
433+
/// # unreachable!()
434+
/// # }
418435
/// # }
419436
/// #
420437
/// # impl ContextExt for MyCustomRuntime {
@@ -540,6 +557,10 @@ where
540557
/// # {
541558
/// # unreachable!()
542559
/// # }
560+
/// #
561+
/// # fn spawn_blocking<F>(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static {
562+
/// # unreachable!()
563+
/// # }
543564
/// # }
544565
/// #
545566
/// # impl ContextExt for MyCustomRuntime {
@@ -581,7 +602,7 @@ pub fn future_into_py_with_locals<R, F, T>(
581602
where
582603
R: Runtime + ContextExt,
583604
F: Future<Output = PyResult<T>> + Send + 'static,
584-
T: for<'py> IntoPyObject<'py>,
605+
T: for<'py> IntoPyObject<'py> + Send + 'static,
585606
{
586607
let (cancel_tx, cancel_rx) = oneshot::channel();
587608

@@ -606,44 +627,50 @@ where
606627
)
607628
.await;
608629

609-
Python::attach(move |py| {
610-
if cancelled(future_tx1.bind(py))
611-
.map_err(dump_err(py))
612-
.unwrap_or(false)
613-
{
614-
return;
615-
}
616-
617-
let _ = set_result(
618-
&locals2.event_loop(py),
619-
future_tx1.bind(py),
620-
result.and_then(|val| val.into_py_any(py)),
621-
)
622-
.map_err(dump_err(py));
623-
});
624-
})
625-
.await
626-
{
627-
if e.is_panic() {
630+
// We should not hold GIL inside async-std/tokio event loop,
631+
// because a blocked task may prevent other tasks from progressing.
632+
R::spawn_blocking(|| {
628633
Python::attach(move |py| {
629-
if cancelled(future_tx2.bind(py))
634+
if cancelled(future_tx1.bind(py))
630635
.map_err(dump_err(py))
631636
.unwrap_or(false)
632637
{
633638
return;
634639
}
635640

636-
let panic_message = format!(
637-
"rust future panicked: {}",
638-
get_panic_message(&e.into_panic())
639-
);
640641
let _ = set_result(
641-
locals.0.event_loop.bind(py),
642-
future_tx2.bind(py),
643-
Err(RustPanic::new_err(panic_message)),
642+
&locals2.event_loop(py),
643+
future_tx1.bind(py),
644+
result.and_then(|val| val.into_py_any(py)),
644645
)
645646
.map_err(dump_err(py));
646647
});
648+
});
649+
})
650+
.await
651+
{
652+
if e.is_panic() {
653+
R::spawn_blocking(|| {
654+
Python::attach(move |py| {
655+
if cancelled(future_tx2.bind(py))
656+
.map_err(dump_err(py))
657+
.unwrap_or(false)
658+
{
659+
return;
660+
}
661+
662+
let panic_message = format!(
663+
"rust future panicked: {}",
664+
get_panic_message(&e.into_panic())
665+
);
666+
let _ = set_result(
667+
locals.0.event_loop.bind(py),
668+
future_tx2.bind(py),
669+
Err(RustPanic::new_err(panic_message)),
670+
)
671+
.map_err(dump_err(py));
672+
});
673+
});
647674
}
648675
}
649676
});
@@ -812,6 +839,10 @@ impl PyDoneCallback {
812839
/// # {
813840
/// # unreachable!()
814841
/// # }
842+
/// #
843+
/// # fn spawn_blocking<F>(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static {
844+
/// # unreachable!()
845+
/// # }
815846
/// # }
816847
/// #
817848
/// # impl ContextExt for MyCustomRuntime {
@@ -844,7 +875,7 @@ pub fn future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
844875
where
845876
R: Runtime + ContextExt,
846877
F: Future<Output = PyResult<T>> + Send + 'static,
847-
T: for<'py> IntoPyObject<'py>,
878+
T: for<'py> IntoPyObject<'py> + Send + 'static,
848879
{
849880
future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
850881
}
@@ -921,6 +952,10 @@ where
921952
/// # {
922953
/// # unreachable!()
923954
/// # }
955+
/// #
956+
/// # fn spawn_blocking<F>(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static {
957+
/// # unreachable!()
958+
/// # }
924959
/// # }
925960
/// #
926961
/// # impl ContextExt for MyCustomRuntime {
@@ -1126,6 +1161,10 @@ where
11261161
/// # {
11271162
/// # unreachable!()
11281163
/// # }
1164+
/// #
1165+
/// # fn spawn_blocking<F>(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static {
1166+
/// # unreachable!()
1167+
/// # }
11291168
/// # }
11301169
/// #
11311170
/// # impl ContextExt for MyCustomRuntime {
@@ -1240,6 +1279,10 @@ where
12401279
/// # {
12411280
/// # unreachable!()
12421281
/// # }
1282+
/// #
1283+
/// # fn spawn_blocking<F>(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static {
1284+
/// # unreachable!()
1285+
/// # }
12431286
/// # }
12441287
/// #
12451288
/// # impl ContextExt for MyCustomRuntime {
@@ -1389,6 +1432,10 @@ where
13891432
/// # {
13901433
/// # unreachable!()
13911434
/// # }
1435+
/// #
1436+
/// # fn spawn_blocking<F>(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static {
1437+
/// # unreachable!()
1438+
/// # }
13921439
/// # }
13931440
/// #
13941441
/// # impl ContextExt for MyCustomRuntime {
@@ -1584,6 +1631,10 @@ async def forward(gen, sender):
15841631
/// # {
15851632
/// # unreachable!()
15861633
/// # }
1634+
/// #
1635+
/// # fn spawn_blocking<F>(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static {
1636+
/// # unreachable!()
1637+
/// # }
15871638
/// # }
15881639
/// #
15891640
/// # impl ContextExt for MyCustomRuntime {
@@ -1737,6 +1788,10 @@ where
17371788
/// # {
17381789
/// # unreachable!()
17391790
/// # }
1791+
/// #
1792+
/// # fn spawn_blocking<F>(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static {
1793+
/// # unreachable!()
1794+
/// # }
17401795
/// # }
17411796
/// #
17421797
/// # impl ContextExt for MyCustomRuntime {

src/tokio.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,13 @@ impl GenericRuntime for TokioRuntime {
9494
fut.await;
9595
})
9696
}
97+
98+
fn spawn_blocking<F>(f: F) -> Self::JoinHandle
99+
where
100+
F: FnOnce() + Send + 'static,
101+
{
102+
get_runtime().spawn_blocking(f)
103+
}
97104
}
98105

99106
impl ContextExt for TokioRuntime {
@@ -318,7 +325,7 @@ pub fn future_into_py_with_locals<F, T>(
318325
) -> PyResult<Bound<PyAny>>
319326
where
320327
F: Future<Output = PyResult<T>> + Send + 'static,
321-
T: for<'py> IntoPyObject<'py>,
328+
T: for<'py> IntoPyObject<'py> + Send + 'static,
322329
{
323330
generic::future_into_py_with_locals::<TokioRuntime, F, T>(py, locals, fut)
324331
}
@@ -364,7 +371,7 @@ where
364371
pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
365372
where
366373
F: Future<Output = PyResult<T>> + Send + 'static,
367-
T: for<'py> IntoPyObject<'py>,
374+
T: for<'py> IntoPyObject<'py> + Send + 'static,
368375
{
369376
generic::future_into_py::<TokioRuntime, _, T>(py, fut)
370377
}

0 commit comments

Comments
 (0)