Skip to content

Commit 4c9963a

Browse files
authored
Async deployment of manifests with kad (eclipse-leda#53)
* make KAD async * too many periods * clean up * more cleanup * make Failed to load manifest... message debug-level * Use paths and reduce log spam for manifest partners
1 parent 8fdab5d commit 4c9963a

File tree

4 files changed

+72
-61
lines changed

4 files changed

+72
-61
lines changed

src/rust/kanto-auto-deployer/Cargo.lock

Lines changed: 21 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/rust/kanto-auto-deployer/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ build = "build.rs"
1212

1313
[dependencies]
1414
prost = "0.10.4"
15-
tokio = { version = "1.20.0", features = ["rt-multi-thread"] }
15+
tokio = { version = "1.20.0", features = ["rt-multi-thread", "fs"] }
1616
tokio-stream = { version = "0.1.12", default-features = false }
1717
tonic = { version = "0.7.2" }
1818
tower = { version = "0.4.13", default-features = false }
@@ -32,6 +32,7 @@ enclose = { version = "1.1.8", optional = true }
3232
rumqttc = { version = "0.17.0", optional = true }
3333
rustls-native-certs = { version = "=0.6.0", optional = true }
3434
lazy_static = { version = "1.4.0", optional = true}
35+
futures = "0.3.29"
3536

3637
[build-dependencies]
3738
tonic-build = "0.7.2"

src/rust/kanto-auto-deployer/src/main.rs

Lines changed: 48 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111
// * SPDX-License-Identifier: Apache-2.0
1212
// ********************************************************************************
1313
use glob::glob;
14-
use std::fs;
15-
use std::path::PathBuf;
14+
use std::path::{PathBuf, Path};
1615
use std::sync::Arc;
1716

1817
use anyhow::Result;
1918
use clap::Parser;
19+
use futures::future;
2020
use std::sync::atomic::AtomicBool;
2121
use tokio::net::UnixStream;
2222
use tokio_retry::{strategy, RetryIf};
@@ -247,8 +247,8 @@ async fn deploy_new(_client: &mut CmClient, new_cont: kanto_cnt::Container) -> R
247247
Ok(())
248248
}
249249

250-
async fn deploy(socket: &str, retries: RetryTimes, file_path: &str, recreate: bool) -> Result<()> {
251-
let container_str = fs::read_to_string(file_path)?;
250+
async fn deploy(socket: &str, retries: RetryTimes, file_path: &Path, recreate: bool) -> Result<()> {
251+
let container_str = tokio::fs::read_to_string(file_path).await?;
252252
let mut _client = get_client(socket, retries).await?;
253253
let parsed_json = manifest_parser::try_parse_manifest(&container_str);
254254
if let Ok(new_container) = parsed_json {
@@ -258,58 +258,65 @@ async fn deploy(socket: &str, retries: RetryTimes, file_path: &str, recreate: bo
258258
.iter()
259259
.find(|c| c.name == new_container.name);
260260
if let Some(existing_cont) = existing_instance {
261-
return handle_existing(&mut _client, new_container, existing_cont, recreate).await;
261+
handle_existing(&mut _client, new_container, existing_cont, recreate).await
262262
} else {
263-
return deploy_new(&mut _client, new_container).await;
263+
deploy_new(&mut _client, new_container).await
264264
}
265265
} else {
266-
log::error!("Wrong json in [{}]", file_path);
266+
Err(anyhow::anyhow!("Wrong json in [{:?}]", file_path))
267267
}
268-
Ok(())
269268
}
270269

271270
async fn deploy_directory(directory_path: &str, socket: &str, retries: RetryTimes) -> Result<()> {
272-
let mut file_path = String::from(directory_path);
273-
let mut path = String::new();
274-
275-
path.push_str(&file_path.clone());
276-
file_path.push_str("/*.json");
277-
278-
let mut b_found = false;
279-
280-
log::info!("Reading manifests from [{}]", path);
281-
282-
let mut full_name = String::new();
283-
for entry in glob(&file_path).expect("Failed to parse glob pattern") {
284-
let name = entry
285-
.expect("Path to entry is unreadable")
286-
.display()
287-
.to_string();
288-
full_name.push_str(&name);
289-
b_found = true;
290-
match deploy(socket, retries, &full_name, false).await {
291-
Ok(_) => {}
292-
Err(e) => log::error!("[CM error] Failed to create: {:?}", e.root_cause()),
293-
};
294-
full_name.clear();
271+
let manifest_glob = format!("{}/*.json", directory_path);
272+
log::info!("Reading manifests from [{}]", directory_path);
273+
274+
let found_manifest_paths: Vec<PathBuf> = glob(&manifest_glob)?
275+
.filter_map(Result::ok)
276+
.collect();
277+
if found_manifest_paths.is_empty() {
278+
return Err(anyhow::anyhow!("No manifests found in {directory_path}"));
295279
}
296-
if !b_found {
297-
log::error!("No manifests are found in [{}]", path);
280+
281+
let deployments = future::join_all(
282+
found_manifest_paths
283+
.iter()
284+
.map(|p| deploy(socket, retries, p, false)),
285+
)
286+
.await;
287+
288+
let (successful, failed): (Vec<_>, Vec<_>) = deployments.into_iter().partition(Result::is_ok);
289+
290+
log::debug!(
291+
"Successfully deployed {}, Failed: {}, Out of {}",
292+
successful.len(),
293+
failed.len(),
294+
found_manifest_paths.len()
295+
);
296+
297+
failed
298+
.iter()
299+
.for_each(|e| log::error!("[CM error] {:?}", e));
300+
301+
if !failed.is_empty() {
302+
return Err(anyhow::anyhow!(
303+
"One or more deployments failed. Check the logs above for more information."
304+
));
298305
}
306+
299307
Ok(())
300308
}
301309

302310
#[cfg(feature = "filewatcher")]
303-
async fn redeploy_on_change(e: fs_watcher::Event, socket: &str) {
311+
async fn redeploy_on_change(event: fs_watcher::Event, socket: &str) {
304312
// In daemon mode we wait until a connection is available to proceed
305313
// Unwrapping in this case is safe.
306-
for path in &e.paths {
314+
for path in &event.paths {
307315
if !is_filetype(path, "json") {
308316
continue;
309317
}
310-
if e.kind.is_create() || e.kind.is_modify() {
311-
let json_path = String::from(path.to_string_lossy());
312-
if let Err(e) = deploy(socket, RetryTimes::Forever, &json_path, true).await {
318+
if event.kind.is_create() || event.kind.is_modify() {
319+
if let Err(e) = deploy(socket, RetryTimes::Forever, path, true).await {
313320
log::error!("[CM error] {:?}", e.root_cause());
314321
};
315322
}
@@ -351,7 +358,9 @@ async fn main() -> Result<()> {
351358
}
352359

353360
// One-shot deployment of all manifests in directory
354-
deploy_directory(&manifests_path, &socket_path, retry_times).await?;
361+
if let Err(e) = deploy_directory(&manifests_path, &socket_path, retry_times).await {
362+
log::error!("Failed to deploy directory: {e}")
363+
}
355364

356365
#[cfg(feature = "filewatcher")]
357366
if cli.daemon {

src/rust/kanto-auto-deployer/src/manifest_parser.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ pub fn try_parse_manifest(container_str: &str) -> Result<Container, Box<dyn std:
125125
ctr
126126
}
127127
Err(_) => {
128-
log::warn!("Failed to load manifest directly. Will attempt auto-conversion from init-dir format.");
128+
log::debug!("Failed to load manifest directly. Will attempt auto-conversion from init-dir format.");
129129
let manifest = serde_json::from_str(container_str)?;
130130
let manifest = expand_container_manifest(&manifest)?;
131131
let internal_state = map_to_internal_state_manifest(manifest)?;

0 commit comments

Comments
 (0)