Skip to content

Commit cb97ae8

Browse files
dicejalexcrichton
andauthored
allow recursive Wasm invocation from concurrent host functions (#12152)
* allow recursive Wasm invocation from concurrent host functions The core changes here are: - remove an unnecessary assertion from `concurrent::prepare_call` - track instance states (e.g. backpressure, etc.) on a per `(Instance, RuntimeComponentInstanceIndex)` basis - both parts of that key are needed now that concurrent state is tracked on a per-store basis rather than a per-instance basis since `RuntimeComponentInstanceIndex`es are not globally unique While discussing the above with Alex, we realized the use of a `HashMap` to track per-instance states was both pessimal and unnecessary. Consequently, I've folded that state into `ComponentInstance::instance_handle_tables`, renaming it to `instance_states`. That involved a fair amount of code churn, but doesn't change behavior except as described in the second bullet point above. Thanks to Alex for the test case! Fixes #12098 Co-authored-by: Alex Crichton <[email protected]> Signed-off-by: Joel Dice <[email protected]> * use new `RuntimeInstance` type instead of tuples Signed-off-by: Joel Dice <[email protected]> --------- Signed-off-by: Joel Dice <[email protected]> Co-authored-by: Alex Crichton <[email protected]>
1 parent 05a711f commit cb97ae8

File tree

10 files changed

+381
-176
lines changed

10 files changed

+381
-176
lines changed

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 231 additions & 136 deletions
Large diffs are not rendered by default.

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2492,7 +2492,7 @@ impl<T> StoreContextMut<'_, T> {
24922492
*producer.lock().unwrap() = Some((mine, buffer));
24932493

24942494
if write_buffer {
2495-
write( token, id, producer.clone(), kind).await?;
2495+
write(token, id, producer.clone(), kind).await?;
24962496
}
24972497

24982498
Ok(if dropped {
@@ -4261,21 +4261,21 @@ impl Instance {
42614261

42624262
impl ComponentInstance {
42634263
fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4264-
let (tables, types) = self.guest_tables();
4264+
let (states, types) = self.instance_states();
42654265
let runtime_instance = match ty {
42664266
TransmitIndex::Stream(ty) => types[ty].instance,
42674267
TransmitIndex::Future(ty) => types[ty].instance,
42684268
};
4269-
&mut tables[runtime_instance]
4269+
states[runtime_instance].handle_table()
42704270
}
42714271

42724272
fn table_for_error_context(
42734273
self: Pin<&mut Self>,
42744274
ty: TypeComponentLocalErrorContextTableIndex,
42754275
) -> &mut HandleTable {
4276-
let (tables, types) = self.guest_tables();
4276+
let (states, types) = self.instance_states();
42774277
let runtime_instance = types[ty].instance;
4278-
&mut tables[runtime_instance]
4278+
states[runtime_instance].handle_table()
42794279
}
42804280

42814281
fn get_mut_by_index(
@@ -4456,7 +4456,8 @@ impl Waitable {
44564456
} => {
44574457
let instance = instance.id().get_mut(store);
44584458
let runtime_instance = instance.component().types()[ty].instance;
4459-
let (rep, state) = instance.guest_tables().0[runtime_instance]
4459+
let (rep, state) = instance.instance_states().0[runtime_instance]
4460+
.handle_table()
44604461
.future_rep(ty, handle)
44614462
.unwrap();
44624463
assert_eq!(rep, self.rep());
@@ -4477,7 +4478,8 @@ impl Waitable {
44774478
} => {
44784479
let instance = instance.id().get_mut(store);
44794480
let runtime_instance = instance.component().types()[ty].instance;
4480-
let (rep, state) = instance.guest_tables().0[runtime_instance]
4481+
let (rep, state) = instance.instance_states().0[runtime_instance]
4482+
.handle_table()
44814483
.stream_rep(ty, handle)
44824484
.unwrap();
44834485
assert_eq!(rep, self.rep());

crates/wasmtime/src/runtime/component/func.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -794,7 +794,7 @@ impl Func {
794794
ResourceTables {
795795
host_table: Some(host_table),
796796
calls,
797-
guest: Some(instance.guest_tables()),
797+
guest: Some(instance.instance_states()),
798798
}
799799
.exit_call()?;
800800
}

crates/wasmtime/src/runtime/component/func/options.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ impl<'a, T: 'static> LowerContext<'a, T> {
286286
ResourceTables {
287287
host_table: Some(host_table),
288288
calls,
289-
guest: Some(instance.guest_tables()),
289+
guest: Some(instance.instance_states()),
290290
},
291291
host_resource_data,
292292
)
@@ -473,7 +473,7 @@ impl<'a> LiftContext<'a> {
473473
calls: self.calls,
474474
// Note that the unsafety here should be valid given the contract of
475475
// `LiftContext::new`.
476-
guest: Some(self.instance.as_mut().guest_tables()),
476+
guest: Some(self.instance.as_mut().instance_states()),
477477
},
478478
self.host_resource_data,
479479
)

crates/wasmtime/src/runtime/component/instance.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ fn resource_tables<'a>(
659659
ResourceTables {
660660
host_table: None,
661661
calls,
662-
guest: Some(instance.guest_tables()),
662+
guest: Some(instance.instance_states()),
663663
}
664664
}
665665

crates/wasmtime/src/runtime/component/store.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl ComponentStoreData {
4949
}
5050

5151
#[cfg(feature = "component-model-async")]
52-
pub(crate) fn assert_guest_tables_empty(&mut self) {
52+
pub(crate) fn assert_instance_states_empty(&mut self) {
5353
for (_, instance) in self.instances.iter_mut() {
5454
let Some(instance) = instance.as_mut() else {
5555
continue;
@@ -58,10 +58,11 @@ impl ComponentStoreData {
5858
assert!(
5959
instance
6060
.get_mut()
61-
.guest_tables()
61+
.instance_states()
6262
.0
63-
.iter()
64-
.all(|(_, table)| table.is_empty())
63+
.iter_mut()
64+
.all(|(_, state)| state.handle_table().is_empty()
65+
&& state.concurrent_state().pending_is_empty())
6566
);
6667
}
6768
}

crates/wasmtime/src/runtime/vm/component.rs

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
use crate::component::{Component, Instance, InstancePre, ResourceType, RuntimeImport};
1010
use crate::module::ModuleRegistry;
1111
use crate::runtime::component::ComponentInstanceId;
12+
#[cfg(feature = "component-model-async")]
13+
use crate::runtime::component::concurrent::ConcurrentInstanceState;
1214
use crate::runtime::vm::instance::{InstanceLayout, OwnedInstance, OwnedVMContext};
1315
use crate::runtime::vm::vmcontext::VMFunctionBody;
1416
use crate::runtime::vm::{
@@ -45,6 +47,35 @@ pub use self::handle_table::{TransmitLocalState, Waitable};
4547
pub use self::resources::CallContext;
4648
pub use self::resources::{CallContexts, ResourceTables, TypedResource, TypedResourceIndex};
4749

50+
/// Represents the state of a (sub-)component instance.
51+
#[derive(Default)]
52+
pub struct InstanceState {
53+
/// Represents the Component Model Async state of a (sub-)component instance.
54+
#[cfg(feature = "component-model-async")]
55+
concurrent_state: ConcurrentInstanceState,
56+
57+
/// State of handles (e.g. resources, waitables, etc.) for this instance.
58+
///
59+
/// For resource handles, this is paired with other information to create a
60+
/// `ResourceTables` and manipulated through that. For other handles, this
61+
/// is used directly to translate guest handles to host representations and
62+
/// vice-versa.
63+
handle_table: HandleTable,
64+
}
65+
66+
impl InstanceState {
67+
/// Represents the Component Model Async state of a (sub-)component instance.
68+
#[cfg(feature = "component-model-async")]
69+
pub fn concurrent_state(&mut self) -> &mut ConcurrentInstanceState {
70+
&mut self.concurrent_state
71+
}
72+
73+
/// State of handles (e.g. resources, waitables, etc.) for this instance.
74+
pub fn handle_table(&mut self) -> &mut HandleTable {
75+
&mut self.handle_table
76+
}
77+
}
78+
4879
/// Runtime representation of a component instance and all state necessary for
4980
/// the instance itself.
5081
///
@@ -86,13 +117,9 @@ pub struct ComponentInstance {
86117
// borrowing a store mutably at the same time as a contained instance.
87118
component: Component,
88119

89-
/// State of handles (e.g. resources, waitables, etc.) for this component.
90-
///
91-
/// For resource handles, this is paired with other information to create a
92-
/// `ResourceTables` and manipulated through that. For other handles, this
93-
/// is used directly to translate guest handles to host representations and
94-
/// vice-versa.
95-
instance_handle_tables: PrimaryMap<RuntimeComponentInstanceIndex, HandleTable>,
120+
/// Contains state specific to each (sub-)component instance within this
121+
/// top-level instance.
122+
instance_states: PrimaryMap<RuntimeComponentInstanceIndex, InstanceState>,
96123

97124
/// What all compile-time-identified core instances are mapped to within the
98125
/// `Store` that this component belongs to.
@@ -293,16 +320,15 @@ impl ComponentInstance {
293320
) -> OwnedComponentInstance {
294321
let offsets = VMComponentOffsets::new(HostPtr, component.env_component());
295322
let num_instances = component.env_component().num_runtime_component_instances;
296-
let mut instance_handle_tables =
297-
PrimaryMap::with_capacity(num_instances.try_into().unwrap());
323+
let mut instance_states = PrimaryMap::with_capacity(num_instances.try_into().unwrap());
298324
for _ in 0..num_instances {
299-
instance_handle_tables.push(HandleTable::default());
325+
instance_states.push(InstanceState::default());
300326
}
301327

302328
let mut ret = OwnedInstance::new(ComponentInstance {
303329
id,
304330
offsets,
305-
instance_handle_tables,
331+
instance_states,
306332
instances: PrimaryMap::with_capacity(
307333
component
308334
.env_component()
@@ -825,22 +851,30 @@ impl ComponentInstance {
825851
resource_instance == component.defined_resource_instances[idx]
826852
}
827853

828-
/// Returns the runtime state of resources associated with this component.
854+
/// Returns the runtime state of resources and concurrency associated with
855+
/// this component.
829856
#[inline]
830-
pub fn guest_tables(
857+
pub fn instance_states(
831858
self: Pin<&mut Self>,
832859
) -> (
833-
&mut PrimaryMap<RuntimeComponentInstanceIndex, HandleTable>,
860+
&mut PrimaryMap<RuntimeComponentInstanceIndex, InstanceState>,
834861
&ComponentTypes,
835862
) {
836863
// safety: we've chosen the `pin` guarantee of `self` to not apply to
837864
// the map returned.
838865
unsafe {
839866
let me = self.get_unchecked_mut();
840-
(&mut me.instance_handle_tables, me.component.types())
867+
(&mut me.instance_states, me.component.types())
841868
}
842869
}
843870

871+
pub fn instance_state(
872+
self: Pin<&mut Self>,
873+
instance: RuntimeComponentInstanceIndex,
874+
) -> Option<&mut InstanceState> {
875+
self.instance_states().0.get_mut(instance)
876+
}
877+
844878
/// Returns the destructor and instance flags for the specified resource
845879
/// table type.
846880
///

crates/wasmtime/src/runtime/vm/component/libcalls.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::component::Instance;
55
use crate::component::concurrent::WaitResult;
66
use crate::prelude::*;
77
#[cfg(feature = "component-model-async")]
8-
use crate::runtime::component::concurrent::ResourcePair;
8+
use crate::runtime::component::concurrent::{ResourcePair, RuntimeInstance};
99
use crate::runtime::vm::component::{ComponentInstance, VMComponentContext};
1010
use crate::runtime::vm::{HostResultHasUnwindSentinel, VMStore, VmSafe};
1111
use core::cell::Cell;
@@ -672,12 +672,15 @@ fn trap(_store: &mut dyn VMStore, _instance: Instance, code: u8) -> Result<()> {
672672
#[cfg(feature = "component-model-async")]
673673
fn backpressure_modify(
674674
store: &mut dyn VMStore,
675-
_instance: Instance,
675+
instance: Instance,
676676
caller_instance: u32,
677677
increment: u8,
678678
) -> Result<()> {
679-
store.concurrent_state_mut().backpressure_modify(
680-
RuntimeComponentInstanceIndex::from_u32(caller_instance),
679+
store.backpressure_modify(
680+
RuntimeInstance {
681+
instance: instance.id().instance(),
682+
index: RuntimeComponentInstanceIndex::from_u32(caller_instance),
683+
},
681684
|old| {
682685
if increment != 0 {
683686
old.checked_add(1)

crates/wasmtime/src/runtime/vm/component/resources.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
//! about ABI details can be found in lifting/lowering throughout Wasmtime,
2121
//! namely in the `Resource<T>` and `ResourceAny` types.
2222
23-
use super::{HandleTable, RemovedResource};
23+
use super::{HandleTable, InstanceState, RemovedResource};
2424
use crate::prelude::*;
2525
use core::error::Error;
2626
use core::fmt;
@@ -53,7 +53,7 @@ pub struct ResourceTables<'a> {
5353
/// `ResourceAny::resource_drop` which won't consult this table as it's
5454
/// only operating over the host table.
5555
pub guest: Option<(
56-
&'a mut PrimaryMap<RuntimeComponentInstanceIndex, HandleTable>,
56+
&'a mut PrimaryMap<RuntimeComponentInstanceIndex, InstanceState>,
5757
&'a ComponentTypes,
5858
)>,
5959

@@ -196,8 +196,8 @@ impl ResourceTables<'_> {
196196
match resource {
197197
TypedResource::Host(_) => self.host_table.as_mut().unwrap(),
198198
TypedResource::Component { ty, .. } => {
199-
let (tables, types) = self.guest.as_mut().unwrap();
200-
&mut tables[types[*ty].unwrap_concrete_instance()]
199+
let (states, types) = self.guest.as_mut().unwrap();
200+
states[types[*ty].unwrap_concrete_instance()].handle_table()
201201
}
202202
}
203203
}
@@ -206,8 +206,8 @@ impl ResourceTables<'_> {
206206
match index {
207207
TypedResourceIndex::Host(_) => self.host_table.as_mut().unwrap(),
208208
TypedResourceIndex::Component { ty, .. } => {
209-
let (tables, types) = self.guest.as_mut().unwrap();
210-
&mut tables[types[*ty].unwrap_concrete_instance()]
209+
let (states, types) = self.guest.as_mut().unwrap();
210+
states[types[*ty].unwrap_concrete_instance()].handle_table()
211211
}
212212
}
213213
}

tests/all/component_model/async.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,3 +767,73 @@ async fn cancel_host_future() -> Result<()> {
767767
}
768768
}
769769
}
770+
771+
#[tokio::test]
772+
#[cfg_attr(miri, ignore)]
773+
async fn run_wasm_in_call_async() -> Result<()> {
774+
_ = env_logger::try_init();
775+
776+
let mut config = Config::new();
777+
config.async_support(true);
778+
config.wasm_component_model_async(true);
779+
let engine = Engine::new(&config)?;
780+
781+
let a = Component::new(
782+
&engine,
783+
r#"
784+
(component
785+
(type $t (func async))
786+
(import "a" (func $f (type $t)))
787+
(core func $f (canon lower (func $f)))
788+
(core module $a
789+
(import "" "f" (func $f))
790+
(func (export "run") call $f)
791+
)
792+
(core instance $a (instantiate $a
793+
(with "" (instance (export "f" (func $f))))
794+
))
795+
(func (export "run") (type $t)
796+
(canon lift (core func $a "run")))
797+
)
798+
"#,
799+
)?;
800+
let b = Component::new(
801+
&engine,
802+
r#"
803+
(component
804+
(type $t (func async))
805+
(core module $a
806+
(func (export "run"))
807+
)
808+
(core instance $a (instantiate $a))
809+
(func (export "run") (type $t)
810+
(canon lift (core func $a "run")))
811+
)
812+
"#,
813+
)?;
814+
815+
type State = Option<Instance>;
816+
817+
let mut linker = Linker::new(&engine);
818+
linker
819+
.root()
820+
.func_wrap_concurrent("a", |accessor: &Accessor<State>, (): ()| {
821+
Box::pin(async move {
822+
let func = accessor.with(|mut access| {
823+
access
824+
.get()
825+
.unwrap()
826+
.get_typed_func::<(), ()>(&mut access, "run")
827+
})?;
828+
func.call_concurrent(accessor, ()).await?;
829+
Ok(())
830+
})
831+
})?;
832+
let mut store = Store::new(&engine, None);
833+
let instance_a = linker.instantiate_async(&mut store, &a).await?;
834+
let instance_b = linker.instantiate_async(&mut store, &b).await?;
835+
*store.data_mut() = Some(instance_b);
836+
let run = instance_a.get_typed_func::<(), ()>(&mut store, "run")?;
837+
run.call_async(&mut store, ()).await?;
838+
Ok(())
839+
}

0 commit comments

Comments
 (0)