@@ -2,8 +2,6 @@ mod fork;
22pub mod meta_store;
33pub mod replication_wal;
44
5- use std:: collections:: hash_map:: Entry ;
6- use std:: collections:: HashMap ;
75use std:: fmt;
86use std:: path:: { Path , PathBuf } ;
97use std:: sync:: atomic:: { AtomicBool , Ordering } ;
@@ -203,8 +201,6 @@ pub trait MakeNamespace: Sync + Send + 'static {
203201 timestamp : Option < NaiveDateTime > ,
204202 meta_store : & MetaStore ,
205203 ) -> crate :: Result < Namespace < Self :: Database > > ;
206-
207- async fn exists ( & self , namespace : & NamespaceName ) -> bool ;
208204}
209205
210206/// Creates new primary `Namespace`
@@ -320,34 +316,6 @@ impl MakeNamespace for PrimaryNamespaceMaker {
320316 let ns = fork_task. fork ( ) . await ?;
321317 Ok ( ns)
322318 }
323-
324- async fn exists ( & self , namespace : & NamespaceName ) -> bool {
325- let ns_path = self . config . base_path . join ( "dbs" ) . join ( namespace. as_str ( ) ) ;
326- if let Ok ( true ) = ns_path. try_exists ( ) {
327- return true ;
328- }
329-
330- if let Some ( replication_options) = self . config . bottomless_replication . as_ref ( ) {
331- tracing:: info!( "Bottomless: {:?}" , replication_options) ;
332- match bottomless:: replicator:: Replicator :: has_backup_of ( namespace, replication_options)
333- . await
334- {
335- Ok ( true ) => {
336- tracing:: debug!( "Bottomless: Backup found" ) ;
337- return true ;
338- }
339- Ok ( false ) => {
340- tracing:: debug!( "Bottomless: No backup found" ) ;
341- }
342- Err ( err) => {
343- tracing:: debug!( "Bottomless: Error checking backup: {}" , err) ;
344- }
345- }
346- } else {
347- tracing:: debug!( "Bottomless: No backup configured" ) ;
348- }
349- false
350- }
351319}
352320
353321/// Creates new replica `Namespace`
@@ -403,11 +371,6 @@ impl MakeNamespace for ReplicaNamespaceMaker {
403371 ) -> crate :: Result < Namespace < Self :: Database > > {
404372 return Err ( ForkError :: ForkReplica . into ( ) ) ;
405373 }
406-
407- async fn exists ( & self , namespace : & NamespaceName ) -> bool {
408- let ns_path = self . config . base_path . join ( "dbs" ) . join ( namespace. as_str ( ) ) ;
409- ns_path. try_exists ( ) . unwrap_or ( false )
410- }
411374}
412375
413376type NamespaceEntry < T > = Arc < RwLock < Option < Namespace < T > > > > ;
@@ -436,13 +399,14 @@ struct NamespaceStoreInner<M: MakeNamespace> {
436399}
437400
438401impl < M : MakeNamespace > NamespaceStore < M > {
439- pub fn new (
402+ pub async fn new (
440403 make_namespace : M ,
441404 allow_lazy_creation : bool ,
442405 snapshot_at_shutdown : bool ,
443406 meta_store_path : impl AsRef < Path > ,
444407 max_active_namespaces : usize ,
445- ) -> Self {
408+ ) -> crate :: Result < Self > {
409+ let metadata = MetaStore :: new ( meta_store_path) . await ?;
446410 tracing:: trace!( "Max active namespaces: {max_active_namespaces}" ) ;
447411 let store = Cache :: < NamespaceName , NamespaceEntry < M :: Database > > :: builder ( )
448412 . async_eviction_listener ( move |name, ns, cause| {
@@ -465,7 +429,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
465429 . max_capacity ( max_active_namespaces as u64 )
466430 . time_to_idle ( Duration :: from_secs ( 86400 ) )
467431 . build ( ) ;
468- Self {
432+ Ok ( Self {
469433 inner : Arc :: new ( NamespaceStoreInner {
470434 store,
471435 metadata,
@@ -474,7 +438,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
474438 has_shutdown : AtomicBool :: new ( false ) ,
475439 snapshot_at_shutdown,
476440 } ) ,
477- }
441+ } )
478442 }
479443
480444 pub async fn destroy ( & self , namespace : NamespaceName ) -> crate :: Result < ( ) > {
@@ -587,6 +551,11 @@ impl<M: MakeNamespace> NamespaceStore<M> {
587551 return Err ( Error :: NamespaceStoreShutdown ) ;
588552 }
589553
554+ // check that the source namespace exists
555+ if !self . inner . metadata . exists ( & from) {
556+ return Err ( crate :: error:: Error :: NamespaceDoesntExist ( from. to_string ( ) ) ) ;
557+ }
558+
590559 let to_entry = self
591560 . inner
592561 . store
@@ -597,11 +566,6 @@ impl<M: MakeNamespace> NamespaceStore<M> {
597566 return Err ( crate :: error:: Error :: NamespaceAlreadyExist ( to. to_string ( ) ) ) ;
598567 }
599568
600- // check that the source namespace exists
601- if !self . inner . make_namespace . exists ( & from) . await {
602- return Err ( crate :: error:: Error :: NamespaceDoesntExist ( from. to_string ( ) ) ) ;
603- }
604-
605569 let from_entry = self
606570 . inner
607571 . store
@@ -665,7 +629,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
665629 let namespace = namespace. clone ( ) ;
666630 async move {
667631 if namespace != NamespaceName :: default ( )
668- && !self . inner . make_namespace . exists ( & namespace) . await
632+ && !self . inner . metadata . exists ( & namespace)
669633 && !self . inner . allow_lazy_creation
670634 {
671635 return Err ( Error :: NamespaceDoesntExist ( namespace. to_string ( ) ) ) ;
@@ -678,6 +642,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
678642 RestoreOption :: Latest ,
679643 NamespaceBottomlessDbId :: NotProvided ,
680644 self . make_reset_cb ( ) ,
645+ & self . inner . metadata ,
681646 )
682647 . await ?;
683648 tracing:: info!( "loaded namespace: `{namespace}`" ) ;
@@ -735,7 +700,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
735700 // otherwise it's an error.
736701 if self . inner . allow_lazy_creation || namespace == NamespaceName :: default ( ) {
737702 tracing:: trace!( "auto-creating the namespace" ) ;
738- } else if self . inner . make_namespace . exists ( & namespace) . await {
703+ } else if self . inner . metadata . exists ( & namespace) {
739704 return Err ( Error :: NamespaceAlreadyExist ( namespace. to_string ( ) ) ) ;
740705 }
741706
@@ -750,6 +715,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
750715 restore_option,
751716 bottomless_db_id_for_init,
752717 self . make_reset_cb ( ) ,
718+ & self . inner . metadata ,
753719 )
754720 . await ;
755721 match ns {
0 commit comments