@@ -9,6 +9,7 @@ use common::{
99 } ,
1010 maybe_val,
1111 query:: {
12+ Expression ,
1213 IndexRange ,
1314 IndexRangeExpression ,
1415 Order ,
@@ -29,6 +30,7 @@ use types::{
2930 ExportRequestor ,
3031} ;
3132use value:: {
33+ ConvexValue ,
3234 DeveloperDocumentId ,
3335 FieldPath ,
3436 ResolvedDocumentId ,
@@ -59,6 +61,9 @@ pub static EXPORTS_STATE_FIELD: LazyLock<FieldPath> =
5961pub static EXPORTS_TS_FIELD : LazyLock < FieldPath > =
6062 LazyLock :: new ( || "start_ts" . parse ( ) . expect ( "Invalid built-in field" ) ) ;
6163
64+ pub static EXPORTS_EXPIRATION_TS_FIELD : LazyLock < FieldPath > =
65+ LazyLock :: new ( || "expiration_ts" . parse ( ) . expect ( "Invalid built-in field" ) ) ;
66+
6267static EXPORTS_REQUESTOR_FIELD : LazyLock < FieldPath > =
6368 LazyLock :: new ( || "requestor" . parse ( ) . expect ( "Invalid built-in field" ) ) ;
6469
@@ -123,6 +128,13 @@ impl<'a, RT: Runtime> ExportsModel<'a, RT> {
123128 . await
124129 }
125130
131+ #[ cfg( test) ]
132+ pub async fn insert_export ( & mut self , export : Export ) -> anyhow:: Result < ResolvedDocumentId > {
133+ SystemMetadataModel :: new_global ( self . tx )
134+ . insert ( & EXPORTS_TABLE , export. try_into ( ) ?)
135+ . await
136+ }
137+
126138 pub async fn list ( & mut self ) -> anyhow:: Result < Vec < ParsedDocument < Export > > > {
127139 let value_query = Query :: full_table_scan ( EXPORTS_TABLE . clone ( ) , Order :: Asc ) ;
128140 let mut query_stream = ResolvedQuery :: new ( self . tx , TableNamespace :: Global , value_query) ?;
@@ -134,6 +146,36 @@ impl<'a, RT: Runtime> ExportsModel<'a, RT> {
134146 Ok ( result)
135147 }
136148
149+ pub async fn list_unexpired_cloud_backups (
150+ & mut self ,
151+ ) -> anyhow:: Result < Vec < ParsedDocument < Export > > > {
152+ let index_range = IndexRange {
153+ index_name : EXPORTS_BY_REQUESTOR . clone ( ) ,
154+ range : vec ! [ IndexRangeExpression :: Eq (
155+ EXPORTS_REQUESTOR_FIELD . clone( ) ,
156+ ConvexValue :: try_from( ExportRequestor :: CloudBackup . to_string( ) ) ?. into( ) ,
157+ ) ] ,
158+ order : Order :: Asc ,
159+ } ;
160+ let completed_filter = Expression :: Eq (
161+ Expression :: Field ( EXPORTS_STATE_FIELD . clone ( ) ) . into ( ) ,
162+ Expression :: Literal ( maybe_val ! ( "completed" ) ) . into ( ) ,
163+ ) ;
164+ let expired_filter = Expression :: Gt (
165+ Expression :: Field ( EXPORTS_EXPIRATION_TS_FIELD . clone ( ) ) . into ( ) ,
166+ Expression :: Literal ( maybe_val ! ( i64 :: from( * self . tx. begin_timestamp( ) ) ) ) . into ( ) ,
167+ ) ;
168+ let query = Query :: index_range ( index_range)
169+ . filter ( Expression :: And ( vec ! [ completed_filter, expired_filter] ) ) ;
170+ let mut query_stream = ResolvedQuery :: new ( self . tx , TableNamespace :: Global , query) ?;
171+ let mut result = vec ! [ ] ;
172+ while let Some ( doc) = query_stream. next ( self . tx , None ) . await ? {
173+ let row: ParsedDocument < Export > = doc. try_into ( ) ?;
174+ result. push ( row) ;
175+ }
176+ Ok ( result)
177+ }
178+
137179 pub async fn latest_requested ( & mut self ) -> anyhow:: Result < Option < ParsedDocument < Export > > > {
138180 self . export_in_state ( "requested" ) . await
139181 }
@@ -331,4 +373,74 @@ mod tests {
331373 ) ;
332374 Ok ( ( ) )
333375 }
376+
377+ #[ convex_macro:: test_runtime]
378+ async fn test_list_unexpired_cloud_snapshots ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
379+ let DbFixtures { db, .. } = DbFixtures :: new_with_model ( & rt) . await ?;
380+ let mut tx = db. begin_system ( ) . await ?;
381+ let ts = * tx. begin_timestamp ( ) ;
382+ let ts_u64: u64 = ts. into ( ) ;
383+ let mut exports_model = ExportsModel :: new ( & mut tx) ;
384+
385+ // Insert an incomplete cloud backup
386+ exports_model
387+ . insert_export ( Export :: requested (
388+ ExportFormat :: Zip {
389+ include_storage : false ,
390+ } ,
391+ ComponentId :: test_user ( ) ,
392+ ExportRequestor :: CloudBackup ,
393+ ts_u64 + 1000 ,
394+ ) )
395+ . await ?;
396+ let backups = exports_model. list_unexpired_cloud_backups ( ) . await ?;
397+ assert ! ( backups. is_empty( ) ) ;
398+
399+ // Insert a completed snapshot export
400+ let export = Export :: requested (
401+ ExportFormat :: Zip {
402+ include_storage : false ,
403+ } ,
404+ ComponentId :: test_user ( ) ,
405+ ExportRequestor :: SnapshotExport ,
406+ ts_u64 + 1000 ,
407+ )
408+ . in_progress ( ts) ?
409+ . completed ( ts, ts, ObjectKey :: try_from ( "asdf" ) ?) ?;
410+ exports_model. insert_export ( export) . await ?;
411+ let backups = exports_model. list_unexpired_cloud_backups ( ) . await ?;
412+ assert ! ( backups. is_empty( ) ) ;
413+
414+ // Insert a completed but expired cloud backup
415+ let export = Export :: requested (
416+ ExportFormat :: Zip {
417+ include_storage : false ,
418+ } ,
419+ ComponentId :: test_user ( ) ,
420+ ExportRequestor :: CloudBackup ,
421+ ts_u64 - 1000 ,
422+ )
423+ . in_progress ( ts) ?
424+ . completed ( ts, ts, ObjectKey :: try_from ( "asdf" ) ?) ?;
425+ exports_model. insert_export ( export) . await ?;
426+ let backups = exports_model. list_unexpired_cloud_backups ( ) . await ?;
427+ assert ! ( backups. is_empty( ) ) ;
428+
429+ // Insert a completed cloud backup
430+ let export = Export :: requested (
431+ ExportFormat :: Zip {
432+ include_storage : false ,
433+ } ,
434+ ComponentId :: test_user ( ) ,
435+ ExportRequestor :: CloudBackup ,
436+ ts_u64 + 1000 ,
437+ )
438+ . in_progress ( ts) ?
439+ . completed ( ts, ts, ObjectKey :: try_from ( "asdf" ) ?) ?;
440+ exports_model. insert_export ( export) . await ?;
441+ let backups = exports_model. list_unexpired_cloud_backups ( ) . await ?;
442+ assert_eq ! ( backups. len( ) , 1 ) ;
443+
444+ Ok ( ( ) )
445+ }
334446}
0 commit comments