Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/wadm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ where
command_publisher,
status_publisher,
manager,
Some(self.manifest_store.clone()),
multitenant_prefix.map(|s| s.to_string()),
))
}
}
59 changes: 54 additions & 5 deletions crates/wadm/src/scaler/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use anyhow::Result;
use tracing::{error, warn};
use wadm_types::{
api::StatusInfo, CapabilityProperties, Component, ComponentProperties, ConfigProperty,
LinkProperty, Policy, Properties, SecretProperty, SharedApplicationComponentProperties,
SpreadScalerProperty, Trait, TraitProperty, DAEMONSCALER_TRAIT, LINK_TRAIT, SPREADSCALER_TRAIT,
LinkProperty, Manifest, Policy, Properties, SecretProperty,
SharedApplicationComponentProperties, SpreadScalerProperty, Trait, TraitProperty,
DAEMONSCALER_TRAIT, LINK_TRAIT, SPREADSCALER_TRAIT,
};
use wasmcloud_secrets_types::SECRET_PREFIX;

Expand Down Expand Up @@ -59,6 +60,7 @@ pub(crate) fn manifest_components_to_scalers<S, P, L>(
notifier_subject: &str,
notifier: &P,
snapshot_data: &SnapshotStore<S, L>,
deployed_apps: &[Manifest],
) -> ScalerList
where
S: ReadStore + Send + Sync + Clone + 'static,
Expand Down Expand Up @@ -103,6 +105,7 @@ where
notifier_subject,
notifier,
snapshot_data,
deployed_apps,
)
}
Properties::Capability { properties } => {
Expand Down Expand Up @@ -138,6 +141,7 @@ where
notifier_subject,
notifier,
snapshot_data,
deployed_apps,
)
}
});
Expand Down Expand Up @@ -175,6 +179,7 @@ fn component_scalers<S, P, L>(
notifier_subject: &str,
notifier: &P,
snapshot_data: &SnapshotStore<S, L>,
deployed_apps: &[Manifest],
) where
S: ReadStore + Send + Sync + Clone + 'static,
P: Publisher + Clone + Send + Sync + 'static,
Expand All @@ -185,7 +190,8 @@ fn component_scalers<S, P, L>(
let component_id = if properties.image.is_some() {
compute_component_id(manifest_name, properties.id.as_ref(), component_name)
} else {
compute_component_id(application_name, properties.id.as_ref(), component_name)
let component_id = resolve_shared_id(properties.id.as_ref(), properties.application.as_ref(), deployed_apps);
compute_component_id(application_name, component_id.as_ref(), component_name)
};
let (config_scalers, mut config_names) =
config_to_scalers(snapshot_data, manifest_name, &properties.config);
Expand Down Expand Up @@ -291,6 +297,7 @@ fn component_scalers<S, P, L>(
notifier_subject,
notifier,
snapshot_data,
deployed_apps,
)),
_ => None,
})
Expand Down Expand Up @@ -330,6 +337,7 @@ fn provider_scalers<S, P, L>(
notifier_subject: &str,
notifier: &P,
snapshot_data: &SnapshotStore<S, L>,
deployed_apps: &[Manifest],
) where
S: ReadStore + Send + Sync + Clone + 'static,
P: Publisher + Clone + Send + Sync + 'static,
Expand All @@ -339,7 +347,12 @@ fn provider_scalers<S, P, L>(
let provider_id = if properties.image.is_some() {
compute_component_id(manifest_name, properties.id.as_ref(), component_name)
} else {
compute_component_id(application_name, properties.id.as_ref(), component_name)
let component_id = resolve_shared_id(
properties.id.as_ref(),
properties.application.as_ref(),
deployed_apps,
);
compute_component_id(application_name, component_id.as_ref(), component_name)
};

let mut scaler_specified = false;
Expand Down Expand Up @@ -461,6 +474,7 @@ fn provider_scalers<S, P, L>(
notifier_subject,
notifier,
snapshot_data,
deployed_apps
)),
_ => None,
})
Expand Down Expand Up @@ -540,6 +554,7 @@ fn link_scaler<S, P, L>(
notifier_subject: &str,
notifier: &P,
snapshot_data: &SnapshotStore<S, L>,
deployed_apps: &[Manifest],
) -> BoxedScaler
where
S: ReadStore + Send + Sync + Clone + 'static,
Expand Down Expand Up @@ -597,7 +612,12 @@ where
)) as BoxedScaler;
}
};
let target = compute_component_id(target_manifest_name, target_id, target_component_name);
let target_id = resolve_shared_id(target_id, shared, deployed_apps);
let target = compute_component_id(
target_manifest_name,
target_id.as_ref(),
target_component_name,
);
Box::new(BackoffWrapper::new(
LinkScaler::new(
snapshot_data.clone(),
Expand Down Expand Up @@ -741,6 +761,35 @@ fn resolve_manifest_component<'a>(
}
}

fn resolve_shared_id<'a>(
target_id: Option<&'_ String>,
shared_app_info: Option<&'a SharedApplicationComponentProperties>,
deployed_apps: &[Manifest],
) -> Option<String> {
if let Some(shared_app) = shared_app_info {
let manifest = deployed_apps
.iter()
.find(|m| m.metadata.name == shared_app.name);

if let Some(manifest) = manifest {
manifest
.spec
.components
.iter()
.find(|comp| comp.name == shared_app.component)
.and_then(|comp| match &comp.properties {
Properties::Component { properties } => properties.id.clone(),
Properties::Capability { properties } => properties.id.clone(),
})
} else {
warn!("Shared application not found");
target_id.cloned()
}
} else {
target_id.cloned()
}
}

#[cfg(test)]
mod test {
use super::compute_component_id;
Expand Down
2 changes: 2 additions & 0 deletions crates/wadm/src/scaler/daemonscaler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,8 @@ mod test {
lattice_source,
)
.await,
None,
None,
);
let blobby_spread_property = SpreadScalerProperty {
instances: 10,
Expand Down
34 changes: 33 additions & 1 deletion crates/wadm/src/scaler/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
events::Event,
publisher::Publisher,
scaler::{Command, Scaler},
server::ModelStorage,
storage::{snapshot::SnapshotStore, ReadStore},
workers::{CommandPublisher, ConfigSource, LinkSource, SecretSource, StatusPublisher},
};
Expand Down Expand Up @@ -109,6 +110,8 @@ pub struct ScalerManager<StateStore, P: Clone, L: Clone> {
command_publisher: CommandPublisher<P>,
status_publisher: StatusPublisher<P>,
snapshot_data: SnapshotStore<StateStore, L>,
account_id: Option<String>,
manifest_store: Option<ModelStorage>,
}

impl<StateStore, P: Clone, L: Clone> Drop for ScalerManager<StateStore, P, L> {
Expand Down Expand Up @@ -179,6 +182,13 @@ where
.filter_map(|manifest| manifest.transpose())
.map(|res| res.map(|(manifest, _)| manifest))
.collect::<Result<Vec<_>>>()?;

let deployed_apps = all_manifests
.clone()
.into_iter()
.filter_map(|m| m.get_deployed().cloned())
.collect::<Vec<_>>();

let snapshot_data = SnapshotStore::new(
state_store.clone(),
link_getter.clone(),
Expand All @@ -197,6 +207,7 @@ where
&subject,
&client,
&snapshot_data,
&deployed_apps,
);
Some((name, scalers))
})
Expand All @@ -213,6 +224,8 @@ where
command_publisher,
status_publisher,
snapshot_data,
manifest_store: Some(manifest_store),
account_id: multitenant_prefix.map(|s| s.to_string()),
};
let cloned = manager.clone();
let handle = tokio::spawn(async move { cloned.notify(messages).await });
Expand Down Expand Up @@ -245,6 +258,8 @@ where
command_publisher,
status_publisher,
snapshot_data,
manifest_store: None,
account_id: None,
}
}

Expand Down Expand Up @@ -279,7 +294,11 @@ where
.ok_or_else(|| anyhow::anyhow!("Data error: scalers no longer exist after creation"))
}

pub fn scalers_for_manifest<'a>(&'a self, manifest: &'a Manifest) -> ScalerList {
pub fn scalers_for_manifest<'a>(
&'a self,
manifest: &'a Manifest,
deployed_apps: &'a [Manifest],
) -> ScalerList {
manifest_components_to_scalers(
&manifest.spec.components,
&manifest.policy_lookup(),
Expand All @@ -288,6 +307,7 @@ where
&self.subject,
&self.client,
&self.snapshot_data,
deployed_apps,
)
}

Expand Down Expand Up @@ -429,6 +449,16 @@ where

match notification {
Notifications::CreateScalers(manifest) => {
let deployed_apps = if let Some(manifest_store) = &self.manifest_store {
manifest_store
.list(self.account_id.as_deref(), &self.lattice_id)
.await?
.into_iter()
.filter_map(|manifest| manifest.get_deployed().cloned())
.collect()
} else {
Vec::new()
};
// We don't want to trigger the notification, so just create the scalers and then insert
let scalers = manifest_components_to_scalers(
&manifest.spec.components,
Expand All @@ -438,6 +468,8 @@ where
&self.subject,
&self.client,
&self.snapshot_data,
&deployed_apps

);
let num_scalers = scalers.len();
self.add_raw_scalers(&manifest.metadata.name, scalers).await;
Expand Down
2 changes: 2 additions & 0 deletions crates/wadm/src/scaler/spreadscaler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,8 @@ mod test {
lattice_source,
)
.await,
None,
None,
);
let blobby_spread_property = SpreadScalerProperty {
instances: 9,
Expand Down
30 changes: 29 additions & 1 deletion crates/wadm/src/workers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::consumers::{
use crate::events::*;
use crate::publisher::Publisher;
use crate::scaler::manager::{ScalerList, ScalerManager};
use crate::server::ModelStorage;
use crate::storage::{Component, Host, Provider, ProviderStatus, Store, WadmComponentInfo};
use crate::APP_SPEC_ANNOTATION;

Expand All @@ -25,6 +26,8 @@ pub struct EventWorker<StateStore, C: Clone, P: Clone> {
command_publisher: CommandPublisher<P>,
status_publisher: StatusPublisher<P>,
scalers: ScalerManager<StateStore, P, C>,
manifest_store: Option<ModelStorage>,
account_id: Option<String>,
}

impl<StateStore, C, P> EventWorker<StateStore, C, P>
Expand All @@ -48,13 +51,17 @@ where
command_publisher: CommandPublisher<P>,
status_publisher: StatusPublisher<P>,
manager: ScalerManager<StateStore, P, C>,
manifest_store: Option<async_nats::jetstream::kv::Store>,
account_id: Option<String>,
) -> EventWorker<StateStore, C, P> {
EventWorker {
store,
ctl_client,
command_publisher,
status_publisher,
scalers: manager,
manifest_store: manifest_store.map(ModelStorage::new),
account_id,
}
}

Expand Down Expand Up @@ -686,7 +693,20 @@ where
.scalers
.remove_raw_scalers(&data.manifest.metadata.name)
.await;
let scalers = self.scalers.scalers_for_manifest(&data.manifest);

let deployed_apps = if let Some(manifest_store) = &self.manifest_store {
manifest_store
.list(self.account_id.as_deref(), lattice_id)
.await?
.into_iter()
.filter_map(|manifest| manifest.get_deployed().cloned())
.collect()
} else {
Vec::new()
};
let scalers = self
.scalers
.scalers_for_manifest(&data.manifest, &deployed_apps);

// Refresh the snapshot data before cleaning up and/or adding scalers
self.scalers.refresh_data().await?;
Expand Down Expand Up @@ -1075,6 +1095,8 @@ mod test {
lattice_source,
)
.await,
None,
None,
);

let host1_id = "DS1";
Expand Down Expand Up @@ -1809,6 +1831,8 @@ mod test {
lattice_source,
)
.await,
None,
None,
);

let provider_id = "HYPERDRIVE";
Expand Down Expand Up @@ -1955,6 +1979,8 @@ mod test {
lattice_source,
)
.await,
None,
None,
);

let host_id = "CLOUDCITY";
Expand Down Expand Up @@ -2059,6 +2085,8 @@ mod test {
lattice_source,
)
.await,
None,
None,
);

let host_id = "jabbaspalace";
Expand Down