@@ -2,8 +2,6 @@ mod fork;
22pub mod meta_store;
33pub mod replication_wal;
44
5- use std:: collections:: hash_map:: Entry ;
6- use std:: collections:: HashMap ;
75use std:: fmt;
86use std:: path:: { Path , PathBuf } ;
97use std:: sync:: atomic:: { AtomicBool , Ordering } ;
@@ -175,8 +173,6 @@ pub trait MakeNamespace: Sync + Send + 'static {
175173 timestamp : Option < NaiveDateTime > ,
176174 meta_store : & MetaStore ,
177175 ) -> crate :: Result < Namespace < Self :: Database > > ;
178-
179- async fn exists ( & self , namespace : & NamespaceName ) -> bool ;
180176}
181177
182178/// Creates new primary `Namespace`
@@ -292,34 +288,6 @@ impl MakeNamespace for PrimaryNamespaceMaker {
292288 let ns = fork_task. fork ( ) . await ?;
293289 Ok ( ns)
294290 }
295-
296- async fn exists ( & self , namespace : & NamespaceName ) -> bool {
297- let ns_path = self . config . base_path . join ( "dbs" ) . join ( namespace. as_str ( ) ) ;
298- if let Ok ( true ) = ns_path. try_exists ( ) {
299- return true ;
300- }
301-
302- if let Some ( replication_options) = self . config . bottomless_replication . as_ref ( ) {
303- tracing:: info!( "Bottomless: {:?}" , replication_options) ;
304- match bottomless:: replicator:: Replicator :: has_backup_of ( namespace, replication_options)
305- . await
306- {
307- Ok ( true ) => {
308- tracing:: debug!( "Bottomless: Backup found" ) ;
309- return true ;
310- }
311- Ok ( false ) => {
312- tracing:: debug!( "Bottomless: No backup found" ) ;
313- }
314- Err ( err) => {
315- tracing:: debug!( "Bottomless: Error checking backup: {}" , err) ;
316- }
317- }
318- } else {
319- tracing:: debug!( "Bottomless: No backup configured" ) ;
320- }
321- false
322- }
323291}
324292
325293/// Creates new replica `Namespace`
@@ -375,11 +343,6 @@ impl MakeNamespace for ReplicaNamespaceMaker {
375343 ) -> crate :: Result < Namespace < Self :: Database > > {
376344 return Err ( ForkError :: ForkReplica . into ( ) ) ;
377345 }
378-
379- async fn exists ( & self , namespace : & NamespaceName ) -> bool {
380- let ns_path = self . config . base_path . join ( "dbs" ) . join ( namespace. as_str ( ) ) ;
381- ns_path. try_exists ( ) . unwrap_or ( false )
382- }
383346}
384347
385348type NamespaceEntry < T > = Arc < RwLock < Option < Namespace < T > > > > ;
@@ -408,13 +371,14 @@ struct NamespaceStoreInner<M: MakeNamespace> {
408371}
409372
410373impl < M : MakeNamespace > NamespaceStore < M > {
411- pub fn new (
374+ pub async fn new (
412375 make_namespace : M ,
413376 allow_lazy_creation : bool ,
414377 snapshot_at_shutdown : bool ,
415378 meta_store_path : impl AsRef < Path > ,
416379 max_active_namespaces : usize ,
417- ) -> Self {
380+ ) -> crate :: Result < Self > {
381+ let metadata = MetaStore :: new ( meta_store_path) . await ?;
418382 tracing:: trace!( "Max active namespaces: {max_active_namespaces}" ) ;
419383 let store = Cache :: < NamespaceName , NamespaceEntry < M :: Database > > :: builder ( )
420384 . async_eviction_listener ( move |name, ns, cause| {
@@ -437,7 +401,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
437401 . max_capacity ( max_active_namespaces as u64 )
438402 . time_to_idle ( Duration :: from_secs ( 86400 ) )
439403 . build ( ) ;
440- Self {
404+ Ok ( Self {
441405 inner : Arc :: new ( NamespaceStoreInner {
442406 store,
443407 metadata,
@@ -446,7 +410,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
446410 has_shutdown : AtomicBool :: new ( false ) ,
447411 snapshot_at_shutdown,
448412 } ) ,
449- }
413+ } )
450414 }
451415
452416 pub async fn destroy ( & self , namespace : NamespaceName ) -> crate :: Result < ( ) > {
@@ -559,6 +523,11 @@ impl<M: MakeNamespace> NamespaceStore<M> {
559523 return Err ( Error :: NamespaceStoreShutdown ) ;
560524 }
561525
526+ // check that the source namespace exists
527+ if !self . inner . metadata . exists ( & from) {
528+ return Err ( crate :: error:: Error :: NamespaceDoesntExist ( from. to_string ( ) ) ) ;
529+ }
530+
562531 let to_entry = self
563532 . inner
564533 . store
@@ -569,11 +538,6 @@ impl<M: MakeNamespace> NamespaceStore<M> {
569538 return Err ( crate :: error:: Error :: NamespaceAlreadyExist ( to. to_string ( ) ) ) ;
570539 }
571540
572- // check that the source namespace exists
573- if !self . inner . make_namespace . exists ( & from) . await {
574- return Err ( crate :: error:: Error :: NamespaceDoesntExist ( from. to_string ( ) ) ) ;
575- }
576-
577541 let from_entry = self
578542 . inner
579543 . store
@@ -637,7 +601,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
637601 let namespace = namespace. clone ( ) ;
638602 async move {
639603 if namespace != NamespaceName :: default ( )
640- && !self . inner . make_namespace . exists ( & namespace) . await
604+ && !self . inner . metadata . exists ( & namespace)
641605 && !self . inner . allow_lazy_creation
642606 {
643607 return Err ( Error :: NamespaceDoesntExist ( namespace. to_string ( ) ) ) ;
@@ -650,6 +614,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
650614 RestoreOption :: Latest ,
651615 NamespaceBottomlessDbId :: NotProvided ,
652616 self . make_reset_cb ( ) ,
617+ & self . inner . metadata ,
653618 )
654619 . await ?;
655620 tracing:: info!( "loaded namespace: `{namespace}`" ) ;
@@ -707,7 +672,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
707672 // otherwise it's an error.
708673 if self . inner . allow_lazy_creation || namespace == NamespaceName :: default ( ) {
709674 tracing:: trace!( "auto-creating the namespace" ) ;
710- } else if self . inner . make_namespace . exists ( & namespace) . await {
675+ } else if self . inner . metadata . exists ( & namespace) {
711676 return Err ( Error :: NamespaceAlreadyExist ( namespace. to_string ( ) ) ) ;
712677 }
713678
@@ -722,6 +687,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
722687 restore_option,
723688 bottomless_db_id_for_init,
724689 self . make_reset_cb ( ) ,
690+ & self . inner . metadata ,
725691 )
726692 . await ;
727693 match ns {
0 commit comments