Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions src/meta/api/src/data_mask_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_meta_app::data_mask::data_mask_name_ident;
use databend_common_meta_app::data_mask::CreateDatamaskReply;
use databend_common_meta_app::data_mask::CreateDatamaskReq;
use databend_common_meta_app::data_mask::DataMaskId;
use databend_common_meta_app::data_mask::DataMaskNameIdent;
use databend_common_meta_app::data_mask::DatamaskMeta;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_app::tenant_key::errors::ExistError;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;

use crate::errors::MaskingPolicyError;
use crate::kv_app_error::KVAppError;
use crate::meta_txn_error::MetaTxnError;

#[async_trait::async_trait]
pub trait DatamaskApi: Send + Sync {
async fn create_data_mask(
&self,
req: CreateDatamaskReq,
) -> Result<CreateDatamaskReply, KVAppError>;
) -> Result<Result<CreateDatamaskReply, ExistError<data_mask_name_ident::Resource>>, MetaError>;

/// On success, returns the dropped id and data mask.
/// Returning None, means nothing is removed.
Expand Down
161 changes: 86 additions & 75 deletions src/meta/api/src/data_mask_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::data_mask::data_mask_name_ident;
use databend_common_meta_app::data_mask::CreateDatamaskReply;
use databend_common_meta_app::data_mask::CreateDatamaskReq;
use databend_common_meta_app::data_mask::DataMaskId;
Expand All @@ -24,8 +24,10 @@ use databend_common_meta_app::data_mask::MaskPolicyTableIdIdent;
use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
use databend_common_meta_app::id_generator::IdGenerator;
use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_app::tenant_key::errors::ExistError;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
Expand All @@ -38,7 +40,6 @@ use log::debug;
use crate::data_mask_api::DatamaskApi;
use crate::errors::MaskingPolicyError;
use crate::fetch_id;
use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
use crate::meta_txn_error::MetaTxnError;
use crate::txn_backoff::txn_backoff;
Expand All @@ -55,87 +56,98 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
async fn create_data_mask(
&self,
req: CreateDatamaskReq,
) -> Result<CreateDatamaskReply, KVAppError> {
) -> Result<Result<CreateDatamaskReply, ExistError<data_mask_name_ident::Resource>>, MetaError>
{
debug!(req :? =(&req); "DatamaskApi: {}", func_name!());

let name_ident = &req.name;

let mut trials = txn_backoff(None, func_name!());
let id = loop {
trials.next().unwrap()?.await;
let row_access_name_ident = RowAccessPolicyNameIdent::new(
name_ident.tenant().clone(),
name_ident.data_mask_name().to_string(),
);
if self.get_pb(&row_access_name_ident).await?.is_some() {
return Ok(Err(
name_ident.exist_error("name conflicts with an existing masking policy")
));
}

let mut txn = TxnRequest::default();
let masking_policy_id = fetch_id(self, IdGenerator::data_mask_id()).await?;

let res = self.get_id_and_value(name_ident).await?;
debug!(res :? = res, name_key :? =(name_ident); "create_data_mask");

let mut curr_seq = 0;

if let Some((seq_id, seq_meta)) = res {
match req.create_option {
CreateOption::Create => {
return Err(AppError::DatamaskAlreadyExists(
name_ident.exist_error(func_name!()),
)
.into());
}
CreateOption::CreateIfNotExists => {
return Ok(CreateDatamaskReply { id: *seq_id.data });
}
CreateOption::CreateOrReplace => {
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());

txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);

curr_seq = seq_id.seq;
}
};
}
let mut txn = TxnRequest::default();

let res = self.get_id_and_value(name_ident).await?;
debug!(res :? = res, name_key :? =(name_ident); "create_data_mask");

// Create data mask by inserting these record:
// name -> id
// id -> policy
// data mask name -> data mask table id list

let id = fetch_id(self, IdGenerator::data_mask_id()).await?;

let id = DataMaskId::new(id);
let id_ident = DataMaskIdIdent::new_generic(name_ident.tenant(), id);
let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone());

debug!(
id :? =(&id_ident),
name_key :? =(name_ident);
"new datamask id"
);

{
let meta: DatamaskMeta = req.data_mask_meta.clone();
let id_list = MaskpolicyTableIdList::default();
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
txn.if_then.extend(vec![
txn_op_put_pb(name_ident, &id, None)?, // name -> db_id
txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta
// TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
txn_op_put_pb(&id_list_key, &id_list, None)?, // data mask name -> id_list
]);

let (succ, _responses) = send_txn(self, txn).await?;

debug!(
name :? =(name_ident),
id :? =(&id_ident),
succ = succ;
"create_data_mask"
);

if succ {
break id;
let mut curr_seq = 0;

if let Some((seq_id, seq_meta)) = res {
match req.create_option {
CreateOption::Create => {
return Ok(Err(
name_ident.exist_error(format!("{} already exists", req.name))
));
}
}
};
CreateOption::CreateIfNotExists => {
return Ok(Ok(CreateDatamaskReply { id: *seq_id.data }));
}
CreateOption::CreateOrReplace => {
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());

txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);

curr_seq = seq_id.seq;
}
};
}

Ok(CreateDatamaskReply { id: *id })
// Create data mask by inserting these record:
// name -> id
// id -> policy
// data mask name -> data mask table id list

let id = DataMaskId::new(masking_policy_id);
let id_ident = DataMaskIdIdent::new_generic(name_ident.tenant(), id);
let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone());

debug!(
id :? =(&id_ident),
name_key :? =(name_ident);
"new datamask id"
);

{
let meta: DatamaskMeta = req.data_mask_meta.clone();
let id_list = MaskpolicyTableIdList::default();
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
txn.condition
.push(txn_cond_eq_seq(&row_access_name_ident, 0));
txn.if_then.extend(vec![
txn_op_put_pb(name_ident, &id, None)?, // name -> db_id
txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta
// TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
txn_op_put_pb(&id_list_key, &id_list, None)?, // data mask name -> id_list
]);
}

let (succ, _responses) = send_txn(self, txn).await?;

debug!(
name :? =(name_ident),
id :? =(&id_ident),
succ = succ;
"create_data_mask"
);

if succ {
Ok(Ok(CreateDatamaskReply {
id: masking_policy_id,
}))
} else {
Ok(Err(
name_ident.exist_error(format!("{} already exists", req.name))
))
}
}

async fn drop_data_mask(
Expand Down Expand Up @@ -173,7 +185,6 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
// Policy is in use - cannot drop
if !table_policy_refs.is_empty() {
return Ok(Err(MaskingPolicyError::policy_in_use(
tenant.tenant_name().to_string(),
name_ident.data_mask_name().to_string(),
)));
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub enum MaskingPolicyError {
}

impl MaskingPolicyError {
pub fn policy_in_use(_tenant: impl Into<String>, policy_name: impl Into<String>) -> Self {
pub fn policy_in_use(policy_name: impl Into<String>) -> Self {
Self::PolicyInUse {
policy_name: policy_name.into(),
}
Expand All @@ -72,7 +72,7 @@ pub enum RowAccessPolicyError {
}

impl RowAccessPolicyError {
pub fn policy_in_use(_tenant: impl Into<String>, policy_name: impl Into<String>) -> Self {
pub fn policy_in_use(policy_name: impl Into<String>) -> Self {
Self::PolicyInUse {
policy_name: policy_name.into(),
}
Expand Down
3 changes: 3 additions & 0 deletions src/meta/api/src/kv_fetch_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ pub async fn fetch_id<T: kvapi::Key>(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
generator: T,
) -> Result<u64, MetaError> {
// Each `upsert` bumps the seq atomically inside metasrv, therefore every caller
// receives a unique, monotonically increasing id even when multiple sessions
// fetch from the same generator concurrently.
let res = kv_api
.upsert_kv(UpsertKV::update(generator.to_string_key(), b""))
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/api/src/row_access_policy_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub trait RowAccessPolicyApi: Send + Sync {
req: CreateRowAccessPolicyReq,
) -> Result<
Result<CreateRowAccessPolicyReply, ExistError<row_access_policy_name_ident::Resource>>,
MetaTxnError,
MetaError,
>;

/// On success, returns the dropped id and row policy.
Expand Down
Loading