Skip to content

Commit 5e4214a

Browse files
committed
Update tests to tokio from async-std
1 parent 0f99af9 commit 5e4214a

13 files changed

+68
-42
lines changed

Cargo.lock

Lines changed: 12 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

up-streamer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,4 @@ usubscription-static-file = {path="../utils/usubscription-static-file"}
3939
async-broadcast = { version = "0.7.0" }
4040
chrono = { version = "0.4.31", features = [] }
4141
integration-test-utils = { path = "../utils/integration-test-utils" }
42+
tokio-condvar = { version = "0.3.0" }

up-streamer/src/endpoint.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
* SPDX-License-Identifier: Apache-2.0
1212
********************************************************************************/
1313

14-
use std::sync::Arc;
1514
use log::*;
15+
use std::sync::Arc;
1616
use up_rust::UTransport;
1717

1818
const ENDPOINT_TAG: &str = "Endpoint:";
@@ -26,7 +26,7 @@ const ENDPOINT_FN_NEW_TAG: &str = "new():";
2626
///
2727
/// ```
2828
/// use std::sync::Arc;
29-
/// use async_std::sync::Mutex;
29+
/// use tokio::sync::Mutex;
3030
/// use up_rust::UTransport;
3131
/// use up_streamer::Endpoint;
3232
///

up-streamer/src/ustreamer.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ const USTREAMER_FN_DELETE_FORWARDING_RULE_TAG: &str = "delete_forwarding_rule():
4141
const THREAD_NUM: usize = 10;
4242

4343
// Create a separate tokio Runtime for running the callback
44-
lazy_static::lazy_static! {
44+
lazy_static! {
4545
static ref CB_RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread()
46-
.worker_threads(THREAD_NUM)
47-
.enable_all()
48-
.build()
49-
.expect("Unable to create callback runtime");
46+
.worker_threads(THREAD_NUM)
47+
.enable_all()
48+
.build()
49+
.expect("Unable to create callback runtime");
5050
}
5151

5252
fn uauthority_to_uuri(authority_name: &str) -> UUri {
@@ -367,7 +367,7 @@ impl ForwardingListeners {
367367
/// use usubscription_static_file::USubscriptionStaticFile;
368368
/// use std::sync::Arc;
369369
/// use std::path::PathBuf;
370-
/// use async_std::sync::Mutex;
370+
/// use tokio::sync::Mutex;
371371
/// use up_rust::{UListener, UTransport};
372372
/// use up_streamer::{Endpoint, UStreamer};
373373
/// # pub mod up_client_foo {
@@ -1054,7 +1054,7 @@ mod tests {
10541054
}
10551055
}
10561056

1057-
#[async_std::test]
1057+
#[tokio::test(flavor = "multi_thread")]
10581058
async fn test_simple_with_a_single_input_and_output_endpoint() {
10591059
// Local endpoint
10601060
let local_authority = "local";
@@ -1123,7 +1123,7 @@ mod tests {
11231123
.is_err());
11241124
}
11251125

1126-
#[async_std::test]
1126+
#[tokio::test(flavor = "multi_thread")]
11271127
async fn test_advanced_where_there_is_a_local_endpoint_and_two_remote_endpoints() {
11281128
// Local endpoint
11291129
let local_authority = "local";
@@ -1188,7 +1188,7 @@ mod tests {
11881188
.is_ok());
11891189
}
11901190

1191-
#[async_std::test]
1191+
#[tokio::test(flavor = "multi_thread")]
11921192
async fn test_advanced_where_there_is_a_local_endpoint_and_two_remote_endpoints_but_the_remote_endpoints_have_the_same_instance_of_utransport(
11931193
) {
11941194
// Local endpoint

up-streamer/tests/single_local_single_remote.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
********************************************************************************/
1313

1414
use async_broadcast::broadcast;
15-
use async_std::sync::{Condvar, Mutex};
16-
use async_std::task;
1715
use futures::future::join;
1816
use integration_test_utils::{
1917
check_messages_in_order, check_send_receive_message_discrepancy, local_authority,
@@ -29,14 +27,16 @@ use log::debug;
2927
use std::sync::atomic::{AtomicU64, Ordering};
3028
use std::sync::Arc;
3129
use std::time::Duration;
30+
use tokio::sync::Mutex;
31+
use tokio_condvar::Condvar;
3232
use up_rust::{UListener, UTransport};
3333
use up_streamer::{Endpoint, UStreamer};
3434
use usubscription_static_file::USubscriptionStaticFile;
3535

3636
const DURATION_TO_RUN_CLIENTS: u128 = 1_000;
3737
const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000;
3838

39-
#[async_std::test]
39+
#[tokio::test(flavor = "multi_thread")]
4040
async fn single_local_single_remote() {
4141
// using async_broadcast to simulate communication protocol
4242
let (tx_1, rx_1) = broadcast(10000);
@@ -175,7 +175,7 @@ async fn single_local_single_remote() {
175175

176176
debug!("after signal_to_resume");
177177

178-
task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
178+
tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
179179

180180
debug!("past wait on clients to run, now tell them to stop");
181181
{

up-streamer/tests/single_local_two_remote_add_remove_rules.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
********************************************************************************/
1313

1414
use async_broadcast::broadcast;
15-
use async_std::sync::{Condvar, Mutex};
16-
use async_std::task;
1715
use futures::future::join;
1816
use integration_test_utils::{
1917
check_messages_in_order, check_send_receive_message_discrepancy, local_authority,
@@ -30,14 +28,16 @@ use log::debug;
3028
use std::sync::atomic::{AtomicU64, Ordering};
3129
use std::sync::Arc;
3230
use std::time::Duration;
31+
use tokio::sync::Mutex;
32+
use tokio_condvar::Condvar;
3333
use up_rust::{UListener, UTransport};
3434
use up_streamer::{Endpoint, UStreamer};
3535
use usubscription_static_file::USubscriptionStaticFile;
3636

3737
const DURATION_TO_RUN_CLIENTS: u128 = 500;
3838
const SENT_MESSAGE_VEC_CAPACITY: usize = 20_000;
3939

40-
#[async_std::test]
40+
#[tokio::test(flavor = "multi_thread")]
4141
async fn single_local_two_remote_add_remove_rules() {
4242
// using async_broadcast to simulate communication protocol
4343
let (tx_1, rx_1) = broadcast(20000);
@@ -206,7 +206,7 @@ async fn single_local_two_remote_add_remove_rules() {
206206

207207
debug!("signalled to resume");
208208

209-
task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
209+
tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
210210

211211
{
212212
let mut local_command = local_command.lock().await;
@@ -289,7 +289,7 @@ async fn single_local_two_remote_add_remove_rules() {
289289

290290
debug!("signalled local, remote_a, remote_b to resume");
291291

292-
task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
292+
tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
293293

294294
debug!("after running local, remote_a, remote_b");
295295

@@ -345,7 +345,7 @@ async fn single_local_two_remote_add_remove_rules() {
345345

346346
debug!("signalled all to resume: local & remote_b");
347347

348-
task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
348+
tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
349349

350350
{
351351
let mut local_command = local_command.lock().await;

up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
********************************************************************************/
1313

1414
use async_broadcast::broadcast;
15-
use async_std::sync::{Condvar, Mutex};
16-
use async_std::task;
1715
use futures::future::join;
1816
use integration_test_utils::{
1917
check_messages_in_order, check_send_receive_message_discrepancy, local_authority,
@@ -30,14 +28,16 @@ use log::debug;
3028
use std::sync::atomic::{AtomicU64, Ordering};
3129
use std::sync::Arc;
3230
use std::time::Duration;
31+
use tokio::sync::Mutex;
32+
use tokio_condvar::Condvar;
3333
use up_rust::{UListener, UTransport};
3434
use up_streamer::{Endpoint, UStreamer};
3535
use usubscription_static_file::USubscriptionStaticFile;
3636

3737
const DURATION_TO_RUN_CLIENTS: u128 = 1_000;
3838
const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000;
3939

40-
#[async_std::test]
40+
#[tokio::test(flavor = "multi_thread")]
4141
async fn single_local_two_remote_authorities_different_remote_transport() {
4242
// using async_broadcast to simulate communication protocol
4343
let (tx_1, rx_1) = broadcast(20000);
@@ -253,7 +253,7 @@ async fn single_local_two_remote_authorities_different_remote_transport() {
253253
// Now signal both clients to resume
254254
signal_to_resume(all_signal_should_pause.clone()).await;
255255

256-
task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
256+
tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
257257

258258
{
259259
let mut local_command = local_command.lock().await;

up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
********************************************************************************/
1313

1414
use async_broadcast::broadcast;
15-
use async_std::sync::{Condvar, Mutex};
16-
use async_std::task;
1715
use futures::future::join;
1816
use integration_test_utils::{
1917
check_messages_in_order, check_send_receive_message_discrepancy, local_authority,
@@ -30,14 +28,16 @@ use log::debug;
3028
use std::sync::atomic::{AtomicU64, Ordering};
3129
use std::sync::Arc;
3230
use std::time::Duration;
31+
use tokio::sync::Mutex;
32+
use tokio_condvar::Condvar;
3333
use up_rust::{UListener, UTransport};
3434
use up_streamer::{Endpoint, UStreamer};
3535
use usubscription_static_file::USubscriptionStaticFile;
3636

3737
const DURATION_TO_RUN_CLIENTS: u128 = 1_000;
3838
const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000;
3939

40-
#[async_std::test]
40+
#[tokio::test(flavor = "multi_thread")]
4141
async fn single_local_two_remote_authorities_same_remote_transport() {
4242
// using async_broadcast to simulate communication protocol
4343
let (tx_1, rx_1) = broadcast(20000);
@@ -257,7 +257,7 @@ async fn single_local_two_remote_authorities_same_remote_transport() {
257257
// Now signal both clients to resume
258258
signal_to_resume(all_signal_should_pause.clone()).await;
259259

260-
task::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
260+
tokio::time::sleep(Duration::from_millis(DURATION_TO_RUN_CLIENTS as u64)).await;
261261

262262
{
263263
let mut local_command = local_command.lock().await;

up-streamer/tests/usubscription.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use up_rust::{UCode, UStatus, UTransport};
1717
use up_streamer::{Endpoint, UStreamer};
1818
use usubscription_static_file::USubscriptionStaticFile;
1919

20-
#[async_std::test]
20+
#[tokio::test(flavor = "multi_thread")]
2121
async fn usubscription_bad_data() {
2222
let utransport_foo: Arc<dyn UTransport> =
2323
Arc::new(UPClientFailingRegister::new("upclient_foo").await);

utils/integration-test-utils/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ license.workspace = true
2323

2424
[dependencies]
2525
async-broadcast = { version = "0.7.0" }
26-
async-std = { workspace = true, features = ["unstable"] }
2726
async-trait = { workspace = true }
2827
env_logger = { workspace = true }
2928
futures = { workspace = true }
@@ -33,3 +32,5 @@ serde_json = { workspace = true }
3332
up-rust = { workspace = true }
3433
up-streamer = { path = "../../up-streamer" }
3534
rand = "0.8.5"
35+
tokio = { workspace = true }
36+
tokio-condvar = { version = "0.3.0" }

0 commit comments

Comments
 (0)