1616use std:: sync:: Arc ;
1717
1818use chrono:: Duration ;
19+ use chrono:: Utc ;
1920use common_base:: base:: tokio;
2021use common_catalog:: table_context:: TableContext ;
2122use common_exception:: Result ;
2223use common_expression:: DataBlock ;
24+ use common_storages_factory:: Table ;
2325use common_storages_fuse:: io:: MetaWriter ;
2426use common_storages_fuse:: io:: SegmentWriter ;
2527use common_storages_fuse:: statistics:: gen_columns_statistics;
28+ use common_storages_fuse:: statistics:: merge_statistics;
2629use common_storages_fuse:: FuseTable ;
2730use futures_util:: TryStreamExt ;
2831use storages_common_table_meta:: meta:: Location ;
2932use storages_common_table_meta:: meta:: SegmentInfo ;
3033use storages_common_table_meta:: meta:: Statistics ;
3134use storages_common_table_meta:: meta:: TableSnapshot ;
35+ use storages_common_table_meta:: meta:: TableSnapshotV2 ;
3236use storages_common_table_meta:: meta:: Versioned ;
3337use uuid:: Uuid ;
3438
@@ -191,6 +195,7 @@ async fn test_fuse_purge_orphan_retention() -> Result<()> {
191195 fuse_table,
192196 segment_locations. clone ( ) ,
193197 Some ( new_timestamp) ,
198+ TableSnapshot :: VERSION ,
194199 )
195200 . await ?;
196201
@@ -200,6 +205,7 @@ async fn test_fuse_purge_orphan_retention() -> Result<()> {
200205 fuse_table,
201206 segment_locations. clone ( ) ,
202207 Some ( new_timestamp) ,
208+ TableSnapshot :: VERSION ,
203209 )
204210 . await ?;
205211
@@ -215,6 +221,7 @@ async fn test_fuse_purge_orphan_retention() -> Result<()> {
215221 fuse_table,
216222 segment_locations. clone ( ) ,
217223 Some ( new_timestamp) ,
224+ TableSnapshot :: VERSION ,
218225 )
219226 . await ?;
220227 }
@@ -243,29 +250,144 @@ async fn test_fuse_purge_orphan_retention() -> Result<()> {
243250 Ok ( ( ) )
244251}
245252
253+ #[ tokio:: test( flavor = "multi_thread" ) ]
254+ async fn test_fuse_purge_older_version ( ) -> Result < ( ) > {
255+ let fixture = TestFixture :: new ( ) . await ;
256+ let ctx = fixture. ctx ( ) ;
257+ fixture. create_default_table ( ) . await ?;
258+
259+ let now = Utc :: now ( ) ;
260+
261+ let schema = TestFixture :: default_table_schema ( ) ;
262+ let mut table = fixture. latest_default_table ( ) . await ?;
263+ let mut fuse_table = FuseTable :: try_from_table ( table. as_ref ( ) ) ?;
264+ let location_gen = fuse_table. meta_location_generator ( ) ;
265+ let operator = fuse_table. get_operator ( ) ;
266+
267+ {
268+ let num_of_segments = 3 ;
269+ let blocks_per_segment = 2 ;
270+ let segments =
271+ utils:: generate_segments ( fuse_table, num_of_segments, blocks_per_segment) . await ?;
272+
273+ // create snapshot 0, the format version is 2.
274+ let locations = vec ! [ segments[ 0 ] . 0 . clone( ) ] ;
275+ let id = Uuid :: new_v4 ( ) ;
276+ let mut snapshot_0 = TableSnapshotV2 :: new (
277+ id,
278+ & None ,
279+ None ,
280+ schema. as_ref ( ) . clone ( ) ,
281+ segments[ 0 ] . 1 . summary . clone ( ) ,
282+ locations,
283+ None ,
284+ None ,
285+ ) ;
286+ snapshot_0. timestamp = Some ( now - Duration :: hours ( 13 ) ) ;
287+
288+ let new_snapshot_location = location_gen
289+ . snapshot_location_from_uuid ( & snapshot_0. snapshot_id , TableSnapshotV2 :: VERSION ) ?;
290+ utils:: write_snapshot_v2 ( & operator, & new_snapshot_location, & snapshot_0) . await ?;
291+
292+ // create snapshot 1, the format version is 3.
293+ let mut locations = Vec :: with_capacity ( 2 ) ;
294+ for i in [ 1 , 0 ] {
295+ locations. push ( segments[ i] . 0 . clone ( ) ) ;
296+ }
297+ let mut snapshot_1 = TableSnapshot :: new (
298+ Uuid :: new_v4 ( ) ,
299+ & snapshot_0. timestamp ,
300+ Some ( ( snapshot_0. snapshot_id , TableSnapshotV2 :: VERSION ) ) ,
301+ schema. as_ref ( ) . clone ( ) ,
302+ Statistics :: default ( ) ,
303+ locations,
304+ None ,
305+ None ,
306+ ) ;
307+ snapshot_1. timestamp = Some ( now - Duration :: hours ( 12 ) ) ;
308+ snapshot_1. summary = merge_statistics ( & snapshot_0. summary , & segments[ 1 ] . 1 . summary ) ?;
309+ let new_snapshot_location = location_gen
310+ . snapshot_location_from_uuid ( & snapshot_1. snapshot_id , TableSnapshot :: VERSION ) ?;
311+ snapshot_1
312+ . write_meta ( & operator, & new_snapshot_location)
313+ . await ?;
314+
315+ // create snapshot 2, the format version is 3.
316+ let mut locations = Vec :: with_capacity ( 3 ) ;
317+ for i in [ 2 , 1 , 0 ] {
318+ locations. push ( segments[ i] . 0 . clone ( ) ) ;
319+ }
320+ let mut snapshot_2 = TableSnapshot :: from_previous ( & snapshot_1) ;
321+ snapshot_2. segments = locations;
322+ snapshot_2. timestamp = Some ( now) ;
323+ snapshot_2. summary = merge_statistics ( & snapshot_1. summary , & segments[ 2 ] . 1 . summary ) ?;
324+ let new_snapshot_location = location_gen
325+ . snapshot_location_from_uuid ( & snapshot_2. snapshot_id , TableSnapshot :: VERSION ) ?;
326+ snapshot_2
327+ . write_meta ( & operator, & new_snapshot_location)
328+ . await ?;
329+ FuseTable :: commit_to_meta_server (
330+ ctx. as_ref ( ) ,
331+ fuse_table. get_table_info ( ) ,
332+ location_gen,
333+ snapshot_2,
334+ None ,
335+ & None ,
336+ & operator,
337+ )
338+ . await ?;
339+ }
340+
341+ let table_ctx: Arc < dyn TableContext > = ctx. clone ( ) ;
342+ table = fixture. latest_default_table ( ) . await ?;
343+ fuse_table = FuseTable :: try_from_table ( table. as_ref ( ) ) ?;
344+ fuse_table. do_purge ( & table_ctx, true ) . await ?;
345+
346+ let expected_num_of_snapshot = 1 ;
347+ let expected_num_of_segment = 3 ;
348+ let expected_num_of_blocks = 6 ;
349+ let expected_num_of_index = expected_num_of_blocks;
350+ check_data_dir (
351+ & fixture,
352+ "do_gc: retention period is 0" ,
353+ expected_num_of_snapshot,
354+ 0 ,
355+ expected_num_of_segment,
356+ expected_num_of_blocks,
357+ expected_num_of_index,
358+ Some ( ( ) ) ,
359+ None ,
360+ )
361+ . await ?;
362+ Ok ( ( ) )
363+ }
364+
246365mod utils {
366+ use std:: io:: Error ;
247367 use std:: sync:: Arc ;
248368
249369 use chrono:: DateTime ;
250370 use chrono:: Utc ;
251371 use common_storages_factory:: Table ;
252372 use common_storages_fuse:: io:: MetaWriter ;
253373 use common_storages_fuse:: FuseStorageFormat ;
374+ use opendal:: Operator ;
254375
255376 use super :: * ;
256377
257378 pub async fn generate_snapshot_with_segments (
258379 fuse_table : & FuseTable ,
259380 segment_locations : Vec < Location > ,
260381 time_stamp : Option < DateTime < Utc > > ,
382+ version : u64 ,
261383 ) -> Result < String > {
262384 let current_snapshot = fuse_table. read_table_snapshot ( ) . await ?. unwrap ( ) ;
263385 let operator = fuse_table. get_operator ( ) ;
264386 let location_gen = fuse_table. meta_location_generator ( ) ;
265387 let mut new_snapshot = TableSnapshot :: from_previous ( current_snapshot. as_ref ( ) ) ;
266388 new_snapshot. segments = segment_locations;
267- let new_snapshot_location = location_gen
268- . snapshot_location_from_uuid ( & new_snapshot. snapshot_id , TableSnapshot :: VERSION ) ?;
389+ let new_snapshot_location =
390+ location_gen . snapshot_location_from_uuid ( & new_snapshot. snapshot_id , version ) ?;
269391 if let Some ( ts) = time_stamp {
270392 new_snapshot. timestamp = Some ( ts)
271393 }
@@ -320,4 +442,14 @@ mod utils {
320442 let segment_location = segment_writer. write_segment_no_cache ( & segment_info) . await ?;
321443 Ok ( ( segment_location, segment_info) )
322444 }
445+
446+ pub async fn write_snapshot_v2 (
447+ data_accessor : & Operator ,
448+ location : & str ,
449+ meta : & TableSnapshotV2 ,
450+ ) -> Result < ( ) > {
451+ let bs = serde_json:: to_vec ( & meta) . map_err ( Error :: other) ?;
452+ data_accessor. write ( location, bs) . await ?;
453+ Ok ( ( ) )
454+ }
323455}
0 commit comments