Skip to content

Commit 9179f1a

Browse files
committed
address review feedback
- Assert that `StoreOpaque::suspend` is not called in a non-blocking context except in specific circumstances - Typecheck async-ness for dynamic host functions - Use type parameter instead of value parameter in `call_host[_dynamic]` Signed-off-by: Joel Dice <[email protected]>
1 parent 46cd6d0 commit 9179f1a

File tree

2 files changed

+71
-18
lines changed

2 files changed

+71
-18
lines changed

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,7 @@ enum SuspendReason {
574574
Waiting {
575575
set: TableId<WaitableSet>,
576576
thread: QualifiedThreadId,
577+
skip_may_block_check: bool,
577578
},
578579
/// The fiber has finished handling its most recent work item and is waiting
579580
/// for another (or to be dropped if it is no longer needed).
@@ -582,7 +583,10 @@ enum SuspendReason {
582583
/// chance to run.
583584
Yielding { thread: QualifiedThreadId },
584585
/// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
585-
ExplicitlySuspending { thread: QualifiedThreadId },
586+
ExplicitlySuspending {
587+
thread: QualifiedThreadId,
588+
skip_may_block_check: bool,
589+
},
586590
}
587591

588592
/// Represents a pending call into guest code for a given guest task.
@@ -805,6 +809,7 @@ pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
805809
store.suspend(SuspendReason::Waiting {
806810
set,
807811
thread: caller,
812+
skip_may_block_check: false,
808813
})?;
809814
}
810815
}
@@ -1445,7 +1450,7 @@ impl StoreOpaque {
14451450
SuspendReason::ExplicitlySuspending { thread, .. } => {
14461451
state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
14471452
}
1448-
SuspendReason::Waiting { set, thread } => {
1453+
SuspendReason::Waiting { set, thread, .. } => {
14491454
let old = state
14501455
.get_mut(set)?
14511456
.waiting
@@ -1485,6 +1490,26 @@ impl StoreOpaque {
14851490
None
14861491
};
14871492

1493+
// We should not have reached here unless either there's no current
1494+
// task, or the current task is permitted to block. In addition, we
1495+
// special-case `thread.switch-to` and waiting for a subtask to go from
1496+
// `starting` to `started`, both of which we consider non-blocking
1497+
// operations despite requiring a suspend.
1498+
assert!(
1499+
matches!(
1500+
reason,
1501+
SuspendReason::ExplicitlySuspending {
1502+
skip_may_block_check: true,
1503+
..
1504+
} | SuspendReason::Waiting {
1505+
skip_may_block_check: true,
1506+
..
1507+
}
1508+
) || old_guest_thread
1509+
.map(|thread| self.concurrent_state_mut().may_block(thread.task))
1510+
.unwrap_or(true)
1511+
);
1512+
14881513
let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
14891514
assert!(suspend_reason.is_none());
14901515
*suspend_reason = Some(reason);
@@ -1540,6 +1565,7 @@ impl StoreOpaque {
15401565
self.suspend(SuspendReason::Waiting {
15411566
set,
15421567
thread: caller,
1568+
skip_may_block_check: false,
15431569
})?;
15441570
let state = self.concurrent_state_mut();
15451571
waitable.join(state, old_set)
@@ -2308,6 +2334,7 @@ impl Instance {
23082334
let async_caller = storage.is_none();
23092335
let state = store.0.concurrent_state_mut();
23102336
let guest_thread = state.guest_thread.unwrap();
2337+
let callee_async = state.get_mut(guest_thread.task)?.async_function;
23112338
let may_enter_after_call = state
23122339
.get_mut(guest_thread.task)?
23132340
.call_post_return_automatically();
@@ -2401,6 +2428,14 @@ impl Instance {
24012428
store.0.suspend(SuspendReason::Waiting {
24022429
set,
24032430
thread: caller,
2431+
// Normally, `StoreOpaque::suspend` would assert it's being
2432+
// called from a context where blocking is allowed. However, if
2433+
// `async_caller` is `true`, we'll only "block" long enough for
2434+
// the callee to start, i.e. we won't repeat this loop, so we
2435+
// tell `suspend` it's okay even if we're not allowed to block.
2436+
// Alternatively, if the callee is not an async function, then
2437+
// we know it won't block anyway.
2438+
skip_may_block_check: async_caller || !callee_async,
24042439
})?;
24052440

24062441
let state = store.0.concurrent_state_mut();
@@ -2885,6 +2920,9 @@ impl Instance {
28852920
self.id().get(store).check_may_leave(caller)?;
28862921

28872922
if !self.options(store, options).async_ {
2923+
// The caller may only call `waitable-set.wait` from an async task
2924+
// (i.e. a task created via a call to an async export).
2925+
// Otherwise, we'll trap.
28882926
store.concurrent_state_mut().check_blocking()?;
28892927
}
28902928

@@ -3136,6 +3174,10 @@ impl Instance {
31363174
} else {
31373175
SuspendReason::ExplicitlySuspending {
31383176
thread: guest_thread,
3177+
// Tell `StoreOpaque::suspend` it's okay to suspend here since
3178+
// we're handling a `thread.switch-to` call; otherwise it would
3179+
// panic if we called it in a non-blocking context.
3180+
skip_may_block_check: to_thread.is_some(),
31393181
}
31403182
};
31413183

@@ -3193,6 +3235,7 @@ impl Instance {
31933235
store.suspend(SuspendReason::Waiting {
31943236
set,
31953237
thread: guest_thread,
3238+
skip_may_block_check: false,
31963239
})?;
31973240
}
31983241
}

crates/wasmtime/src/runtime/component/func/host.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,12 @@ impl HostFunc {
126126
let data = SendSyncPtr::new(NonNull::new(data.as_ptr() as *mut F).unwrap());
127127
unsafe {
128128
call_host_and_handle_result::<T>(cx, |store, instance| {
129-
call_host(
129+
call_host::<_, _, _, _, S>(
130130
store,
131131
instance,
132132
TypeFuncIndex::from_u32(ty),
133133
OptionsIndex::from_u32(options),
134134
NonNull::slice_from_raw_parts(storage, storage_len).as_mut(),
135-
S::ASYNC,
136135
move |store, args| (*data.as_ptr())(store, args),
137136
)
138137
})
@@ -155,12 +154,17 @@ impl HostFunc {
155154
{
156155
Arc::new(HostFunc {
157156
entrypoint: dynamic_entrypoint::<T, F, S>,
158-
// This function performs dynamic type checks and subsequently does
159-
// not need to perform up-front type checks. Instead everything is
160-
// dynamically managed at runtime.
161-
//
162-
// TODO: Where does async checking happen?
163-
typecheck: Box::new(move |_expected_index, _expected_types| Ok(())),
157+
// This function performs dynamic type checks on its parameters and
158+
// results and subsequently does not need to perform up-front type
159+
// checks. However, we _do_ verify async-ness here.
160+
typecheck: Box::new(move |ty, types| {
161+
let ty = &types.types[ty];
162+
if S::ASYNC != ty.async_ {
163+
bail!("type mismatch with async");
164+
}
165+
166+
Ok(())
167+
}),
164168
func: Box::new(func),
165169
})
166170
}
@@ -253,26 +257,27 @@ where
253257
/// * `Return` - the result of the host function
254258
/// * `F` - the `closure` to actually receive the `Params` and return the
255259
/// `Return`
260+
/// * `S` - the expected `FunctionStyle`
256261
///
257262
/// It's expected that `F` will "un-tuple" the arguments to pass to a host
258263
/// closure.
259264
///
260265
/// This function is in general `unsafe` as the validity of all the parameters
261266
/// must be upheld. Generally that's done by ensuring this is only called from
262267
/// the select few places it's intended to be called from.
263-
unsafe fn call_host<T, Params, Return, F>(
268+
unsafe fn call_host<T, Params, Return, F, S>(
264269
store: StoreContextMut<'_, T>,
265270
instance: Instance,
266271
ty: TypeFuncIndex,
267272
options: OptionsIndex,
268273
storage: &mut [MaybeUninit<ValRaw>],
269-
async_function: bool,
270274
closure: F,
271275
) -> Result<()>
272276
where
273277
F: Fn(StoreContextMut<'_, T>, Params) -> HostResult<Return> + Send + Sync + 'static,
274278
Params: Lift,
275279
Return: Lower + 'static,
280+
S: FunctionStyle,
276281
{
277282
let (component, store) = instance.component_and_store_mut(store.0);
278283
let mut store = StoreContextMut(store);
@@ -369,7 +374,10 @@ where
369374
);
370375
}
371376
} else {
372-
if async_function {
377+
if S::ASYNC {
378+
// The caller has synchronously lowered an async function, meaning
379+
// the caller can only call it from an async task (i.e. a task
380+
// created via a call to an async export). Otherwise, we'll trap.
373381
concurrent::check_blocking(store.0)?;
374382
}
375383

@@ -731,13 +739,12 @@ where
731739
}
732740
}
733741

734-
unsafe fn call_host_dynamic<T, F>(
742+
unsafe fn call_host_dynamic<T, F, S>(
735743
store: StoreContextMut<'_, T>,
736744
instance: Instance,
737745
ty: TypeFuncIndex,
738746
options: OptionsIndex,
739747
storage: &mut [MaybeUninit<ValRaw>],
740-
async_function: bool,
741748
closure: F,
742749
) -> Result<()>
743750
where
@@ -751,6 +758,7 @@ where
751758
+ Sync
752759
+ 'static,
753760
T: 'static,
761+
S: FunctionStyle,
754762
{
755763
let (component, store) = instance.component_and_store_mut(store.0);
756764
let mut store = StoreContextMut(store);
@@ -853,7 +861,10 @@ where
853861
);
854862
}
855863
} else {
856-
if async_function {
864+
if S::ASYNC {
865+
// The caller has synchronously lowered an async function, meaning
866+
// the caller can only call it from an async task (i.e. a task
867+
// created via a call to an async export). Otherwise, we'll trap.
857868
concurrent::check_blocking(store.0)?;
858869
}
859870

@@ -975,13 +986,12 @@ where
975986
let data = SendSyncPtr::new(NonNull::new(data.as_ptr() as *mut F).unwrap());
976987
unsafe {
977988
call_host_and_handle_result(cx, |store, instance| {
978-
call_host_dynamic::<T, _>(
989+
call_host_dynamic::<T, _, S>(
979990
store,
980991
instance,
981992
TypeFuncIndex::from_u32(ty),
982993
OptionsIndex::from_u32(options),
983994
NonNull::slice_from_raw_parts(storage, storage_len).as_mut(),
984-
S::ASYNC,
985995
&*data.as_ptr(),
986996
)
987997
})

0 commit comments

Comments
 (0)