Skip to content

Commit 3c6a4fe

Browse files
authored
fix: load of large zip file. (#19010)
* fix load of file compressed by zip.
1 parent 5851a25 commit 3c6a4fe

File tree

6 files changed

+74
-12
lines changed

6 files changed

+74
-12
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ __pycache__/
6969
.python-version
7070

7171
*.zip
72+
!tests/data/ontime_200.csv.zip
7273
*.profraw
7374

7475
# tpch data set

src/common/compress/src/decode.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,21 +357,38 @@ impl DecompressDecoder {
357357
Ok(main)
358358
}
359359

360-
pub fn decompress_all_zip(compressed: &[u8]) -> databend_common_exception::Result<Vec<u8>> {
360+
pub fn decompress_all_zip(
361+
compressed: &[u8],
362+
path: &str,
363+
memory_limit: usize,
364+
) -> databend_common_exception::Result<Vec<u8>> {
361365
let mut zip = ZipArchive::new(Cursor::new(compressed)).map_err(|e| {
362366
ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}"))
363367
})?;
364368
if zip.len() > 1 {
369+
// if we want to support loading of zip with multi files later, need to resolve
370+
// 1. separate output bytes of files
371+
// 2. a formal way to repr the path of each file, for metadata and error reporting
372+
// 3. atomic file load
365373
return Err(ErrorCode::InvalidCompressionData(
366374
"Zip only supports single file",
367375
));
368376
}
377+
if memory_limit > 0 {
378+
if let Some(size) = zip.decompressed_size() {
379+
if size + compressed.len() as u128 > memory_limit as u128 / 2 {
380+
return Err(ErrorCode::BadBytes(format!(
381+
"zip file {path} is too large, decompressed_size = {size}",
382+
)));
383+
}
384+
}
385+
}
369386
let mut file = zip.by_index(0).map_err(|e| {
370387
ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}"))
371388
})?;
372389
let mut bytes = Vec::new();
390+
// todo: split to 16MB batches
373391
file.read_to_end(&mut bytes)?;
374-
375392
Ok(bytes)
376393
}
377394

@@ -611,7 +628,7 @@ mod tests {
611628
rng.fill_bytes(&mut content);
612629

613630
let compressed_content = CompressCodec::compress_all_zip(&content, "unload.csv")?;
614-
let result = DecompressDecoder::decompress_all_zip(&compressed_content)?;
631+
let result = DecompressDecoder::decompress_all_zip(&compressed_content, "", 0)?;
615632

616633
assert_eq!(result, content);
617634

src/query/sql/src/planner/semantic/type_check.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use databend_common_ast::parser::parse_expr;
5151
use databend_common_ast::parser::tokenize_sql;
5252
use databend_common_ast::parser::Dialect;
5353
use databend_common_ast::Span;
54+
use databend_common_base::runtime::GLOBAL_MEM_STAT;
5455
use databend_common_catalog::catalog::CatalogManager;
5556
use databend_common_catalog::plan::InternalColumn;
5657
use databend_common_catalog::plan::InternalColumnType;
@@ -5405,7 +5406,11 @@ impl<'a> TypeChecker<'a> {
54055406
Some(algo) => {
54065407
log::trace!("Decompressing module using {:?} algorithm", algo);
54075408
if algo == CompressAlgorithm::Zip {
5408-
DecompressDecoder::decompress_all_zip(&code_blob)
5409+
DecompressDecoder::decompress_all_zip(
5410+
&code_blob,
5411+
&module_path,
5412+
GLOBAL_MEM_STAT.get_limit() as usize,
5413+
)
54095414
} else {
54105415
let mut decoder = DecompressDecoder::new(algo);
54115416
decoder.decompress_all(&code_blob)

src/query/storages/stage/src/read/row_based/processors/decompressor.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use databend_common_base::runtime::GLOBAL_MEM_STAT;
1718
use databend_common_compress::CompressAlgorithm;
1819
use databend_common_compress::DecompressDecoder;
1920
use databend_common_compress::DecompressState;
@@ -32,6 +33,7 @@ pub struct Decompressor {
3233
algo: Option<CompressAlgorithm>,
3334
decompressor: Option<(DecompressDecoder, usize)>,
3435
path: Option<String>,
36+
zip_buf: Vec<u8>,
3537
}
3638

3739
impl Decompressor {
@@ -41,6 +43,7 @@ impl Decompressor {
4143
algo,
4244
path: None,
4345
decompressor: None,
46+
zip_buf: Vec::new(),
4447
})
4548
}
4649

@@ -55,6 +58,7 @@ impl Decompressor {
5558

5659
if let Some(algo) = algo {
5760
if matches!(algo, CompressAlgorithm::Zip) {
61+
self.zip_buf.clear();
5862
return;
5963
}
6064
let decompressor = DecompressDecoder::new(algo);
@@ -82,15 +86,33 @@ impl AccumulatingTransform for Decompressor {
8286
}
8387
}
8488
if matches!(self.algo, Some(CompressAlgorithm::Zip)) {
85-
let bytes = DecompressDecoder::decompress_all_zip(&batch.data)?;
89+
let memory_limit = GLOBAL_MEM_STAT.get_limit() as usize;
90+
if memory_limit > 0 && self.zip_buf.len() + batch.data.len() > memory_limit / 3 {
91+
return Err(ErrorCode::BadBytes(format!(
92+
"zip file {} is larger than memory_limit/3 ({})",
93+
batch.path,
94+
memory_limit / 3
95+
)));
96+
}
97+
self.zip_buf.extend_from_slice(&batch.data);
8698

87-
let new_batch = Box::new(BytesBatch {
88-
data: bytes,
89-
path: batch.path.clone(),
90-
offset: batch.data.len(),
91-
is_eof: batch.is_eof,
92-
});
93-
return Ok(vec![DataBlock::empty_with_meta(new_batch)]);
99+
return if batch.is_eof {
100+
let bytes = DecompressDecoder::decompress_all_zip(
101+
&self.zip_buf,
102+
&batch.path,
103+
memory_limit,
104+
)?;
105+
let new_batch = Box::new(BytesBatch {
106+
data: bytes,
107+
path: batch.path.clone(),
108+
offset: 0,
109+
is_eof: batch.is_eof,
110+
});
111+
self.zip_buf.clear();
112+
Ok(vec![DataBlock::empty_with_meta(new_batch)])
113+
} else {
114+
Ok(vec![])
115+
};
94116
}
95117
if let Some((de, offset)) = &mut self.decompressor {
96118
let mut data = de.decompress_batch(&batch.data).map_err(|e| {

tests/data/ontime_200.csv.zip

8.27 KB
Binary file not shown.

tests/sqllogictests/suites/stage/on_time.test

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
# use prepared data ontime
2+
# Test with small buffer to simulate chunked reading
3+
statement ok
4+
set input_read_buffer_size = 1024
5+
26
statement ok
37
truncate table ontime
48

@@ -92,3 +96,16 @@ select count(1), avg(Year), sum(DayOfWeek) from ontime
9296

9397
statement ok
9498
truncate table ontime
99+
100+
query TIITI
101+
copy into ontime from @data/ontime_200.csv.zip pattern = '' FILE_FORMAT = (type = csv skip_header = 1 compression = 'zip')
102+
----
103+
ontime_200.csv.zip 199 0 NULL NULL
104+
105+
query III
106+
select count(1), avg(Year), sum(DayOfWeek) from ontime
107+
----
108+
199 2020.0 769
109+
110+
statement ok
111+
truncate table ontime

0 commit comments

Comments
 (0)