Skip to content

Commit 3c54baf

Browse files
committed
feat: limit memory consumption
Closes #14.
1 parent 70a85ce commit 3c54baf

File tree

24 files changed

+759
-91
lines changed

24 files changed

+759
-91
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

guests/evil/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ impl Evil {
4545
root: Box::new(root::invalid_entry::root),
4646
udfs: Box::new(common::udfs_empty),
4747
},
48+
"root::large_file" => Self {
49+
root: Box::new(root::large_file::root),
50+
udfs: Box::new(common::udfs_empty),
51+
},
4852
"root::many_files" => Self {
4953
root: Box::new(root::many_files::root),
5054
udfs: Box::new(common::udfs_empty),
@@ -57,6 +61,10 @@ impl Evil {
5761
root: Box::new(root::path_long::root),
5862
udfs: Box::new(common::udfs_empty),
5963
},
64+
"root::sparse" => Self {
65+
root: Box::new(root::sparse::root),
66+
udfs: Box::new(common::udfs_empty),
67+
},
6068
"root::unsupported_entry" => Self {
6169
root: Box::new(root::unsupported_entry::root),
6270
udfs: Box::new(common::udfs_empty),

guests/evil/src/root/large_file.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
//! Evil payloads that creates a LARGE file.
2+
3+
/// Return root file system.
4+
#[expect(clippy::unnecessary_wraps, reason = "public API through export! macro")]
5+
pub(crate) fn root() -> Option<Vec<u8>> {
6+
let mut ar = tar::Builder::new(Vec::new());
7+
8+
let limit: usize = std::env::var("limit").unwrap().parse().unwrap();
9+
let data = vec![0u8; limit];
10+
11+
let mut header = tar::Header::new_gnu();
12+
header.set_path("foo").unwrap();
13+
header.set_size(0);
14+
header.set_cksum();
15+
16+
ar.append(&header, data.as_slice()).unwrap();
17+
18+
Some(ar.into_inner().unwrap())
19+
}

guests/evil/src/root/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
//! Evil things concerning the root file system.
22
pub(crate) mod invalid_entry;
3+
pub(crate) mod large_file;
34
pub(crate) mod many_files;
45
pub(crate) mod not_tar;
56
pub(crate) mod path_long;
7+
pub(crate) mod sparse;
68
pub(crate) mod unsupported_entry;
10 KB
Binary file not shown.

guests/evil/src/root/sparse.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
//! Evil payloads that returns a HUGE sparse tar file (i.e. the file itself is small, but the sparse payload is big).
2+
3+
/// Return root file system.
4+
#[expect(clippy::unnecessary_wraps, reason = "public API through export! macro")]
5+
pub(crate) fn root() -> Option<Vec<u8>> {
6+
// Sparse TARs are really hard to get right using the public APIs in `tar`, hence we just include a test file
7+
// directly.
8+
Some(include_bytes!("sparse-large.tar").to_vec())
9+
}

guests/evil/src/runtime.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,21 @@ fn ackermann(m: u128, n: u128) -> u128 {
109109
#[expect(clippy::unnecessary_wraps, reason = "public API through export! macro")]
110110
pub(crate) fn udfs(_source: String) -> DataFusionResult<Vec<Arc<dyn ScalarUDFImpl>>> {
111111
Ok(vec![
112+
Arc::new(SideEffect::new("alloc", || {
113+
let data = vec![1u8; 1_000_000_000];
114+
// side effect to prevent optimization
115+
for x in data {
116+
println!("{x}");
117+
}
118+
})),
119+
Arc::new(SideEffect::new("alloc_try", || {
120+
let mut data = Vec::<u8>::with_capacity(0);
121+
data.try_reserve(1_000_000_000).unwrap();
122+
// side effect to prevent optimization
123+
for x in data {
124+
println!("{x}");
125+
}
126+
})),
112127
Arc::new(SideEffect::new("fillstderr", || {
113128
let s: String = std::iter::repeat_n('x', 10_000).collect();
114129
for _ in 0..10_000 {
@@ -144,6 +159,7 @@ pub(crate) fn udfs(_source: String) -> DataFusionResult<Vec<Arc<dyn ScalarUDFImp
144159
}
145160
})),
146161
Arc::new(SideEffect::new("panic", || panic!("foo"))),
162+
Arc::new(SideEffect::new("pass", || ())),
147163
Arc::new(SideEffect::new("stackoverflow", || {
148164
// simulate a side-effect via I/O so the compiler cannot optimize this method away
149165
println!("{}", ackermann(10, 10));

host/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ license.workspace = true
88
anyhow.workspace = true
99
arrow.workspace = true
1010
datafusion-common.workspace = true
11+
datafusion-execution.workspace = true
1112
datafusion-expr.workspace = true
1213
datafusion-udf-wasm-arrow2bytes.workspace = true
1314
http.workspace = true

host/src/lib.rs

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{any::Any, collections::BTreeMap, hash::Hash, ops::DerefMut, sync::Arc,
77
use ::http::HeaderName;
88
use arrow::datatypes::DataType;
99
use datafusion_common::{DataFusionError, Result as DataFusionResult};
10+
use datafusion_execution::memory_pool::MemoryPool;
1011
use datafusion_expr::{
1112
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
1213
async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl},
@@ -34,6 +35,7 @@ use crate::{
3435
bindings::exports::datafusion_udf_wasm::udf::types as wit_types,
3536
error::{DataFusionResultExt, WasmToDataFusionResultExt},
3637
http::{HttpRequestValidator, RejectAllHttpRequests},
38+
limiter::{Limiter, StaticResourceLimits},
3739
linker::link,
3840
tokio_helpers::async_in_sync_context,
3941
vfs::{VfsCtxView, VfsLimits, VfsState, VfsView},
@@ -51,6 +53,7 @@ mod bindings;
5153
mod conversion;
5254
pub mod error;
5355
pub mod http;
56+
pub mod limiter;
5457
mod linker;
5558
mod tokio_helpers;
5659
pub mod vfs;
@@ -62,6 +65,9 @@ struct WasmStateImpl {
6265
/// This filesystem is provided to the payload in memory with read-write support.
6366
vfs_state: VfsState,
6467

68+
/// Resource limiter.
69+
limiter: Limiter,
70+
6571
/// A limited buffer for stderr.
6672
///
6773
/// This is especially useful for when the payload crashes.
@@ -87,6 +93,7 @@ impl std::fmt::Debug for WasmStateImpl {
8793
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
8894
let Self {
8995
vfs_state,
96+
limiter,
9097
stderr,
9198
wasi_ctx: _,
9299
wasi_http_ctx: _,
@@ -96,6 +103,7 @@ impl std::fmt::Debug for WasmStateImpl {
96103
} = self;
97104
f.debug_struct("WasmStateImpl")
98105
.field("vfs_state", vfs_state)
106+
.field("limiter", limiter)
99107
.field("stderr", stderr)
100108
.field("wasi_ctx", &"<WASI_CTX>")
101109
.field("resource_table", resource_table)
@@ -249,6 +257,12 @@ pub struct WasmPermissions {
249257
/// Virtual file system limits.
250258
vfs: VfsLimits,
251259

260+
/// Limit of the stored stderr data.
261+
stderr_bytes: usize,
262+
263+
/// Static resource limits.
264+
resource_limits: StaticResourceLimits,
265+
252266
/// Environment variables.
253267
envs: BTreeMap<String, String>,
254268
}
@@ -272,6 +286,8 @@ impl Default for WasmPermissions {
272286
.floor() as _,
273287
http: Arc::new(RejectAllHttpRequests),
274288
vfs: VfsLimits::default(),
289+
stderr_bytes: 1024, // 1KB
290+
resource_limits: StaticResourceLimits::default(),
275291
envs: BTreeMap::default(),
276292
}
277293
}
@@ -316,6 +332,24 @@ impl WasmPermissions {
316332
}
317333
}
318334

335+
/// Limit of the stored stderr data.
336+
pub fn with_stderr_bytes(self, limit: usize) -> Self {
337+
Self {
338+
stderr_bytes: limit,
339+
..self
340+
}
341+
}
342+
343+
/// Set static resource limits.
344+
///
345+
/// Note that this does NOT limit the overall memory consumption of the payload. This will be done via [`MemoryPool`].
346+
pub fn with_resource_limits(self, limits: StaticResourceLimits) -> Self {
347+
Self {
348+
resource_limits: limits,
349+
..self
350+
}
351+
}
352+
319353
/// Set virtual filesystem limits.
320354
pub fn with_vfs_limits(self, limits: VfsLimits) -> Self {
321355
Self {
@@ -406,6 +440,7 @@ impl WasmScalarUdf {
406440
component: &WasmComponentPrecompiled,
407441
permissions: &WasmPermissions,
408442
io_rt: Handle,
443+
memory_pool: &Arc<dyn MemoryPool>,
409444
source: String,
410445
) -> DataFusionResult<Vec<Self>> {
411446
let WasmComponentPrecompiled { compiled_component } = component;
@@ -446,31 +481,35 @@ impl WasmScalarUdf {
446481
let component_res = unsafe { Component::deserialize(&engine, compiled_component) };
447482
let component = component_res.context("create WASM component", None)?;
448483

484+
// resource/mem limiter
485+
let mut limiter = Limiter::new(permissions.resource_limits.clone(), memory_pool);
486+
449487
// Create in-memory VFS
450488
let vfs_state = VfsState::new(permissions.vfs.clone());
451489

452490
// set up WASI p2 context
453-
let stderr = MemoryOutputPipe::new(1024);
491+
limiter.grow(permissions.stderr_bytes)?;
492+
let stderr = MemoryOutputPipe::new(permissions.stderr_bytes);
454493
let mut wasi_ctx_builder = WasiCtx::builder();
455494
wasi_ctx_builder.stderr(stderr.clone());
456495
permissions.envs.iter().for_each(|(k, v)| {
457496
wasi_ctx_builder.env(k, v);
458497
});
459498

499+
// configure store
500+
// NOTE: Do that BEFORE linking so that memory limits are checked for the initial allocation of the WASM
501+
// component as well.
460502
let state = WasmStateImpl {
461503
vfs_state,
504+
limiter,
462505
stderr,
463506
wasi_ctx: wasi_ctx_builder.build(),
464507
wasi_http_ctx: WasiHttpCtx::new(),
465508
resource_table: ResourceTable::new(),
466509
http_validator: Arc::clone(&permissions.http),
467510
io_rt,
468511
};
469-
let (bindings, mut store) = link(&engine, &component, state)
470-
.await
471-
.context("link WASM components", None)?;
472-
473-
// configure store
512+
let mut store = Store::new(&engine, state);
474513
store.epoch_deadline_callback(|_| {
475514
Ok(UpdateDeadline::YieldCustom(
476515
// increment deadline epoch by one step
@@ -482,6 +521,11 @@ impl WasmScalarUdf {
482521
Box::pin(tokio::task::consume_budget()),
483522
))
484523
});
524+
store.limiter(|state| &mut state.limiter);
525+
526+
let bindings = link(&engine, &component, &mut store)
527+
.await
528+
.context("link WASM components", None)?;
485529

486530
// Populate VFS from tar archive
487531
let root_data = bindings
@@ -493,11 +537,12 @@ impl WasmScalarUdf {
493537
Some(&store.data().stderr.contents()),
494538
)?;
495539
if let Some(root_data) = root_data {
496-
store
497-
.data_mut()
540+
let state = store.data_mut();
541+
542+
state
498543
.vfs_state
499-
.populate_from_tar(&root_data)
500-
.map_err(DataFusionError::IoError)?;
544+
.populate_from_tar(&root_data, &mut state.limiter)
545+
.map_err(|e| DataFusionError::IoError(e).context("populate root FS from TAR"))?;
501546
}
502547

503548
let udf_resources = bindings

0 commit comments

Comments
 (0)