Skip to content

Commit 10dada0

Browse files
authored
Update async API (#510)
* Update async API * Update docstrings * Fix missing function * Update doc * Enable rowGroups
1 parent dc8d2fa commit 10dada0

File tree

3 files changed

+225
-239
lines changed

3 files changed

+225
-239
lines changed

src/read_options.rs

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
use parquet::arrow::arrow_reader::ArrowReaderBuilder;
2+
use parquet::arrow::ProjectionMask;
3+
use parquet::schema::types::SchemaDescriptor;
14
use serde::{Deserialize, Serialize};
25
use wasm_bindgen::prelude::*;
36

7+
use crate::error::{ParquetWasmError, Result};
8+
49
#[wasm_bindgen(typescript_custom_section)]
510
const TS_ReaderOptions: &'static str = r#"
611
export type ReaderOptions = {
@@ -12,6 +17,10 @@ export type ReaderOptions = {
1217
limit?: number;
1318
/* Provide an offset to skip over the given number of rows. */
1419
offset?: number;
20+
/* The column names from the file to read. */
21+
columns?: string[];
22+
/* The number of concurrent requests to make in the async reader. */
23+
concurrency?: number;
1524
};
1625
"#;
1726

@@ -21,7 +30,7 @@ extern "C" {
2130
pub type ReaderOptions;
2231
}
2332

24-
#[derive(Serialize, Deserialize, Default)]
33+
#[derive(Clone, Serialize, Deserialize, Default)]
2534
#[serde(rename_all = "camelCase")]
2635
pub struct JsReaderOptions {
2736
/// The number of rows in each batch. If not provided, the upstream parquet default is 1024.
@@ -35,12 +44,87 @@ pub struct JsReaderOptions {
3544

3645
/// Provide an offset to skip over the given number of rows
3746
pub offset: Option<usize>,
47+
48+
/// The column names from the file to read.
49+
pub columns: Option<Vec<String>>,
50+
51+
/// The number of concurrent requests to make in the async reader.
52+
pub concurrency: Option<usize>,
53+
}
54+
55+
impl JsReaderOptions {
56+
pub fn apply_to_builder<T>(
57+
&self,
58+
mut builder: ArrowReaderBuilder<T>,
59+
) -> Result<ArrowReaderBuilder<T>> {
60+
if let Some(batch_size) = self.batch_size {
61+
builder = builder.with_batch_size(batch_size);
62+
}
63+
64+
if let Some(limit) = self.limit {
65+
builder = builder.with_limit(limit);
66+
}
67+
68+
if let Some(offset) = self.offset {
69+
builder = builder.with_offset(offset);
70+
}
71+
72+
if let Some(columns) = &self.columns {
73+
let parquet_schema = builder.parquet_schema();
74+
let projection_mask = generate_projection_mask(columns, parquet_schema)?;
75+
76+
builder = builder.with_projection(projection_mask);
77+
}
78+
79+
Ok(builder)
80+
}
3881
}
3982

4083
impl TryFrom<ReaderOptions> for JsReaderOptions {
4184
type Error = serde_wasm_bindgen::Error;
4285

43-
fn try_from(value: ReaderOptions) -> Result<Self, Self::Error> {
86+
fn try_from(value: ReaderOptions) -> std::result::Result<Self, Self::Error> {
4487
serde_wasm_bindgen::from_value(value.obj)
4588
}
4689
}
90+
91+
fn generate_projection_mask<S: AsRef<str>>(
92+
columns: &[S],
93+
pq_schema: &SchemaDescriptor,
94+
) -> Result<ProjectionMask> {
95+
let col_paths = pq_schema
96+
.columns()
97+
.iter()
98+
.map(|col| col.path().string())
99+
.collect::<Vec<_>>();
100+
let indices: Vec<usize> = columns
101+
.iter()
102+
.map(|col| {
103+
let col = col.as_ref();
104+
let field_indices: Vec<usize> = col_paths
105+
.iter()
106+
.enumerate()
107+
.filter(|(_idx, path)| {
108+
// identical OR the path starts with the column AND the substring is immediately followed by the
109+
// path separator
110+
path.as_str() == col
111+
|| path.starts_with(col) && {
112+
let left_index = path.find(col).unwrap();
113+
path.chars().nth(left_index + col.len()).unwrap() == '.'
114+
}
115+
})
116+
.map(|(idx, _)| idx)
117+
.collect();
118+
if field_indices.is_empty() {
119+
Err(ParquetWasmError::UnknownColumn(col.to_string()))
120+
} else {
121+
Ok(field_indices)
122+
}
123+
})
124+
.collect::<Result<Vec<Vec<usize>>>>()?
125+
.into_iter()
126+
.flatten()
127+
.collect();
128+
let projection_mask = ProjectionMask::leaves(pq_schema, indices);
129+
Ok(projection_mask)
130+
}

0 commit comments

Comments
 (0)