Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ pub trait SchemaApi: Send + Sync {
async fn mget_table_names_by_ids(
&self,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>, KVAppError>;

async fn mget_database_names_by_ids(
Expand Down
3 changes: 2 additions & 1 deletion src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1639,6 +1639,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
async fn mget_table_names_by_ids(
&self,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>, KVAppError> {
debug!(req :? =(&table_ids); "SchemaApi: {}", func_name!());

Expand All @@ -1654,7 +1655,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let seq_metas = self.get_pb_values_vec(id_idents).await?;
for (i, seq_meta_opt) in seq_metas.iter().enumerate() {
if let Some(seq_meta) = seq_meta_opt {
if seq_meta.data.drop_on.is_some() {
if seq_meta.data.drop_on.is_some() && !get_dropped_table {
table_names[i] = None;
}
} else {
Expand Down
1 change: 1 addition & 0 deletions src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
&self,
tenant: &Tenant,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>>;

// Get the db name by meta id.
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/catalogs/default/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,15 @@ impl Catalog for DatabaseCatalog {
&self,
tenant: &Tenant,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
let sys_table_names = self
.immutable_catalog
.mget_table_names_by_ids(tenant, table_ids)
.mget_table_names_by_ids(tenant, table_ids, get_dropped_table)
.await?;
let mut_table_names = self
.mutable_catalog
.mget_table_names_by_ids(tenant, table_ids)
.mget_table_names_by_ids(tenant, table_ids, get_dropped_table)
.await?;

let mut table_names = Vec::with_capacity(table_ids.len());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ impl Catalog for ImmutableCatalog {
&self,
_tenant: &Tenant,
table_ids: &[MetaId],
_get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
let mut table_name = Vec::with_capacity(table_ids.len());
for id in table_ids {
Expand Down
7 changes: 6 additions & 1 deletion src/query/service/src/catalogs/default/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,13 @@ impl Catalog for MutableCatalog {
&self,
_tenant: &Tenant,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
let res = self.ctx.meta.mget_table_names_by_ids(table_ids).await?;
let res = self
.ctx
.meta
.mget_table_names_by_ids(table_ids, get_dropped_table)
.await?;
Ok(res)
}

Expand Down
5 changes: 4 additions & 1 deletion src/query/service/src/catalogs/default/session_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,11 @@ impl Catalog for SessionCatalog {
&self,
tenant: &Tenant,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> databend_common_exception::Result<Vec<Option<String>>> {
self.inner.mget_table_names_by_ids(tenant, table_ids).await
self.inner
.mget_table_names_by_ids(tenant, table_ids, get_dropped_table)
.await
}

async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<Option<String>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,9 @@ async fn show_account_grants(
.collect::<HashSet<u64>>();
let mut table_ids = table_id_set.into_iter().collect::<Vec<u64>>();
table_ids.sort();
let table_names = catalog.mget_table_names_by_ids(&tenant, &table_ids).await?;
let table_names = catalog
.mget_table_names_by_ids(&tenant, &table_ids, false)
.await?;
let table_map = table_ids
.into_iter()
.zip(table_names.into_iter())
Expand Down
5 changes: 4 additions & 1 deletion src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,11 @@ impl Catalog for FakedCatalog {
&self,
tenant: &Tenant,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
self.cat.mget_table_names_by_ids(tenant, table_ids).await
self.cat
.mget_table_names_by_ids(tenant, table_ids, get_dropped_table)
.await
}

async fn get_db_name_by_id(&self, db_id: MetaId) -> Result<String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,8 +940,11 @@ impl Catalog for FakedCatalog {
&self,
tenant: &Tenant,
table_id: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
self.cat.mget_table_names_by_ids(tenant, table_id).await
self.cat
.mget_table_names_by_ids(tenant, table_id, get_dropped_table)
.await
}

async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<Option<String>> {
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/hive/hive/src/hive_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ impl Catalog for HiveCatalog {
&self,
_tenant: &Tenant,
_table_ids: &[MetaId],
_get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
Err(ErrorCode::Unimplemented(
"Cannot get tables name by ids in HIVE catalog",
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/iceberg/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ impl Catalog for IcebergCatalog {
&self,
_tenant: &Tenant,
_table_ids: &[MetaId],
_get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
Err(ErrorCode::Unimplemented(
"Cannot get tables name by ids in HIVE catalog",
Expand Down
4 changes: 3 additions & 1 deletion src/query/storages/system/src/streams_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {

let mut source_tb_ids = source_tb_id_set.into_iter().collect::<Vec<u64>>();
source_tb_ids.sort();
let source_tb_names = ctl.mget_table_names_by_ids(&tenant, &source_tb_ids).await?;
let source_tb_names = ctl
.mget_table_names_by_ids(&tenant, &source_tb_ids, false)
.await?;
let source_tb_map = source_tb_ids
.into_iter()
.zip(source_tb_names.into_iter())
Expand Down
55 changes: 27 additions & 28 deletions src/query/storages/system/src/tables_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
Expand Down Expand Up @@ -252,7 +253,7 @@ where TablesTable<WITH_HISTORY, WITHOUT_VIEW>: HistoryAware
let user_api = UserApiProvider::instance();

let mut dbs = Vec::new();
let mut tables_names: Vec<String> = Vec::new();
let mut tables_names: BTreeSet<String> = BTreeSet::new();
let mut invalid_tables_ids = false;
let mut tables_ids: Vec<u64> = Vec::new();
let mut db_name: Vec<String> = Vec::new();
Expand Down Expand Up @@ -317,9 +318,7 @@ where TablesTable<WITH_HISTORY, WITHOUT_VIEW>: HistoryAware
}
} else if col_name == "name" {
if let Scalar::String(t_name) = scalar {
if !tables_names.contains(t_name) {
tables_names.push(t_name.clone());
}
tables_names.insert(t_name.clone());
}
}
Ok(())
Expand Down Expand Up @@ -348,17 +347,19 @@ where TablesTable<WITH_HISTORY, WITHOUT_VIEW>: HistoryAware
}
}
}
if let Err(err) = ctl.mget_table_names_by_ids(&tenant, &tables_ids).await {
warn!("Failed to get tables: {}, {}", ctl.name(), err);
} else {
let new_tables_names = ctl
.mget_table_names_by_ids(&tenant, &tables_ids)
.await?
.into_iter()
.flatten()
.filter(|table| !tables_names.contains(table))
.collect::<Vec<_>>();
tables_names.extend(new_tables_names);
match ctl
.mget_table_names_by_ids(&tenant, &tables_ids, false)
.await
{
Ok(new_tables) => {
let new_table_names: BTreeSet<_> =
new_tables.into_iter().flatten().collect();
tables_names.extend(new_table_names);
}
Err(err) => {
// swallow the errors related with mget tables
warn!("Failed to get tables: {}, {}", ctl.name(), err);
}
}

for table_name in &tables_names {
Expand Down Expand Up @@ -430,21 +431,19 @@ where TablesTable<WITH_HISTORY, WITHOUT_VIEW>: HistoryAware
}
}

if !WITH_HISTORY {
match ctl.mget_table_names_by_ids(&tenant, &tables_ids).await {
Ok(tables) => {
for table in tables.into_iter().flatten() {
if !tables_names.contains(&table) {
tables_names.push(table.clone());
}
}
}
Err(err) => {
let msg =
format!("Failed to get tables: {}, {}", ctl.name(), err);
warn!("{}", msg);
match ctl
.mget_table_names_by_ids(&tenant, &tables_ids, WITH_HISTORY)
.await
{
Ok(tables) => {
for table in tables.into_iter().flatten() {
tables_names.insert(table.clone());
}
}
Err(err) => {
let msg = format!("Failed to get tables: {}, {}", ctl.name(), err);
warn!("{}", msg);
}
}
}
}
Expand Down
Loading