Skip to content

Commit fbb0bb5

Browse files
[ZEN-688] Add cancellation tokens support (#635)
* add cancellation tokens support * Merge branch 'main' into fix-concurrent-cancel-ct * support cancellation tokens for indirect callbacks * address review comments * fix is_cancelled evaluation
1 parent f7a9dff commit fbb0bb5

File tree

12 files changed

+320
-59
lines changed

12 files changed

+320
-59
lines changed

Cargo.lock

Lines changed: 28 additions & 28 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/cancellation.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
//
2+
// Copyright (c) 2025 ZettaScale Technology
3+
//
4+
// This program and the accompanying materials are made available under the
5+
// terms of the Eclipse Public License 2.0 which is available at
6+
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
//
9+
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
//
11+
// Contributors:
12+
// ZettaScale Zenoh Team, <[email protected]>
13+
//
14+
use pyo3::prelude::*;
15+
16+
use crate::{macros::wrapper, utils::wait};
17+
18+
wrapper!(zenoh::cancellation::CancellationToken: Clone, Default);
19+
20+
#[pymethods]
21+
impl CancellationToken {
22+
#[new]
23+
fn new() -> Self {
24+
Self::default()
25+
}
26+
27+
#[getter]
28+
fn is_cancelled(&self) -> bool {
29+
self.0.is_cancelled()
30+
}
31+
32+
fn cancel(&self, py: Python) -> PyResult<()> {
33+
wait(py, self.0.cancel())
34+
}
35+
36+
fn __repr__(&self) -> PyResult<String> {
37+
Ok(format!("{:?}", self.0))
38+
}
39+
}

src/ext.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ impl AdvancedSubscriber {
571571
py: Python,
572572
handler: Option<&Bound<PyAny>>,
573573
) -> PyResult<SampleMissListener> {
574-
let (handler, background) = into_handler(py, handler)?;
574+
let (handler, background) = into_handler(py, handler, None)?;
575575
let builder = self.get_ref()?.sample_miss_listener();
576576
let mut listener = wait(py, builder.with(handler))?;
577577
if background {
@@ -587,7 +587,7 @@ impl AdvancedSubscriber {
587587
handler: Option<&Bound<PyAny>>,
588588
history: Option<bool>,
589589
) -> PyResult<Subscriber> {
590-
let (handler, background) = into_handler(py, handler)?;
590+
let (handler, background) = into_handler(py, handler, None)?;
591591
let builder = build!(self.get_ref()?.detect_publishers(), history);
592592
let mut subscriber = wait(py, builder.with(handler))?;
593593
if background {
@@ -808,7 +808,7 @@ pub(crate) fn declare_advanced_subscriber(
808808
recovery: Option<RecoveryConfig>,
809809
subscriber_detection: Option<bool>,
810810
) -> PyResult<AdvancedSubscriber> {
811-
let (handler, background) = into_handler(py, handler)?;
811+
let (handler, background) = into_handler(py, handler, None)?;
812812
let mut builder = build!(
813813
session.0.declare_subscriber(key_expr).advanced(),
814814
allowed_origin,

0 commit comments

Comments
 (0)