Skip to content

Commit 51caf0c

Browse files
committed
add mutation serialize transform
1 parent f9f229e commit 51caf0c

File tree

13 files changed

+946
-123
lines changed

13 files changed

+946
-123
lines changed

src/common/io/src/constants.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ pub const INF_BYTES_LONG: &str = "Infinity";
2828
pub const DEFAULT_BLOCK_BUFFER_SIZE: usize = 100 * 1024 * 1024;
2929
// The size of the I/O read/write block index buffer by default.
3030
pub const DEFAULT_BLOCK_INDEX_BUFFER_SIZE: usize = 300 * 1024;
31+
// The size of the I/O read/write delete mark buffer by default.
32+
pub const DEFAULT_BLOCK_DELETE_MARK_SIZE: usize = 1000 * 1000;
3133
// The max number of a block by default.
3234
pub const DEFAULT_BLOCK_MAX_ROWS: usize = 1000 * 1000;
3335
// The min number of a block by default.

src/query/storages/fuse/src/fuse_part.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use common_expression::ColumnId;
2828
use common_expression::Scalar;
2929
use storages_common_table_meta::meta::ColumnMeta;
3030
use storages_common_table_meta::meta::Compression;
31+
use storages_common_table_meta::meta::Location;
3132

3233
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
3334
pub struct FusePartInfo {
@@ -42,7 +43,7 @@ pub struct FusePartInfo {
4243
pub sort_min_max: Option<(Scalar, Scalar)>,
4344
/// page range in the file
4445
pub range: Option<Range<usize>>,
45-
pub delete_mark: Option<String>,
46+
pub delete_mark: Option<(Location, u64)>,
4647
}
4748

4849
#[typetag::serde(name = "fuse")]
@@ -75,7 +76,7 @@ impl FusePartInfo {
7576
compression: Compression,
7677
sort_min_max: Option<(Scalar, Scalar)>,
7778
range: Option<Range<usize>>,
78-
delete_mark: Option<String>,
79+
delete_mark: Option<(Location, u64)>,
7980
) -> Arc<Box<dyn PartInfo>> {
8081
Arc::new(Box::new(FusePartInfo {
8182
location,

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,10 @@ impl Table for FuseTable {
325325
true
326326
}
327327

328+
fn support_delete_mark(&self) -> bool {
329+
false
330+
}
331+
328332
fn cluster_keys(&self, ctx: Arc<dyn TableContext>) -> Vec<RemoteExpr<String>> {
329333
let table_meta = Arc::new(self.clone());
330334
if let Some((_, order)) = &self.cluster_key_meta {
@@ -552,7 +556,7 @@ impl Table for FuseTable {
552556
col_indices: Vec<usize>,
553557
pipeline: &mut Pipeline,
554558
) -> Result<()> {
555-
self.do_delete(ctx, filter, col_indices, pipeline).await
559+
self.do_delete2(ctx, filter, col_indices, pipeline).await
556560
}
557561

558562
async fn update(

src/query/storages/fuse/src/io/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub use files::Files;
2323
pub use locations::TableMetaLocationGenerator;
2424
pub use read::BlockReader;
2525
pub use read::BloomBlockFilterReader;
26+
pub use read::DeleteMarkReader;
2627
pub use read::MergeIOReadResult;
2728
pub use read::MetaReaders;
2829
pub use read::NativeReaderExt;

src/query/storages/fuse/src/io/read/delete/delete_mark_reader.rs

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,26 @@ type CachedReader = InMemoryItemCacheReader<Bitmap, DeleteMetaLoader>;
4747

4848
#[async_trait::async_trait]
4949
pub trait DeleteMarkReader {
50-
async fn read_delete_mark(&self, dal: Operator, length: u64) -> Result<Arc<Bitmap>>;
50+
async fn read_delete_mark(
51+
&self,
52+
dal: Operator,
53+
size: u64,
54+
num_rows: usize,
55+
) -> Result<Arc<Bitmap>>;
5156
}
5257

5358
#[async_trait::async_trait]
5459
impl DeleteMarkReader for Location {
55-
async fn read_delete_mark(&self, dal: Operator, length: u64) -> Result<Arc<Bitmap>> {
60+
async fn read_delete_mark(
61+
&self,
62+
dal: Operator,
63+
size: u64,
64+
num_rows: usize,
65+
) -> Result<Arc<Bitmap>> {
5666
let (path, ver) = &self;
5767
let mask_version = DeleteMaskVersion::try_from(*ver)?;
5868
if matches!(mask_version, DeleteMaskVersion::V0(_)) {
59-
let res = load_delete_mark(dal, path, length).await?;
69+
let res = load_delete_mark(dal, path, size, num_rows).await?;
6070
Ok(res)
6171
} else {
6272
unreachable!()
@@ -66,9 +76,14 @@ impl DeleteMarkReader for Location {
6676

6777
/// load delete mark data
6878
#[tracing::instrument(level = "debug", skip_all)]
69-
pub async fn load_delete_mark(dal: Operator, path: &str, length: u64) -> Result<Arc<Bitmap>> {
79+
pub async fn load_delete_mark(
80+
dal: Operator,
81+
path: &str,
82+
size: u64,
83+
num_rows: usize,
84+
) -> Result<Arc<Bitmap>> {
7085
// 1. load delete mark meta
71-
let delete_mark_meta = load_mark_meta(dal.clone(), path, length).await?;
86+
let delete_mark_meta = load_mark_meta(dal.clone(), path, size).await?;
7287
let file_meta = &delete_mark_meta.0;
7388
if file_meta.row_groups.len() != 1 {
7489
return Err(ErrorCode::StorageOther(format!(
@@ -81,7 +96,7 @@ pub async fn load_delete_mark(dal: Operator, path: &str, length: u64) -> Result<
8196
let index_column_chunk_metas = file_meta.row_groups[0].columns();
8297
assert_eq!(index_column_chunk_metas.len(), 1);
8398
let column_meta = &index_column_chunk_metas[0];
84-
let marks = load_delete_mark_data(column_meta, path, &dal).await?;
99+
let marks = load_delete_mark_data(column_meta, path, &dal, num_rows).await?;
85100

86101
Ok(marks)
87102
}
@@ -92,12 +107,14 @@ async fn load_delete_mark_data<'a>(
92107
col_chunk_meta: &'a ColumnChunkMetaData,
93108
path: &'a str,
94109
dal: &'a Operator,
110+
num_rows: usize,
95111
) -> Result<Arc<Bitmap>> {
96112
let storage_runtime = GlobalIORuntime::instance();
97113
let bytes = {
98114
let meta = col_chunk_meta.metadata();
99115
let location = path.to_string();
100116
let loader = DeleteMetaLoader {
117+
num_rows,
101118
offset: meta.data_page_offset as u64,
102119
len: meta.total_compressed_size as u64,
103120
cache_key: location.clone(),
@@ -122,7 +139,7 @@ async fn load_delete_mark_data<'a>(
122139
/// Loads index meta data
123140
/// read data from cache, or populate cache items if possible
124141
#[tracing::instrument(level = "debug", skip_all)]
125-
async fn load_mark_meta(dal: Operator, path: &str, length: u64) -> Result<Arc<DeleteMarkMeta>> {
142+
async fn load_mark_meta(dal: Operator, path: &str, size: u64) -> Result<Arc<DeleteMarkMeta>> {
126143
let path_owned = path.to_owned();
127144
async move {
128145
let reader = MetaReaders::delete_mark_meta_reader(dal);
@@ -132,7 +149,7 @@ async fn load_mark_meta(dal: Operator, path: &str, length: u64) -> Result<Arc<De
132149

133150
let load_params = LoadParams {
134151
location: path_owned,
135-
len_hint: Some(length),
152+
len_hint: Some(size),
136153
ver: version,
137154
};
138155

@@ -143,6 +160,7 @@ async fn load_mark_meta(dal: Operator, path: &str, length: u64) -> Result<Arc<De
143160
}
144161

145162
pub struct DeleteMetaLoader {
163+
pub num_rows: usize,
146164
pub offset: u64,
147165
pub len: u64,
148166
pub cache_key: String,
@@ -177,8 +195,13 @@ impl Loader<Bitmap> for DeleteMetaLoader {
177195
let column_type = self.column_descriptor.descriptor.primitive_type.clone();
178196
let filed_name = self.column_descriptor.path_in_schema[0].to_owned();
179197
let field = ArrowField::new(filed_name, DataType::Boolean, false);
180-
let mut array_iter =
181-
column_iter_to_arrays(vec![decompressor], vec![&column_type], field, None, 1)?;
198+
let mut array_iter = column_iter_to_arrays(
199+
vec![decompressor],
200+
vec![&column_type],
201+
field,
202+
None,
203+
self.num_rows,
204+
)?;
182205
if let Some(array) = array_iter.next() {
183206
let array = array?;
184207
let col =

src/query/storages/fuse/src/io/read/delete/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,5 @@
1313
// limitations under the License.
1414

1515
mod delete_mark_reader;
16+
17+
pub use delete_mark_reader::DeleteMarkReader;

src/query/storages/fuse/src/io/read/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub use block::NativeReaderExt;
2525
pub use block::UncompressedBuffer;
2626
pub use bloom::BloomBlockFilterReader;
2727
pub use bloom::InRuntime;
28+
pub use delete::DeleteMarkReader;
2829
pub use meta::MetaReaders;
2930
pub use meta::SegmentInfoReader;
3031
pub use meta::TableSnapshotReader;

src/query/storages/fuse/src/operations/delete.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use common_expression::Evaluator;
3535
use common_expression::Expr;
3636
use common_expression::FieldIndex;
3737
use common_expression::RemoteExpr;
38+
use common_expression::TableDataType;
39+
use common_expression::TableField;
3840
use common_expression::TableSchema;
3941
use common_expression::Value;
4042
use common_functions::scalars::BUILTIN_FUNCTIONS;
@@ -44,6 +46,8 @@ use common_sql::evaluator::BlockOperator;
4446
use storages_common_table_meta::meta::Location;
4547
use storages_common_table_meta::meta::TableSnapshot;
4648

49+
use super::mutation::MutationSerializeTransform;
50+
use super::mutation::ParquetDeleteSource;
4751
use crate::operations::mutation::MutationAction;
4852
use crate::operations::mutation::MutationPartInfo;
4953
use crate::operations::mutation::MutationSink;
@@ -148,6 +152,107 @@ impl FuseTable {
148152
Ok(())
149153
}
150154

155+
pub async fn do_delete2(
156+
&self,
157+
ctx: Arc<dyn TableContext>,
158+
filter: Option<RemoteExpr<String>>,
159+
col_indices: Vec<usize>,
160+
pipeline: &mut Pipeline,
161+
) -> Result<()> {
162+
let snapshot_opt = self.read_table_snapshot().await?;
163+
164+
// check if table is empty
165+
let snapshot = if let Some(val) = snapshot_opt {
166+
val
167+
} else {
168+
// no snapshot, no deletion
169+
return Ok(());
170+
};
171+
172+
if snapshot.summary.row_count == 0 {
173+
// empty snapshot, no deletion
174+
return Ok(());
175+
}
176+
177+
let scan_progress = ctx.get_scan_progress();
178+
// check if unconditional deletion
179+
if filter.is_none() {
180+
let progress_values = ProgressValues {
181+
rows: snapshot.summary.row_count as usize,
182+
bytes: snapshot.summary.uncompressed_byte_size as usize,
183+
};
184+
scan_progress.incr(&progress_values);
185+
// deleting the whole table... just a truncate
186+
let purge = false;
187+
return self.do_truncate(ctx.clone(), purge).await;
188+
}
189+
190+
let filter_expr = filter.unwrap();
191+
if col_indices.is_empty() {
192+
// here the situation: filter_expr is not null, but col_indices in empty, which
193+
// indicates the expr being evaluated is unrelated to the value of rows:
194+
// e.g.
195+
// `delete from t where 1 = 1`, `delete from t where now()`,
196+
// or `delete from t where RANDOM()::INT::BOOLEAN`
197+
// if the `filter_expr` is of "constant" nullary :
198+
// for the whole block, whether all of the rows should be kept or dropped,
199+
// we can just return from here, without accessing the block data
200+
if self.try_eval_const(ctx.clone(), &self.schema(), &filter_expr)? {
201+
let progress_values = ProgressValues {
202+
rows: snapshot.summary.row_count as usize,
203+
bytes: snapshot.summary.uncompressed_byte_size as usize,
204+
};
205+
scan_progress.incr(&progress_values);
206+
207+
// deleting the whole table... just a truncate
208+
let purge = false;
209+
return self.do_truncate(ctx.clone(), purge).await;
210+
}
211+
// do nothing.
212+
return Ok(());
213+
}
214+
215+
let projection = Projection::Columns(col_indices.clone());
216+
self.mutation_block_pruning(
217+
ctx.clone(),
218+
vec![filter_expr.clone()],
219+
projection.clone(),
220+
&snapshot,
221+
)
222+
.await?;
223+
let block_reader = self.create_block_reader(projection, ctx.clone())?;
224+
225+
let mut schema = block_reader.schema().as_ref().to_owned();
226+
schema.add_columns(&[TableField::new("_row_exists", TableDataType::Boolean)])?;
227+
let filter = Arc::new(
228+
filter_expr
229+
.as_expr(&BUILTIN_FUNCTIONS)
230+
.project_column_ref(|name| schema.index_of(name).unwrap()),
231+
);
232+
233+
let max_threads = ctx.get_settings().get_max_threads()? as usize;
234+
// Add source pipe.
235+
pipeline.add_source(
236+
|output| {
237+
ParquetDeleteSource::try_create(
238+
ctx.clone(),
239+
output,
240+
self,
241+
filter.clone(),
242+
block_reader.clone(),
243+
)
244+
},
245+
max_threads,
246+
)?;
247+
248+
self.try_add_mutation_transform2(ctx.clone(), snapshot.segments.clone(), pipeline)?;
249+
250+
pipeline.add_sink(|input| {
251+
MutationSink::try_create(self, ctx.clone(), snapshot.clone(), input)
252+
})?;
253+
Ok(())
254+
}
255+
151256
pub fn try_eval_const(
152257
&self,
153258
ctx: Arc<dyn TableContext>,
@@ -335,6 +440,44 @@ impl FuseTable {
335440
}
336441
}
337442

443+
pub fn try_add_mutation_transform2(
444+
&self,
445+
ctx: Arc<dyn TableContext>,
446+
base_segments: Vec<Location>,
447+
pipeline: &mut Pipeline,
448+
) -> Result<()> {
449+
if pipeline.is_empty() {
450+
return Err(ErrorCode::Internal("The pipeline is empty."));
451+
}
452+
453+
match pipeline.output_len() {
454+
0 => Err(ErrorCode::Internal("The output of the last pipe is 0.")),
455+
last_pipe_size => {
456+
let mut inputs_port = Vec::with_capacity(last_pipe_size);
457+
for _ in 0..last_pipe_size {
458+
inputs_port.push(InputPort::create());
459+
}
460+
let output_port = OutputPort::create();
461+
pipeline.add_pipe(Pipe::create(inputs_port.len(), 1, vec![PipeItem::create(
462+
MutationSerializeTransform::try_create(
463+
ctx,
464+
self.schema(),
465+
inputs_port.clone(),
466+
output_port.clone(),
467+
self.get_operator(),
468+
self.meta_location_generator().clone(),
469+
base_segments,
470+
self.get_block_compact_thresholds(),
471+
)?,
472+
inputs_port,
473+
vec![output_port],
474+
)]));
475+
476+
Ok(())
477+
}
478+
}
479+
}
480+
338481
pub fn cluster_stats_gen(&self, ctx: Arc<dyn TableContext>) -> Result<ClusterStatsGenerator> {
339482
if self.cluster_key_meta.is_none() {
340483
return Ok(ClusterStatsGenerator::default());

src/query/storages/fuse/src/operations/mutation/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ pub mod base_mutator;
1717
mod compact;
1818
pub mod mutation_meta;
1919
mod mutation_part;
20+
mod mutation_serialize_transform;
2021
pub mod mutation_sink;
2122
mod mutation_source;
2223
mod mutation_transform;
23-
#[allow(unused)]
24-
mod parquet_delete_transform;
24+
pub mod parquet_delete_source;
2525
pub mod recluster_mutator;
2626
mod serialize_data_transform;
2727

@@ -39,10 +39,13 @@ pub use mutation_meta::MutationSinkMeta;
3939
pub use mutation_meta::MutationTransformMeta;
4040
pub use mutation_meta::SerializeDataMeta;
4141
pub use mutation_part::MutationPartInfo;
42+
pub use mutation_serialize_transform::MutationSerializeTransform;
4243
pub use mutation_sink::MutationSink;
4344
pub use mutation_source::MutationAction;
4445
pub use mutation_source::MutationSource;
4546
pub use mutation_transform::MutationTransform;
47+
pub use parquet_delete_source::MutationSourceMeta;
48+
pub use parquet_delete_source::ParquetDeleteSource;
4649
pub use recluster_mutator::ReclusterMutator;
4750
pub use serialize_data_transform::SerializeDataTransform;
4851
pub use serialize_data_transform::SerializeState;

0 commit comments

Comments
 (0)